Andra Hugo лет назад: 14
Родитель
Сommit
c2d309d071
6 измененных файлов с 246 добавлено и 285 удалено
  1. 1 0
      src/core/jobs.c
  2. 3 1
      src/core/perfmodel/perfmodel_history.c
  3. 221 264
      src/core/sched_ctx.c
  4. 17 10
      src/core/sched_ctx.h
  5. 2 5
      src/core/workers.c
  6. 2 5
      src/core/workers.h

+ 1 - 0
src/core/jobs.c

@@ -227,6 +227,7 @@ void _starpu_handle_job_termination(starpu_job_t j, unsigned job_is_already_lock
 		/* We reuse the same job structure */
 		int ret = _starpu_submit_job(j, 1);
 		STARPU_ASSERT(!ret);
+		printf("did not decrement\n");
 	}	
 	else {
 		_starpu_decrement_nsubmitted_tasks();

+ 3 - 1
src/core/perfmodel/perfmodel_history.c

@@ -662,11 +662,13 @@ double _starpu_history_based_job_expected_perf(struct starpu_perfmodel_t *model,
 	exp = entry?entry->mean:-1.0;
 
 	if (entry && entry->nsample < STARPU_CALIBRATION_MINIMUM)
+	  {
 		/* TODO: report differently if we've scheduled really enough
 		 * of that task and the scheduler should perhaps put it aside */
 		/* Not calibrated enough */
 		return -1.0;
-	  
+	  }
+
 	return exp;
 }
 

+ 221 - 264
src/core/sched_ctx.c

@@ -1,46 +1,12 @@
-#include <core/sched_ctx.h>
-#include <common/config.h>
-#include <common/utils.h>
 #include <core/sched_policy.h>
-#include <profiling/profiling.h>
+#include <core/sched_ctx.h>
 
-static pthread_cond_t blocking_ths_cond = PTHREAD_COND_INITIALIZER;
-static pthread_cond_t wakeup_ths_cond = PTHREAD_COND_INITIALIZER;
-static pthread_mutex_t blocking_ths_mutex = PTHREAD_MUTEX_INITIALIZER;
-static int nblocked_ths = 0;
+static struct _starpu_barrier_counter_t workers_barrier;
 
-void _starpu_init_sched_ctx(struct starpu_machine_config_s *config)
-{
-  /* unused sched_ctx have the id STARPU_NMAX_SCHED_CTXS */
-  int i;
-  for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
-    config->sched_ctxs[i].sched_ctx_id = STARPU_NMAX_SCHED_CTXS;
-}
-
-void _starpu_init_sched_ctx_for_worker(unsigned workerid)
-{
-  struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
-  worker->sched_ctx = (struct starpu_sched_ctx**)malloc(STARPU_NMAX_SCHED_CTXS * sizeof(struct starpu_sched_ctx*));
-  int i;
-  for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
-    worker->sched_ctx[i] = NULL;
-}
-
-static unsigned _starpu_get_first_free_sched_ctx(struct starpu_machine_config_s *config){
-  unsigned i;
-  for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
-    if(config->sched_ctxs[i].sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
-      return i;
-  return STARPU_NMAX_SCHED_CTXS;
-}
-
-static unsigned _starpu_get_first_free_sched_ctx_in_worker_list(struct starpu_worker_s *worker){
-  unsigned i;
-  for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
-    if(worker->sched_ctx[i] == NULL)
-      return i;
-  return -1;
-}
+static unsigned _starpu_get_first_available_sched_ctx_id(struct starpu_machine_config_s *config);
+static unsigned _starpu_get_first_free_sched_ctx_in_worker_list(struct starpu_worker_s *worker);
+static void _starpu_rearange_sched_ctx_workerids(struct starpu_sched_ctx *sched_ctx, int old_nworkerids_in_ctx);
+int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_ctx, int *workerids_in_ctx);
 
 unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx, 
 			     int nworkerids_in_ctx, unsigned is_initial_sched,
@@ -51,7 +17,7 @@ unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx
 
 	struct starpu_sched_ctx *sched_ctx = &config->sched_ctxs[config->topology.nsched_ctxs];
 
-	sched_ctx->sched_ctx_id = _starpu_get_first_free_sched_ctx(config);
+	sched_ctx->sched_ctx_id = _starpu_get_first_available_sched_ctx_id(config);
 
 	int nworkers = config->topology.nworkers;
 	
@@ -62,9 +28,7 @@ unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx
 	sched_ctx->is_initial_sched = is_initial_sched;
 	sched_ctx->sched_name = sched_name;
 
-	PTHREAD_COND_INIT(&sched_ctx->submitted_cond, NULL);
-	PTHREAD_MUTEX_INIT(&sched_ctx->submitted_mutex, NULL);
-	sched_ctx->nsubmitted = 0;
+	_starpu_barrier_counter_init(&sched_ctx->tasks_barrier, 0);
 
 	int j;
 	/*all the workers are in this contex*/
@@ -83,21 +47,16 @@ unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx
 		int i;
 		for(i = 0; i < nworkerids_in_ctx; i++)
 		  {
-			/*take care the user does not ask for a resource that does not exist*/
+			/* the user should not ask for a resource that does not exist */
 			STARPU_ASSERT( workerids_in_ctx[i] >= 0 &&  workerids_in_ctx[i] <= nworkers);
 		    
 			sched_ctx->workerid[i] = workerids_in_ctx[i];
-			for(j = 0; j < nworkers; j++)
-			  {
-				if(sched_ctx->workerid[i] == j)
-				  {
-					struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
-					workerarg->sched_ctx[_starpu_get_first_free_sched_ctx_in_worker_list(workerarg)] = sched_ctx;
-				  }
-			  }
+			struct starpu_worker_s *workerarg = _starpu_get_worker_struct(sched_ctx->workerid[i]);
+			workerarg->sched_ctx[_starpu_get_first_free_sched_ctx_in_worker_list(workerarg)] = sched_ctx;
 		  }
 	  }
 
