Pārlūkot izejas kodu

overlapping ctxs (workers execute tasks from a context for a period of
time, then they switch to another) - works only for heft for the moment

Andra Hugo 13 gadi atpakaļ
vecāks
revīzija
44d7201986

+ 8 - 0
include/starpu_scheduler.h

@@ -210,6 +210,14 @@ unsigned starpu_get_nshared_workers(unsigned sched_ctx_id, unsigned sched_ctx_id
 
 unsigned starpu_worker_belongs_to_sched_ctx(int workerid, unsigned sched_ctx_id);
 
+unsigned starpu_are_overlapping_ctxs_on_worker(int workerid);
+
+unsigned starpu_is_ctxs_turn(int workerid, unsigned sched_ctx_id);
+
+void starpu_set_turn_to_other_ctx(int workerid, unsigned sched_ctx_id);
+
+double starpu_get_max_time_worker_on_ctx(void);
+
 void starpu_stop_task_submission(unsigned sched_ctx);
 
 /* Check if the worker specified by workerid can execute the codelet. */

+ 5 - 2
include/starpu_task.h

@@ -209,6 +209,8 @@ struct starpu_task
 	int hypervisor_tag;
 	
 	double flops;
+
+	unsigned already_pushed;
 };
 
 /* It is possible to initialize statically allocated tasks with this value.
@@ -237,8 +239,9 @@ struct starpu_task
 	.magic = 42,                  			\
 	.sched_ctx = 0,					\
 	.control_task = 0,				\
-		.hypervisor_tag = 0,			\
-		.flops = 0.0				\
+	.hypervisor_tag = 0,				\
+	.flops = 0.0,					\
+		.already_pushed = 0			\
 };
 
 /*

+ 80 - 22
src/core/sched_ctx.c

@@ -23,6 +23,7 @@ static pthread_mutex_t sched_ctx_manag = PTHREAD_MUTEX_INITIALIZER;
 struct starpu_task stop_submission_task = STARPU_TASK_INITIALIZER;
 pthread_key_t sched_ctx_key;
 unsigned with_hypervisor = 0;
+double max_time_worker_on_ctx = -1.0;
 
 static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config);
 static unsigned _starpu_worker_get_first_free_sched_ctx(struct _starpu_worker *worker);
@@ -43,6 +44,7 @@ static void change_worker_sched_ctx(unsigned sched_ctx_id)
 		/* add context to worker */
 		worker->sched_ctx[worker_sched_ctx_id] = sched_ctx;
 		worker->nsched_ctxs++;	
+		worker->active_ctx = sched_ctx_id;
 	}
 	else 
 	{
@@ -51,6 +53,7 @@ static void change_worker_sched_ctx(unsigned sched_ctx_id)
 			worker->sched_ctx[worker_sched_ctx_id]->sched_policy->remove_workers(sched_ctx_id, &worker->workerid, 1);
 		worker->sched_ctx[worker_sched_ctx_id] = NULL;
 		worker->nsched_ctxs--;
+		starpu_set_turn_to_other_ctx(worker->workerid, sched_ctx_id);
 	}
 }
 
@@ -104,12 +107,10 @@ static void _starpu_update_workers(int *workerids, int nworkers, int sched_ctx_i
 	}
 }
 
-void starpu_stop_task_submission(unsigned sched_ctx)
+void starpu_stop_task_submission(unsigned sched_ctx_id)
 {
 	_starpu_exclude_task_from_dag(&stop_submission_task);
-	
 	_starpu_task_submit_internally(&stop_submission_task);
-
 }
 
 static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers, 
@@ -409,7 +410,7 @@ void starpu_set_perf_counters(unsigned sched_ctx_id, struct starpu_performance_c
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	sched_ctx->perf_counters = perf_counters;
-	return sched_ctx_id;
+	return;
 }
 #endif
 
@@ -488,6 +489,31 @@ static void _starpu_check_workers(int *workerids, int nworkers)
 	}		
 }
 
