Browse Source

fix syncrhonisation issue (not yet for ws, parallel_heft and random), there's a synchro issue with delete_ctx (seg fault from time to time)->to be fixed

Andra Hugo 12 years ago
parent
commit
d56457109a

+ 8 - 0
include/starpu_scheduler.h

@@ -120,6 +120,14 @@ struct starpu_sched_policy
 
 struct starpu_sched_policy **starpu_sched_get_predefined_policies();
 
+/* When there is no available task for a worker, StarPU blocks this worker on a
+condition variable. This function specifies which condition variable (and the
+associated mutex) should be used to block (and to wake up) a worker. Note that
+multiple workers may use the same condition variable. For instance, in the case
+of a scheduling strategy with a single task queue, the same condition variable
+would be used to block and wake up all workers.   */	
+	void starpu_worker_get_sched_condition(int workerid, pthread_mutex_t **sched_mutex, pthread_cond_t **sched_cond);
+
 /* Check if the worker specified by workerid can execute the codelet. */
 int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl);
 

+ 2 - 13
src/core/sched_policy.c

@@ -519,17 +519,14 @@ struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
 		_starpu_clock_gettime(&pop_start_time);
 
 pick:
-	_STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
 	/* perhaps there is some local task to be executed first */
 	task = _starpu_pop_local_task(worker);
-	_STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 
 
 	/* get tasks from the stacks of the strategy */
 	if(!task)
 	{
 		struct _starpu_sched_ctx *sched_ctx;
-		_starpu_pthread_mutex_t *sched_ctx_mutex;
 
 		int been_here[STARPU_NMAX_SCHED_CTXS];
 		int i;
@@ -545,17 +542,9 @@ pick:
 
 			if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 			{
-				sched_ctx_mutex = _starpu_get_sched_mutex(sched_ctx, worker->workerid);
-				if(sched_ctx_mutex != NULL)
-				{
-					_STARPU_PTHREAD_MUTEX_LOCK(sched_ctx_mutex);
-
-					if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
-						task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
-
-					_STARPU_PTHREAD_MUTEX_UNLOCK(sched_ctx_mutex);
 
-				}
+				if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
+					task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
 			}
 
 			if((!task && sched_ctx->pop_counter[worker->workerid] == 0 && been_here[sched_ctx->id]) || worker->nsched_ctxs == 1)

+ 6 - 0
src/core/workers.c

@@ -1239,6 +1239,12 @@ void _starpu_worker_set_status(int workerid, enum _starpu_worker_status status)
 	config.workers[workerid].status = status;
 }
 
