Browse Source

When changing context block only concerned threads

Andra Hugo 14 years ago
parent
commit
951d9a611a

+ 10 - 4
include/starpu_scheduler.h

@@ -101,10 +101,10 @@ struct starpu_sched_policy_s {
 };
 };
 
 
 struct starpu_sched_ctx {
 struct starpu_sched_ctx {
-	struct starpu_sched_policy_s *sched_policy;
-	int workerid[STARPU_NMAXWORKERS];
-	int nworkers_in_ctx;
-	unsigned is_init_sched; /*we keep an init sched which we never delete*/
+	struct starpu_sched_policy_s *sched_policy; /*policy of the contex */
+	int workerid[STARPU_NMAXWORKERS]; /*list of indices of workers */
+	int nworkers_in_ctx; /*number of threads in contex */
+	unsigned is_init_sched; /*we keep an init sched which we never delete */
 };
 };
 
 
 void starpu_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx);
 void starpu_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx);
@@ -185,4 +185,10 @@ double starpu_data_expected_transfer_time(starpu_data_handle handle, unsigned me
 /* Returns expected power consumption in J */
 /* Returns expected power consumption in J */
 double starpu_task_expected_power(struct starpu_task *task, enum starpu_perf_archtype arch);
 double starpu_task_expected_power(struct starpu_task *task, enum starpu_perf_archtype arch);
 
 
+/* Waits until all the tasks of a worker, already submitted, have been executed */
+int starpu_wait_for_all_tasks_of_worker(int workerid);
+
+/* Waits until all the tasks of a bunch of workers have been executed */
+int starpu_wait_for_all_tasks_of_workers(int *workerids_in_ctx, int nworkerids_in_ctx);
+
 #endif // __STARPU_SCHEDULER_H__
 #endif // __STARPU_SCHEDULER_H__

+ 1 - 1
simple_ex/exemple.c

@@ -77,7 +77,7 @@ int main(int argc, char **argv)
 
 
   struct starpu_sched_ctx sched_ctx2;
   struct starpu_sched_ctx sched_ctx2;
   int procs2[]={3, 4, 5, 6, 7};
   int procs2[]={3, 4, 5, 6, 7};
-  starpu_create_sched_ctx(&sched_ctx2, "random", procs2, 4);
+  starpu_create_sched_ctx(&sched_ctx2, "random", procs2, 5);
 
 
   struct starpu_task *task3 = starpu_task_create();
   struct starpu_task *task3 = starpu_task_create();
   task3->cl = &cl;
   task3->cl = &cl;

+ 124 - 46
src/core/sched_policy.c

@@ -27,6 +27,7 @@
 
 
 static int use_prefetch = 0;
 static int use_prefetch = 0;
 static pthread_cond_t blocking_ths_cond = PTHREAD_COND_INITIALIZER;
 static pthread_cond_t blocking_ths_cond = PTHREAD_COND_INITIALIZER;
+static pthread_cond_t wakeup_ths_cond = PTHREAD_COND_INITIALIZER;
 static pthread_mutex_t blocking_ths_mutex = PTHREAD_MUTEX_INITIALIZER;
 static pthread_mutex_t blocking_ths_mutex = PTHREAD_MUTEX_INITIALIZER;
 static int nblocked_ths = 0;
 static int nblocked_ths = 0;
 
 
@@ -414,21 +415,12 @@ int starpu_push_local_task(int workerid, struct starpu_task *task, int back)
 	return _starpu_push_local_task(worker, task, back);
 	return _starpu_push_local_task(worker, task, back);
 }
 }
 
 
