瀏覽代碼

Fix the trouble I created in r15580. Also add a default mode for _get_next_sched_ctx_to_pop_into()

Terry Cojean 10 年之前
父節點
當前提交
0248b236d2

+ 12 - 0
include/starpu_sched_ctx.h

@@ -116,6 +116,18 @@ int starpu_sched_ctx_get_nready_tasks(unsigned sched_ctx_id);
 
 double starpu_sched_ctx_get_nready_flops(unsigned sched_ctx_id);
 
+void starpu_sched_ctx_list_task_counters_increment(unsigned sched_ctx_id, int workerid);
+
+void starpu_sched_ctx_list_task_counters_decrement(unsigned sched_ctx_id, int workerid);
+
+void starpu_sched_ctx_list_task_counters_reset(unsigned sched_ctx_id, int workerid);
+
+void starpu_sched_ctx_list_task_counters_increment_all(struct starpu_task *task, unsigned sched_ctx_id);
+
+void starpu_sched_ctx_list_task_counters_decrement_all(struct starpu_task *task, unsigned sched_ctx_id);
+
+void starpu_sched_ctx_list_task_counters_reset_all(struct starpu_task *task, unsigned sched_ctx_id);
+
 void starpu_sched_ctx_set_priority(int *workers, int nworkers, unsigned sched_ctx_id, unsigned priority);
 
 void starpu_sched_ctx_set_priority_on_level(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority);

+ 120 - 0
src/core/sched_ctx.c

@@ -482,6 +482,7 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 	STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->waiting_tasks_mutex, NULL);
 	starpu_task_list_init(&sched_ctx->waiting_tasks);
 
+	STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->sched_ctx_list_mutex, NULL);
 
 	sched_ctx->sched_policy = policy ? (struct starpu_sched_policy*)malloc(sizeof(struct starpu_sched_policy)) : NULL;
 	sched_ctx->is_initial_sched = is_initial_sched;
@@ -840,6 +841,7 @@ static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
 
 	STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->empty_ctx_mutex);
 	STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->waiting_tasks_mutex);
+	STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->sched_ctx_list_mutex);
 	sched_ctx->id = STARPU_NMAX_SCHED_CTXS;
 #ifdef STARPU_HAVE_HWLOC
 	hwloc_bitmap_free(sched_ctx->hwloc_workers_set);
@@ -1993,6 +1995,124 @@ void starpu_sched_ctx_move_task_to_ctx(struct starpu_task *task, unsigned sched_
 	_starpu_repush_task(j);
 }
 
