Browse Source

barrier for workers when added/removed/modified in/from a context

Andra Hugo 14 years ago
parent
commit
1b9bb42099

+ 5 - 5
examples/cholesky_and_lu/cholesky_and_lu.c

@@ -79,15 +79,15 @@ void cholesky_vs_cholesky(params *p1, params *p2, params *p3,
   for(i = gpu; i < gpu + gpu1; i++)
     {
       procs[k++] = i;
-      //      printf("%d ", i);
+      //printf("%d ", i);
     }
 
   for(i = 3; i < 3 + cpu1; i++)
     {
       procs[k++] = i;
-      //  printf("%d ", i);
+      //printf("%d ", i);
     }
-  //  printf("\n");
+  //printf("\n");
 
   p1->ctx = starpu_create_sched_ctx("heft", procs, ncpus1, "cholesky1");
   p2->the_other_ctx = (int)p1->ctx;
@@ -99,7 +99,7 @@ void cholesky_vs_cholesky(params *p1, params *p2, params *p3,
 
   for(i = 0; i < gpu; i++){
     procs2[k++] = i;
-    //printf("%d ", i);
+    //    printf("%d ", i);
   }
 
   for(i = gpu + gpu1; i < gpu + gpu1 + gpu2; i++){
@@ -112,7 +112,7 @@ void cholesky_vs_cholesky(params *p1, params *p2, params *p3,
     //    printf("%d ", i);
   }
 
-  //   printf("\n");
+  //  printf("\n");
 
   p2->ctx = starpu_create_sched_ctx("heft", procs2, ncpus2, "cholesky2");
   p1->the_other_ctx = (int)p2->ctx;

+ 7 - 0
src/common/barrier_counter.c

@@ -23,6 +23,13 @@ int _starpu_barrier_counter_init(struct _starpu_barrier_counter_t *barrier_c, in
 	return 0;
 }
 
+int _starpu_barrier_counter_update(struct _starpu_barrier_counter_t *barrier_c, int count)
+{
+	barrier_c->barrier.count = count;
+	barrier_c->barrier.reached = 0;
+	return 0;
+}
+
 int _starpu_barrier_counter_destroy(struct _starpu_barrier_counter_t *barrier_c)
 {
 	_starpu_barrier_destroy(&barrier_c->barrier);

+ 2 - 0
src/common/barrier_counter.h

@@ -23,6 +23,8 @@ struct _starpu_barrier_counter_t {
 
 int _starpu_barrier_counter_init(struct _starpu_barrier_counter_t *barrier_c, int count);
 
+int _starpu_barrier_counter_update(struct _starpu_barrier_counter_t *barrier_c, int count);
+
 int _starpu_barrier_counter_destroy(struct _starpu_barrier_counter_t *barrier_c);
 
 int _starpu_barrier_counter_wait_for_empty_counter(struct _starpu_barrier_counter_t *barrier_c);

+ 68 - 37
src/core/sched_ctx.c

@@ -1,12 +1,12 @@
 #include <core/sched_policy.h>
 #include <core/sched_ctx.h>
 
-static struct _starpu_barrier_counter_t workers_barrier;
+static struct _starpu_barrier_counter_t workers_barrier[STARPU_NMAX_SCHED_CTXS];
 
 static unsigned _starpu_get_first_available_sched_ctx_id(struct starpu_machine_config_s *config);
 static unsigned _starpu_get_first_free_sched_ctx_in_worker_list(struct starpu_worker_s *worker);
 static void _starpu_rearange_sched_ctx_workerids(struct starpu_sched_ctx *sched_ctx, int old_nworkerids_in_ctx);
-int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_ctx, int *workerids_in_ctx);
+int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_ctx, int *workerids_in_ctx, unsigned sched_ctx_id);
 
 unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx, 
 				  int nworkerids_in_ctx, unsigned is_initial_sched,
@@ -15,9 +15,11 @@ unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx
 	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
 	STARPU_ASSERT(config->topology.nsched_ctxs < STARPU_NMAX_SCHED_CTXS - 1);
 
-	struct starpu_sched_ctx *sched_ctx = &config->sched_ctxs[config->topology.nsched_ctxs];
+	unsigned id = _starpu_get_first_available_sched_ctx_id(config);
 
-	sched_ctx->sched_ctx_id = _starpu_get_first_available_sched_ctx_id(config);
+	struct starpu_sched_ctx *sched_ctx = &config->sched_ctxs[id];
+
+	sched_ctx->sched_ctx_id = id;
 
 	int nworkers = config->topology.nworkers;
 	
@@ -34,7 +36,7 @@ unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx
 	_starpu_barrier_counter_init(&sched_ctx->tasks_barrier, 0);
 
 	int j;
-	/*all the workers are in this contex*/
+	/* if null add ll the workers are to the contex */
 	if(workerids_in_ctx == NULL)
 	  {
 		for(j = 0; j < nworkers; j++)
@@ -75,16 +77,21 @@ unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx
 unsigned starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx, 
 			    int nworkerids_in_ctx, const char *sched_name)
 {
+	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
+
+	unsigned id = _starpu_get_first_available_sched_ctx_id(config);
+
 	unsigned ret;
 	/* block the workers until the contex is switched */
-	set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
+	set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx, id);
 	ret = _starpu_create_sched_ctx(policy_name, workerids_in_ctx, nworkerids_in_ctx, 0, sched_name);
 	/* also wait the workers to wake up before using the context */
-	set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx);
+	set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx, id);
 	
 	return ret;
 }
 
