ソースを参照

Start a major cleanup of the scheduler which intends to make it doable to write
scheduling policies outside the core of StarPU.
- Each worker is associated to a condition variable and a mutex (multiple
workers may share the same condition). The starpu_worker_set_sched_condition
makes it possible to specify which condition is associated to a worker.
- The push/pop methods do not take a queue as parameter anymore.
- There is no global lock / condition variable anymore (in addition to the
per-worker locking facilities), so that a worker waiting for a task can only
block on the condition variable that was associated to that worker.
- The implementation of the ws strategy is totally naive now (a global lock),
we need to rewrite it in a better way.

Cédric Augonnet 14 年 前
コミット
6b9ea2fa8d

+ 1 - 0
src/Makefile.am

@@ -131,6 +131,7 @@ libstarpu_la_SOURCES = 						\
 	core/dependencies/htable.c				\
 	core/dependencies/data_concurrency.c			\
 	core/mechanisms/queues.c				\
+	core/mechanisms/stack_queues.c				\
 	core/mechanisms/deque_queues.c				\
 	core/mechanisms/priority_queues.c			\
 	core/mechanisms/fifo_queues.c				\

+ 3 - 71
src/core/mechanisms/deque_queues.c

@@ -19,30 +19,12 @@
 #include <errno.h>
 #include <common/utils.h>
 
-/* keep track of the total number of jobs to be scheduled to avoid infinite 
- * polling when there are really few jobs in the overall queue */
-static unsigned total_number_of_jobs;
-
-static pthread_cond_t *sched_cond;
-static pthread_mutex_t *sched_mutex;
-
 void _starpu_init_deque_queues_mechanisms(void)
 {
-	total_number_of_jobs = 0;
-
-	struct starpu_sched_policy_s *sched = _starpu_get_sched_policy();
-
-	/* to access them more easily, we keep their address in local variables */
-	sched_cond = &sched->sched_activity_cond;
-	sched_mutex = &sched->sched_activity_mutex;
 }
 
 void _starpu_deinit_deque_queues_mechanisms(void)
 {
-	struct starpu_sched_policy_s *sched = _starpu_get_sched_policy();
-
-	PTHREAD_MUTEX_DESTROY(&sched->sched_activity_mutex);
-	PTHREAD_COND_DESTROY(&sched->sched_activity_cond);
 }
 
 struct starpu_jobq_s *_starpu_create_deque(void)
@@ -50,9 +32,6 @@ struct starpu_jobq_s *_starpu_create_deque(void)
 	struct starpu_jobq_s *jobq;
 	jobq = malloc(sizeof(struct starpu_jobq_s));
 
-	PTHREAD_MUTEX_INIT(&jobq->activity_mutex, NULL);
-	PTHREAD_COND_INIT(&jobq->activity_cond, NULL);
-
 	struct starpu_deque_jobq_s *deque;
 	deque = malloc(sizeof(struct starpu_deque_jobq_s));
 
@@ -82,11 +61,6 @@ void _starpu_destroy_deque(struct starpu_jobq_s *jobq)
 	free(jobq);
 }
 
-unsigned _starpu_get_total_njobs_deques(void)
-{
-	return total_number_of_jobs;
-}
-
 unsigned _starpu_get_deque_njobs(struct starpu_jobq_s *q)
 {
 	STARPU_ASSERT(q);
@@ -105,36 +79,6 @@ unsigned _starpu_get_deque_nprocessed(struct starpu_jobq_s *q)
 	return deque_queue->nprocessed;
 }
 
-int _starpu_deque_push_prio_task(struct starpu_jobq_s *q, starpu_job_t task)
-{
-	return _starpu_deque_push_task(q, task);
-}
-
-int _starpu_deque_push_task(struct starpu_jobq_s *q, starpu_job_t task)
-{
-	STARPU_ASSERT(q);
-	struct starpu_deque_jobq_s *deque_queue = q->queue;
-
-	/* if anyone is blocked on the entire machine, wake it up */
-	PTHREAD_MUTEX_LOCK(sched_mutex);
-	total_number_of_jobs++;
-	PTHREAD_COND_SIGNAL(sched_cond);
-	PTHREAD_MUTEX_UNLOCK(sched_mutex);
-
-	/* wake people waiting locally */
-	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
-
-	STARPU_TRACE_JOB_PUSH(task, 0);
-	starpu_job_list_push_front(deque_queue->jobq, task);
-	deque_queue->njobs++;
-	deque_queue->nprocessed++;
-
-	PTHREAD_COND_SIGNAL(&q->activity_cond);
-	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
-
-	return 0;
-}
-
 starpu_job_t _starpu_deque_pop_task(struct starpu_jobq_s *q)
 {
 	starpu_job_t j = NULL;
@@ -156,18 +100,12 @@ starpu_job_t _starpu_deque_pop_task(struct starpu_jobq_s *q)
 		deque_queue->njobs--;
 		
 		STARPU_TRACE_JOB_POP(j, 0);
-
-		/* we are sure that we got it now, so at worst, some people thought 
-		 * there remained some work and will soon discover it is not true */
-		PTHREAD_MUTEX_LOCK(sched_mutex);
-		total_number_of_jobs--;
-		PTHREAD_MUTEX_UNLOCK(sched_mutex);
 	}
 	
 	return j;
 }
 
-struct starpu_job_list_s * _starpu_deque_pop_every_task(struct starpu_jobq_s *q, uint32_t where)
+struct starpu_job_list_s * _starpu_deque_pop_every_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, uint32_t where)
 {
 	struct starpu_job_list_s *new_list, *old_list;
 
@@ -175,7 +113,7 @@ struct starpu_job_list_s * _starpu_deque_pop_every_task(struct starpu_jobq_s *q,
 	struct starpu_deque_jobq_s *deque_queue = q->queue;
 
 	/* block until some task is available in that queue */
-	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
+	PTHREAD_MUTEX_LOCK(sched_mutex);
 
 	if (deque_queue->njobs == 0)
 	{
@@ -217,16 +155,10 @@ struct starpu_job_list_s * _starpu_deque_pop_every_task(struct starpu_jobq_s *q,
 		else
 		{
 			deque_queue->njobs -= new_list_size;
-	
-			/* we are sure that we got it now, so at worst, some people thought
-			 * there remained some work and will soon discover it is not true */
-			PTHREAD_MUTEX_LOCK(sched_mutex);
-			total_number_of_jobs -= new_list_size;
-			PTHREAD_MUTEX_UNLOCK(sched_mutex);
 		}
 	}
 	
-	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
+	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
 	return new_list;
 }

+ 0 - 2
src/core/mechanisms/deque_queues.h

@@ -46,8 +46,6 @@ starpu_job_t _starpu_deque_pop_task(struct starpu_jobq_s *q);
 void _starpu_init_deque_queues_mechanisms(void);
 void _starpu_deinit_deque_queues_mechanisms(void);
 
-unsigned _starpu_get_total_njobs_deques(void);
-
 unsigned _starpu_get_deque_njobs(struct starpu_jobq_s *q);
 unsigned _starpu_get_deque_nprocessed(struct starpu_jobq_s *q);
 

+ 9 - 40
src/core/mechanisms/fifo_queues.c

@@ -19,30 +19,11 @@
 #include <errno.h>
 #include <common/utils.h>
 
-static pthread_cond_t *sched_cond;
-static pthread_mutex_t *sched_mutex;
-
-void _starpu_init_fifo_queues_mechanisms(void)
-{
-	struct starpu_sched_policy_s *sched = _starpu_get_sched_policy();
-
-	/* to access them more easily, we keep their address in local variables */
-	sched_cond = &sched->sched_activity_cond;
-	sched_mutex = &sched->sched_activity_mutex;
-}
-
-void _starpu_deinit_fifo_queues_mechanisms(void)
-{
-}
-
 struct starpu_jobq_s *_starpu_create_fifo(void)
 {
 	struct starpu_jobq_s *jobq;
 	jobq = malloc(sizeof(struct starpu_jobq_s));
 
-	PTHREAD_MUTEX_INIT(&jobq->activity_mutex, NULL);
-	PTHREAD_COND_INIT(&jobq->activity_cond, NULL);
-
 	struct starpu_fifo_jobq_s *fifo;
 	fifo = malloc(sizeof(struct starpu_fifo_jobq_s));
 
@@ -73,50 +54,38 @@ void _starpu_destroy_fifo(struct starpu_jobq_s *jobq)
 	free(jobq);
 }
 
-int _starpu_fifo_push_prio_task(struct starpu_jobq_s *q, starpu_job_t task)
+int _starpu_fifo_push_prio_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task)
 {
 	STARPU_ASSERT(q);
 	struct starpu_fifo_jobq_s *fifo_queue = q->queue;
 
-	/* if anyone is blocked on the entire machine, wake it up */
 	PTHREAD_MUTEX_LOCK(sched_mutex);
-	pthread_cond_signal(sched_cond);
-	PTHREAD_MUTEX_UNLOCK(sched_mutex);
-	
-	/* wake people waiting locally */
-	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
 
 	STARPU_TRACE_JOB_PUSH(task, 0);
 	starpu_job_list_push_back(fifo_queue->jobq, task);
 	fifo_queue->njobs++;
 	fifo_queue->nprocessed++;
 
-	pthread_cond_signal(&q->activity_cond);
-	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
+	pthread_cond_signal(sched_cond);
+	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
 	return 0;
 }
 
-int _starpu_fifo_push_task(struct starpu_jobq_s *q, starpu_job_t task)
+int _starpu_fifo_push_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task)
 {
 	STARPU_ASSERT(q);
 	struct starpu_fifo_jobq_s *fifo_queue = q->queue;
 
-	/* if anyone is blocked on the entire machine, wake it up */
 	PTHREAD_MUTEX_LOCK(sched_mutex);
-	pthread_cond_signal(sched_cond);
-	PTHREAD_MUTEX_UNLOCK(sched_mutex);
-	
-	/* wake people waiting locally */
-	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
 
 	STARPU_TRACE_JOB_PUSH(task, 0);
 	starpu_job_list_push_front(fifo_queue->jobq, task);
 	fifo_queue->njobs++;
 	fifo_queue->nprocessed++;
 
-	pthread_cond_signal(&q->activity_cond);
-	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
+	pthread_cond_signal(sched_cond);
+	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
 	return 0;
 }