+	/* initialise all sync structures bc the number of workers can modify */
 	sched_ctx->sched_mutex = (pthread_mutex_t**)malloc(STARPU_NMAXWORKERS* sizeof(pthread_mutex_t*));
 	sched_ctx->sched_cond = (pthread_cond_t**)malloc(STARPU_NMAXWORKERS *sizeof(pthread_cond_t*));
 
@@ -109,95 +68,6 @@ unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx
  	return sched_ctx->sched_ctx_id;
 }
 
-void _starpu_decrement_nblocked_ths(void)
-{
-	PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
-
-	if(--nblocked_ths == 0)
-		PTHREAD_COND_BROADCAST(&wakeup_ths_cond);
-
-	PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
-}
-
-void _starpu_increment_nblocked_ths(int nworkers)
-{
-	PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
-	if (++nblocked_ths == nworkers)
-		PTHREAD_COND_BROADCAST(&blocking_ths_cond);
-
-	PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
-}
-
-static int _starpu_wait_for_all_threads_to_block(int nworkers)
-{
-	PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
-
-	while (nblocked_ths < nworkers)
-		PTHREAD_COND_WAIT(&blocking_ths_cond, &blocking_ths_mutex);
-
-	PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
-	
-	return 0;
-}
-
-static int _starpu_wait_for_all_threads_to_wake_up(void)
-{
-	PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
-	
-	while (nblocked_ths > 0)
-		PTHREAD_COND_WAIT(&wakeup_ths_cond, &blocking_ths_mutex);
-
-	PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
-	
-	return 0;
-}
-
-static int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_ctx, int *workerids_in_ctx)
-{
-	struct starpu_machine_config_s *config = _starpu_get_machine_config();
-
-	int i;
-	int nworkers = nworkerids_in_ctx == -1 ? (int)config->topology.nworkers : nworkerids_in_ctx;
-  
-	struct starpu_worker_s *worker = NULL;
-	pthread_mutex_t *changing_ctx_mutex = NULL;
-	pthread_cond_t *changing_ctx_cond = NULL;
-	
-	int workerid = -1;
-
-	for(i = 0; i < nworkers; i++)
-	  {
-		workerid = workerids_in_ctx == NULL ? i : workerids_in_ctx[i];
-		worker = _starpu_get_worker_struct(workerid);
-
-		changing_ctx_mutex = &worker->changing_ctx_mutex;
-		changing_ctx_cond = &worker->changing_ctx_cond;
-		
-		/*if the status is CHANGING_CTX let the thread know that it must block*/
-		PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
-		worker->blocking_status = changing_ctx;
-		worker->nworkers_of_next_ctx = nworkers;
-		PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-
-		/*if we have finished changing the ctx wake up the blocked threads*/
-		if(changing_ctx == STATUS_UNKNOWN)
-		  {
-			PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
-			PTHREAD_COND_SIGNAL(changing_ctx_cond);
-			PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-		  }
-
-	  }
-	
-	/*after letting know all the concerned threads about the change                        
-	  wait for them to take into account the info*/
-	if(changing_ctx == STATUS_CHANGING_CTX)
-		_starpu_wait_for_all_threads_to_block(nworkers);
-	else
-		_starpu_wait_for_all_threads_to_wake_up();
-
-  return 0;
-}
 
 unsigned starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx, 
 			    int nworkerids_in_ctx, const char *sched_name)