+void starpu_worker_get_sched_condition(int workerid, _starpu_pthread_mutex_t **sched_mutex, _starpu_pthread_cond_t **sched_cond)
+{
+	*sched_cond = &config.workers[workerid].sched_cond;
+	*sched_mutex = &config.workers[workerid].sched_mutex;
+}
+
 int starpu_worker_get_nids_by_type(enum starpu_archtype type, int *workerids, int maxsize)
 {
 	unsigned nworkers = starpu_worker_get_count();

+ 1 - 1
src/core/workers.h

@@ -68,7 +68,7 @@ struct _starpu_worker
 	int combined_workerid; /* combined worker currently using this worker */
 	int current_rank; /* current rank in case the worker is used in a parallel fashion */
 	int worker_size; /* size of the worker in case we use a combined worker */
-        _starpu_pthread_cond_t ready_cond; /* indicate when the worker is ready */
+	_starpu_pthread_cond_t ready_cond; /* indicate when the worker is ready */
 	unsigned memory_node; /* which memory node is the worker associated with ? */
 	_starpu_pthread_cond_t sched_cond; /* condition variable used when the worker waits for tasks. */
 	_starpu_pthread_mutex_t sched_mutex; /* mutex protecting sched_cond */

+ 3 - 5
src/drivers/driver_common/driver_common.c

@@ -156,19 +156,15 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int wor
 {
 	struct starpu_task *task;
 
+	_STARPU_PTHREAD_MUTEX_LOCK(&args->sched_mutex);
 	task = _starpu_pop_task(args);
 
 	if (task == NULL)
 	{
-#ifdef STARPU_DEVEL
-#warning TODO: check this out after the merge (atomicity issue)
-#endif
-
 		/* Note: we need to keep the sched condition mutex all along the path
 		 * from popping a task from the scheduler to blocking. Otherwise the
 		 * driver may go block just after the scheduler got a new task to be
 		 * executed, and thus hanging. */
-		_STARPU_PTHREAD_MUTEX_LOCK(&args->sched_mutex);
 
 		if (_starpu_worker_get_status(workerid) != STATUS_SLEEPING)
 		{
@@ -185,6 +181,8 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int wor
 		return NULL;
 	}
 
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&args->sched_mutex);
+
 	if (_starpu_worker_get_status(workerid) == STATUS_SLEEPING)
 	{
 		_STARPU_TRACE_WORKER_SLEEP_END

+ 29 - 19
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -200,11 +200,11 @@ static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
 	if (task)
 	{
 		double model = task->predicted;
-
+		
 		fifo->exp_len -= model;
 		fifo->exp_start = starpu_timing_now() + model;
 		fifo->exp_end = fifo->exp_start + fifo->exp_len;
-
+			
 #ifdef STARPU_VERBOSE
 		if (task->cl)
 		{
@@ -212,7 +212,7 @@ static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
 			if (non_ready == 0)
 				dt->ready_task_cnt++;
 		}
-
+		
 		dt->total_task_cnt++;
 #endif
 	}
@@ -231,9 +231,10 @@ static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
 
 	_starpu_pthread_mutex_t *sched_mutex;
 	_starpu_pthread_cond_t *sched_cond;
-	starpu_sched_ctx_get_worker_mutex_and_cond(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
-	new_list = _starpu_fifo_pop_every_task(fifo, sched_mutex, workerid);
-
+	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
+	_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+	new_list = _starpu_fifo_pop_every_task(fifo, workerid);
+	_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 	while (new_list)
 	{
 		double model = new_list->predicted;
@@ -260,7 +261,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 
 	_starpu_pthread_mutex_t *sched_mutex;
 	_starpu_pthread_cond_t *sched_cond;
-	starpu_sched_ctx_get_worker_mutex_and_cond(sched_ctx_id, best_workerid, &sched_mutex, &sched_cond);
+	starpu_worker_get_sched_condition(best_workerid, &sched_mutex, &sched_cond);
 
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
 	starpu_call_pushed_task_cb(best_workerid, sched_ctx_id);
@@ -317,12 +318,23 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		AYU_event(AYU_ADDTASKTOQUEUE, _starpu_get_job_associated_to_task(task)->job_id, &id);
 	}
 #endif
+	int ret = 0;
 	if (prio)
-		return _starpu_fifo_push_sorted_task(dt->queue_array[best_workerid],
-			sched_mutex, sched_cond, task);
+	{
+		_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+		ret =_starpu_fifo_push_sorted_task(dt->queue_array[best_workerid], task);
+		_STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+		_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+	}
 	else
-		return _starpu_fifo_push_task(dt->queue_array[best_workerid],
-			sched_mutex, sched_cond, task);
+	{
+		_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+		ret = _starpu_fifo_push_task(dt->queue_array[best_workerid], task);
+		_STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+		_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+	}
+
+	return ret;
 }
 
 /* TODO: factorize with dmda!! */
@@ -369,7 +381,7 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 			double exp_end;
 			_starpu_pthread_mutex_t *sched_mutex;
 			_starpu_pthread_cond_t *sched_cond;
-			starpu_sched_ctx_get_worker_mutex_and_cond(sched_ctx_id, worker, &sched_mutex, &sched_cond);
+			starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
 
 			/* Sometimes workers didn't take the tasks as early as we expected */
 			_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
@@ -487,7 +499,8 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 			/* Sometimes workers didn't take the tasks as early as we expected */
 			_starpu_pthread_mutex_t *sched_mutex;
 			_starpu_pthread_cond_t *sched_cond;
-			starpu_sched_ctx_get_worker_mutex_and_cond(sched_ctx_id, worker, &sched_mutex, &sched_cond);
+			starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+
 			_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 			fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
 			_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
@@ -761,10 +774,7 @@ static void dmda_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nwo
 		/* if the worker has alreadry belonged to this context
 		   the queue and the synchronization variables have been already initialized */
 		if(dt->queue_array[workerid] ==NULL)
-		{
 			dt->queue_array[workerid] = _starpu_create_fifo();
-			starpu_sched_ctx_init_worker_mutex_and_cond(sched_ctx_id, workerid);
-		}
 	}
 }
 
@@ -779,7 +789,6 @@ static void dmda_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned
 		workerid = workerids[i];
 		_starpu_destroy_fifo(dt->queue_array[workerid]);
 		dt->queue_array[workerid] = NULL;
-		starpu_sched_ctx_deinit_worker_mutex_and_cond(sched_ctx_id, workerid);
 	}
 }
 