@@ -146,7 +115,7 @@ starpu_job_t _starpu_fifo_pop_task(struct starpu_jobq_s *q)
 }
 
 /* pop every task that can be executed on the calling driver */
-struct starpu_job_list_s * _starpu_fifo_pop_every_task(struct starpu_jobq_s *q, uint32_t where)
+struct starpu_job_list_s * _starpu_fifo_pop_every_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, uint32_t where)
 {
 	struct starpu_job_list_s *new_list, *old_list;
 	unsigned size;
@@ -154,7 +123,7 @@ struct starpu_job_list_s * _starpu_fifo_pop_every_task(struct starpu_jobq_s *q,
 	STARPU_ASSERT(q);
 	struct starpu_fifo_jobq_s *fifo_queue = q->queue;
 
-	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
+	PTHREAD_MUTEX_LOCK(sched_mutex);
 
 	size = fifo_queue->njobs;
 
@@ -199,7 +168,7 @@ struct starpu_job_list_s * _starpu_fifo_pop_every_task(struct starpu_jobq_s *q,
 		}
 	}
 
-	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
+	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 
 	return new_list;
 }

+ 3 - 6
src/core/mechanisms/fifo_queues.h

@@ -38,13 +38,10 @@ struct starpu_fifo_jobq_s {
 struct starpu_jobq_s *_starpu_create_fifo(void);
 void _starpu_destroy_fifo(struct starpu_jobq_s *jobq);
 
-int _starpu_fifo_push_task(struct starpu_jobq_s *q, starpu_job_t task);
-int _starpu_fifo_push_prio_task(struct starpu_jobq_s *q, starpu_job_t task);
+int _starpu_fifo_push_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task);
+int _starpu_fifo_push_prio_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task);
 
 starpu_job_t _starpu_fifo_pop_task(struct starpu_jobq_s *q);
-struct starpu_job_list_s * _starpu_fifo_pop_every_task(struct starpu_jobq_s *q, uint32_t where);
-
-void _starpu_init_fifo_queues_mechanisms(void);
-void _starpu_deinit_fifo_queues_mechanisms(void);
+struct starpu_job_list_s * _starpu_fifo_pop_every_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, uint32_t where);
 
 #endif // __FIFO_QUEUES_H__

+ 0 - 87
src/core/mechanisms/priority_queues.c

@@ -23,25 +23,6 @@
  * Centralized queue with priorities 
  */
 
-
-/* keep track of the total number of jobs to be scheduled to avoid infinite 
- * polling when there are really few jobs in the overall queue */
-static pthread_cond_t *sched_cond;
-static pthread_mutex_t *sched_mutex;
-
-void _starpu_init_priority_queues_mechanisms(void)
-{
-	struct starpu_sched_policy_s *sched = _starpu_get_sched_policy();
-
-	/* to access them more easily, we keep their address in local variables */
-	sched_cond = &sched->sched_activity_cond;
-	sched_mutex = &sched->sched_activity_mutex;
-}
-
-void _starpu_deinit_priority_queues_mechanisms(void)
-{
-}
-
 struct starpu_jobq_s *_starpu_create_priority_jobq(void)
 {
 	struct starpu_jobq_s *q;
@@ -53,9 +34,6 @@ struct starpu_jobq_s *_starpu_create_priority_jobq(void)
 	central_queue = malloc(sizeof(struct starpu_priority_jobq_s));
 	q->queue = central_queue;
 
-	PTHREAD_MUTEX_INIT(&q->activity_mutex, NULL);
-	PTHREAD_COND_INIT(&q->activity_cond, NULL);
-
 	central_queue->total_njobs = 0;
 
 	unsigned prio;
@@ -82,68 +60,3 @@ void _starpu_destroy_priority_jobq(struct starpu_jobq_s *jobq)
 
 	free(jobq);
 }
-
-int _starpu_priority_push_task(struct starpu_jobq_s *q, starpu_job_t j)
-{
-	STARPU_ASSERT(q);
-	struct starpu_priority_jobq_s *queue = q->queue;
-
-	/* if anyone is blocked on the entire machine, wake it up */
-	PTHREAD_MUTEX_LOCK(sched_mutex);
-	PTHREAD_COND_SIGNAL(sched_cond);
-	PTHREAD_MUTEX_UNLOCK(sched_mutex);
-
-	/* wake people waiting locally */
-	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
-
-	STARPU_TRACE_JOB_PUSH(j, 1);
-	
-	unsigned priolevel = j->task->priority - STARPU_MIN_PRIO;
-
-	starpu_job_list_push_front(queue->jobq[priolevel], j);
-	queue->njobs[priolevel]++;
-	queue->total_njobs++;
-
-	PTHREAD_COND_SIGNAL(&q->activity_cond);
-	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
-
-	return 0;
-}
-
-starpu_job_t _starpu_priority_pop_task(struct starpu_jobq_s *q)
-{
-	starpu_job_t j = NULL;
-
-	STARPU_ASSERT(q);
-	struct starpu_priority_jobq_s *queue = q->queue;
-
-	/* block until some event happens */
-	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
-
-	if ((queue->total_njobs == 0) && _starpu_machine_is_running())
-	{
-#ifdef STARPU_NON_BLOCKING_DRIVERS
-		_starpu_datawizard_progress(q->memory_node, 1);
-#else
-		PTHREAD_COND_WAIT(&q->activity_cond, &q->activity_mutex);
-#endif
-	}
-
-	if (queue->total_njobs > 0)
-	{
-		unsigned priolevel = NPRIO_LEVELS - 1;
-		do {
-			if (queue->njobs[priolevel] > 0) {
-				/* there is some task that we can grab */
-				j = starpu_job_list_pop_back(queue->jobq[priolevel]);
-				queue->njobs[priolevel]--;
-				queue->total_njobs--;
-				STARPU_TRACE_JOB_POP(j, 0);
-			}
-		} while (!j && priolevel-- > 0);
-	}
-
-	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
-
-	return j;
-}

+ 0 - 5
src/core/mechanisms/priority_queues.h

@@ -36,10 +36,5 @@ struct starpu_jobq_s *_starpu_create_priority_jobq(void);
 void _starpu_destroy_priority_jobq(struct starpu_jobq_s *jobq);
 
 void _starpu_init_priority_queues_mechanisms(void);
-void _starpu_deinit_priority_queues_mechanisms(void);
-
-int _starpu_priority_push_task(struct starpu_jobq_s *q, starpu_job_t task);
-
-starpu_job_t _starpu_priority_pop_task(struct starpu_jobq_s *q);
 
 #endif // __PRIORITY_QUEUES_H__

+ 0 - 35
src/core/mechanisms/queues.c

@@ -59,38 +59,3 @@ void _starpu_deinit_queues(void (*deinit_queue_design)(void),
 	if (deinit_queue_design)
 		deinit_queue_design();
 }
-
-
-
-/* this may return NULL for an "anonymous thread" */
-struct starpu_jobq_s *_starpu_get_local_queue(void)
-{
-	struct starpu_sched_policy_s *policy = _starpu_get_sched_policy();
-
-	return pthread_getspecific(policy->local_queue_key);
-}
-
-/* XXX how to retrieve policy ? that may be given in the machine config ? */
-void _starpu_set_local_queue(struct starpu_jobq_s *jobq)
-{
-	struct starpu_sched_policy_s *policy = _starpu_get_sched_policy();
-
-	pthread_setspecific(policy->local_queue_key, jobq);
-}
-
-void _starpu_jobq_lock(struct starpu_jobq_s *jobq)
-{
-//	_starpu_check_mutex_deadlock(&jobq->activity_mutex);
-
-        PTHREAD_MUTEX_LOCK(&jobq->activity_mutex);
-}
-
-void _starpu_jobq_unlock(struct starpu_jobq_s *jobq)
-{
-	PTHREAD_MUTEX_UNLOCK(&jobq->activity_mutex);
-}
-
-int _starpu_jobq_trylock(struct starpu_jobq_s *jobq)
-{
-	return pthread_mutex_trylock(&jobq->activity_mutex);	
-}