@@ -224,21 +94,17 @@ static unsigned _starpu_worker_belongs_to_ctx(int workerid, struct starpu_sched_
 static void _starpu_remove_sched_ctx_from_worker(struct starpu_worker_s *workerarg, struct starpu_sched_ctx *sched_ctx)
 {
 	unsigned i;
-	unsigned to_remove = 0;
 	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
 	  {
 		if(sched_ctx != NULL && workerarg->sched_ctx[i] == sched_ctx
 		    && workerarg->status != STATUS_JOINED)
 		  {
 			workerarg->sched_ctx[i] = NULL;
-			to_remove = 1;
+			workerarg->nctxs--;
 			break;
 		  }
 	  }
 	
-	/* if the the worker had belonged to the context it would have been found in the worker's list of sched_ctxs, so it can be removed */
-	if(to_remove)
-		workerarg->nctxs--;
 	return;
 }
 
@@ -288,15 +154,19 @@ static void _starpu_add_workers_to_sched_ctx(int *new_workers, int nnew_workers,
                                 workerarg->sched_ctx[_starpu_get_first_free_sched_ctx_in_worker_list(workerarg)] = sched_ctx;
                           }
                   }
-                sched_ctx->nworkers_in_ctx = ntotal_workers;
+		//                sched_ctx->nworkers_in_ctx = ntotal_workers;
+		n_added_workers = ntotal_workers;
           }
         else
           {
                 int i;
+		printf("%d redim worker:", nnew_workers);
                 for(i = 0; i < nnew_workers; i++)
                   {
                         /*take care the user does not ask for a resource that does not exist*/
                         STARPU_ASSERT( new_workers[i] >= 0 &&  new_workers[i] <= ntotal_workers);
+			printf(" %d", new_workers[i]);
+
 			struct starpu_worker_s *workerarg = _starpu_get_worker_struct(new_workers[i]);
 			if(!_starpu_worker_belongs_to_ctx(new_workers[i], sched_ctx))
 			  {
@@ -307,7 +177,8 @@ static void _starpu_add_workers_to_sched_ctx(int *new_workers, int nnew_workers,
 			    n_added_workers++;
 			  }
                   }
-                sched_ctx->nworkers_in_ctx += n_added_workers;
+		//                sched_ctx->nworkers_in_ctx += n_added_workers;
+		printf("\n");
           }
 
         sched_ctx->sched_policy->init_sched_for_workers(sched_ctx->sched_ctx_id, n_added_workers);
@@ -342,8 +213,6 @@ void starpu_delete_sched_ctx(unsigned sched_ctx_id, unsigned inheritor_sched_ctx
 
 void _starpu_delete_all_sched_ctxs()
 {
-	struct starpu_machine_config_s *config = _starpu_get_machine_config();
-	unsigned nsched_ctxs = config->topology.nsched_ctxs;
 	unsigned i;
 
 	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
@@ -359,68 +228,6 @@ void _starpu_delete_all_sched_ctxs()
 	return;
 }
 
-int starpu_wait_for_all_tasks_of_worker(int workerid)
-{
-	if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
-		return -EDEADLK;
-
-	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
-	
-	PTHREAD_MUTEX_LOCK(&worker->submitted_mutex);
-
-	while (worker->nsubmitted > 0)
-		PTHREAD_COND_WAIT(&worker->submitted_cond, &worker->submitted_mutex);
-
-	PTHREAD_MUTEX_UNLOCK(&worker->submitted_mutex);
-	
-	return 0;
-}
-
-int starpu_wait_for_all_tasks_of_workers(int *workerids_in_ctx, int nworkerids_in_ctx){
-	int ret_val = 0;
-	
-	struct starpu_machine_config_s *config = _starpu_get_machine_config();
-	int nworkers = nworkerids_in_ctx == -1 ? (int)config->topology.nworkers : nworkerids_in_ctx;
-	
-	int workerid = -1;
-	int i, n;
-	
-	for(i = 0; i < nworkers; i++)
-	  {
-		workerid = workerids_in_ctx == NULL ? i : workerids_in_ctx[i];
-		n = starpu_wait_for_all_tasks_of_worker(workerid);
-		ret_val = (ret_val && n);
-	  }
-	
-	return ret_val;
-}
-
-void _starpu_decrement_nsubmitted_tasks_of_worker(int workerid)
-{
-	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
-	
-	PTHREAD_MUTEX_LOCK(&worker->submitted_mutex);
-
-	if (--worker->nsubmitted == 0)
-		PTHREAD_COND_BROADCAST(&worker->submitted_cond);
-
-	PTHREAD_MUTEX_UNLOCK(&worker->submitted_mutex);
-	return;
-}
-
-void _starpu_increment_nsubmitted_tasks_of_worker(int workerid)
-{
-	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
-
-	PTHREAD_MUTEX_LOCK(&worker->submitted_mutex);
-
-	worker->nsubmitted++;
-	
-	PTHREAD_MUTEX_UNLOCK(&worker->submitted_mutex);
-	return;
-}
-
-
 void starpu_add_workers_to_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ctx,
 				     unsigned sched_ctx_id)
 {
@@ -435,38 +242,6 @@ void starpu_add_workers_to_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ct
 	return;
 }
 
-static int _starpu_get_first_free_space(int *workerids, int old_nworkerids_in_ctx)
-{
-  int i;
-  for(i = 0; i < old_nworkerids_in_ctx; i++)
-    if(workerids[i] == -1)
-      return i;
-  return -1;
-}
-
-/* rearange array of workerids in order not to have {-1, -1, 5, -1, 7}
-   and have instead {5, 7, -1, -1, -1} 
-   it is easier afterwards to iterate the array
-*/
-static void _starpu_rearange_sched_ctx_workerids(struct starpu_sched_ctx *sched_ctx, int old_nworkerids_in_ctx)
-{
-	int first_free_id = -1;
-	int i;
-	for(i = 0; i < old_nworkerids_in_ctx; i++)
-	  {
-		if(sched_ctx->workerid[i] != -1)
-		  {
-			first_free_id = _starpu_get_first_free_space(sched_ctx->workerid, 
-								     old_nworkerids_in_ctx);
-			if(first_free_id != -1)
-			  {
-				sched_ctx->workerid[first_free_id] = sched_ctx->workerid[i];
-				sched_ctx->workerid[i] = -1;
-			  }
-		  }
-	  }
-}
-
 static void _starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ctx, 
 					  struct starpu_sched_ctx *sched_ctx)
 {
@@ -515,7 +290,6 @@ static void _starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nwo
 	  }
 
 	return;