@@ -862,7 +871,8 @@ static void dmda_pre_exec_hook(struct starpu_task *task)
 
 	_starpu_pthread_mutex_t *sched_mutex;
 	_starpu_pthread_cond_t *sched_cond;
-	starpu_sched_ctx_get_worker_mutex_and_cond(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
+	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
+
 	/* Once the task is executing, we can update the predicted amount
 	 * of work. */
 	_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
@@ -886,7 +896,7 @@ static void dmda_push_task_notify(struct starpu_task *task, int workerid, unsign
 	double predicted_transfer = starpu_task_expected_data_transfer_time(memory_node, task);
 	_starpu_pthread_mutex_t *sched_mutex;
 	_starpu_pthread_cond_t *sched_cond;
-	starpu_sched_ctx_get_worker_mutex_and_cond(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
+	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
 
 
 	/* Update the predictions */

+ 41 - 35
src/sched_policies/eager_central_policy.c

@@ -27,33 +27,8 @@
 struct _starpu_eager_center_policy_data
 {
 	struct _starpu_fifo_taskq *fifo;
-	_starpu_pthread_mutex_t sched_mutex;
-	_starpu_pthread_cond_t sched_cond;
 };
 
-static void eager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
-{
-	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	unsigned i;
-	int workerid;
-	for (i = 0; i < nworkers; i++)
-	{
-		workerid = workerids[i];
-		starpu_sched_ctx_set_worker_mutex_and_cond(sched_ctx_id, workerid, &data->sched_mutex, &data->sched_cond);
-	}
-}
-
-static void eager_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
-{
-	unsigned i;
-	int workerid;
-	for (i = 0; i < nworkers; i++)
-	{
-		workerid = workerids[i];
-		starpu_sched_ctx_set_worker_mutex_and_cond(sched_ctx_id, workerid, NULL, NULL);
-	}
-}
-
 static void initialize_eager_center_policy(unsigned sched_ctx_id)
 {
 	starpu_sched_ctx_create_worker_collection(sched_ctx_id, WORKER_LIST);
@@ -65,9 +40,6 @@ static void initialize_eager_center_policy(unsigned sched_ctx_id)
 	/* there is only a single queue in that trivial design */
 	data->fifo =  _starpu_create_fifo();
 
-	_STARPU_PTHREAD_MUTEX_INIT(&data->sched_mutex, NULL);
-	_STARPU_PTHREAD_COND_INIT(&data->sched_cond, NULL);
-
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 }
 
@@ -80,9 +52,6 @@ static void deinitialize_eager_center_policy(unsigned sched_ctx_id)
 	/* deallocate the job queue */
 	_starpu_destroy_fifo(data->fifo);
 
-	_STARPU_PTHREAD_MUTEX_DESTROY(&data->sched_mutex);
-	_STARPU_PTHREAD_COND_DESTROY(&data->sched_cond);
-
 	starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
 
 	free(data);
@@ -104,7 +73,35 @@ static int push_task_eager_policy(struct starpu_task *task)
 		return ret_val;
 	}
 
-	ret_val = _starpu_fifo_push_task(data->fifo, &data->sched_mutex, &data->sched_cond, task);
+	unsigned worker = 0;
+	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+	if(workers->init_cursor)
+        workers->init_cursor(workers);
+
+	while(workers->has_next(workers))
+	{
+		worker = workers->get_next(workers);
+		_starpu_pthread_mutex_t *sched_mutex;
+		_starpu_pthread_cond_t *sched_cond;
+		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+		_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+	}
+		
+	ret_val = _starpu_fifo_push_task(data->fifo, task);
+
+	while(workers->has_next(workers))
+	{
+		worker = workers->get_next(workers);
+		_starpu_pthread_mutex_t *sched_mutex;
+		_starpu_pthread_cond_t *sched_cond;
+		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+		_STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+		_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+	}
+		
+	if (workers->deinit_cursor)
+		workers->deinit_cursor(workers);
+
 	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 	return ret_val;
 }
@@ -112,7 +109,16 @@ static int push_task_eager_policy(struct starpu_task *task)
 static struct starpu_task *pop_every_task_eager_policy(unsigned sched_ctx_id)
 {
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	return _starpu_fifo_pop_every_task(data->fifo, &data->sched_mutex, starpu_worker_get_id());
+    int workerid = starpu_worker_get_id();
+
+    _starpu_pthread_mutex_t *sched_mutex;
+    _starpu_pthread_cond_t *sched_cond;
+    starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
+
+	_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+	struct starpu_task* task = _starpu_fifo_pop_every_task(data->fifo, workerid);
+	_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+	return task;
 }
 
 static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
@@ -127,8 +133,8 @@ struct starpu_sched_policy _starpu_sched_eager_policy =
 {
 	.init_sched = initialize_eager_center_policy,
 	.deinit_sched = deinitialize_eager_center_policy,
-	.add_workers = eager_add_workers,
-	.remove_workers = eager_remove_workers,
+	.add_workers = NULL,
+	.remove_workers = NULL,
 	.push_task = push_task_eager_policy,
 	.pop_task = pop_task_eager_policy,
 	.pre_exec_hook = NULL,

+ 65 - 53
src/sched_policies/eager_central_priority_policy.c

@@ -45,8 +45,6 @@ struct _starpu_priority_taskq
 struct _starpu_eager_central_prio_data
 {
 	struct _starpu_priority_taskq *taskq;
-	_starpu_pthread_mutex_t sched_mutex;
-	_starpu_pthread_cond_t sched_cond;
 };
 
 /*
@@ -75,30 +73,6 @@ static void _starpu_destroy_priority_taskq(struct _starpu_priority_taskq *priori
 	free(priority_queue);
 }
 
-static void eager_priority_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
-{
-	struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-
-	unsigned i;
-	int workerid;
-	for (i = 0; i < nworkers; i++)
-	{
-		workerid = workerids[i];
-		starpu_sched_ctx_set_worker_mutex_and_cond(sched_ctx_id, workerid, &data->sched_mutex, &data->sched_cond);
-	}
-}
-
-static void eager_priority_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
-{
-	unsigned i;
-	int workerid;
-	for (i = 0; i < nworkers; i++)
-	{
-		workerid = workerids[i];
-		starpu_sched_ctx_set_worker_mutex_and_cond(sched_ctx_id, workerid, NULL, NULL);
-	}
-}
-
 static void initialize_eager_center_priority_policy(unsigned sched_ctx_id)
 {
 	starpu_sched_ctx_create_worker_collection(sched_ctx_id, WORKER_LIST);
@@ -111,9 +85,6 @@ static void initialize_eager_center_priority_policy(unsigned sched_ctx_id)
 	/* only a single queue (even though there are several internaly) */
 	data->taskq = _starpu_create_priority_taskq();
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
-
-	_STARPU_PTHREAD_MUTEX_INIT(&data->sched_mutex, NULL);
-	_STARPU_PTHREAD_COND_INIT(&data->sched_cond, NULL);
 }
 
 static void deinitialize_eager_center_priority_policy(unsigned sched_ctx_id)
@@ -124,11 +95,8 @@ static void deinitialize_eager_center_priority_policy(unsigned sched_ctx_id)
 	/* deallocate the task queue */
 	_starpu_destroy_priority_taskq(data->taskq);
 
-	_STARPU_PTHREAD_MUTEX_DESTROY(&data->sched_mutex);
-        _STARPU_PTHREAD_COND_DESTROY(&data->sched_cond);
-
 	starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
-        free(data);
+	free(data);
 }
 
 static int _starpu_priority_push_task(struct starpu_task *task)