-static int starpu_get_ctx_id(struct starpu_sched_ctx *sched_ctx, struct starpu_worker_s *worker){
-	unsigned i;
-	for(i = 0; i < worker->nctxs; i++)
-		if(worker->sched_ctx[i] == sched_ctx)
-			return i;
-	return -1;
-}
-
 void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerids_in_ctx, int nworkerids_in_ctx, unsigned is_init_sched)
 void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerids_in_ctx, int nworkerids_in_ctx, unsigned is_init_sched)
 {
 {
 	sched_ctx->nworkers_in_ctx = nworkerids_in_ctx;
 	sched_ctx->nworkers_in_ctx = nworkerids_in_ctx;
 	sched_ctx->sched_policy = malloc(sizeof(struct starpu_sched_policy_s));
 	sched_ctx->sched_policy = malloc(sizeof(struct starpu_sched_policy_s));
 	sched_ctx->is_init_sched = is_init_sched;
 	sched_ctx->is_init_sched = is_init_sched;
 
 
-
 	struct starpu_machine_config_s *config = _starpu_get_machine_config();
 	struct starpu_machine_config_s *config = _starpu_get_machine_config();
 	int nworkers = config->topology.nworkers;
 	int nworkers = config->topology.nworkers;
        
        
@@ -437,7 +429,7 @@ void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *po
 	if(workerids_in_ctx == NULL){
 	if(workerids_in_ctx == NULL){
 		for(j = 0; j < nworkers; j++){
 		for(j = 0; j < nworkers; j++){
 			sched_ctx->workerid[j] = j;
 			sched_ctx->workerid[j] = j;
-			struct starpu_worker_s *workerarg = &config->workers[j];
+			struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
 			workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
 			workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
 		}
 		}
 		sched_ctx->nworkers_in_ctx = nworkers;
 		sched_ctx->nworkers_in_ctx = nworkers;
@@ -447,7 +439,7 @@ void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *po
 			sched_ctx->workerid[i] = workerids_in_ctx[i];
 			sched_ctx->workerid[i] = workerids_in_ctx[i];
 			for(j = 0; j < nworkers; j++){
 			for(j = 0; j < nworkers; j++){
 				if(sched_ctx->workerid[i] == j){
 				if(sched_ctx->workerid[i] == j){
-					struct starpu_worker_s *workerarg = &config->workers[j];
+					struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
 					workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
 					workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
 				}
 				}
 			}
 			}
@@ -459,38 +451,51 @@ void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *po
 	return;
 	return;
 }
 }
 
 
-static int _starpu_wait_for_all_threads_to_block(int nworkers)
+void _starpu_decrement_nblocked_ths(void)
 {
 {
 	PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
 	PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
 
 
-	while (nblocked_ths < nworkers)
-		PTHREAD_COND_WAIT(&blocking_ths_cond, &blocking_ths_mutex);
-       
+	if(--nblocked_ths == 0)
+		PTHREAD_COND_BROADCAST(&wakeup_ths_cond);
+
 	PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
 	PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
+}
 
 
-	return 0;
+void _starpu_increment_nblocked_ths(int nworkers)
+{
+	PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
+	if (++nblocked_ths == nworkers)
+		PTHREAD_COND_BROADCAST(&blocking_ths_cond);
+
+	PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
 }
 }
 
 
-static void _starpu_decrement_nblocked_ths(void)
+static int _starpu_wait_for_all_threads_to_block(int nworkers)
 {
 {
 	PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
 	PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
 
 
-	nblocked_ths--;
-	
+	while (nblocked_ths < nworkers)
+		PTHREAD_COND_WAIT(&blocking_ths_cond, &blocking_ths_mutex);
+       
 	PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
 	PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
+
+	return 0;
 }
 }
 
 
-void _starpu_increment_nblocked_ths(int nworkers)
+static int _starpu_wait_for_all_threads_to_wake_up(void)
 {
 {
 	PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
 	PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
 
 
-	if (++nblocked_ths == nworkers)
-		PTHREAD_COND_BROADCAST(&blocking_ths_cond);
+	while (nblocked_ths > 0)
+		PTHREAD_COND_WAIT(&wakeup_ths_cond, &blocking_ths_mutex);
 
 
 	PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
 	PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
+	
+	return 0;
 }
 }
 
 
-int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_ctx, int *workerids_in_ctx){
+static int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_ctx, int *workerids_in_ctx)
+{
 	struct starpu_machine_config_s *config = _starpu_get_machine_config();
 	struct starpu_machine_config_s *config = _starpu_get_machine_config();
 
 
 	int i;
 	int i;
@@ -502,9 +507,10 @@ int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_c
 
 
 	int workerid = -1;
 	int workerid = -1;
 	
 	
-	for(i = 0; i < nworkers; i++){
+	for(i = 0; i < nworkers; i++)
+	  {
 		workerid = workerids_in_ctx == NULL ? i : workerids_in_ctx[i];
 		workerid = workerids_in_ctx == NULL ? i : workerids_in_ctx[i];
-		worker = &config->workers[workerid];
+		worker = _starpu_get_worker_struct(workerid);
 		
 		
 		changing_ctx_mutex = &worker->changing_ctx_mutex;
 		changing_ctx_mutex = &worker->changing_ctx_mutex;
 		changing_ctx_cond = &worker->changing_ctx_cond;
 		changing_ctx_cond = &worker->changing_ctx_cond;
@@ -516,33 +522,41 @@ int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_c
 		PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 		PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 		
 		
 		/*if we have finished changing the ctx wake up the blocked threads*/
 		/*if we have finished changing the ctx wake up the blocked threads*/
-		if(changing_ctx == STATUS_UNKNOWN){
+		if(changing_ctx == STATUS_UNKNOWN)
+		  {
 			PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
 			PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
 			PTHREAD_COND_SIGNAL(changing_ctx_cond);
 			PTHREAD_COND_SIGNAL(changing_ctx_cond);
 			PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 			PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-			_starpu_decrement_nblocked_ths();
-		}
-	}
+		  }
+	  }
 
 
-	/*after letting know all the concered threads about the change
+	/*after letting know all the concerned threads about the change
 	  wait for them to take into account the info*/
 	  wait for them to take into account the info*/
 	if(changing_ctx == STATUS_CHANGING_CTX)
 	if(changing_ctx == STATUS_CHANGING_CTX)
 		_starpu_wait_for_all_threads_to_block(nworkers);
 		_starpu_wait_for_all_threads_to_block(nworkers);
+	else
+		_starpu_wait_for_all_threads_to_wake_up();
 
 
 	return 0;
 	return 0;
 }
 }
 
 