+void starpu_sched_ctx_list_task_counters_increment(unsigned sched_ctx_id, int workerid)
+{
+	/* Note : often we don't have any sched_mutex taken here but we
+	    should, so take it */
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	if (worker->nsched_ctxs > 1)
+	{
+		STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
+		_starpu_sched_ctx_list_push_event(worker->sched_ctx_list, sched_ctx_id);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
+	}
+}
+
+void starpu_sched_ctx_list_task_counters_decrement(unsigned sched_ctx_id, int workerid)
+{
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	if (worker->nsched_ctxs > 1)
+		_starpu_sched_ctx_list_pop_event(worker->sched_ctx_list, sched_ctx_id);
+}
+
+void starpu_sched_ctx_list_task_counters_reset(unsigned sched_ctx_id, int workerid)
+{
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	if (worker->nsched_ctxs > 1)
+		_starpu_sched_ctx_list_pop_all_event(worker->sched_ctx_list, sched_ctx_id);
+}
+
+void starpu_sched_ctx_list_task_counters_increment_all(struct starpu_task *task, unsigned sched_ctx_id)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+	unsigned worker = 0;
+	struct starpu_sched_ctx_iterator it;
+
+	workers->init_iterator_for_parallel_tasks(workers, &it, task);
+	STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->sched_ctx_list_mutex);
+	while(workers->has_next(workers, &it))
+	{
+		worker = workers->get_next(workers, &it);
+
+		starpu_sched_ctx_list_task_counters_increment(sched_ctx_id, worker);
+	}
+	STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->sched_ctx_list_mutex);
+}
+
+void starpu_sched_ctx_list_task_counters_decrement_all(struct starpu_task *task, unsigned sched_ctx_id)
+{
+	int curr_workerid = starpu_worker_get_id();
+	if(curr_workerid != -1)
+	{
+		struct _starpu_worker *worker_str = _starpu_get_worker_struct(curr_workerid);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&worker_str->sched_mutex);
+	}
+
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+	unsigned worker = 0;
+	struct starpu_sched_ctx_iterator it;
+	workers->init_iterator_for_parallel_tasks(workers, &it, task);
+	STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->sched_ctx_list_mutex);
+	while(workers->has_next(workers, &it))
+	{
+		worker = workers->get_next(workers, &it);
+
+		struct _starpu_worker *worker_str = _starpu_get_worker_struct(worker);
+		if (worker_str->nsched_ctxs > 1)
+		{
+			STARPU_PTHREAD_MUTEX_LOCK(&worker_str->sched_mutex);
+			starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, worker);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&worker_str->sched_mutex);
+		}
+	}
+	STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->sched_ctx_list_mutex);
+
+	if(curr_workerid != -1)
+	{
+		struct _starpu_worker *worker_str = _starpu_get_worker_struct(curr_workerid);
+		STARPU_PTHREAD_MUTEX_LOCK(&worker_str->sched_mutex);
+	}
+}
+
+void starpu_sched_ctx_list_task_counters_reset_all(struct starpu_task *task, unsigned sched_ctx_id)
+{
+	struct _starpu_worker *worker_str;
+	int curr_workerid = starpu_worker_get_id();
+	if(curr_workerid != -1)
+	{
+		worker_str = _starpu_get_worker_struct(curr_workerid);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&worker_str->sched_mutex);
+	}
+
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+	unsigned worker = 0;
+	struct starpu_sched_ctx_iterator it;
+	workers->init_iterator_for_parallel_tasks(workers, &it, task);
+	STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->sched_ctx_list_mutex);
+	while(workers->has_next(workers, &it))
+	{
+		worker = workers->get_next(workers, &it);
+
+		worker_str = _starpu_get_worker_struct(worker);
+		if (worker_str->nsched_ctxs > 1)
+		{
+			STARPU_PTHREAD_MUTEX_LOCK(&worker_str->sched_mutex);
+			starpu_sched_ctx_list_task_counters_reset(sched_ctx_id, worker);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&worker_str->sched_mutex);
+		}
+	}
+	STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->sched_ctx_list_mutex);
+
+	if(curr_workerid != -1)
+	{
+		worker_str = _starpu_get_worker_struct(curr_workerid);
+		STARPU_PTHREAD_MUTEX_LOCK(&worker_str->sched_mutex);
+	}
+}
+
 static unsigned _worker_sleeping_in_other_ctx(unsigned sched_ctx_id, int workerid)
 {
 	int s;

+ 3 - 0
src/core/sched_ctx.h

@@ -83,6 +83,9 @@ struct _starpu_sched_ctx
 	/* mutext protecting waiting_tasks list */
 	starpu_pthread_mutex_t waiting_tasks_mutex;
 
+	/* mutext protecting write to all worker's sched_ctx_list structure for this sched_ctx */
+	starpu_pthread_mutex_t sched_ctx_list_mutex;
+
 	/* min CPUs to execute*/
 	int min_ncpus;
 

+ 3 - 6
src/core/sched_ctx_list.c

@@ -46,6 +46,7 @@ void _starpu_sched_ctx_elt_init(struct _starpu_sched_ctx_elt *elt, unsigned sche
 {
 	elt->sched_ctx = sched_ctx;
 	elt->task_number = 0;
+	elt->last_poped = 0;
 	elt->parent = NULL;
 	elt->next = NULL;
 	elt->prev = NULL;
@@ -390,10 +391,7 @@ int _starpu_sched_ctx_list_push_event(struct _starpu_sched_ctx_list *list, unsig
 	if (elt == NULL)
 		return -1;
 
-	if (elt->task_number < 0)
-		elt->task_number = 1;
-	else
-		elt->task_number++;
+	elt->task_number++;
 
 	return 0;
 }
@@ -405,8 +403,7 @@ int _starpu_sched_ctx_list_pop_event(struct _starpu_sched_ctx_list *list, unsign
 	if (elt == NULL)
 		return -1;
 
-	if (elt->task_number > 0)
-		elt->task_number--;
+	elt->task_number--;
 
 	/** Balance circular lists **/
 	elt->parent->head = elt->next;

+ 1 - 0
src/core/sched_ctx_list.h

@@ -35,6 +35,7 @@ struct _starpu_sched_ctx_elt
 	struct _starpu_sched_ctx_list *parent;
 	unsigned sched_ctx;
 	long task_number;
+	unsigned last_poped;
 };
 
 struct _starpu_sched_ctx_list_iterator

+ 25 - 4
src/core/sched_policy.c

@@ -675,7 +675,7 @@ struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker
 {
 	struct _starpu_sched_ctx_elt *e = NULL;
 	struct _starpu_sched_ctx_list_iterator list_it;
-	unsigned first_sched_ctx = _starpu_get_initial_sched_ctx()->id;
+	int found = 0;
 
 	_starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
 	while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
@@ -684,7 +684,27 @@ struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker
 		if (e->task_number > 0)
 			return _starpu_get_sched_ctx_struct(e->sched_ctx);
 	}
-	return _starpu_get_sched_ctx_struct(STARPU_GLOBAL_SCHED_CTX);
+
+	_starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
+	while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
+	{
+		e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
+		if (e->last_poped)
+		{
+			e->last_poped = 0;
+			if (_starpu_sched_ctx_list_iterator_has_next(&list_it))
+			{
+				e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
+				found = 1;
+			}
+			break;
+		}
+	}
+	if (!found)
+		e = worker->sched_ctx_list->head;
+	e->last_poped = 1;
+
+	return _starpu_get_sched_ctx_struct(e->sched_ctx);
 }
 
 struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
@@ -727,9 +747,10 @@ pick:
 					/** Caution
 					 * If you use multiple contexts your scheduler *needs*
 					 * to update the variable task_number of the ctx list.
+					 * In order to get the best performances.
 					 * This is done using functions :
-					 *   _starpu_sched_ctx_list_pop_event(...)
-					 *   _starpu_sched_ctx_list_push_event(...)
+					 *   starpu_sched_ctx_list_task_counters_increment...(...)
+					 *   starpu_sched_ctx_list_task_counters_decrement...(...)
 					**/
 					sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
 

+ 4 - 14
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -23,7 +23,6 @@
 #include <starpu_scheduler.h>
 
 #include <common/fxt.h>
-#include <core/workers.h>
 #include <core/task.h>
 
 #include <sched_policies/fifo_queues.h>
@@ -186,7 +185,6 @@ static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 	struct starpu_task *task;
-	struct _starpu_sched_ctx_list *ctx_list;
 
 	int workerid = starpu_worker_get_id();
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
@@ -214,8 +212,7 @@ static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
 			
 		}
 
-		ctx_list = _starpu_get_worker_struct(workerid)->sched_ctx_list;
-		_starpu_sched_ctx_list_pop_event(ctx_list, sched_ctx_id);
+		starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
 
 #ifdef STARPU_VERBOSE
 		if (task->cl)
@@ -237,7 +234,6 @@ static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 	struct starpu_task *task;
-	struct _starpu_sched_ctx_list *ctx_list;
 
 	int workerid = starpu_worker_get_id();
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
@@ -267,8 +263,7 @@ static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
 
 		}
 