@@ -140,21 +108,33 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 
 	/* if the context has no workers return */
 	_starpu_pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
-        unsigned nworkers;
-        int ret_val = -1;
-
-        _STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
-        nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
-        if(nworkers == 0)
-        {
-                _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-                return ret_val;
-        }
+	unsigned nworkers;
+	int ret_val = -1;
+	
+	_STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+	nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
+	if(nworkers == 0)
+	{
+		_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+		return ret_val;
+	}
 
 
 	/*if there are no tasks block */
 	/* wake people waiting for a task */
-	_STARPU_PTHREAD_MUTEX_LOCK(&data->sched_mutex);
+	unsigned worker = 0;
+    struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+    if(workers->init_cursor)
+        workers->init_cursor(workers);
+
+    while(workers->has_next(workers))
+    {
+		worker = workers->get_next(workers);
+        _starpu_pthread_mutex_t *sched_mutex;
+        _starpu_pthread_cond_t *sched_cond;
+        starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+        _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+    }
 
 	unsigned priolevel = task->priority - STARPU_MIN_PRIO;
 
@@ -162,11 +142,21 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 	taskq->ntasks[priolevel]++;
 	taskq->total_ntasks++;
 
-	_STARPU_PTHREAD_COND_SIGNAL(&data->sched_cond);
-	_STARPU_PTHREAD_MUTEX_UNLOCK(&data->sched_mutex);
-
-        _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-        return 0;
+	while(workers->has_next(workers))
+    {
+		worker = workers->get_next(workers);
+        _starpu_pthread_mutex_t *sched_mutex;
+        _starpu_pthread_cond_t *sched_cond;
+		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+        _STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+        _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+    }
+
+    if (workers->deinit_cursor)
+		workers->deinit_cursor(workers);
+
+	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+	return 0;
 }
 
 static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