-
 }
 
 void starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nworkerids_in_ctx, 
@@ -534,46 +308,229 @@ void starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nworkerids_
 		set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx);
 	  }
 	return;
+}
+
+/* unused sched_ctx have the id STARPU_NMAX_SCHED_CTXS */
+void _starpu_init_all_sched_ctx(struct starpu_machine_config_s *config)
+{
+	unsigned i;
+	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+		config->sched_ctxs[i].sched_ctx_id = STARPU_NMAX_SCHED_CTXS;
+	return;
+}
 
+/* unused sched_ctx pointers of a worker are NULL */
+void _starpu_init_sched_ctx_for_worker(unsigned workerid)
+{
+	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+	worker->sched_ctx = (struct starpu_sched_ctx**)malloc(STARPU_NMAX_SCHED_CTXS * sizeof(struct starpu_sched_ctx*));
+	unsigned i;
+	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+		worker->sched_ctx[i] = NULL;
+	return;
 }
 
-int starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id)
+/* sched_ctx aren't necessarly one next to another */
+/* for eg when we remove one its place is free */
+/* when we add  new one we reuse its place */
+static unsigned _starpu_get_first_available_sched_ctx_id(struct starpu_machine_config_s *config)
 {
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	unsigned i;
+	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+		if(config->sched_ctxs[i].sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
+			return i;
+
+	STARPU_ASSERT(0);
+	return STARPU_NMAX_SCHED_CTXS;
+}
+
+static unsigned _starpu_get_first_free_sched_ctx_in_worker_list(struct starpu_worker_s *worker)
+{
+	unsigned i;
+	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+		if(worker->sched_ctx[i] == NULL)
+			return i;
+	STARPU_ASSERT(0);
+	return STARPU_NMAX_SCHED_CTXS;
+}
+
+static int _starpu_get_first_free_worker_space(int *workerids, int old_nworkerids_in_ctx)
+{
+	int i;
+	for(i = 0; i < old_nworkerids_in_ctx; i++)
+		if(workerids[i] == -1)
+			return i;
+
+	return -1;
+}
+
+/* rearange array of workerids in order not to have {-1, -1, 5, -1, 7}
+   and have instead {5, 7, -1, -1, -1} 
+   it is easier afterwards to iterate the array
+*/
+static void _starpu_rearange_sched_ctx_workerids(struct starpu_sched_ctx *sched_ctx, int old_nworkerids_in_ctx)
+{
+	int first_free_id = -1;
+	int i;
+	for(i = 0; i < old_nworkerids_in_ctx; i++)
+	  {
+		if(sched_ctx->workerid[i] != -1)
+		  {
+			first_free_id = _starpu_get_first_free_worker_space(sched_ctx->workerid, old_nworkerids_in_ctx);
+			if(first_free_id != -1)
+			  {
+				sched_ctx->workerid[first_free_id] = sched_ctx->workerid[i];
+				sched_ctx->workerid[i] = -1;
+			  }
+		  }
+	  }
+}
+
+/* manage blocking and waking up threads when constructing/modifying contexts */
+int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_ctx, int *workerids_in_ctx)
+{
+	struct starpu_machine_config_s *config = _starpu_get_machine_config();
+
+	int i;
+	int nworkers = nworkerids_in_ctx == -1 ? (int)config->topology.nworkers : nworkerids_in_ctx;
+  
+	struct starpu_worker_s *worker = NULL;
+	pthread_mutex_t *changing_ctx_mutex = NULL;
+	pthread_cond_t *changing_ctx_cond = NULL;
 	
+	int workerid = -1;
+
+	if(changing_ctx == STATUS_CHANGING_CTX)
+	  _starpu_barrier_counter_init(&workers_barrier, nworkers);
+	
+	for(i = 0; i < nworkers; i++)
+	  {
+		workerid = workerids_in_ctx == NULL ? i : workerids_in_ctx[i];
+		worker = _starpu_get_worker_struct(workerid);
+
+		changing_ctx_mutex = &worker->changing_ctx_mutex;
+		changing_ctx_cond = &worker->changing_ctx_cond;
+		
+		/*if the status is CHANGING_CTX let the thread know that it must block*/
+		PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+		worker->blocking_status = changing_ctx;
+		worker->nworkers_of_next_ctx = nworkers;
+		PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+
+		/*if we have finished changing the ctx wake up the blocked threads*/
+		if(changing_ctx == STATUS_UNKNOWN)
+		  {
+			PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+			PTHREAD_COND_SIGNAL(changing_ctx_cond);
+			PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+		  }
+
+	  }
+	
+	/* after letting know all the concerned threads about the change
+	   wait for them to take into account the info */
+	if(changing_ctx == STATUS_CHANGING_CTX)
+		_starpu_wait_for_all_threads_to_block();
+	else
+	  {
+		_starpu_wait_for_all_threads_to_wake_up();
+		_starpu_barrier_counter_destroy(&workers_barrier);
+	  }
+
+	return 0;
+}
+
+
+int starpu_wait_for_all_tasks_of_worker(int workerid)
+{
 	if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
-	  return -EDEADLK;
+		return -EDEADLK;
+
+	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+	
+	_starpu_barrier_counter_wait_for_empty_counter(&worker->tasks_barrier);
 	
-	PTHREAD_MUTEX_LOCK(&sched_ctx->submitted_mutex);
+	return 0;
+}
+
+int starpu_wait_for_all_tasks_of_workers(int *workerids_in_ctx, int nworkerids_in_ctx){
+	int ret_val = 0;
 	
+	struct starpu_machine_config_s *config = _starpu_get_machine_config();
+	int nworkers = nworkerids_in_ctx == -1 ? (int)config->topology.nworkers : nworkerids_in_ctx;
 	
-	while (sched_ctx->nsubmitted > 0)
-	  PTHREAD_COND_WAIT(&sched_ctx->submitted_cond, &sched_ctx->submitted_mutex);
+	int workerid = -1;
+	int i, n;
 	
-	PTHREAD_MUTEX_UNLOCK(&sched_ctx->submitted_mutex);
+	for(i = 0; i < nworkers; i++)
+	  {
+		workerid = workerids_in_ctx == NULL ? i : workerids_in_ctx[i];
+		n = starpu_wait_for_all_tasks_of_worker(workerid);
+		ret_val = (ret_val && n);
+	  }
 	
-	return 0;
+	return ret_val;
 }
 