-void starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerids_in_ctx, int nworkerids_in_ctx){
-    	  if(!starpu_task_wait_for_all()){
-		/*block the workers until the contex is switched*/
+void starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerids_in_ctx, int nworkerids_in_ctx)
+{
+	/* wait for the workers concerned by the change of contex
+	 * to finish their work in the previous context */
+	if(!starpu_wait_for_all_tasks_of_workers(workerids_in_ctx, nworkerids_in_ctx))
+	  {
+		/* block the workers until the contex is switched */
 		set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
 		set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
 		_starpu_create_sched_ctx(sched_ctx, policy_name, workerids_in_ctx, nworkerids_in_ctx, 0);
 		_starpu_create_sched_ctx(sched_ctx, policy_name, workerids_in_ctx, nworkerids_in_ctx, 0);
+		/* also wait the workers to wake up before using the context */
 		set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx);
 		set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx);
 	  }
 	  }
 	  return;
 	  return;
 }
 }
 
 
-int worker_belongs_to_ctx(struct starpu_worker_s *workerarg, struct starpu_sched_ctx *sched_ctx){
+int worker_belongs_to_ctx(struct starpu_worker_s *workerarg, struct starpu_sched_ctx *sched_ctx)
+{
 	unsigned i;
 	unsigned i;
 	for(i = 0; i < workerarg->nctxs; i++)
 	for(i = 0; i < workerarg->nctxs; i++)
 		if(sched_ctx != NULL && workerarg->sched_ctx[i] == sched_ctx 
 		if(sched_ctx != NULL && workerarg->sched_ctx[i] == sched_ctx 
@@ -557,32 +571,96 @@ void starpu_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx)
 	int nworkers = config->topology.nworkers;
 	int nworkers = config->topology.nworkers;
        
        
 	int i;
 	int i;
-	for(i = 0; i < nworkers; i++){
-		struct starpu_worker_s *workerarg = &config->workers[i];
+	for(i = 0; i < nworkers; i++)
+	  {
+		struct starpu_worker_s *workerarg = _starpu_get_worker_struct(i);
 		if(worker_belongs_to_ctx(workerarg, sched_ctx))
 		if(worker_belongs_to_ctx(workerarg, sched_ctx))
 			workerarg->nctxs--;
 			workerarg->nctxs--;
-	}
+	  }
 
 
 	free(sched_ctx->sched_policy);
 	free(sched_ctx->sched_policy);
 	sched_ctx->sched_policy = NULL;
 	sched_ctx->sched_policy = NULL;
 }
 }
 
 
-void _starpu_delete_all_sched_ctxs(){
+void _starpu_delete_all_sched_ctxs()
+{
   	struct starpu_machine_config_s *config = _starpu_get_machine_config();
   	struct starpu_machine_config_s *config = _starpu_get_machine_config();
-	int nworkers = config->topology.nworkers;
+	unsigned nworkers = config->topology.nworkers;
 
 
 	unsigned i, j;
 	unsigned i, j;
 	struct starpu_sched_ctx *sched_ctx = NULL;
 	struct starpu_sched_ctx *sched_ctx = NULL;
 	struct starpu_worker_s *workerarg = NULL;
 	struct starpu_worker_s *workerarg = NULL;
-	for(i = 0; i < nworkers; i++){
-		workerarg = &config->workers[i];
-		for(j = 0; j < workerarg->nctxs; j++){
+	for(i = 0; i < nworkers; i++)
+	  {
+		workerarg = _starpu_get_worker_struct(i);
+		for(j = 0; j < workerarg->nctxs; j++)
+		  {
 			sched_ctx = workerarg->sched_ctx[j];
 			sched_ctx = workerarg->sched_ctx[j];
-			if(sched_ctx != NULL && !sched_ctx->is_init_sched){
+			if(sched_ctx != NULL && !sched_ctx->is_init_sched)
+			  {
 				free(sched_ctx->sched_policy);
 				free(sched_ctx->sched_policy);
 				sched_ctx->sched_policy = NULL;
 				sched_ctx->sched_policy = NULL;
 				workerarg->nctxs--;
 				workerarg->nctxs--;
-			}
-		}
-	}
+			  }
+		  }
+	  }
+}
+
+int starpu_wait_for_all_tasks_of_worker(int workerid)
+{
+	if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
+		return -EDEADLK;
+
+	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+
+	PTHREAD_MUTEX_LOCK(&worker->submitted_mutex);
+	
+	while (worker->nsubmitted > 0)
+		PTHREAD_COND_WAIT(&worker->submitted_cond, &worker->submitted_mutex);
+
+	PTHREAD_MUTEX_UNLOCK(&worker->submitted_mutex);
+
+	return 0;
+}
+
+int starpu_wait_for_all_tasks_of_workers(int *workerids_in_ctx, int nworkerids_in_ctx){
+	int ret_val = 0;
+
+	struct starpu_machine_config_s *config = _starpu_get_machine_config();
+	int nworkers = nworkerids_in_ctx == -1 ? config->topology.nworkers : nworkerids_in_ctx;
+
+	int workerid = -1;
+	int i, n;
+
+	for(i = 0; i < nworkers; i++)
+	  {
+		workerid = workerids_in_ctx == NULL ? i : workerids_in_ctx[i];
+		n = starpu_wait_for_all_tasks_of_worker(workerid);
+		ret_val = ret_val && n;
+	  }
+
+	return ret_val;
+}
+
+void _starpu_decrement_nsubmitted_tasks_of_worker(int workerid)
+{
+	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+
+	PTHREAD_MUTEX_LOCK(&worker->submitted_mutex);
+
+	if (--worker->nsubmitted == 0)
+		PTHREAD_COND_BROADCAST(&worker->submitted_cond);
+
+	PTHREAD_MUTEX_UNLOCK(&worker->submitted_mutex);
+}
+
+void _starpu_increment_nsubmitted_tasks_of_worker(int workerid)
+{
+	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+	
+	PTHREAD_MUTEX_LOCK(&worker->submitted_mutex);
+
+	worker->nsubmitted++;
+	
+	PTHREAD_MUTEX_UNLOCK(&worker->submitted_mutex);
 }
 }

+ 5 - 0
src/core/sched_policy.h

@@ -42,5 +42,10 @@ void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *po
 void _starpu_delete_all_sched_ctxs();
 void _starpu_delete_all_sched_ctxs();
 
 
 void _starpu_increment_nblocked_ths(int nworkers);
 void _starpu_increment_nblocked_ths(int nworkers);
+void _starpu_decrement_nblocked_ths(void);
+
+/* Keeps track of the number of tasks currently submitted to a worker */
+void _starpu_decrement_nsubmitted_tasks_of_worker(int workerid);
+void _starpu_increment_nsubmitted_tasks_of_worker(int workerid);
 
 
 #endif // __SCHED_POLICY_H__
 #endif // __SCHED_POLICY_H__

+ 3 - 2
src/core/task.c

@@ -198,8 +198,9 @@ int _starpu_submit_job(starpu_job_t j, unsigned do_not_increment_nsubmitted)
 
 
 	j->terminated = 0;
 	j->terminated = 0;
 
 
-	if (!do_not_increment_nsubmitted)
+	if (!do_not_increment_nsubmitted){
 		_starpu_increment_nsubmitted_tasks();
 		_starpu_increment_nsubmitted_tasks();
+	}
 
 
 	PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 	PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 	
 	
@@ -285,7 +286,7 @@ int starpu_task_submit_to_ctx(struct starpu_task *task, struct starpu_sched_ctx
 }
 }
 
 
 int starpu_task_submit(struct starpu_task *task){
 int starpu_task_submit(struct starpu_task *task){
-  starpu_task_submit_to_ctx(task, _starpu_get_initial_sched_ctx());
+  return starpu_task_submit_to_ctx(task, _starpu_get_initial_sched_ctx());
 }
 }
 
 
 void starpu_display_codelet_stats(struct starpu_codelet_t *cl)
 void starpu_display_codelet_stats(struct starpu_codelet_t *cl)