@@ -186,7 +176,11 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 		return NULL;
 #else
-		_STARPU_PTHREAD_COND_WAIT(&data->sched_cond, &data->sched_mutex);
+        _starpu_pthread_mutex_t *sched_mutex;
+        _starpu_pthread_cond_t *sched_cond;
+		starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
+
+		_STARPU_PTHREAD_COND_WAIT(sched_cond, sched_mutex);
 #endif
 	}
 
@@ -221,8 +215,28 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 	}
 
 	if (!chosen_task && skipped)
+	{
 		/* Notify another worker to do that task */
-		_STARPU_PTHREAD_COND_SIGNAL(&data->sched_cond);
+		unsigned worker = 0;
+		struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+		if(workers->init_cursor)
+			workers->init_cursor(workers);
+		
+		while(workers->has_next(workers))
+		{
+			worker = workers->get_next(workers);
+			if(worker != workerid)
+			{
+				_starpu_pthread_mutex_t *sched_mutex;
+				_starpu_pthread_cond_t *sched_cond;
+				starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+				_STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+			}
+		}
+
+		if (workers->deinit_cursor)
+			workers->deinit_cursor(workers);
+	}
 
 	return chosen_task;
 }
@@ -231,8 +245,6 @@ struct starpu_sched_policy _starpu_sched_prio_policy =
 {
 	.init_sched = initialize_eager_center_priority_policy,
 	.deinit_sched = deinitialize_eager_center_priority_policy,
-        .add_workers = eager_priority_add_workers,
-        .remove_workers = eager_priority_remove_workers,
 	/* we always use priorities in that policy */
 	.push_task = _starpu_priority_push_task,
 	.pop_task = _starpu_priority_pop_task,

+ 4 - 21
src/sched_policies/fifo_queues.c

@@ -53,15 +53,10 @@ int _starpu_fifo_empty(struct _starpu_fifo_taskq *fifo)
 }
 
 int