-void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx)
+void _starpu_decrement_nsubmitted_tasks_of_worker(int workerid)
 {
-	PTHREAD_MUTEX_LOCK(&sched_ctx->submitted_mutex);
-
-	if (--sched_ctx->nsubmitted == 0)
-		PTHREAD_COND_BROADCAST(&sched_ctx->submitted_cond);
+	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
 	
-	PTHREAD_MUTEX_UNLOCK(&sched_ctx->submitted_mutex);
+	_starpu_barrier_counter_decrement_until_empty_counter(&worker->tasks_barrier);
+
+	return;
 }
 
-void _starpu_increment_nsubmitted_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx)
+void _starpu_increment_nsubmitted_tasks_of_worker(int workerid)
+{
+	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+
+	_starpu_barrier_counter_increment(&worker->tasks_barrier);
+
+	return;
+}
+
+void _starpu_decrement_nblocked_ths(void)
+{
+	_starpu_barrier_counter_decrement_until_empty_counter(&workers_barrier);
+}
+
+void _starpu_increment_nblocked_ths(void)
+{
+	_starpu_barrier_counter_increment_until_full_counter(&workers_barrier);
+}
+
+int _starpu_wait_for_all_threads_to_block(void)
 {
-	PTHREAD_MUTEX_LOCK(&sched_ctx->submitted_mutex);
+	return _starpu_barrier_counter_wait_for_full_counter(&workers_barrier);
+}
+
+int _starpu_wait_for_all_threads_to_wake_up(void)
+{
+	return _starpu_barrier_counter_wait_for_empty_counter(&workers_barrier);
+}
 