+ 5 - 0
src/core/workers.c

@@ -149,6 +149,11 @@ static void _starpu_launch_drivers(struct starpu_machine_config_s *config)
 		PTHREAD_MUTEX_INIT(&workerarg->changing_ctx_mutex, NULL);
 		PTHREAD_MUTEX_INIT(&workerarg->changing_ctx_mutex, NULL);
 		PTHREAD_COND_INIT(&workerarg->changing_ctx_cond, NULL);
 		PTHREAD_COND_INIT(&workerarg->changing_ctx_cond, NULL);
 
 
+		workerarg->nsubmitted = 0;
+		PTHREAD_COND_INIT(&workerarg->submitted_cond, NULL);
+		PTHREAD_MUTEX_INIT(&workerarg->submitted_mutex, NULL);
+
+
 		PTHREAD_MUTEX_INIT(&workerarg->mutex, NULL);
 		PTHREAD_MUTEX_INIT(&workerarg->mutex, NULL);
 		PTHREAD_COND_INIT(&workerarg->ready_cond, NULL);
 		PTHREAD_COND_INIT(&workerarg->ready_cond, NULL);
 
 

+ 5 - 0
src/core/workers.h

@@ -85,6 +85,11 @@ struct starpu_worker_s {
 	pthread_cond_t changing_ctx_cond;
 	pthread_cond_t changing_ctx_cond;
 	int nworkers_of_next_ctx;
 	int nworkers_of_next_ctx;
 
 
+	long int nsubmitted; /* submitted tasks to worker */
+	pthread_cond_t submitted_cond; /* cond for nsubmitted */
+	pthread_mutex_t submitted_mutex; /* mutex for nsubmitted */
+
+
 #ifdef __GLIBC__
 #ifdef __GLIBC__
 	cpu_set_t initial_cpu_set;
 	cpu_set_t initial_cpu_set;
 	cpu_set_t current_cpu_set;
 	cpu_set_t current_cpu_set;

+ 5 - 1
src/drivers/cpu/driver_cpu.c

@@ -158,9 +158,11 @@ void *_starpu_cpu_worker(void *arg)
 
 
 		/*when contex is changing block the threads belonging to it*/
 		/*when contex is changing block the threads belonging to it*/
 		PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
 		PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+
 		if(cpu_arg->status == STATUS_CHANGING_CTX){
 		if(cpu_arg->status == STATUS_CHANGING_CTX){
 			_starpu_increment_nblocked_ths(cpu_arg->nworkers_of_next_ctx);
 			_starpu_increment_nblocked_ths(cpu_arg->nworkers_of_next_ctx);
 			_starpu_block_worker(workerid, changing_ctx_cond, changing_ctx_mutex);
 			_starpu_block_worker(workerid, changing_ctx_cond, changing_ctx_mutex);
+			_starpu_decrement_nblocked_ths();
 		}
 		}
 		PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 		PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
   
   
@@ -238,8 +240,10 @@ void *_starpu_cpu_worker(void *arg)
 			}
 			}
 		}
 		}
 
 