-		ctx_list = _starpu_get_worker_struct(workerid)->sched_ctx_list;
-		_starpu_sched_ctx_list_pop_event(ctx_list, sched_ctx_id);
+		starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
 		  
 #ifdef STARPU_VERBOSE
 		if (task->cl)
@@ -288,7 +283,6 @@ static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
 static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
 {
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	struct _starpu_sched_ctx_list *ctx_list;
 
 	struct starpu_task *new_list;
 
@@ -302,8 +296,7 @@ static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
 	new_list = _starpu_fifo_pop_every_task(fifo, workerid);
 	STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
-	ctx_list = _starpu_get_worker_struct(workerid)->sched_ctx_list;
-	_starpu_sched_ctx_list_pop_all_event(ctx_list, sched_ctx_id);
+	starpu_sched_ctx_list_task_counters_reset(sched_ctx_id, workerid);
 
 	while (new_list)
 	{
@@ -338,7 +331,6 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	/* make sure someone could execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
 	unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(best_workerid, sched_ctx_id);
-	struct _starpu_sched_ctx_list *ctx_list;
 
         if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
         {
@@ -461,8 +453,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 	}
 
-	ctx_list = _starpu_get_worker_struct(best_workerid)->sched_ctx_list;
-	_starpu_sched_ctx_list_push_event(ctx_list, sched_ctx_id);
+	starpu_sched_ctx_list_task_counters_increment(sched_ctx_id, best_workerid);
 
 	return ret;
 }
@@ -1104,7 +1095,6 @@ static void dmda_pre_exec_hook(struct starpu_task *task)
 
 static void dmda_push_task_notify(struct starpu_task *task, int workerid, int perf_workerid, unsigned sched_ctx_id)
 {
-	struct _starpu_sched_ctx_list *ctx_list;
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
 	/* Compute the expected penality */

+ 5 - 48
src/sched_policies/eager_central_policy.c

@@ -25,7 +25,6 @@
 #include <sched_policies/fifo_queues.h>
 #include <common/thread.h>
 #include <starpu_bitmap.h>
-#include <core/workers.h>
 
 struct _starpu_eager_center_policy_data
 {
@@ -118,14 +117,6 @@ static int push_task_eager_policy(struct starpu_task *task)
 #else
 			dowake[worker] = 1;
 #endif
-
-			/* If we have only one sched_ctx we don't need to use these. */
-			/* We will use the global one anyway. */
-			if (_starpu_get_nsched_ctxs() > 1)
-			{
-				ctx_list = _starpu_get_worker_struct(worker)->sched_ctx_list;
-				_starpu_sched_ctx_list_push_event(ctx_list, sched_ctx_id);
-			}
 		}
 	}
 	/* Let the task free */
@@ -144,35 +135,20 @@ static int push_task_eager_policy(struct starpu_task *task)
 	}
 #endif
 