+ 4 - 12
src/core/mechanisms/queues.h

@@ -26,10 +26,10 @@ struct starpu_jobq_s {
 	/* a pointer to some queue structure */
 	void *queue; 
 
-	/* in case workers are blocked on the queue, signaling on that 
-	  condition must unblock them, even if there is no available task */
-	pthread_cond_t activity_cond;
-	pthread_mutex_t activity_mutex;
+//	/* in case workers are blocked on the queue, signaling on that 
+//	  condition must unblock them, even if there is no available task */
+//	pthread_cond_t activity_cond;
+//	pthread_mutex_t activity_mutex;
 };
 
 struct starpu_machine_config_s;
@@ -41,12 +41,4 @@ void _starpu_deinit_queues(void (*deinit_queue_design)(void),
 		  void (*func_deinit_queue)(struct starpu_jobq_s *q), 
 		  struct starpu_machine_config_s *config);
 
-
-struct starpu_jobq_s *_starpu_get_local_queue(void);
-void _starpu_set_local_queue(struct starpu_jobq_s *jobq);
-
-void _starpu_jobq_lock(struct starpu_jobq_s *jobq);
-void _starpu_jobq_unlock(struct starpu_jobq_s *jobq);
-int _starpu_jobq_trylock(struct starpu_jobq_s *jobq);
-
 #endif // __QUEUES_H__

+ 11 - 35
src/core/mechanisms/stack_queues.c

@@ -23,18 +23,9 @@
  * polling when there are really few jobs in the overall queue */
 static unsigned total_number_of_jobs;
 
-static pthread_cond_t *sched_cond;
-static pthread_mutex_t *sched_mutex;
-
 void _starpu_init_stack_queues_mechanisms(void)
 {
 	total_number_of_jobs = 0;
-
-	struct starpu_sched_policy_s *sched = _starpu_get_sched_policy();
-
-	/* to access them more easily, we keep their address in local variables */
-	sched_cond = &sched->sched_activity_cond;
-	sched_mutex = &sched->sched_activity_mutex;
 }
 
 struct starpu_jobq_s *_starpu_create_stack(void)
@@ -45,9 +36,6 @@ struct starpu_jobq_s *_starpu_create_stack(void)
 	struct starpu_stack_jobq_s *stack;
 	stack = malloc(sizeof(struct starpu_stack_jobq_s));
 
-	PTHREAD_MUTEX_INIT(&jobq->activity_mutex, NULL);
-	PTHREAD_COND_INIT(&jobq->activity_cond, NULL);
-
 	/* note that not all mechanisms (eg. the semaphore) have to be used */
 	stack->jobq = starpu_job_list_new();
 	stack->njobs = 0;
@@ -85,53 +73,41 @@ unsigned _starpu_get_stack_nprocessed(struct starpu_jobq_s *q)
 	return stack_queue->nprocessed;
 }
 
-void _starpu_stack_push_prio_task(struct starpu_jobq_s *q, starpu_job_t task)
+void _starpu_stack_push_prio_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task)
 {
 	STARPU_ASSERT(q);
 	struct starpu_stack_jobq_s *stack_queue = q->queue;
 
-	/* if anyone is blocked on the entire machine, wake it up */
 	PTHREAD_MUTEX_LOCK(sched_mutex);
 	total_number_of_jobs++;
-	PTHREAD_COND_SIGNAL(sched_cond);
-	PTHREAD_MUTEX_UNLOCK(sched_mutex);
-
-	/* wake people waiting locally */
-	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
 
 	STARPU_TRACE_JOB_PUSH(task, 0);
 	starpu_job_list_push_back(stack_queue->jobq, task);
-	deque_queue->njobs++;
-	deque_queue->nprocessed++;
+	stack_queue->njobs++;
+	stack_queue->nprocessed++;
 
-	PTHREAD_COND_SIGNAL(&q->activity_cond);
-	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
+	PTHREAD_COND_SIGNAL(sched_cond);
+	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 }
 
-void _starpu_stack_push_task(struct starpu_jobq_s *q, starpu_job_t task)
+void _starpu_stack_push_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task)
 {
 	STARPU_ASSERT(q);
 	struct starpu_stack_jobq_s *stack_queue = q->queue;
 
-	/* if anyone is blocked on the entire machine, wake it up */
 	PTHREAD_MUTEX_LOCK(sched_mutex);
 	total_number_of_jobs++;
-	PTHREAD_COND_SIGNAL(sched_cond);
-	PTHREAD_MUTEX_UNLOCK(sched_mutex);
-
-	/* wake people waiting locally */
-	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
 
 	STARPU_TRACE_JOB_PUSH(task, 0);
 	starpu_job_list_push_front(stack_queue->jobq, task);
-	deque_queue->njobs++;
-	deque_queue->nprocessed++;
+	stack_queue->njobs++;
+	stack_queue->nprocessed++;
 
-	PTHREAD_COND_SIGNAL(&q->activity_cond);
-	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
+	PTHREAD_COND_SIGNAL(sched_cond);
+	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 }
 
-starpu_job_t _starpu_stack_pop_task(struct starpu_jobq_s *q)
+starpu_job_t _starpu_stack_pop_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex)
 {
 	starpu_job_t j = NULL;
 

+ 3 - 4
src/core/mechanisms/stack_queues.h

@@ -37,11 +37,10 @@ struct starpu_stack_jobq_s {
 
 struct starpu_jobq_s *_starpu_create_stack(void);
 
-void _starpu_stack_push_task(struct starpu_jobq_s *q, starpu_job_t task);
+void _starpu_stack_push_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task);
+void _starpu_stack_push_prio_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task);
 
-void _starpu_stack_push_prio_task(struct starpu_jobq_s *q, starpu_job_t task);
-
-starpu_job_t _starpu_stack_pop_task(struct starpu_jobq_s *q);
+starpu_job_t _starpu_stack_pop_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex);
 
 void _starpu_init_stack_queues_mechanisms(void);
 

+ 31 - 28
src/core/policies/deque_modeling_policy.c

@@ -21,10 +21,17 @@
 static unsigned nworkers;
 static struct starpu_jobq_s *queue_array[STARPU_NMAXWORKERS];
 