+void _starpu_fetch_tasks_from_empty_ctx_list(struct _starpu_sched_ctx *sched_ctx)
+{
+	unsigned unlocked = 0;
+	_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
+	while(!starpu_task_list_empty(&sched_ctx->empty_ctx_tasks))
+	{
+		if(unlocked)
+			_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
+		struct starpu_task *old_task = starpu_task_list_pop_back(&sched_ctx->empty_ctx_tasks);
+		unlocked = 1;
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
+		
+		if(old_task == &stop_submission_task)
+			break;
+
+		struct _starpu_job *old_j = _starpu_get_job_associated_to_task(old_task);
+		int ret = _starpu_push_task(old_j);
+		/* if we should stop poping from empty ctx tasks */
+		if(ret == -1) break;
+	}
+	if(!unlocked)
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
+	return;
+
+}
 void starpu_add_workers_to_sched_ctx(int *workers_to_add, int nworkers_to_add, unsigned sched_ctx_id)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
@@ -506,25 +532,9 @@ void starpu_add_workers_to_sched_ctx(int *workers_to_add, int nworkers_to_add, u
 	}
 
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->changing_ctx_mutex);
-
-	unsigned unlocked = 0;
-	_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
-	while(!starpu_task_list_empty(&sched_ctx->empty_ctx_tasks))
-	{
-		if(unlocked)
-			_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
-		struct starpu_task *old_task = starpu_task_list_pop_back(&sched_ctx->empty_ctx_tasks);
-		unlocked = 1;
-		_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
-
-		if(old_task == &stop_submission_task)
-			break;
-		struct _starpu_job *old_j = _starpu_get_job_associated_to_task(old_task);
-		_starpu_push_task(old_j);
-	}
-	if(!unlocked)
-		_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
 	
+	_starpu_fetch_tasks_from_empty_ctx_list(sched_ctx);
+
 	return;
 }
 
@@ -555,6 +565,10 @@ void _starpu_init_all_sched_ctxs(struct _starpu_machine_config *config)
 	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
 		config->sched_ctxs[i].id = STARPU_NMAX_SCHED_CTXS;
 
+	char* max_time_on_ctx = getenv("STARPU_MAX_TIME_ON_CTX");
+	if (max_time_on_ctx != NULL)
+		max_time_worker_on_ctx = atof(max_time_on_ctx);
+
 	return;
 }
 
@@ -854,6 +868,50 @@ unsigned starpu_worker_belongs_to_sched_ctx(int workerid, unsigned sched_ctx_id)
 	return 0;
 }
 
+unsigned starpu_are_overlapping_ctxs_on_worker(int workerid)
+{
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	return worker->nsched_ctxs > 1;
+}
+
+unsigned starpu_is_ctxs_turn(int workerid, unsigned sched_ctx_id)
+{
+	if(max_time_worker_on_ctx == -1.0) return 1;
+
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	return worker->active_ctx == sched_ctx_id;
+}
+
+void starpu_set_turn_to_other_ctx(int workerid, unsigned sched_ctx_id)
+{
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+
+	struct _starpu_sched_ctx *other_sched_ctx = NULL;
+	struct _starpu_sched_ctx *active_sched_ctx = NULL;
+	int i;
+	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+	{
+		other_sched_ctx = worker->sched_ctx[i];
+		if(other_sched_ctx != NULL && other_sched_ctx->id != STARPU_NMAX_SCHED_CTXS && 
+		   other_sched_ctx->id != 0 && other_sched_ctx->id != sched_ctx_id)
+		{
+			worker->active_ctx = other_sched_ctx->id;
+			active_sched_ctx = other_sched_ctx;
+			break;
+		}
+	}		
+
+	if(worker->active_ctx != sched_ctx_id)
+	{
+		_starpu_fetch_tasks_from_empty_ctx_list(active_sched_ctx);
+	}
+}
+
+double starpu_get_max_time_worker_on_ctx(void)
+{
+	return max_time_worker_on_ctx;	
+}
+
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
 
 void starpu_call_poped_task_cb(int workerid, unsigned sched_ctx_id, double flops)

+ 16 - 1
src/core/sched_policy.c