+/* check if the worker already belongs to the context */
 static unsigned _starpu_worker_belongs_to_ctx(int workerid, struct starpu_sched_ctx *sched_ctx)
 {
 	int i;
@@ -94,6 +101,7 @@ static unsigned _starpu_worker_belongs_to_ctx(int workerid, struct starpu_sched_
 	return 0;
 }
 
+/* remove the context from the worker's list of contexts */
 static void _starpu_remove_sched_ctx_from_worker(struct starpu_worker_s *workerarg, struct starpu_sched_ctx *sched_ctx)
 {
 	unsigned i;
@@ -111,6 +119,7 @@ static void _starpu_remove_sched_ctx_from_worker(struct starpu_worker_s *workera
 	return;
 }
 
+/* free all structures for the context */
 static void _starpu_manage_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx)
 {
 	int nworkers = sched_ctx->nworkers_in_ctx;
@@ -156,18 +165,17 @@ static void _starpu_add_workers_to_sched_ctx(int *new_workers, int nnew_workers,
                                 workerarg->sched_ctx[_starpu_get_first_free_sched_ctx_in_worker_list(workerarg)] = sched_ctx;
                           }
                   }
-		//                sched_ctx->nworkers_in_ctx = ntotal_workers;
 		n_added_workers = ntotal_workers;
           }
         else
           {
                 int i;
-		//		printf("%d redim worker:", nnew_workers);
+		//printf("%d redim worker:", nnew_workers);
                 for(i = 0; i < nnew_workers; i++)
                   {
-                        /*take care the user does not ask for a resource that does not exist*/
+                        /* take care the user does not ask for a resource that does not exist */
                         STARPU_ASSERT( new_workers[i] >= 0 &&  new_workers[i] <= ntotal_workers);
-			//	printf(" %d", new_workers[i]);
+			//printf(" %d", new_workers[i]);
 
 			struct starpu_worker_s *workerarg = _starpu_get_worker_struct(new_workers[i]);
 			if(!_starpu_worker_belongs_to_ctx(new_workers[i], sched_ctx))
@@ -179,8 +187,7 @@ static void _starpu_add_workers_to_sched_ctx(int *new_workers, int nnew_workers,
 			    n_added_workers++;
 			  }
                   }
-		//                sched_ctx->nworkers_in_ctx += n_added_workers;
-		//		printf("\n");
+	       	//printf("\n");
           }
 
         sched_ctx->sched_policy->init_sched_for_workers(sched_ctx->sched_ctx_id, n_added_workers);