-static starpu_job_t dm_pop_task(struct starpu_jobq_s *q)
+static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
+static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
+
+static starpu_job_t dm_pop_task(void)
 {
 	struct starpu_job_s *j;
 
+	int workerid = starpu_worker_get_id();
+
+	struct starpu_jobq_s *q = queue_array[workerid];
+
 	j = _starpu_fifo_pop_task(q);
 	if (j) {
 		struct starpu_fifo_jobq_s *fifo = q->queue;
@@ -38,11 +45,15 @@ static starpu_job_t dm_pop_task(struct starpu_jobq_s *q)
 	return j;
 }
 
-static struct starpu_job_list_s *dm_pop_every_task(struct starpu_jobq_s *q, uint32_t where)
+static struct starpu_job_list_s *dm_pop_every_task(uint32_t where)
 {
 	struct starpu_job_list_s *new_list;
 
-	new_list = _starpu_fifo_pop_every_task(q, where);
+	int workerid = starpu_worker_get_id();
+
+	struct starpu_jobq_s *q = queue_array[workerid];
+
+	new_list = _starpu_fifo_pop_every_task(queue_array[workerid], &sched_mutex[workerid], where);
 	if (new_list) {
 		starpu_job_itor_t i;
 		for(i = starpu_job_list_begin(new_list);
@@ -63,7 +74,7 @@ static struct starpu_job_list_s *dm_pop_every_task(struct starpu_jobq_s *q, uint
 
 
 
-static int _dm_push_task(struct starpu_jobq_s *q __attribute__ ((unused)), starpu_job_t j, unsigned prio)
+static int _dm_push_task(starpu_job_t j, unsigned prio)
 {
 	/* find the queue */
 	struct starpu_fifo_jobq_s *fifo;
@@ -135,23 +146,23 @@ static int _dm_push_task(struct starpu_jobq_s *q __attribute__ ((unused)), starp
 		_starpu_prefetch_task_input_on_node(task, memory_node);
 
 	if (prio) {
-		return _starpu_fifo_push_prio_task(queue_array[best], j);
+		return _starpu_fifo_push_prio_task(queue_array[best], &sched_mutex[best], &sched_cond[best], j);
 	} else {
-		return _starpu_fifo_push_task(queue_array[best], j);
+		return _starpu_fifo_push_task(queue_array[best], &sched_mutex[best], &sched_cond[best], j);
 	}
 }
 
-static int dm_push_prio_task(struct starpu_jobq_s *q, starpu_job_t j)
+static int dm_push_prio_task(starpu_job_t j)
 {
-	return _dm_push_task(q, j, 1);
+	return _dm_push_task(j, 1);
 }
 
-static int dm_push_task(struct starpu_jobq_s *q, starpu_job_t j)
+static int dm_push_task(starpu_job_t j)
 {
 	if (j->task->priority == STARPU_MAX_PRIO)
-		return _dm_push_task(q, j, 1);
+		return _dm_push_task(j, 1);
 
-	return _dm_push_task(q, j, 0);
+	return _dm_push_task(j, 0);
 }
 
 static struct starpu_jobq_s *init_dm_fifo(void)
@@ -160,7 +171,14 @@ static struct starpu_jobq_s *init_dm_fifo(void)
 
 	q = _starpu_create_fifo();
 
-	queue_array[nworkers++] = q;
+	int workerid = nworkers++;
+
+	queue_array[workerid] = q;
+
+	PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
+	PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
+
+	starpu_worker_set_sched_condition(workerid, &sched_cond[workerid], &sched_mutex[workerid]);
 
 	return q;
 }
@@ -170,7 +188,7 @@ static void initialize_dm_policy(struct starpu_machine_config_s *config,
 {
 	nworkers = 0;
 
-	_starpu_setup_queues(_starpu_init_fifo_queues_mechanisms, init_dm_fifo, config);
+	_starpu_setup_queues(NULL, init_dm_fifo, config);
 }
 
 static void deinitialize_dm_policy(struct starpu_machine_config_s *config, 
@@ -179,24 +197,9 @@ static void deinitialize_dm_policy(struct starpu_machine_config_s *config,
 	_starpu_deinit_queues(NULL, _starpu_destroy_fifo, config);
 }
 
-static struct starpu_jobq_s *get_local_queue_dm(struct starpu_sched_policy_s *policy __attribute__ ((unused)))
-{
-	struct starpu_jobq_s *queue;
-	queue = pthread_getspecific(policy->local_queue_key);
-
-	if (!queue)
-	{
-		/* take one randomly as this *must* be for a push anyway XXX */
-		queue = queue_array[0];
-	}
-
-	return queue;
-}
-
 struct starpu_sched_policy_s _starpu_sched_dm_policy = {
 	.init_sched = initialize_dm_policy,
 	.deinit_sched = deinitialize_dm_policy,
-	.get_local_queue = get_local_queue_dm,
 	.push_task = dm_push_task, 
 	.push_prio_task = dm_push_prio_task,
 	.pop_task = dm_pop_task,

+ 24 - 28
src/core/policies/deque_modeling_policy_data_aware.c

@@ -20,13 +20,19 @@
 static unsigned nworkers;
 static struct starpu_jobq_s *queue_array[STARPU_NMAXWORKERS];
 
+static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
+static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
+
 static double alpha = 1.0;
 static double beta = 1.0;
 
-static starpu_job_t dmda_pop_task(struct starpu_jobq_s *q)
+static starpu_job_t dmda_pop_task(void)
 {
 	struct starpu_job_s *j;
 
+	int workerid = starpu_worker_get_id();
+	struct starpu_jobq_s *q = queue_array[workerid];
+
 	j = _starpu_fifo_pop_task(q);
 	if (j) {
 		struct starpu_fifo_jobq_s *fifo = q->queue;
@@ -53,7 +59,7 @@ static void update_data_requests(uint32_t memory_node, struct starpu_task *task)
 	}
 }
 
-static int _dmda_push_task(struct starpu_jobq_s *q __attribute__ ((unused)) , starpu_job_t j, unsigned prio)
+static int _dmda_push_task(starpu_job_t j, unsigned prio)
 {
 	/* find the queue */
 	struct starpu_fifo_jobq_s *fifo;
@@ -172,23 +178,23 @@ static int _dmda_push_task(struct starpu_jobq_s *q __attribute__ ((unused)) , st
 		_starpu_prefetch_task_input_on_node(task, memory_node);
 
 	if (prio) {
-		return _starpu_fifo_push_prio_task(queue_array[best], j);
+		return _starpu_fifo_push_prio_task(queue_array[best], &sched_mutex[best], &sched_cond[best], j);
 	} else {
-		return _starpu_fifo_push_task(queue_array[best], j);
+		return _starpu_fifo_push_task(queue_array[best], &sched_mutex[best], &sched_cond[best], j);
 	}
 }
 
-static int dmda_push_prio_task(struct starpu_jobq_s *q, starpu_job_t j)
+static int dmda_push_prio_task(starpu_job_t j)
 {
-	return _dmda_push_task(q, j, 1);
+	return _dmda_push_task(j, 1);
 }
 
-static int dmda_push_task(struct starpu_jobq_s *q, starpu_job_t j)
+static int dmda_push_task(starpu_job_t j)
 {
 	if (j->task->priority == STARPU_MAX_PRIO)
-		return _dmda_push_task(q, j, 1);
+		return _dmda_push_task(j, 1);
 
-	return _dmda_push_task(q, j, 0);
+	return _dmda_push_task(j, 0);
 }
 
 static struct starpu_jobq_s *init_dmda_fifo(void)
@@ -197,7 +203,14 @@ static struct starpu_jobq_s *init_dmda_fifo(void)
 
 	q = _starpu_create_fifo();
 
-	queue_array[nworkers++] = q;
+	int workerid = nworkers++;
+
+	queue_array[workerid] = q;
+
+	PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
+	PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
+
+	starpu_worker_set_sched_condition(workerid, &sched_cond[workerid], &sched_mutex[workerid]);
 
 	return q;
 }
@@ -215,7 +228,7 @@ static void initialize_dmda_policy(struct starpu_machine_config_s *config,
 	if (strval_beta)
 		beta = atof(strval_beta);
 
-	_starpu_setup_queues(_starpu_init_fifo_queues_mechanisms, init_dmda_fifo, config);
+	_starpu_setup_queues(NULL, init_dmda_fifo, config);
 }
 
 static void deinitialize_dmda_policy(struct starpu_machine_config_s *config, 
@@ -224,26 +237,9 @@ static void deinitialize_dmda_policy(struct starpu_machine_config_s *config,
 	_starpu_deinit_queues(NULL, _starpu_destroy_fifo, config);
 }
 
-
-
-static struct starpu_jobq_s *get_local_queue_dmda(struct starpu_sched_policy_s *policy __attribute__ ((unused)))
-{
-	struct starpu_jobq_s *queue;
-	queue = pthread_getspecific(policy->local_queue_key);
-
-	if (!queue)
-	{
-		/* take one randomly as this *must* be for a push anyway XXX */
-		queue = queue_array[0];
-	}
-
-	return queue;
-}
-
 struct starpu_sched_policy_s _starpu_sched_dmda_policy = {
 	.init_sched = initialize_dmda_policy,
 	.deinit_sched = deinitialize_dmda_policy,
-	.get_local_queue = get_local_queue_dmda,
 	.push_task = dmda_push_task, 
 	.push_prio_task = dmda_push_prio_task, 
 	.pop_task = dmda_pop_task,

+ 24 - 22
src/core/policies/eager_central_policy.c

@@ -24,57 +24,59 @@
 /* the former is the actual queue, the latter some container */
 static struct starpu_jobq_s *jobq;
 
-static void init_central_queue_design(void)
+static pthread_cond_t sched_cond;
+static pthread_mutex_t sched_mutex;
+
+static void initialize_eager_center_policy(struct starpu_machine_config_s *config, 
+		   __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
 {
 	/* there is only a single queue in that trivial design */
 	jobq = _starpu_create_fifo();
 
-	_starpu_init_fifo_queues_mechanisms();
+	PTHREAD_MUTEX_INIT(&sched_mutex, NULL);
+	PTHREAD_COND_INIT(&sched_cond, NULL);
 
+	int workerid;
+	for (workerid = 0; workerid < STARPU_NMAXWORKERS; workerid++)
+		starpu_worker_set_sched_condition(workerid, &sched_cond, &sched_mutex);
 }
 
-static void deinit_central_queue_design(void)
+static void deinitialize_eager_center_policy(struct starpu_machine_config_s *config, 
+		   __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
 {
 	/* TODO check that there is no task left in the queue */
-	_starpu_deinit_fifo_queues_mechanisms();
 
 	/* deallocate the job queue */
 	_starpu_destroy_fifo(jobq);
 }
 
-static struct starpu_jobq_s *func_init_central_queue(void)
+static int push_task_eager_policy(starpu_job_t task)
 {
-	/* once again, this is trivial */
-	return jobq;
+	return _starpu_fifo_push_task(jobq, &sched_mutex, &sched_cond, task);
 }
 
-static void initialize_eager_center_policy(struct starpu_machine_config_s *config, 
-		   __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
+static int push_prio_task_eager_policy(starpu_job_t task)
 {
-	_starpu_setup_queues(init_central_queue_design, func_init_central_queue, config);
+	return _starpu_fifo_push_prio_task(jobq, &sched_mutex, &sched_cond, task);
 }
 
-static void deinitialize_eager_center_policy(struct starpu_machine_config_s *config, 
-		   __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
+static struct starpu_job_list_s *pop_every_task_eager_policy(uint32_t where)
 {
-	_starpu_deinit_queues(deinit_central_queue_design, NULL, config);
+	return _starpu_fifo_pop_every_task(jobq, &sched_mutex, where);
 }
 
-static struct starpu_jobq_s *get_local_queue_eager(struct starpu_sched_policy_s *policy 
-						__attribute__ ((unused)))
+static starpu_job_t pop_task_eager_policy(void)
 {
-	/* this is trivial for that strategy :) */
-	return jobq;
+	return _starpu_fifo_pop_task(jobq);
 }
 
 struct starpu_sched_policy_s _starpu_sched_eager_policy = {
 	.init_sched = initialize_eager_center_policy,
 	.deinit_sched = deinitialize_eager_center_policy,
-	.get_local_queue = get_local_queue_eager,
-	.push_task = _starpu_fifo_push_task,
-	.push_prio_task = _starpu_fifo_push_prio_task,
-	.pop_task = _starpu_fifo_pop_task,
-	.pop_every_task = _starpu_fifo_pop_every_task,
+	.push_task = push_task_eager_policy,
+	.push_prio_task = push_prio_task_eager_policy,
+	.pop_task = pop_task_eager_policy,
+	.pop_every_task = pop_every_task_eager_policy,
 	.policy_name = "eager",
 	.policy_description = "greedy policy"
 };

+ 65 - 20
src/core/policies/eager_central_priority_policy.c

@@ -19,50 +19,95 @@
 /* the former is the actual queue, the latter some container */
 static struct starpu_jobq_s *jobq;
 
-static void init_priority_queue_design(void)
+/* keep track of the total number of jobs to be scheduled to avoid infinite 
+ * polling when there are really few jobs in the overall queue */
+static pthread_cond_t global_sched_cond;
+static pthread_mutex_t global_sched_mutex;
+
+static void initialize_eager_center_priority_policy(struct starpu_machine_config_s *config, 
+			__attribute__ ((unused))	struct starpu_sched_policy_s *_policy) 
 {
 	/* only a single queue (even though there are several internaly) */
 	jobq = _starpu_create_priority_jobq();
 
-	_starpu_init_priority_queues_mechanisms();
+	PTHREAD_MUTEX_INIT(&global_sched_mutex, NULL);
+	PTHREAD_COND_INIT(&global_sched_cond, NULL);
+
+	int workerid;
+	for (workerid = 0; workerid < STARPU_NMAXWORKERS; workerid++)
+		starpu_worker_set_sched_condition(workerid, &global_sched_cond, &global_sched_mutex);
 }
 
-static void deinit_priority_queue_design(void)
+static void deinitialize_eager_center_priority_policy(struct starpu_machine_config_s *config, 
+		   __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
 {
 	/* TODO check that there is no task left in the queue */
-	_starpu_deinit_priority_queues_mechanisms();
 
 	/* deallocate the job queue */
 	_starpu_destroy_priority_jobq(jobq);
 }
 
-static struct starpu_jobq_s *func_init_priority_queue(void)
+static int _starpu_priority_push_task(starpu_job_t j)
 {
-	return jobq;
-}
+	struct starpu_priority_jobq_s *queue = jobq->queue;
 
-static void initialize_eager_center_priority_policy(struct starpu_machine_config_s *config, 
-			__attribute__ ((unused))	struct starpu_sched_policy_s *_policy) 
-{
-	_starpu_setup_queues(init_priority_queue_design, func_init_priority_queue, config);
-}
+	/* wake people waiting for a task */
+	PTHREAD_MUTEX_LOCK(&global_sched_mutex);
 
-static void deinitialize_eager_center_priority_policy(struct starpu_machine_config_s *config, 
-		   __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
-{
-	_starpu_deinit_queues(deinit_priority_queue_design, NULL, config);
+	STARPU_TRACE_JOB_PUSH(j, 1);
+	
+	unsigned priolevel = j->task->priority - STARPU_MIN_PRIO;
+
+	starpu_job_list_push_front(queue->jobq[priolevel], j);
+	queue->njobs[priolevel]++;
+	queue->total_njobs++;
+
+	PTHREAD_COND_SIGNAL(&global_sched_cond);
+	PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
+
+	return 0;
 }
 
-static struct starpu_jobq_s *get_local_queue_eager_priority(struct starpu_sched_policy_s *policy __attribute__ ((unused)))
+static starpu_job_t _starpu_priority_pop_task(void)
 {
-	/* this is trivial for that strategy */
-	return jobq;
+	starpu_job_t j = NULL;
+
+	struct starpu_priority_jobq_s *queue = jobq->queue;
+
+	/* block until some event happens */
+	PTHREAD_MUTEX_LOCK(&global_sched_mutex);
+
+	if ((queue->total_njobs == 0) && _starpu_machine_is_running())
+	{
+#ifdef STARPU_NON_BLOCKING_DRIVERS
+		_starpu_datawizard_progress(q->memory_node, 1);
+#else
+		PTHREAD_COND_WAIT(&global_sched_cond, &global_sched_mutex);
+#endif
+	}
+
+	if (queue->total_njobs > 0)
+	{
+		unsigned priolevel = NPRIO_LEVELS - 1;
+		do {
+			if (queue->njobs[priolevel] > 0) {
+				/* there is some task that we can grab */
+				j = starpu_job_list_pop_back(queue->jobq[priolevel]);
+				queue->njobs[priolevel]--;
+				queue->total_njobs--;
+				STARPU_TRACE_JOB_POP(j, 0);
+			}
+		} while (!j && priolevel-- > 0);
+	}
+
+	PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
+
+	return j;
 }
 
 struct starpu_sched_policy_s _starpu_sched_prio_policy = {
 	.init_sched = initialize_eager_center_priority_policy,
 	.deinit_sched = deinitialize_eager_center_priority_policy,
-	.get_local_queue = get_local_queue_eager_priority,
 	/* we always use priorities in that policy */
 	.push_task = _starpu_priority_push_task,
 	.push_prio_task = _starpu_priority_push_task,

+ 19 - 9
src/core/policies/no_prio_policy.c

@@ -24,12 +24,20 @@
 /* the former is the actual queue, the latter some container */
 static struct starpu_jobq_s *jobq;
 
+static pthread_cond_t sched_cond;
+static pthread_mutex_t sched_mutex;
+
 static void init_no_prio_design(void)
 {
 	/* there is only a single queue in that trivial design */
 	jobq = _starpu_create_fifo();
 
-	_starpu_init_fifo_queues_mechanisms();
+	PTHREAD_MUTEX_INIT(&sched_mutex, NULL);
+	PTHREAD_COND_INIT(&sched_cond, NULL);
+
+	int workerid;
+	for (workerid = 0; workerid < STARPU_NMAXWORKERS; workerid++)
+		starpu_worker_set_sched_condition(workerid, &sched_cond, &sched_mutex);
 }
 
 static struct starpu_jobq_s *func_init_central_queue(void)
@@ -44,20 +52,22 @@ static void initialize_no_prio_policy(struct starpu_machine_config_s *config,
 	_starpu_setup_queues(init_no_prio_design, func_init_central_queue, config);
 }
 
-static struct starpu_jobq_s *get_local_queue_no_prio(struct starpu_sched_policy_s *policy 
-					__attribute__ ((unused)))
+static int push_task_no_prio_policy(starpu_job_t task)
 {
-	/* this is trivial for that strategy :) */
-	return jobq;
+        return _starpu_fifo_push_task(jobq, &sched_mutex, &sched_cond, task);
+}
+
+static starpu_job_t pop_task_no_prio_policy(void)
+{
+	return _starpu_fifo_pop_task(jobq);
 }
 
 struct starpu_sched_policy_s _starpu_sched_no_prio_policy = {
 	.init_sched = initialize_no_prio_policy,
 	.deinit_sched = NULL,
-	.get_local_queue = get_local_queue_no_prio,
-	.push_task = _starpu_fifo_push_task,
-	.push_prio_task = _starpu_fifo_push_task,
-	.pop_task = _starpu_fifo_pop_task,
+	.push_task = push_task_no_prio_policy,
+	.push_prio_task = push_task_no_prio_policy,
+	.pop_task = pop_task_no_prio_policy,
 	.policy_name = "no-prio",
 	.policy_description = "eager without priority"
 };

+ 25 - 27
src/core/policies/random_policy.c

@@ -19,16 +19,22 @@
 static unsigned nworkers;
 static struct starpu_jobq_s *queue_array[STARPU_NMAXWORKERS];
 
-static starpu_job_t random_pop_task(struct starpu_jobq_s *q)
+static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
+static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
+
+static starpu_job_t random_pop_task(void)
 {
 	struct starpu_job_s *j;
 
-	j = _starpu_fifo_pop_task(q);
+	int workerid = starpu_worker_get_id();
+	struct starpu_jobq_s *jobq = queue_array[workerid];
+
+	j = _starpu_fifo_pop_task(jobq);
 
 	return j;
 }
 
-static int _random_push_task(struct starpu_jobq_s *q __attribute__ ((unused)), starpu_job_t task, unsigned prio)
+static int _random_push_task(starpu_job_t task, unsigned prio)
 {
 	/* find the queue */
 	unsigned worker;
@@ -59,22 +65,22 @@ static int _random_push_task(struct starpu_jobq_s *q __attribute__ ((unused)), s
 		alpha += worker_alpha;
 	}
 
-	/* we should now have the best worker in variable "best" */
+	/* we should now have the best worker in variable "selected" */
 	if (prio) {
-		return _starpu_fifo_push_prio_task(queue_array[selected], task);
+		return _starpu_fifo_push_prio_task(queue_array[selected], &sched_mutex[selected], &sched_cond[selected], task);
 	} else {
-		return _starpu_fifo_push_task(queue_array[selected], task);
+		return _starpu_fifo_push_task(queue_array[selected],&sched_mutex[selected], &sched_cond[selected],  task);
 	}
 }
 
-static int random_push_prio_task(struct starpu_jobq_s *q, starpu_job_t task)
+static int random_push_prio_task(starpu_job_t task)
 {
-	return _random_push_task(q, task, 1);
+	return _random_push_task(task, 1);
 }
 
-static int random_push_task(struct starpu_jobq_s *q, starpu_job_t task)
+static int random_push_task(starpu_job_t task)
 {
-	return _random_push_task(q, task, 0);
+	return _random_push_task(task, 0);
 }
 
 static struct starpu_jobq_s *init_random_fifo(void)
@@ -83,7 +89,14 @@ static struct starpu_jobq_s *init_random_fifo(void)
 
 	q = _starpu_create_fifo();
 
-	queue_array[nworkers++] = q;
+	int workerid = nworkers++;
+
+	queue_array[workerid] = q;
+
+	PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
+	PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
+
+	starpu_worker_set_sched_condition(workerid, &sched_cond[workerid], &sched_mutex[workerid]);
 
 	return q;
 }
@@ -95,27 +108,12 @@ static void initialize_random_policy(struct starpu_machine_config_s *config,
 
 	starpu_srand48(time(NULL));
 
-	_starpu_setup_queues(_starpu_init_fifo_queues_mechanisms, init_random_fifo, config);
-}
-
-static struct starpu_jobq_s *get_local_queue_random(struct starpu_sched_policy_s *policy __attribute__ ((unused)))
-{
-	struct starpu_jobq_s *queue;
-	queue = pthread_getspecific(policy->local_queue_key);
-
-	if (!queue)
-	{
-		/* take one randomly as this *must* be for a push anyway XXX */
-		queue = queue_array[0];
-	}
-
-	return queue;
+	_starpu_setup_queues(NULL, init_random_fifo, config);
 }
 
 struct starpu_sched_policy_s _starpu_sched_random_policy = {
 	.init_sched = initialize_random_policy,
 	.deinit_sched = NULL,
-	.get_local_queue = get_local_queue_random,
 	.push_task = random_push_task,
 	.push_prio_task = random_push_prio_task,
 	.pop_task = random_pop_task,

+ 10 - 39
src/core/policies/sched_policy.c

@@ -83,15 +83,10 @@ static void load_sched_policy(struct starpu_sched_policy_s *sched_policy)
 
 	policy.init_sched = sched_policy->init_sched;
 	policy.deinit_sched = sched_policy->deinit_sched;
-	policy.get_local_queue = sched_policy->get_local_queue;
 	policy.push_task = sched_policy->push_task;
 	policy.push_prio_task = sched_policy->push_prio_task;
 	policy.pop_task = sched_policy->pop_task;
 	policy.pop_every_task = sched_policy->pop_every_task;
-
-	PTHREAD_COND_INIT(&policy.sched_activity_cond, NULL);
-	PTHREAD_MUTEX_INIT(&policy.sched_activity_mutex, NULL);
-	pthread_key_create(&policy.local_queue_key, NULL);
 }
 
 static struct starpu_sched_policy_s *find_sched_policy_from_name(const char *policy_name)
@@ -200,17 +195,11 @@ void _starpu_deinit_sched_policy(struct starpu_machine_config_s *config)
 {
 	if (policy.deinit_sched)
 		policy.deinit_sched(config, &policy);
-
-	pthread_key_delete(policy.local_queue_key);
-	PTHREAD_MUTEX_DESTROY(&policy.sched_activity_mutex);
-	PTHREAD_COND_DESTROY(&policy.sched_activity_cond);
 }
 
 /* the generic interface that call the proper underlying implementation */
 int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 {
-	struct starpu_jobq_s *queue = policy.get_local_queue(&policy);
-
 	struct starpu_task *task = j->task;
 
 	task->status = STARPU_TASK_READY;
@@ -240,57 +229,39 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 	else {
 		STARPU_ASSERT(policy.push_task);
 
-		return policy.push_task(queue, j);
+		return policy.push_task(j);
 	}
 }
 
-struct starpu_job_s * _starpu_pop_task_from_queue(struct starpu_jobq_s *queue)
-{
-	STARPU_ASSERT(policy.pop_task);
-
-	struct starpu_job_s *j = policy.pop_task(queue);
-
-	return j;
-}
-
 struct starpu_job_s * _starpu_pop_task(void)
 {
-	struct starpu_jobq_s *queue = policy.get_local_queue(&policy);
-
-	return _starpu_pop_task_from_queue(queue);
+	return policy.pop_task();
 }
 
-struct starpu_job_list_s * _starpu_pop_every_task_from_queue(struct starpu_jobq_s *queue, uint32_t where)
+/* pop every task that can be executed on "where" (eg. GORDON) */
+struct starpu_job_list_s *_starpu_pop_every_task(uint32_t where)
 {
 	STARPU_ASSERT(policy.pop_every_task);
 
-	struct starpu_job_list_s *list = policy.pop_every_task(queue, where);
+	struct starpu_job_list_s *list = policy.pop_every_task(where);
 
 	return list;
 }
 
-/* pop every task that can be executed on "where" (eg. GORDON) */
-struct starpu_job_list_s *_starpu_pop_every_task(uint32_t where)
-{
-	struct starpu_jobq_s *queue = policy.get_local_queue(&policy);
-
-	return _starpu_pop_every_task_from_queue(queue, where);
-}
-
 void _starpu_wait_on_sched_event(void)
 {
-	struct starpu_jobq_s *q = policy.get_local_queue(&policy);
+	struct starpu_worker_s *worker = _starpu_get_local_worker_key();
 
-	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
+	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
 
-	_starpu_handle_all_pending_node_data_requests(_starpu_get_local_memory_node());
+	_starpu_handle_all_pending_node_data_requests(worker->memory_node);
 
 	if (_starpu_machine_is_running())
 	{
 #ifndef STARPU_NON_BLOCKING_DRIVERS
-		pthread_cond_wait(&q->activity_cond, &q->activity_mutex);
+		pthread_cond_wait(worker->sched_cond, worker->sched_mutex);
 #endif
 	}
 
-	PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
+	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
 }

+ 4 - 13
src/core/policies/sched_policy.h

@@ -34,13 +34,10 @@ struct starpu_sched_policy_s {
 	/* cleanup method at termination */
 	void (*deinit_sched)(struct starpu_machine_config_s *, struct starpu_sched_policy_s *);
 
-	/* anyone can request which queue it is associated to */
-	struct starpu_jobq_s *(*get_local_queue)(struct starpu_sched_policy_s *);
-
 	/* some methods to manipulate the previous queue */
-	int (*push_task)(struct starpu_jobq_s *, starpu_job_t);
-	int (*push_prio_task)(struct starpu_jobq_s *, starpu_job_t);
-	struct starpu_job_s* (*pop_task)(struct starpu_jobq_s *);
+	int (*push_task)(starpu_job_t);
+	int (*push_prio_task)(starpu_job_t);
+	struct starpu_job_s* (*pop_task)(void);
 
 	/* returns the number of tasks that were retrieved 
  	 * the function is reponsible for allocating the output but the driver
@@ -48,19 +45,13 @@ struct starpu_sched_policy_s {
  	 *
  	 * NB : this function is non blocking
  	 * */
-	struct starpu_job_list_s *(*pop_every_task)(struct starpu_jobq_s *, uint32_t);
+	struct starpu_job_list_s *(*pop_every_task)(uint32_t where);
 
 	/* name of the policy (optionnal) */
 	const char *policy_name;
 
 	/* description of the policy (optionnal) */
 	const char *policy_description;
-
-	/* some worker may block until some activity happens in the machine */
-	pthread_cond_t sched_activity_cond;
-	pthread_mutex_t sched_activity_mutex;
-
-	pthread_key_t local_queue_key;
 };
 
 struct starpu_sched_policy_s *_starpu_get_sched_policy(void);

+ 46 - 24
src/core/policies/work_stealing_policy.c

@@ -23,11 +23,13 @@ static unsigned nworkers;
 static unsigned rr_worker;
 static struct starpu_jobq_s *queue_array[STARPU_NMAXWORKERS];
 
+static pthread_mutex_t global_sched_mutex;
+static pthread_cond_t global_sched_cond;
+
 /* keep track of the work performed from the beginning of the algorithm to make
  * better decisions about which queue to select when stealing or deferring work
  */
 static unsigned performed_total = 0;
-//static unsigned performed_local[16];
 
 #ifdef USE_OVERLOAD
 static float overload_metric(unsigned id)
@@ -119,7 +121,6 @@ static struct starpu_jobq_s *select_victimq(void)
  * we need to select a queue where to dispose them */
 static struct starpu_jobq_s *select_workerq(void)
 {
-
 	struct starpu_jobq_s *q;
 
 	q = queue_array[rr_worker];
@@ -131,14 +132,22 @@ static struct starpu_jobq_s *select_workerq(void)
 
 #endif
 
-static starpu_job_t ws_pop_task(struct starpu_jobq_s *q)
+#warning TODO rewrite ... this will not scale at all now
+static starpu_job_t ws_pop_task(void)
 {
 	starpu_job_t j;
 
+	int workerid = starpu_worker_get_id();
+
+	struct starpu_jobq_s *q = queue_array[workerid];
+
+	PTHREAD_MUTEX_LOCK(&global_sched_mutex);
+
 	j = _starpu_deque_pop_task(q);
 	if (j) {
 		/* there was a local task */
 		performed_total++;
+		PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
 		return j;
 	}
 	
@@ -146,18 +155,39 @@ static starpu_job_t ws_pop_task(struct starpu_jobq_s *q)
 	struct starpu_jobq_s *victimq;
 	victimq = select_victimq();
 
-	if (!_starpu_jobq_trylock(victimq))
-	{
-		j = _starpu_deque_pop_task(victimq);
-		_starpu_jobq_unlock(victimq);
-
+	j = _starpu_deque_pop_task(victimq);
+	if (j) {
 		STARPU_TRACE_WORK_STEALING(q, j);
 		performed_total++;
 	}
 
+	PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
+
 	return j;
 }
 
+int ws_push_task(starpu_job_t task)
+{
+	int workerid = starpu_worker_get_id();
+
+        struct starpu_deque_jobq_s *deque_queue;
+	deque_queue = queue_array[workerid]->queue;
+
+        PTHREAD_MUTEX_LOCK(&global_sched_mutex);
+	// XXX reuse ?
+        //total_number_of_jobs++;
+
+        STARPU_TRACE_JOB_PUSH(task, 0);
+        starpu_job_list_push_front(deque_queue->jobq, task);
+        deque_queue->njobs++;
+        deque_queue->nprocessed++;
+
+        PTHREAD_COND_SIGNAL(&global_sched_cond);
+        PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
+
+        return 0;
+}
+
 static struct starpu_jobq_s *init_ws_deque(void)
 {
 	struct starpu_jobq_s *q;
@@ -177,29 +207,21 @@ static void initialize_ws_policy(struct starpu_machine_config_s *config,
 
 	//machineconfig = config;
 
-	_starpu_setup_queues(_starpu_init_deque_queues_mechanisms, init_ws_deque, config);
-}
+	PTHREAD_MUTEX_INIT(&global_sched_mutex, NULL);
+	PTHREAD_COND_INIT(&global_sched_cond, NULL);
 
-static struct starpu_jobq_s *get_local_queue_ws(struct starpu_sched_policy_s *policy __attribute__ ((unused)))
-{
-	struct starpu_jobq_s *queue;
-	queue = pthread_getspecific(policy->local_queue_key);
-
-	if (!queue) {
-		queue = select_workerq();
-	}
+	int workerid;
+	for (workerid = 0; workerid < STARPU_NMAXWORKERS; workerid++)
+		starpu_worker_set_sched_condition(workerid, &global_sched_cond, &global_sched_mutex);
 
-	STARPU_ASSERT(queue);
-
-	return queue;
+	_starpu_setup_queues(_starpu_init_deque_queues_mechanisms, init_ws_deque, config);
 }
 
 struct starpu_sched_policy_s _starpu_sched_ws_policy = {
 	.init_sched = initialize_ws_policy,
 	.deinit_sched = NULL,
-	.get_local_queue = get_local_queue_ws,
-	.push_task = _starpu_deque_push_task,
-	.push_prio_task = _starpu_deque_push_prio_task,
+	.push_task = ws_push_task,
+	.push_prio_task = ws_push_task,
 	.pop_task = ws_pop_task,
 	.policy_name = "ws",
 	.policy_description = "work stealing"

+ 9 - 55
src/core/workers.c

@@ -81,9 +81,8 @@ static struct starpu_worker_set_s gordon_worker_set;
 
 static void _starpu_init_worker_queue(struct starpu_worker_s *workerarg)
 {
-	struct starpu_jobq_s *jobq = workerarg->jobq;
-	pthread_cond_t *cond = &jobq->activity_cond;
-	pthread_mutex_t *mutex = &jobq->activity_mutex;
+	pthread_cond_t *cond = workerarg->sched_cond;
+	pthread_mutex_t *mutex = workerarg->sched_mutex;
 
 	unsigned memory_node = workerarg->memory_node;
 
@@ -363,62 +362,11 @@ unsigned _starpu_worker_can_block(unsigned memnode)
 	return can_block;
 }
 
-typedef enum {
-	BROADCAST,
-	LOCK,
-	UNLOCK
-} queue_op;
-
-static void _starpu_operate_on_all_conditions(queue_op op)
-{
-	unsigned cond_id;
-	struct _cond_and_mutex *condition;
-
-	starpu_mem_node_descr * const descr = _starpu_get_memory_node_description();
-
-	PTHREAD_RWLOCK_RDLOCK(&descr->conditions_rwlock);
-
-	unsigned nconds = descr->total_condition_count;
-
-	for (cond_id = 0; cond_id < nconds; cond_id++)
-	{
-		condition = &descr->conditions_all[cond_id];
-		switch (op) {
-			case BROADCAST:
-				PTHREAD_COND_BROADCAST(condition->cond);
-				break;
-			case LOCK:
-				PTHREAD_MUTEX_LOCK(condition->mutex);
-				break;
-			case UNLOCK:
-				PTHREAD_MUTEX_UNLOCK(condition->mutex);
-				break;
-		}
-	}
-
-	PTHREAD_RWLOCK_UNLOCK(&descr->conditions_rwlock);
-}
-
 static void _starpu_kill_all_workers(struct starpu_machine_config_s *config)
 {
-	/* lock all workers and the scheduler (in the proper order) to make
-	   sure everyone will notice the termination */
-	/* WARNING: here we make the asumption that a queue is not attached to
- 	 * different memory nodes ! */
-
-	struct starpu_sched_policy_s *sched = _starpu_get_sched_policy();
-
-	_starpu_operate_on_all_conditions(LOCK);
-	PTHREAD_MUTEX_LOCK(&sched->sched_activity_mutex);
-	
 	/* set the flag which will tell workers to stop */
 	config->running = 0;
-
-	_starpu_operate_on_all_conditions(BROADCAST);
-	PTHREAD_COND_BROADCAST(&sched->sched_activity_cond);
-
-	PTHREAD_MUTEX_UNLOCK(&sched->sched_activity_mutex);
-	_starpu_operate_on_all_conditions(UNLOCK);
+	starpu_wake_all_blocked_workers();
 }
 
 void starpu_shutdown(void)
@@ -544,3 +492,9 @@ void _starpu_worker_set_status(int workerid, starpu_worker_status status)
 {
 	config.workers[workerid].status = status;
 }
+
+void starpu_worker_set_sched_condition(int workerid, pthread_cond_t *sched_cond, pthread_mutex_t *sched_mutex)
+{
+	config.workers[workerid].sched_cond = sched_cond;
+	config.workers[workerid].sched_mutex = sched_mutex;
+}

+ 5 - 0
src/core/workers.h

@@ -66,7 +66,10 @@ struct starpu_worker_s {
 	int workerid; /* uniquely identify the worker among all processing units types */
         pthread_cond_t ready_cond; /* indicate when the worker is ready */
 	unsigned memory_node; /* which memory node is associated that worker to ? */
+	/* TODO remove */
 	struct starpu_jobq_s *jobq; /* in which queue will that worker get/put tasks ? */
+	pthread_cond_t *sched_cond; /* condition variable used when the worker waits for tasks. */
+	pthread_mutex_t *sched_mutex; /* mutex protecting sched_cond */
 	struct starpu_job_list_s *local_jobs; /* this queue contains tasks that have been explicitely submitted to that queue */
 	pthread_mutex_t local_jobs_mutex; /* protect the local_jobs list */
 	struct starpu_worker_set_s *set; /* in case this worker belongs to a set */
@@ -157,4 +160,6 @@ void _starpu_worker_set_status(int workerid, starpu_worker_status status);
 /* TODO move */
 unsigned _starpu_execute_registered_progression_hooks(void);
 
+void starpu_worker_set_sched_condition(int workerid, pthread_cond_t *sched_cond, pthread_mutex_t *sched_mutex);
+
 #endif // __WORKERS_H__

+ 0 - 19
src/datawizard/copy_driver.c

@@ -28,16 +28,6 @@
 
 void _starpu_wake_all_blocked_workers_on_node(unsigned nodeid)
 {
-	/* workers may be blocked on the policy's global condition */
-	struct starpu_sched_policy_s *sched = _starpu_get_sched_policy();
-	pthread_cond_t *sched_cond = &sched->sched_activity_cond;
-	pthread_mutex_t *sched_mutex = &sched->sched_activity_mutex;
-
-
-	PTHREAD_MUTEX_LOCK(sched_mutex);
-	PTHREAD_COND_BROADCAST(sched_cond);
-	PTHREAD_MUTEX_UNLOCK(sched_mutex);
-
 	/* wake up all workers on that memory node */
 	unsigned cond_id;
 
@@ -62,15 +52,6 @@ void _starpu_wake_all_blocked_workers_on_node(unsigned nodeid)
 
 void starpu_wake_all_blocked_workers(void)
 {
-	/* workers may be blocked on the policy's global condition */
-	struct starpu_sched_policy_s *sched = _starpu_get_sched_policy();
-	pthread_cond_t *sched_cond = &sched->sched_activity_cond;
-	pthread_mutex_t *sched_mutex = &sched->sched_activity_mutex;
-
-	PTHREAD_MUTEX_LOCK(sched_mutex);
-	PTHREAD_COND_BROADCAST(sched_cond);
-	PTHREAD_MUTEX_UNLOCK(sched_mutex);
-
 	/* workers may be blocked on the various queues' conditions */
 	unsigned cond_id;
 

+ 4 - 10
src/drivers/cpu/driver_cpu.c

@@ -84,7 +84,6 @@ static int execute_job_on_cpu(starpu_job_t j, struct starpu_worker_s *cpu_args)
 void *_starpu_cpu_worker(void *arg)
 {
 	struct starpu_worker_s *cpu_arg = arg;
-	struct starpu_jobq_s *jobq = cpu_arg->jobq;
 	unsigned memnode = cpu_arg->memory_node;
 	int workerid = cpu_arg->workerid;
 	int devid = cpu_arg->devid;
@@ -102,8 +101,6 @@ void *_starpu_cpu_worker(void *arg)
 
 	_starpu_set_local_memory_node_key(&memnode);
 
-	_starpu_set_local_queue(jobq);
-
 	_starpu_set_local_worker_key(cpu_arg);
 
 	snprintf(cpu_arg->name, 32, "CPU %d", devid);
@@ -121,9 +118,6 @@ void *_starpu_cpu_worker(void *arg)
         starpu_job_t j;
 	int res;
 
-	struct starpu_sched_policy_s *policy = _starpu_get_sched_policy();
-	struct starpu_jobq_s *queue = policy->get_local_queue(policy);
-
 	while (_starpu_machine_is_running())
 	{
 		STARPU_TRACE_START_PROGRESS(memnode);
@@ -132,7 +126,7 @@ void *_starpu_cpu_worker(void *arg)
 
 		_starpu_execute_registered_progression_hooks();
 
-		_starpu_jobq_lock(queue);
+		PTHREAD_MUTEX_LOCK(cpu_arg->sched_mutex);
 
 		/* perhaps there is some local task to be executed first */
 		j = _starpu_pop_local_task(cpu_arg);
@@ -144,14 +138,14 @@ void *_starpu_cpu_worker(void *arg)
                 if (j == NULL) 
 		{
 			if (_starpu_worker_can_block(memnode))
-				_starpu_block_worker(workerid, &queue->activity_cond, &queue->activity_mutex);
+				_starpu_block_worker(workerid, cpu_arg->sched_cond, cpu_arg->sched_mutex);
 
-			_starpu_jobq_unlock(queue);
+			PTHREAD_MUTEX_UNLOCK(cpu_arg->sched_mutex);
 
 			continue;
 		};
 	
-		_starpu_jobq_unlock(queue);
+		PTHREAD_MUTEX_UNLOCK(cpu_arg->sched_mutex);
 		
 		/* can a cpu perform that task ? */
 		if (!STARPU_CPU_MAY_PERFORM(j)) 

+ 4 - 10
src/drivers/cuda/driver_cuda.c

@@ -148,7 +148,6 @@ static int execute_job_on_cuda(starpu_job_t j, struct starpu_worker_s *args)
 void *_starpu_cuda_worker(void *arg)
 {
 	struct starpu_worker_s* args = arg;
-	struct starpu_jobq_s *jobq = args->jobq;
 
 	int devid = args->devid;
 	int workerid = args->workerid;
@@ -163,8 +162,6 @@ void *_starpu_cuda_worker(void *arg)
 
 	_starpu_set_local_memory_node_key(&memnode);
 
-	_starpu_set_local_queue(jobq);
-
 	_starpu_set_local_worker_key(args);
 
 	init_context(devid);
@@ -196,9 +193,6 @@ void *_starpu_cuda_worker(void *arg)
 	struct starpu_job_s * j;
 	int res;
 
-	struct starpu_sched_policy_s *policy = _starpu_get_sched_policy();
-	struct starpu_jobq_s *queue = policy->get_local_queue(policy);
-	
 	while (_starpu_machine_is_running())
 	{
 		STARPU_TRACE_START_PROGRESS(memnode);
@@ -207,7 +201,7 @@ void *_starpu_cuda_worker(void *arg)
 
 		_starpu_execute_registered_progression_hooks();
 	
-		_starpu_jobq_lock(queue);
+		PTHREAD_MUTEX_LOCK(args->sched_mutex);
 
 		/* perhaps there is some local task to be executed first */
 		j = _starpu_pop_local_task(args);
@@ -219,14 +213,14 @@ void *_starpu_cuda_worker(void *arg)
                 if (j == NULL) 
 		{
 			if (_starpu_worker_can_block(memnode))
-				_starpu_block_worker(workerid, &queue->activity_cond, &queue->activity_mutex);
+				_starpu_block_worker(workerid, args->sched_cond, args->sched_mutex);
 
-			_starpu_jobq_unlock(queue);
+			PTHREAD_MUTEX_UNLOCK(args->sched_mutex);
 
 			continue;
 		};
 
-		_starpu_jobq_unlock(queue);
+		PTHREAD_MUTEX_UNLOCK(args->sched_mutex);
 
 		/* can CUDA do that task ? */
 		if (!STARPU_CUDA_MAY_PERFORM(j))

+ 4 - 10
src/drivers/opencl/driver_opencl.c

@@ -274,7 +274,6 @@ static int _starpu_opencl_execute_job(starpu_job_t j, struct starpu_worker_s *ar
 void *_starpu_opencl_worker(void *arg)
 {
 	struct starpu_worker_s* args = arg;
-	struct starpu_jobq_s *jobq = args->jobq;
 
 	int devid = args->devid;
 	int workerid = args->workerid;
@@ -290,8 +289,6 @@ void *_starpu_opencl_worker(void *arg)
 
 	_starpu_set_local_memory_node_key(&memnode);
 
-	_starpu_set_local_queue(jobq);
-
 	_starpu_set_local_worker_key(args);
 
 	_starpu_opencl_init_context(devid);
@@ -319,9 +316,6 @@ void *_starpu_opencl_worker(void *arg)
 	struct starpu_job_s * j;
 	int res;
 
-	struct starpu_sched_policy_s *policy = _starpu_get_sched_policy();
-	struct starpu_jobq_s *queue = policy->get_local_queue(policy);
-
 	while (_starpu_machine_is_running())
 	{
 		STARPU_TRACE_START_PROGRESS(memnode);
@@ -330,7 +324,7 @@ void *_starpu_opencl_worker(void *arg)
 
 		_starpu_execute_registered_progression_hooks();
 
-		_starpu_jobq_lock(queue);
+		PTHREAD_MUTEX_LOCK(args->sched_mutex);
 
 		/* perhaps there is some local task to be executed first */
 		j = _starpu_pop_local_task(args);
@@ -342,14 +336,14 @@ void *_starpu_opencl_worker(void *arg)
                 if (j == NULL) 
 		{
 			if (_starpu_worker_can_block(memnode))
-				_starpu_block_worker(workerid, &queue->activity_cond, &queue->activity_mutex);
+				_starpu_block_worker(workerid, args->sched_cond, args->sched_mutex);
 
-			_starpu_jobq_unlock(queue);
+			PTHREAD_MUTEX_UNLOCK(args->sched_mutex);
 
 			continue;
 		};
 
-		_starpu_jobq_unlock(queue);
+		PTHREAD_MUTEX_UNLOCK(args->sched_mutex);
 	       
 		/* can OpenCL do that task ? */
 		if (!STARPU_OPENCL_MAY_PERFORM(j))