@@ -317,7 +317,7 @@ static int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struc
 	while(workers->has_next(workers))
 	{
 		worker = workers->get_next(workers);
-		if (starpu_worker_can_execute_task(worker, task, 0))
+		if (starpu_worker_can_execute_task(worker, task, 0) && starpu_is_ctxs_turn(worker, sched_ctx->id))
 			nworkers++;
 	}
 	
@@ -327,6 +327,7 @@ static int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struc
 }
 
 /* the generic interface that call the proper underlying implementation */
+
 int _starpu_push_task(struct _starpu_job *j)
 {
 	struct starpu_task *task = j->task;
@@ -340,12 +341,25 @@ int _starpu_push_task(struct _starpu_job *j)
 		  we consider the ctx empty */
 		nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
 		
+
 		if(nworkers == 0)
 		{
+			if(task->already_pushed)
+			{
 				_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
+				starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task);
+				_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
+				return -1;
+				
+			}
+			else
+			{
+				_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
+				task->already_pushed = 1;
 				starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
 				_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
 				return 0;
+			}
 		}
 	}
 
@@ -377,6 +391,7 @@ int _starpu_push_task(struct _starpu_job *j)
 		if(ret == -1)
 		{
 			fprintf(stderr, "repush task \n");
+			_starpu_decrement_nready_tasks();
 			ret = _starpu_push_task(j);
 		}
 	}

+ 2 - 0
src/core/task.c

@@ -90,6 +90,8 @@ void starpu_task_init(struct starpu_task *task)
 	task->hypervisor_tag = 0;
 	
 	task->flops = 0.0;
+	
+	task->already_pushed = 0;
 }
 
 /* Free all the ressources allocated for a task, without deallocating the task

+ 5 - 0
src/core/workers.h

@@ -88,6 +88,11 @@ struct _starpu_worker
 	struct starpu_task *tasks[STARPU_NMAX_SCHED_CTXS];
        
 	unsigned has_prev_init; /* had already been inited in another ctx */
+
+	/* indicated in each ctx the workers can execute tasks on,
+	 used for overlapping ctx in order to determine on which 
+	ctx the worker is allowed to pop */
+	unsigned active_ctx;
 #ifdef __GLIBC__
 	cpu_set_t initial_cpu_set;
 	cpu_set_t current_cpu_set;

+ 125 - 100
src/sched_policies/heft.c

@@ -42,6 +42,8 @@ static double exp_end[STARPU_NMAXWORKERS];   /* of the set of queued tasks */
 static double exp_len[STARPU_NMAXWORKERS];   /* of the last queued task */
 static double ntasks[STARPU_NMAXWORKERS];
 
+static double current_time[STARPU_NMAXWORKERS][STARPU_NMAX_SCHED_CTXS];
+
 typedef struct {
 	double alpha;
 	double beta;
@@ -87,6 +89,7 @@ static void heft_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nwo
 		   therefore the synchronisations mechanisms of the strategy
 		   are the global ones */
 		starpu_worker_set_sched_condition(sched_ctx_id, workerid, &workerarg->sched_mutex, &workerarg->sched_cond);
+		current_time[workerid][sched_ctx_id] = 0.0;
 	}
 }
 
@@ -98,6 +101,7 @@ static void heft_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned
 	{
 		workerid = workerids[i];
 		starpu_worker_set_sched_condition(sched_ctx_id, workerid, NULL, NULL);
+		current_time[workerid][sched_ctx_id] = 0.0;
 	}
 }
 
@@ -156,7 +160,7 @@ static void heft_pre_exec_hook(struct starpu_task *task)
 	exp_len[workerid] -= model + transfer_model;
 	exp_start[workerid] = starpu_timing_now() + model;
 	exp_end[workerid] = exp_start[workerid] + exp_len[workerid];
-	ntasks[workerid]--;
+	ntasks[workerid]--; 
 	_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 }
 
@@ -269,6 +273,17 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		starpu_prefetch_task_input_on_node(task, memory_node);
 	}
 