+	starpu_sched_ctx_list_task_counters_increment_all(task, sched_ctx_id);
+
 	return 0;
 }
 
 static struct starpu_task *pop_every_task_eager_policy(unsigned sched_ctx_id)
 {
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	int workerid = starpu_worker_get_id();
-	struct _starpu_sched_ctx_list *ctx_list;
-	
+	int workerid = starpu_worker_get_id();;
 	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
 	struct starpu_task* task = _starpu_fifo_pop_every_task(data->fifo, workerid);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
-	if (_starpu_get_nsched_ctxs() > 1) {
-		unsigned worker = 0;
-		struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
-		struct starpu_sched_ctx_iterator it;
-		workers->init_iterator_for_parallel_tasks(workers, &it, task);
-		while(workers->has_next(workers, &it))
-		{
-			worker = workers->get_next(workers, &it);
-
-			unsigned nimpl;
-			if (starpu_worker_can_execute_task_first_impl(workerid, task, &nimpl)) {
-				ctx_list = _starpu_get_worker_struct(workerid)->sched_ctx_list;
-				_starpu_sched_ctx_list_pop_all_event(ctx_list, sched_ctx_id);
-			}
-		}
-	}
+	starpu_sched_ctx_list_task_counters_reset_all(task, sched_ctx_id);
 
 	return task;
 }