@@ -197,7 +204,7 @@ void starpu_delete_sched_ctx(unsigned sched_ctx_id, unsigned inheritor_sched_ctx
 		struct starpu_sched_ctx *inheritor_sched_ctx = _starpu_get_sched_ctx(inheritor_sched_ctx_id);
 
 		/* block the workers until the contex is switched */
-		set_changing_ctx_flag(STATUS_CHANGING_CTX, sched_ctx->nworkers_in_ctx, sched_ctx->workerid);
+		set_changing_ctx_flag(STATUS_CHANGING_CTX, sched_ctx->nworkers_in_ctx, sched_ctx->workerid, sched_ctx_id);
 		_starpu_manage_delete_sched_ctx(sched_ctx);
 
 		/*if both of them have all the ressources is pointless*/
@@ -208,7 +215,7 @@ void starpu_delete_sched_ctx(unsigned sched_ctx_id, unsigned inheritor_sched_ctx
 		if(!(sched_ctx->nworkers_in_ctx == ntotal_workers && sched_ctx->nworkers_in_ctx == inheritor_sched_ctx->nworkers_in_ctx))
 		  _starpu_add_workers_to_sched_ctx(sched_ctx->workerid, sched_ctx->nworkers_in_ctx, inheritor_sched_ctx);
 		/* also wait the workers to wake up before using the context */
-		set_changing_ctx_flag(STATUS_UNKNOWN, sched_ctx->nworkers_in_ctx, sched_ctx->workerid);
+		set_changing_ctx_flag(STATUS_UNKNOWN, sched_ctx->nworkers_in_ctx, sched_ctx->workerid, sched_ctx_id);
 	  }		
 	return;	
 }
@@ -236,10 +243,10 @@ void starpu_add_workers_to_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ct
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
 
 	/* block the workers until the contex is switched */
-	set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
+	set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx, sched_ctx_id);
 	_starpu_add_workers_to_sched_ctx(workerids_in_ctx, nworkerids_in_ctx, sched_ctx);
 	/* also wait the workers to wake up before using the context */
-	set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx);
+	set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx, sched_ctx_id);
 
 	return;
 }
@@ -288,6 +295,8 @@ static void _starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nwo
 		  }
 
 		sched_ctx->nworkers_in_ctx -= nworkerids_in_ctx;
+		/* reorder the worker's list of contexts in order to avoid 
+		   the holes in the list after removing some elements */
 		_starpu_rearange_sched_ctx_workerids(sched_ctx, nworkerids_already_in_ctx);
 	  }
 
@@ -304,12 +313,12 @@ void starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nworkerids_
 		struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
 
 		/* block the workers until the contex is switched */
-		set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
+		set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx, sched_ctx_id);
 		PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
 		_starpu_remove_workers_from_sched_ctx(workerids_in_ctx, nworkerids_in_ctx, sched_ctx);
 		PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
 		/* also wait the workers to wake up before using the context */
-		set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx);
+		set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx, sched_ctx_id);
 	  }
 	return;
 }
@@ -319,7 +328,10 @@ void _starpu_init_all_sched_ctx(struct starpu_machine_config_s *config)
 {
 	unsigned i;
 	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+	  {
 		config->sched_ctxs[i].sched_ctx_id = STARPU_NMAX_SCHED_CTXS;
+		_starpu_barrier_counter_init(&workers_barrier[i], 0);
+	  }
 	return;
 }
 
@@ -330,7 +342,10 @@ void _starpu_init_sched_ctx_for_worker(unsigned workerid)
 	worker->sched_ctx = (struct starpu_sched_ctx**)malloc(STARPU_NMAX_SCHED_CTXS * sizeof(struct starpu_sched_ctx*));
 	unsigned i;
 	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+	  {
 		worker->sched_ctx[i] = NULL;
+		worker->workers_barrier[i] = NULL;
+	  }
 	return;
 }
 