+	double max_time_on_ctx = starpu_get_max_time_worker_on_ctx();
+	if(max_time_on_ctx != -1.0 && starpu_are_overlapping_ctxs_on_worker(best_workerid) && starpu_is_ctxs_turn(best_workerid, sched_ctx_id))
+	{
+		current_time[best_workerid][sched_ctx_id] += predicted;
+		
+		if(current_time[best_workerid][sched_ctx_id] >= max_time_on_ctx)
+		{
+			current_time[best_workerid][sched_ctx_id] = 0.0;
+			starpu_set_turn_to_other_ctx(best_workerid, sched_ctx_id);
+		}
+	}
 
 	//_STARPU_DEBUG("Heft : pushing local task\n");
 	return starpu_push_local_task(best_workerid, task, prio);
@@ -302,87 +317,91 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 	while(workers->has_next(workers))
 	{
 		worker = workers->get_next(workers);
-		for (nimpl = 0; nimpl <STARPU_MAXIMPLEMENTATIONS; nimpl++) 
+		if(starpu_is_ctxs_turn(worker, sched_ctx_id) || sched_ctx_id == 0)
 		{
-			/* Sometimes workers didn't take the tasks as early as we expected */
-			pthread_mutex_t *sched_mutex;
-			pthread_cond_t *sched_cond;
-			starpu_worker_get_sched_condition(sched_ctx_id, worker, &sched_mutex, &sched_cond);
-			_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
-			exp_start[worker] = STARPU_MAX(exp_start[worker], starpu_timing_now());
-			exp_end[worker_ctx][nimpl] = exp_start[worker] + exp_len[worker];
-			if (exp_end[worker_ctx][nimpl] > max_exp_end)
- 				max_exp_end = exp_end[worker_ctx][nimpl];
-			_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
-			if (!starpu_worker_can_execute_task(worker, task, nimpl))
-			{
-				/* no one on that queue may execute this task */
-//				worker_ctx++;
-				continue;
-			}
-
-			enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
-			unsigned memory_node = starpu_worker_get_memory_node(worker);
-
-			if (bundle)
-			{
-				/* TODO : conversion time */
-				local_task_length[worker_ctx][nimpl] = starpu_task_bundle_expected_length(bundle, perf_arch, nimpl);
-				local_data_penalty[worker_ctx][nimpl] = starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
-				local_power[worker_ctx][nimpl] = starpu_task_bundle_expected_power(bundle, perf_arch, nimpl);
-				//_STARPU_DEBUG("Scheduler heft bundle: task length (%lf) local power (%lf) worker (%u) kernel (%u) \n", local_task_length[worker_ctx],local_power[worker_ctx],worker,nimpl);
-			}
-			else 
-			{
-				local_task_length[worker_ctx][nimpl] = starpu_task_expected_length(task, perf_arch, nimpl);
-				local_data_penalty[worker_ctx][nimpl] = starpu_task_expected_data_transfer_time(memory_node, task);
-				local_power[worker_ctx][nimpl] = starpu_task_expected_power(task, perf_arch, nimpl);
-				double conversion_time = starpu_task_expected_conversion_time(task, perf_arch, nimpl);
-				if (conversion_time > 0.0)
-					local_task_length[worker_ctx][nimpl] += conversion_time;
-				//_STARPU_DEBUG("Scheduler heft bundle: task length (%lf) local power (%lf) worker (%u) kernel (%u) \n", local_task_length[worker_ctx],local_power[worker_ctx],worker,nimpl);
-			}
-
-			double ntasks_end = ntasks[worker] / starpu_worker_get_relative_speedup(perf_arch);
 
-			if (ntasks_best == -1
-			    || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better worker */
-			    || (!calibrating && isnan(local_task_length[worker_ctx][nimpl])) /* Not calibrating but this worker is being calibrated */
-			    || (calibrating && isnan(local_task_length[worker_ctx][nimpl]) && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
-				)
+			for (nimpl = 0; nimpl <STARPU_MAXIMPLEMENTATIONS; nimpl++) 
 			{
-				ntasks_best_end = ntasks_end;
-				ntasks_best = worker;
-				nimpl_best = nimpl;
-			}
-
-			if (isnan(local_task_length[worker_ctx][nimpl]))
-				/* we are calibrating, we want to speed-up calibration time
-				 * so we privilege non-calibrated tasks (but still
-				 * greedily distribute them to avoid dumb schedules) */
-				calibrating = 1;
-
-			if (isnan(local_task_length[worker_ctx][nimpl])
-				|| _STARPU_IS_ZERO(local_task_length[worker_ctx][nimpl]))
-				/* there is no prediction available for that task
-				 * with that arch (yet or at all), so switch to a greedy strategy */
-				unknown = 1;
-
-			if (unknown)
-				continue;
-
-			exp_end[worker_ctx][nimpl] = exp_start[worker] + exp_len[worker] + local_task_length[worker_ctx][nimpl];
+				/* Sometimes workers didn't take the tasks as early as we expected */
+				pthread_mutex_t *sched_mutex;
+				pthread_cond_t *sched_cond;
+				starpu_worker_get_sched_condition(sched_ctx_id, worker, &sched_mutex, &sched_cond);
+				_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+				exp_start[worker] = STARPU_MAX(exp_start[worker], starpu_timing_now());
+				exp_end[worker_ctx][nimpl] = exp_start[worker] + exp_len[worker];
+				if (exp_end[worker_ctx][nimpl] > max_exp_end)
+					max_exp_end = exp_end[worker_ctx][nimpl];
+				_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+				if (!starpu_worker_can_execute_task(worker, task, nimpl))
+				{
+					/* no one on that queue may execute this task */
+//				worker_ctx++;
+					continue;
+				}
+				
+				enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
+				unsigned memory_node = starpu_worker_get_memory_node(worker);
+				
+				if (bundle)
+				{
+					/* TODO : conversion time */
+					local_task_length[worker_ctx][nimpl] = starpu_task_bundle_expected_length(bundle, perf_arch, nimpl);
+					local_data_penalty[worker_ctx][nimpl] = starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
+					local_power[worker_ctx][nimpl] = starpu_task_bundle_expected_power(bundle, perf_arch, nimpl);
+					//_STARPU_DEBUG("Scheduler heft bundle: task length (%lf) local power (%lf) worker (%u) kernel (%u) \n", local_task_length[worker_ctx],local_power[worker_ctx],worker,nimpl);
+				}
+				else 
+				{
+					local_task_length[worker_ctx][nimpl] = starpu_task_expected_length(task, perf_arch, nimpl);
+					local_data_penalty[worker_ctx][nimpl] = starpu_task_expected_data_transfer_time(memory_node, task);
+					local_power[worker_ctx][nimpl] = starpu_task_expected_power(task, perf_arch, nimpl);
+					double conversion_time = starpu_task_expected_conversion_time(task, perf_arch, nimpl);
+					if (conversion_time > 0.0)
+						local_task_length[worker_ctx][nimpl] += conversion_time;
+					//_STARPU_DEBUG("Scheduler heft bundle: task length (%lf) local power (%lf) worker (%u) kernel (%u) \n", local_task_length[worker_ctx],local_power[worker_ctx],worker,nimpl);
+				}
+				
+				double ntasks_end = ntasks[worker] / starpu_worker_get_relative_speedup(perf_arch);
+				
+				if (ntasks_best == -1
+				    || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better worker */
+				    || (!calibrating && isnan(local_task_length[worker_ctx][nimpl])) /* Not calibrating but this worker is being calibrated */
+				    || (calibrating && isnan(local_task_length[worker_ctx][nimpl]) && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
+					)
+				{
+					ntasks_best_end = ntasks_end;
+					ntasks_best = worker;
+					nimpl_best = nimpl;
+				}
+				
+				if (isnan(local_task_length[worker_ctx][nimpl]))
+					/* we are calibrating, we want to speed-up calibration time
+					 * so we privilege non-calibrated tasks (but still
+					 * greedily distribute them to avoid dumb schedules) */
+					calibrating = 1;
+				
+				if (isnan(local_task_length[worker_ctx][nimpl])
+				    || _STARPU_IS_ZERO(local_task_length[worker_ctx][nimpl]))
+					/* there is no prediction available for that task
+					 * with that arch (yet or at all), so switch to a greedy strategy */
+					unknown = 1;
+				
+				if (unknown)
+					continue;
+				
+				exp_end[worker_ctx][nimpl] = exp_start[worker] + exp_len[worker] + local_task_length[worker_ctx][nimpl];
 			
-			if (exp_end[worker_ctx][nimpl] < best_exp_end)
-			{
-				/* a better solution was found */
-				best_exp_end = exp_end[worker_ctx][nimpl];
-				nimpl_best = nimpl;
+				if (exp_end[worker_ctx][nimpl] < best_exp_end)
+				{
+					/* a better solution was found */
+					best_exp_end = exp_end[worker_ctx][nimpl];
+					nimpl_best = nimpl;
+				}
+				
+				if (isnan(local_power[worker_ctx][nimpl]))
+					local_power[worker_ctx][nimpl] = 0.;
+				
 			}
-
-			if (isnan(local_power[worker_ctx][nimpl]))
-				local_power[worker_ctx][nimpl] = 0.;
-
 		}
 		worker_ctx++;
 	}
@@ -500,38 +519,44 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	while(workers->has_next(workers))
 	{
 		worker = workers->get_next(workers);
-		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
+		if(starpu_is_ctxs_turn(worker, sched_ctx_id) || sched_ctx_id == 0)
 		{
-			if (!starpu_worker_can_execute_task(worker, task, nimpl))
+			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 			{
-				/* no one on that queue may execute this task */
-				//worker_ctx++;
-				continue;
-			}
-
-
-			fitness[worker_ctx][nimpl] = hd->alpha*(exp_end[worker_ctx][nimpl] - best_exp_end) 
-						+ hd->beta*(local_data_penalty[worker_ctx][nimpl])
-						+ hd->_gamma*(local_power[worker_ctx][nimpl]);
-
-			if (exp_end[worker_ctx][nimpl] > max_exp_end)
-				/* This placement will make the computation
-				 * longer, take into account the idle
+				if (!starpu_worker_can_execute_task(worker, task, nimpl))
+				{
+					/* no one on that queue may execute this task */
+					//worker_ctx++;
+					continue;
+				}
+				
+				
+				fitness[worker_ctx][nimpl] = hd->alpha*(exp_end[worker_ctx][nimpl] - best_exp_end) 
+					+ hd->beta*(local_data_penalty[worker_ctx][nimpl])
+					+ hd->_gamma*(local_power[worker_ctx][nimpl]);
+				
+				if (exp_end[worker_ctx][nimpl] > max_exp_end)
+					/* This placement will make the computation
+					 * longer, take into account the idle
 				 * consumption of other cpus */
-				fitness[worker_ctx][nimpl] += hd->_gamma * hd->idle_power * (exp_end[worker_ctx][nimpl] - max_exp_end) / 1000000.0;
+					fitness[worker_ctx][nimpl] += hd->_gamma * hd->idle_power * (exp_end[worker_ctx][nimpl] - max_exp_end) / 1000000.0;
 			
-			if (best == -1 || fitness[worker_ctx][nimpl] < best_fitness)
-			{
-				/* we found a better solution */
-				best_fitness = fitness[worker_ctx][nimpl];
-				best = worker;
-				best_in_ctx = worker_ctx;
-				selected_impl = nimpl;
+				if (best == -1 || fitness[worker_ctx][nimpl] < best_fitness)
+				{
+					/* we found a better solution */
+					best_fitness = fitness[worker_ctx][nimpl];
+					best = worker;
+					best_in_ctx = worker_ctx;
+					selected_impl = nimpl;
+				}
 			}
 		}
 		worker_ctx++;
 	}
 
+	if(best == -1)
+		return -1;
+
 	/* By now, we must have found a solution */
 	STARPU_ASSERT(best != -1);