@@ -183,7 +159,6 @@ static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 	struct starpu_task *task = NULL;
-	struct _starpu_sched_ctx_list *ctx_list;
 
 	/* block until some event happens */
 	/* Here helgrind would shout that this is unprotected, this is just an
@@ -209,25 +184,7 @@ static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 
 	if(task)
 	{
-		/* If we have only one sched_ctx we don't need to use these. */
-		/* We will use the global one anyway. */
-		if (_starpu_get_nsched_ctxs() > 1)
-		{
-			unsigned worker = 0;
-			struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
-			struct starpu_sched_ctx_iterator it;
-			workers->init_iterator_for_parallel_tasks(workers, &it, task);
-			while(workers->has_next(workers, &it))
-			{
-				worker = workers->get_next(workers, &it);
-
-				unsigned nimpl;
-				if (starpu_worker_can_execute_task_first_impl(workerid, task, &nimpl)) {
-					ctx_list = _starpu_get_worker_struct(workerid)->sched_ctx_list;
-					_starpu_sched_ctx_list_pop_event(ctx_list, sched_ctx_id);
-				}
-			}
-		}
+		starpu_sched_ctx_list_task_counters_decrement_all(task, sched_ctx_id);
 
 		unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
 		if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)

+ 3 - 27
src/sched_policies/eager_central_priority_policy.c

@@ -26,7 +26,6 @@
 #include <starpu_bitmap.h>
 
 #include <common/fxt.h>
-#include <core/workers.h>
 
 #define DEFAULT_MIN_LEVEL	(-5)
 #define DEFAULT_MAX_LEVEL	(+5)
@@ -125,7 +124,6 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_priority_taskq *taskq = data->taskq;
-	struct _starpu_sched_ctx_list *ctx_list;
 
 	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
 	unsigned priolevel = task->priority - STARPU_MIN_PRIO;
@@ -165,11 +163,6 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 #else
 			dowake[worker] = 1;
 #endif
-			if (_starpu_get_nsched_ctxs() > 1)
-			{
-				ctx_list = _starpu_get_worker_struct(worker)->sched_ctx_list;
-				_starpu_sched_ctx_list_push_event(ctx_list, sched_ctx_id);
-			}
 		}
 	}
 	/* Let the task free */
@@ -188,6 +181,8 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 	}
 #endif
 
+	starpu_sched_ctx_list_task_counters_increment_all(task, sched_ctx_id);
+
 	return 0;
 }
 
@@ -200,7 +195,6 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 	struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 	struct _starpu_priority_taskq *taskq = data->taskq;
-	struct _starpu_sched_ctx_list *ctx_list;
 
 	/* block until some event happens */
 	/* Here helgrind would shout that this is unprotected, this is just an
@@ -293,25 +287,7 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 
 	if(chosen_task)
 	{
-		/* If we have only one sched_ctx we don't need to use these. */
-		/* We will use the global one anyway. */
-		if (_starpu_get_nsched_ctxs() > 1)
-		{
-			unsigned worker = 0;
-			struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
-			struct starpu_sched_ctx_iterator it;
-			workers->init_iterator_for_parallel_tasks(workers, &it, chosen_task);
-			while(workers->has_next(workers, &it))
-			{
-				worker = workers->get_next(workers, &it);
-
-				unsigned nimpl;
-				if (starpu_worker_can_execute_task_first_impl(workerid, chosen_task, &nimpl)) {
-					ctx_list = _starpu_get_worker_struct(workerid)->sched_ctx_list;
-					_starpu_sched_ctx_list_pop_event(ctx_list, sched_ctx_id);
-				}
-			}
-		}
+		starpu_sched_ctx_list_task_counters_decrement_all(chosen_task, sched_ctx_id);
 
                 unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
 		if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)