-		if (rank == 0)
+		if (rank == 0){
 			_starpu_handle_job_termination(j, 0);
 			_starpu_handle_job_termination(j, 0);
+			_starpu_decrement_nsubmitted_tasks_of_worker(cpu_arg->workerid);
+		}
         }
         }
 
 
 	STARPU_TRACE_WORKER_DEINIT_START
 	STARPU_TRACE_WORKER_DEINIT_START

+ 24 - 4
src/sched_policies/eager_central_policy.c

@@ -57,6 +57,13 @@ static void deinitialize_eager_center_policy(struct starpu_sched_ctx *sched_ctx)
 
 
 static int push_task_eager_policy(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx)
 static int push_task_eager_policy(struct starpu_task *task, struct starpu_sched_ctx *sched_ctx)
 {
 {
+	int i;
+	int workerid;
+	for(i = 0; i < sched_ctx->nworkers_in_ctx; i++){
+		workerid = sched_ctx->workerid[i]; 
+		_starpu_increment_nsubmitted_tasks_of_worker(workerid);
+	}
+
 	return _starpu_fifo_push_task(fifo, &sched_mutex, &sched_cond, task);
 	return _starpu_fifo_push_task(fifo, &sched_mutex, &sched_cond, task);
 }
 }
 
 
@@ -67,14 +74,27 @@ static int push_prio_task_eager_policy(struct starpu_task *task, struct starpu_s
 
 
 static struct starpu_task *pop_every_task_eager_policy(void)
 static struct starpu_task *pop_every_task_eager_policy(void)
 {
 {
-        int workerid = starpu_worker_get_id();
-        struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
-	return _starpu_fifo_pop_every_task(fifo, &sched_mutex, workerid);
+	return _starpu_fifo_pop_every_task(fifo, &sched_mutex, starpu_worker_get_id());
 }
 }
 
 
 static struct starpu_task *pop_task_eager_policy(void)
 static struct starpu_task *pop_task_eager_policy(void)
 {
 {
-	return _starpu_fifo_pop_task(fifo, starpu_worker_get_id());
+        unsigned workerid = starpu_worker_get_id();
+	struct starpu_task *task =  _starpu_fifo_pop_task(fifo, workerid);
+
+	if(task)
+	  {
+		struct starpu_sched_ctx *sched_ctx = task->sched_ctx;
+
+		int i;
+		for(i = 0; i <sched_ctx->nworkers_in_ctx; i++)
+		  {
+			workerid = sched_ctx->workerid[i]; 
+			_starpu_decrement_nsubmitted_tasks_of_worker(workerid);
+		  }
+	  }
+
+	return task;
 }
 }
 
 
 struct starpu_sched_policy_s _starpu_sched_eager_policy = {
 struct starpu_sched_policy_s _starpu_sched_eager_policy = {

+ 1 - 0
src/sched_policies/random_policy.c

@@ -63,6 +63,7 @@ static int _random_push_task(struct starpu_task *task, unsigned prio, struct sta
 	}
 	}
 
 
 	/* we should now have the best worker in variable "selected" */
 	/* we should now have the best worker in variable "selected" */
+	_starpu_increment_nsubmitted_tasks_of_worker(selected);
 	return starpu_push_local_task(selected, task, prio);
 	return starpu_push_local_task(selected, task, prio);
 }
 }