-	sched_ctx->nsubmitted++;
+int starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id)
+{
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
+	
+	if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
+	  return -EDEADLK;
 	
-	PTHREAD_MUTEX_UNLOCK(&sched_ctx->submitted_mutex);
+	return _starpu_barrier_counter_wait_for_empty_counter(&sched_ctx->tasks_barrier);
+}
+
+void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx)
+{
+	_starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->tasks_barrier);
 }
 
+void _starpu_increment_nsubmitted_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx)
+{
+	_starpu_barrier_counter_increment(&sched_ctx->tasks_barrier);
+}
+
+
 int _starpu_get_index_in_ctx_of_workerid(unsigned sched_ctx_id, unsigned workerid)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);

+ 17 - 10
src/core/sched_ctx.h

@@ -3,7 +3,7 @@
  * Copyright (C) 2010  Université de Bordeaux 1
  *
  * StarPU is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
+ * it under the terms of the GNU Lesser General Public License as published by
  * the Free Software Foundation; either version 2.1 of the License, or (at
  * your option) any later version.
  *
@@ -19,6 +19,9 @@
 
 #include <starpu.h>
 #include <starpu_scheduler.h>
+#include <common/config.h>
+#include <common/barrier_counter.h>
+#include <profiling/profiling.h>
 
 struct starpu_sched_ctx {
 	/* id of the context used in user mode*/
@@ -42,14 +45,8 @@ struct starpu_sched_ctx {
 	/* we keep an initial sched which we never delete */
 	unsigned is_initial_sched; 
 
-	/* cond used for no of submitted tasks to a sched_ctx */
-	pthread_cond_t submitted_cond; 
-	
-	/* mut used for no of submitted tasks to a sched_ctx */
-	pthread_mutex_t submitted_mutex; 
-	
-	/* counter used for no of submitted tasks to a sched_ctx */
-	int nsubmitted;	 
+	/* wait for the tasks submitted to the context to be executed */
+	struct _starpu_barrier_counter_t tasks_barrier;
 
 	/* table of sched cond corresponding to each worker in this ctx */
 	pthread_cond_t **sched_cond;
@@ -58,14 +55,24 @@ struct starpu_sched_ctx {
 	pthread_mutex_t **sched_mutex;
 };
 
+struct starpu_machine_config_s;
+/* init sched_ctx_id of all contextes*/
+void _starpu_init_all_sched_ctx(struct starpu_machine_config_s *config);
+
+/* init the list of contextes of the worker */
 void _starpu_init_sched_ctx_for_worker(unsigned workerid);
 
+/* allocate all structures belonging to a context */
 unsigned _starpu_create_sched_ctx(const char *policy_name, int *workerid, int nworkerids, unsigned is_init_sched, const char *sched_name);
 
+/* delete all sched_ctx */
 void _starpu_delete_all_sched_ctxs();
 
-void _starpu_increment_nblocked_ths(int nworkers);
+/* Workers are blocked when constructing or modifying a context */
+void _starpu_increment_nblocked_ths(void);
 void _starpu_decrement_nblocked_ths(void);
+int _starpu_wait_for_all_threads_to_block(void);
+int _starpu_wait_for_all_threads_to_wake_up(void);
 
 /* Keeps track of the number of tasks currently submitted to a worker */
 void _starpu_decrement_nsubmitted_tasks_of_worker(int workerid);

+ 2 - 5
src/core/workers.c

@@ -151,10 +151,7 @@ static void _starpu_launch_drivers(struct starpu_machine_config_s *config)
 		PTHREAD_MUTEX_INIT(&workerarg->changing_ctx_mutex, NULL);
 		PTHREAD_COND_INIT(&workerarg->changing_ctx_cond, NULL);
 
-		workerarg->nsubmitted = 0;
-		PTHREAD_COND_INIT(&workerarg->submitted_cond, NULL);
-		PTHREAD_MUTEX_INIT(&workerarg->submitted_mutex, NULL);
-
+		_starpu_barrier_counter_init(&workerarg->tasks_barrier, 0);
 
 		PTHREAD_MUTEX_INIT(&workerarg->mutex, NULL);
 		PTHREAD_COND_INIT(&workerarg->ready_cond, NULL);
@@ -348,7 +345,7 @@ int starpu_init(struct starpu_conf *user_conf)
 	 * initialization */
 	config.user_conf = user_conf;
 
-	_starpu_init_sched_ctx(&config);
+	_starpu_init_all_sched_ctx(&config);
 	ret = _starpu_build_topology(&config);
 	if (ret) {
 		PTHREAD_MUTEX_LOCK(&init_mutex);

+ 2 - 5
src/core/workers.h

@@ -86,11 +86,8 @@ struct starpu_worker_s {
 	pthread_cond_t changing_ctx_cond;
 	int nworkers_of_next_ctx;
 
-	long int nsubmitted; /* submitted tasks to worker */
-	pthread_cond_t submitted_cond; /* cond for nsubmitted */
-	pthread_mutex_t submitted_mutex; /* mutex for nsubmitted */
-
-
+	struct _starpu_barrier_counter_t tasks_barrier; /* wait for the tasks submitted */
+       
 #ifdef __GLIBC__
 	cpu_set_t initial_cpu_set;
 	cpu_set_t current_cpu_set;