@@ -391,7 +406,7 @@ static void _starpu_rearange_sched_ctx_workerids(struct starpu_sched_ctx *sched_
 }
 
 /* manage blocking and waking up threads when constructing/modifying contexts */
-int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_ctx, int *workerids_in_ctx)
+int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_ctx, int *workerids_in_ctx, unsigned sched_ctx_id)
 {
 	struct starpu_machine_config_s *config = _starpu_get_machine_config();
 
@@ -403,10 +418,9 @@ int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_c
 	pthread_cond_t *changing_ctx_cond = NULL;
 	
 	int workerid = -1;
-
 	if(changing_ctx == STATUS_CHANGING_CTX)
-	  _starpu_barrier_counter_init(&workers_barrier, nworkers);
-	
+		_starpu_barrier_counter_update(&workers_barrier[sched_ctx_id], nworkers);
+
 	for(i = 0; i < nworkers; i++)
 	  {
 		workerid = workerids_in_ctx == NULL ? i : workerids_in_ctx[i];
@@ -415,6 +429,11 @@ int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_c
 		changing_ctx_mutex = &worker->changing_ctx_mutex;
 		changing_ctx_cond = &worker->changing_ctx_cond;
 		
+		if(changing_ctx == STATUS_CHANGING_CTX)
+		  {
+			worker->workers_barrier[sched_ctx_id] = &workers_barrier[sched_ctx_id];
+		  }
+
 		/*if the status is CHANGING_CTX let the thread know that it must block*/
 		PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
 		worker->blocking_status = changing_ctx;
@@ -427,20 +446,25 @@ int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_c
 			PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
 			PTHREAD_COND_SIGNAL(changing_ctx_cond);
 			PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-		  }
-
+		  }       
 	  }
 	
 	/* after letting know all the concerned threads about the change
 	   wait for them to take into account the info */
 	if(changing_ctx == STATUS_CHANGING_CTX)
-		_starpu_wait_for_all_threads_to_block();
+		_starpu_wait_for_all_threads_to_block(&workers_barrier[sched_ctx_id]);
 	else
+		_starpu_wait_for_all_threads_to_wake_up(&workers_barrier[sched_ctx_id]);
+
+	if(changing_ctx == STATUS_UNKNOWN)
 	  {
-		_starpu_wait_for_all_threads_to_wake_up();
-		_starpu_barrier_counter_destroy(&workers_barrier);
+		for(i = 0; i < nworkers; i++)
+		  {
+			 workerid = workerids_in_ctx == NULL ? i : workerids_in_ctx[i];
+			 worker = _starpu_get_worker_struct(workerid);
+			 worker->workers_barrier[sched_ctx_id] = NULL;
+		  }
 	  }
-
 	return 0;
 }
 
@@ -494,24 +518,31 @@ void _starpu_increment_nsubmitted_tasks_of_worker(int workerid)
 	return;
 }
 
-void _starpu_decrement_nblocked_ths(void)
+void _starpu_decrement_nblocked_ths(struct _starpu_barrier_counter_t **barrier)
 {
-	_starpu_barrier_counter_decrement_until_empty_counter(&workers_barrier);
+	unsigned i;
+	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+		if(barrier[i] != NULL)
+			_starpu_barrier_counter_decrement_until_empty_counter(barrier[i]);
+	  
 }
 
-void _starpu_increment_nblocked_ths(void)
+void _starpu_increment_nblocked_ths(struct _starpu_barrier_counter_t **barrier)
 {
-	_starpu_barrier_counter_increment_until_full_counter(&workers_barrier);
+	unsigned i;
+	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+		if(barrier[i] != NULL)
+			_starpu_barrier_counter_increment_until_full_counter(barrier[i]);
 }
 
-int _starpu_wait_for_all_threads_to_block(void)
+int _starpu_wait_for_all_threads_to_block(struct _starpu_barrier_counter_t *barrier)
 {
-	return _starpu_barrier_counter_wait_for_full_counter(&workers_barrier);
+	return _starpu_barrier_counter_wait_for_full_counter(barrier);
 }
 