-_starpu_fifo_push_sorted_task(struct _starpu_fifo_taskq *fifo_queue,
-			      _starpu_pthread_mutex_t *sched_mutex,
-			      _starpu_pthread_cond_t *sched_cond,
-			      struct starpu_task *task)
+_starpu_fifo_push_sorted_task(struct _starpu_fifo_taskq *fifo_queue, struct starpu_task *task)
 {
 	struct starpu_task_list *list = &fifo_queue->taskq;
 
-	_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
-
 	if (list->head == NULL)
 	{
 		list->head = task;
@@ -115,31 +110,23 @@ _starpu_fifo_push_sorted_task(struct _starpu_fifo_taskq *fifo_queue,
 	fifo_queue->ntasks++;
 	fifo_queue->nprocessed++;
 
-	_STARPU_PTHREAD_COND_SIGNAL(sched_cond);
-	_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
-
 	return 0;
 }
 
 /* TODO: revert front/back? */
-int _starpu_fifo_push_task(struct _starpu_fifo_taskq *fifo_queue, _starpu_pthread_mutex_t *sched_mutex, _starpu_pthread_cond_t *sched_cond, struct starpu_task *task)
+int _starpu_fifo_push_task(struct _starpu_fifo_taskq *fifo_queue, struct starpu_task *task)
 {
 
 	if (task->priority > 0)
 	{
-		_starpu_fifo_push_sorted_task(fifo_queue, sched_mutex,
-					      sched_cond, task);
+		_starpu_fifo_push_sorted_task(fifo_queue, task);
 	}
 	else
 	{
-		_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 		starpu_task_list_push_front(&fifo_queue->taskq, task);
 
 		fifo_queue->ntasks++;
 		fifo_queue->nprocessed++;
-
-		_STARPU_PTHREAD_COND_SIGNAL(sched_cond);
-		_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 	}
 
 	return 0;
@@ -188,7 +175,7 @@ struct starpu_task *_starpu_fifo_pop_local_task(struct _starpu_fifo_taskq *fifo_
 }
 
 /* pop every task that can be executed on the calling driver */
-struct starpu_task *_starpu_fifo_pop_every_task(struct _starpu_fifo_taskq *fifo_queue, _starpu_pthread_mutex_t *sched_mutex, int workerid)
+struct starpu_task *_starpu_fifo_pop_every_task(struct _starpu_fifo_taskq *fifo_queue, int workerid)
 {
 	struct starpu_task_list *old_list;
 	unsigned size;
@@ -196,8 +183,6 @@ struct starpu_task *_starpu_fifo_pop_every_task(struct _starpu_fifo_taskq *fifo_
 	struct starpu_task *new_list = NULL;
 	struct starpu_task *new_list_tail = NULL;
 
-	_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
-
 	size = fifo_queue->ntasks;
 
 	if (size > 0)
@@ -246,7 +231,5 @@ struct starpu_task *_starpu_fifo_pop_every_task(struct _starpu_fifo_taskq *fifo_
 		fifo_queue->ntasks -= new_list_size;
 	}
 
-	_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
-
 	return new_list;
 }

+ 3 - 6
src/sched_policies/fifo_queues.h

@@ -45,15 +45,12 @@ void _starpu_destroy_fifo(struct _starpu_fifo_taskq *fifo);
 
 int _starpu_fifo_empty(struct _starpu_fifo_taskq *fifo);
 
-int _starpu_fifo_push_sorted_task(struct _starpu_fifo_taskq *fifo_queue,
-				  _starpu_pthread_mutex_t *sched_mutex,
-				  _starpu_pthread_cond_t *sched_cond,
-				  struct starpu_task *task);
+int _starpu_fifo_push_sorted_task(struct _starpu_fifo_taskq *fifo_queue, struct starpu_task *task);
 
-int _starpu_fifo_push_task(struct _starpu_fifo_taskq *fifo, _starpu_pthread_mutex_t *sched_mutex, _starpu_pthread_cond_t *sched_cond, struct starpu_task *task);
+int _starpu_fifo_push_task(struct _starpu_fifo_taskq *fifo, struct starpu_task *task);
 
 struct starpu_task *_starpu_fifo_pop_task(struct _starpu_fifo_taskq *fifo, int workerid);
 struct starpu_task *_starpu_fifo_pop_local_task(struct _starpu_fifo_taskq *fifo);
-struct starpu_task *_starpu_fifo_pop_every_task(struct _starpu_fifo_taskq *fifo, _starpu_pthread_mutex_t *sched_mutex, int workerid);
+struct starpu_task *_starpu_fifo_pop_every_task(struct _starpu_fifo_taskq *fifo, int workerid);
 
 #endif // __FIFO_QUEUES_H__

+ 54 - 36
src/sched_policies/parallel_eager.c

@@ -27,12 +27,6 @@ struct _starpu_peager_data
 	struct _starpu_fifo_taskq *local_fifo[STARPU_NMAXWORKERS];
 
 	int master_id[STARPU_NMAXWORKERS];
-
-	_starpu_pthread_cond_t sched_cond;
-	_starpu_pthread_mutex_t sched_mutex;
-
-	_starpu_pthread_cond_t master_sched_cond[STARPU_NMAXWORKERS];
-	_starpu_pthread_mutex_t master_sched_mutex[STARPU_NMAXWORKERS];
 };
 
 /* XXX instead of 10, we should use some "MAX combination .."*/
@@ -94,31 +88,17 @@ static void peager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned n
 		}
 	}
 
-	for(i = 0; i < nworkers; i++)
-        {
-		workerid = workerids[i];
-		_STARPU_PTHREAD_MUTEX_INIT(&data->master_sched_mutex[workerid], NULL);
-		_STARPU_PTHREAD_COND_INIT(&data->master_sched_cond[workerid], NULL);
-	}
 
 	for(i = 0; i < nworkers; i++)
-        {
+	{
 		workerid = workerids[i];
-
+		
 		/* slaves pick up tasks from their local queue, their master
 		 * will put tasks directly in that local list when a parallel
 		 * tasks comes. */
 		data->local_fifo[workerid] = _starpu_create_fifo();
-
-		unsigned master = data->master_id[workerid];
-
-		/* All masters use the same condition/mutex */
-		if (master == workerid)
-			starpu_sched_ctx_set_worker_mutex_and_cond(sched_ctx_id, workerid, &data->sched_mutex, &data->sched_cond);
-		else
-			starpu_sched_ctx_set_worker_mutex_and_cond(sched_ctx_id, workerid, &data->master_sched_mutex[master], &data->master_sched_cond[master]);
 	}
-
+	
 #if 0
 	for(i = 0; i < nworkers; i++)
         {
@@ -138,9 +118,6 @@ static void peager_remove_workers(unsigned sched_ctx_id, int *workerids, unsigne
         {
 		workerid = workerids[i];
 		_starpu_destroy_fifo(data->local_fifo[workerid]);
-		starpu_sched_ctx_set_worker_mutex_and_cond(sched_ctx_id, workerid, NULL, NULL);
-		_STARPU_PTHREAD_MUTEX_DESTROY(&data->master_sched_mutex[workerid]);
-		_STARPU_PTHREAD_COND_DESTROY(&data->master_sched_cond[workerid]);
 	}
 }
 
@@ -152,9 +129,6 @@ static void initialize_peager_policy(unsigned sched_ctx_id)
 	/* masters pick tasks from that queue */
 	data->fifo = _starpu_create_fifo();
 
-	_STARPU_PTHREAD_MUTEX_INIT(&data->sched_mutex, NULL);
-	_STARPU_PTHREAD_COND_INIT(&data->sched_cond, NULL);
-
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 }
 
@@ -166,9 +140,6 @@ static void deinitialize_peager_policy(unsigned sched_ctx_id)
 	/* deallocate the job queue */
 	_starpu_destroy_fifo(data->fifo);
 
-	_STARPU_PTHREAD_MUTEX_DESTROY(&data->sched_mutex);
-	_STARPU_PTHREAD_COND_DESTROY(&data->sched_cond);
-
 	starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
 
 	free(data);
@@ -191,7 +162,46 @@ static int push_task_peager_policy(struct starpu_task *task)
 		return ret_val;
 	}
 	struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	ret_val = _starpu_fifo_push_task(data->fifo, &data->sched_mutex, &data->sched_cond, task);
+	int worker = 0;
+    struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+    if(workers->init_cursor)
+        workers->init_cursor(workers);
+
+    while(workers->has_next(workers))
+    {
+        worker = workers->get_next(workers);
+		int master = data->master_id[worker];
+		/* If this is not a CPU, then the worker simply grabs tasks from the fifo */
+		if (starpu_worker_get_type(worker) != STARPU_CPU_WORKER  || master == worker)
+		{
+			_starpu_pthread_mutex_t *sched_mutex;
+			_starpu_pthread_cond_t *sched_cond;
+			starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+			_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+		}
+    }
+
+
+	ret_val = _starpu_fifo_push_task(data->fifo, task);
+
+	while(workers->has_next(workers))
+    {
+		worker = workers->get_next(workers);
+		int master = data->master_id[worker];
+		/* If this is not a CPU, then the worker simply grabs tasks from the fifo */
+		if (starpu_worker_get_type(worker) != STARPU_CPU_WORKER  || master == worker)
+		{
+			_starpu_pthread_mutex_t *sched_mutex;
+			_starpu_pthread_cond_t *sched_cond;
+			starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+			_STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+			_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+		}
+    }
+
+    if (workers->deinit_cursor)
+        workers->deinit_cursor(workers);
+
 	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 
 	return ret_val;
@@ -273,10 +283,18 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 			{
 				struct starpu_task *alias = _starpu_create_task_alias(task);
 				int local_worker = combined_workerid[i];
+				
+				_starpu_pthread_mutex_t *sched_mutex;
+				_starpu_pthread_cond_t *sched_cond;
+				starpu_worker_get_sched_condition(local_worker, &sched_mutex, &sched_cond);
+
+				_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+
+				_starpu_fifo_push_task(data->local_fifo[local_worker], alias);
+
+				_STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+				_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
-				_starpu_fifo_push_task(data->local_fifo[local_worker],
-						       &data->master_sched_mutex[master],
-						       &data->master_sched_cond[master], alias);
 			}
 
 			/* The master also manipulated an alias */