-int _starpu_wait_for_all_threads_to_wake_up(void)
+int _starpu_wait_for_all_threads_to_wake_up(struct _starpu_barrier_counter_t *barrier)
 {
-	return _starpu_barrier_counter_wait_for_empty_counter(&workers_barrier);
+	return _starpu_barrier_counter_wait_for_empty_counter(barrier);
 }
 
 int starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id)

+ 4 - 4
src/core/sched_ctx.h

@@ -75,10 +75,10 @@ unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerid, int nw
 void _starpu_delete_all_sched_ctxs();
 
 /* Workers are blocked when constructing or modifying a context */
-void _starpu_increment_nblocked_ths(void);
-void _starpu_decrement_nblocked_ths(void);
-int _starpu_wait_for_all_threads_to_block(void);
-int _starpu_wait_for_all_threads_to_wake_up(void);
+void _starpu_increment_nblocked_ths(struct _starpu_barrier_counter_t **barrier);
+  void _starpu_decrement_nblocked_ths(struct _starpu_barrier_counter_t **barrier);
+int _starpu_wait_for_all_threads_to_block(struct _starpu_barrier_counter_t *workers_barrier);
+int _starpu_wait_for_all_threads_to_wake_up(struct _starpu_barrier_counter_t *workers_barrier);
 
 /* Keeps track of the number of tasks currently submitted to a worker */
 void _starpu_decrement_nsubmitted_tasks_of_worker(int workerid);

+ 3 - 0
src/core/workers.h

@@ -87,6 +87,9 @@ struct starpu_worker_s {
 	int nworkers_of_next_ctx;
 
 	struct _starpu_barrier_counter_t tasks_barrier; /* wait for the tasks submitted */
+
+	/* block workers to update their affiliation to a context */
+	struct _starpu_barrier_counter_t *workers_barrier[STARPU_NMAX_SCHED_CTXS];
        
 #ifdef __GLIBC__
 	cpu_set_t initial_cpu_set;

+ 2 - 2
src/drivers/cpu/driver_cpu.c

@@ -162,9 +162,9 @@ void *_starpu_cpu_worker(void *arg)
 		PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
 
 		if(cpu_arg->blocking_status == STATUS_CHANGING_CTX){
-			_starpu_increment_nblocked_ths();
+			_starpu_increment_nblocked_ths(cpu_arg->workers_barrier);
 			_starpu_block_worker(workerid, changing_ctx_cond, changing_ctx_mutex);
-			_starpu_decrement_nblocked_ths();
+			_starpu_decrement_nblocked_ths(cpu_arg->workers_barrier);
 		}
 
 		PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);

+ 2 - 2
src/drivers/cuda/driver_cuda.c

@@ -289,9 +289,9 @@ void *_starpu_cuda_worker(void *arg)
                 PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
 
                 if(args->blocking_status == STATUS_CHANGING_CTX){
-			_starpu_increment_nblocked_ths();
+			_starpu_increment_nblocked_ths(args->workers_barrier);
 			_starpu_block_worker(workerid, changing_ctx_cond, changing_ctx_mutex);
-			_starpu_decrement_nblocked_ths();
+			_starpu_decrement_nblocked_ths(args->workers_barrier);
                 }
 
                 PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);

+ 5 - 1
src/sched_policies/heft.c

@@ -64,7 +64,11 @@ static void heft_init_for_workers(unsigned sched_ctx_id, int nnew_workers)
 	    PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid_ctx], NULL);
 	    PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid_ctx], NULL);
 	  }
-	sched_ctx->nworkers_in_ctx = all_workers;
+
+	/* take into account the new number of threads at the next push */
+	PTHREAD_MUTEX_LOCK(&sched_ctx->changing_ctx_mutex);
+	sched_ctx->temp_nworkers_in_ctx = all_workers;
+	PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
 }
 static void heft_init(unsigned sched_ctx_id)
 {