Преглед изворни кода

Get rid of the starpu_jobq_s structure.

Cédric Augonnet пре 14 година
родитељ
комит
9a938aeb1a

+ 0 - 1
src/Makefile.am

@@ -69,7 +69,6 @@ noinst_HEADERS = 						\
 	core/mechanisms/priority_queues.h			\
 	core/mechanisms/fifo_queues.h				\
 	core/mechanisms/deque_queues.h				\
-	core/mechanisms/queues.h				\
 	core/mechanisms/stack_queues.h				\
 	core/perfmodel/perfmodel.h				\
 	core/perfmodel/regression.h				\

+ 10 - 33
src/core/mechanisms/deque_queues.c

@@ -14,16 +14,15 @@
  * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  */
 
-#include <pthread.h>
+#include <starpu.h>
+#include <common/config.h>
+#include <core/workers.h>
 #include <core/mechanisms/deque_queues.h>
 #include <errno.h>
 #include <common/utils.h>
 
-struct starpu_jobq_s *_starpu_create_deque(void)
+struct starpu_deque_jobq_s *_starpu_create_deque(void)
 {
-	struct starpu_jobq_s *jobq;
-	jobq = malloc(sizeof(struct starpu_jobq_s));
-
 	struct starpu_deque_jobq_s *deque;
 	deque = malloc(sizeof(struct starpu_deque_jobq_s));
 
@@ -36,48 +35,29 @@ struct starpu_jobq_s *_starpu_create_deque(void)
 	deque->exp_len = 0.0;
 	deque->exp_end = deque->exp_start;
 
-	jobq->queue = deque;
-
-	return jobq;
+	return deque;
 }
 
-void _starpu_destroy_deque(struct starpu_jobq_s *jobq)
+void _starpu_destroy_deque(struct starpu_deque_jobq_s *deque)
 {
-	struct starpu_deque_jobq_s *deque;
-
-	deque = jobq->queue;
-
 	starpu_job_list_delete(deque->jobq);
 	free(deque);
-
-	free(jobq);
 }
 
-unsigned _starpu_get_deque_njobs(struct starpu_jobq_s *q)
+unsigned _starpu_get_deque_njobs(struct starpu_deque_jobq_s *deque_queue)
 {
-	STARPU_ASSERT(q);
-
-	struct starpu_deque_jobq_s *deque_queue = q->queue;
-
 	return deque_queue->njobs;
 }
 
-unsigned _starpu_get_deque_nprocessed(struct starpu_jobq_s *q)
+unsigned _starpu_get_deque_nprocessed(struct starpu_deque_jobq_s *deque_queue)
 {
-	STARPU_ASSERT(q);
-
-	struct starpu_deque_jobq_s *deque_queue = q->queue;
-
 	return deque_queue->nprocessed;
 }
 
-starpu_job_t _starpu_deque_pop_task(struct starpu_jobq_s *q)
+starpu_job_t _starpu_deque_pop_task(struct starpu_deque_jobq_s *deque_queue)
 {
 	starpu_job_t j = NULL;
 
-	STARPU_ASSERT(q);
-	struct starpu_deque_jobq_s *deque_queue = q->queue;
-
 	if ((deque_queue->njobs == 0) && _starpu_machine_is_running())
 	{
 		return NULL;
@@ -97,13 +77,10 @@ starpu_job_t _starpu_deque_pop_task(struct starpu_jobq_s *q)
 	return j;
 }
 
-struct starpu_job_list_s * _starpu_deque_pop_every_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, uint32_t where)
+struct starpu_job_list_s *_starpu_deque_pop_every_task(struct starpu_deque_jobq_s *deque_queue, pthread_mutex_t *sched_mutex, uint32_t where)
 {
 	struct starpu_job_list_s *new_list, *old_list;
 
-	STARPU_ASSERT(q);
-	struct starpu_deque_jobq_s *deque_queue = q->queue;
-
 	/* block until some task is available in that queue */
 	PTHREAD_MUTEX_LOCK(sched_mutex);
 

+ 7 - 9
src/core/mechanisms/deque_queues.h

@@ -19,7 +19,7 @@
 
 #include <starpu.h>
 #include <common/config.h>
-#include <core/mechanisms/queues.h>
+#include <core/jobs.h>
 
 struct starpu_deque_jobq_s {
 	/* the actual list */
@@ -37,16 +37,14 @@ struct starpu_deque_jobq_s {
 	double exp_len;
 };
 
-struct starpu_jobq_s *_starpu_create_deque(void);
-void _starpu_destroy_deque(struct starpu_jobq_s *jobq);
+struct starpu_deque_jobq_s *_starpu_create_deque(void);
+void _starpu_destroy_deque(struct starpu_deque_jobq_s *deque);
 
-int _starpu_deque_push_task(struct starpu_jobq_s *q, starpu_job_t task);
-int _starpu_deque_push_prio_task(struct starpu_jobq_s *q, starpu_job_t task);
+starpu_job_t _starpu_deque_pop_task(struct starpu_deque_jobq_s *deque_queue);
+struct starpu_job_list_s *_starpu_deque_pop_every_task(struct starpu_deque_jobq_s *deque_queue, pthread_mutex_t *sched_mutex, uint32_t where);
 
-starpu_job_t _starpu_deque_pop_task(struct starpu_jobq_s *q);
-
-unsigned _starpu_get_deque_njobs(struct starpu_jobq_s *q);
-unsigned _starpu_get_deque_nprocessed(struct starpu_jobq_s *q);
+unsigned _starpu_get_deque_njobs(struct starpu_deque_jobq_s *deque_queue);
+unsigned _starpu_get_deque_nprocessed(struct starpu_deque_jobq_s *deque_queue);
 
 
 #endif // __DEQUE_QUEUES_H__

+ 7 - 31
src/core/mechanisms/fifo_queues.c

@@ -19,11 +19,8 @@
 #include <errno.h>
 #include <common/utils.h>
 
-struct starpu_jobq_s *_starpu_create_fifo(void)
+struct starpu_fifo_jobq_s *_starpu_create_fifo(void)
 {
-	struct starpu_jobq_s *jobq;
-	jobq = malloc(sizeof(struct starpu_jobq_s));
-
 	struct starpu_fifo_jobq_s *fifo;
 	fifo = malloc(sizeof(struct starpu_fifo_jobq_s));
 
@@ -36,29 +33,17 @@ struct starpu_jobq_s *_starpu_create_fifo(void)
 	fifo->exp_len = 0.0;
 	fifo->exp_end = fifo->exp_start;
 
-	jobq->queue = fifo;
-
-	return jobq;
+	return fifo;
 }
 
-void _starpu_destroy_fifo(struct starpu_jobq_s *jobq)
+void _starpu_destroy_fifo(struct starpu_fifo_jobq_s *fifo)
 {
-	STARPU_ASSERT(jobq);
-
-	/* We first free the FIFO-specific data structure */
-	struct starpu_fifo_jobq_s *fifo = jobq->queue;
-
 	starpu_job_list_delete(fifo->jobq);
 	free(fifo);
-
-	free(jobq);
 }
 
-int _starpu_fifo_push_prio_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task)
+int _starpu_fifo_push_prio_task(struct starpu_fifo_jobq_s *fifo_queue, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task)
 {
-	STARPU_ASSERT(q);
-	struct starpu_fifo_jobq_s *fifo_queue = q->queue;
-
 	PTHREAD_MUTEX_LOCK(sched_mutex);
 
 	STARPU_TRACE_JOB_PUSH(task, 0);
@@ -72,11 +57,8 @@ int _starpu_fifo_push_prio_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_
 	return 0;
 }
 
-int _starpu_fifo_push_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task)
+int _starpu_fifo_push_task(struct starpu_fifo_jobq_s *fifo_queue, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task)
 {
-	STARPU_ASSERT(q);
-	struct starpu_fifo_jobq_s *fifo_queue = q->queue;
-
 	PTHREAD_MUTEX_LOCK(sched_mutex);
 
 	STARPU_TRACE_JOB_PUSH(task, 0);
@@ -90,13 +72,10 @@ int _starpu_fifo_push_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex
 	return 0;
 }
 
-starpu_job_t _starpu_fifo_pop_task(struct starpu_jobq_s *q)
+starpu_job_t _starpu_fifo_pop_task(struct starpu_fifo_jobq_s *fifo_queue)
 {
 	starpu_job_t j = NULL;
 
-	STARPU_ASSERT(q);
-	struct starpu_fifo_jobq_s *fifo_queue = q->queue;
-
 	if (fifo_queue->njobs == 0)
 		return NULL;
 
@@ -115,14 +94,11 @@ starpu_job_t _starpu_fifo_pop_task(struct starpu_jobq_s *q)
 }
 
 /* pop every task that can be executed on the calling driver */
-struct starpu_job_list_s * _starpu_fifo_pop_every_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, uint32_t where)
+struct starpu_job_list_s * _starpu_fifo_pop_every_task(struct starpu_fifo_jobq_s *fifo_queue, pthread_mutex_t *sched_mutex, uint32_t where)
 {
 	struct starpu_job_list_s *new_list, *old_list;
 	unsigned size;
 	
-	STARPU_ASSERT(q);
-	struct starpu_fifo_jobq_s *fifo_queue = q->queue;
-
 	PTHREAD_MUTEX_LOCK(sched_mutex);
 
 	size = fifo_queue->njobs;

+ 7 - 7
src/core/mechanisms/fifo_queues.h

@@ -19,7 +19,7 @@
 
 #include <starpu.h>
 #include <common/config.h>
-#include <core/mechanisms/queues.h>
+#include <core/jobs.h>
 
 struct starpu_fifo_jobq_s {
 	/* the actual list */
@@ -37,13 +37,13 @@ struct starpu_fifo_jobq_s {
 	double exp_len;
 };
 
-struct starpu_jobq_s *_starpu_create_fifo(void);
-void _starpu_destroy_fifo(struct starpu_jobq_s *jobq);
+struct starpu_fifo_jobq_s*_starpu_create_fifo(void);
+void _starpu_destroy_fifo(struct starpu_fifo_jobq_s *fifo);
 
-int _starpu_fifo_push_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task);
-int _starpu_fifo_push_prio_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task);
+int _starpu_fifo_push_task(struct starpu_fifo_jobq_s *fifo, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task);
+int _starpu_fifo_push_prio_task(struct starpu_fifo_jobq_s *fifo, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task);
 
-starpu_job_t _starpu_fifo_pop_task(struct starpu_jobq_s *q);
-struct starpu_job_list_s * _starpu_fifo_pop_every_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, uint32_t where);
+starpu_job_t _starpu_fifo_pop_task(struct starpu_fifo_jobq_s *fifo);
+struct starpu_job_list_s * _starpu_fifo_pop_every_task(struct starpu_fifo_jobq_s *fifo, pthread_mutex_t *sched_mutex, uint32_t where);
 
 #endif // __FIFO_QUEUES_H__

+ 5 - 17
src/core/mechanisms/priority_queues.c

@@ -23,17 +23,11 @@
  * Centralized queue with priorities 
  */
 
-struct starpu_jobq_s *_starpu_create_priority_jobq(void)
+struct starpu_priority_jobq_s *_starpu_create_priority_jobq(void)
 {
-	struct starpu_jobq_s *q;
-
-	q = malloc(sizeof(struct starpu_jobq_s));
-
 	struct starpu_priority_jobq_s *central_queue;
 	
 	central_queue = malloc(sizeof(struct starpu_priority_jobq_s));
-	q->queue = central_queue;
-
 	central_queue->total_njobs = 0;
 
 	unsigned prio;
@@ -43,20 +37,14 @@ struct starpu_jobq_s *_starpu_create_priority_jobq(void)
 		central_queue->njobs[prio] = 0;
 	}
 
-	return q;
+	return central_queue;
 }
 
-void _starpu_destroy_priority_jobq(struct starpu_jobq_s *jobq)
+void _starpu_destroy_priority_jobq(struct starpu_priority_jobq_s *priority_queue)
 {
-	struct starpu_priority_jobq_s *central_queue;
-
-	central_queue = jobq->queue;
-
 	unsigned prio;
 	for (prio = 0; prio < NPRIO_LEVELS; prio++)
-		starpu_job_list_delete(central_queue->jobq[prio]);
-
-	free(central_queue);
+		starpu_job_list_delete(priority_queue->jobq[prio]);
 
-	free(jobq);
+	free(priority_queue);
 }

+ 3 - 3
src/core/mechanisms/priority_queues.h

@@ -19,7 +19,7 @@
 
 #include <starpu.h>
 #include <common/config.h>
-#include <core/mechanisms/queues.h>
+#include <core/jobs.h>
 
 #define NPRIO_LEVELS	((STARPU_MAX_PRIO) - (STARPU_MIN_PRIO) + 1)
 
@@ -32,8 +32,8 @@ struct starpu_priority_jobq_s {
 	unsigned total_njobs;
 };
 
-struct starpu_jobq_s *_starpu_create_priority_jobq(void);
-void _starpu_destroy_priority_jobq(struct starpu_jobq_s *jobq);
+struct starpu_priority_jobq_s *_starpu_create_priority_jobq(void);
+void _starpu_destroy_priority_jobq(struct starpu_priority_jobq_s *priority_queue);
 
 void _starpu_init_priority_queues_mechanisms(void);
 

+ 0 - 30
src/core/mechanisms/queues.h

@@ -1,30 +0,0 @@
-/*
- * StarPU
- * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
-
-#ifndef __QUEUES_H__
-#define __QUEUES_H__
-
-#include <pthread.h>
-
-#include <core/jobs.h>
-#include <core/policies/sched_policy.h>
-
-struct starpu_jobq_s {
-	/* a pointer to some queue structure */
-	void *queue; 
-};
-
-#endif // __QUEUES_H__

+ 7 - 30
src/core/mechanisms/stack_queues.c

@@ -28,15 +28,11 @@ void _starpu_init_stack_queues_mechanisms(void)
 	total_number_of_jobs = 0;
 }
 
-struct starpu_jobq_s *_starpu_create_stack(void)
+struct starpu_stack_jobq_s *_starpu_create_stack(void)
 {
-	struct starpu_jobq_s *jobq;
-	jobq = malloc(sizeof(struct starpu_jobq_s));
-
 	struct starpu_stack_jobq_s *stack;
 	stack = malloc(sizeof(struct starpu_stack_jobq_s));
 
-	/* note that not all mechanisms (eg. the semaphore) have to be used */
 	stack->jobq = starpu_job_list_new();
 	stack->njobs = 0;
 	stack->nprocessed = 0;
@@ -45,9 +41,7 @@ struct starpu_jobq_s *_starpu_create_stack(void)
 	stack->exp_len = 0.0;
 	stack->exp_end = stack->exp_start;
 
-	jobq->queue = stack;
-
-	return jobq;
+	return stack;
 }
 
 unsigned get_total_njobs_stacks(void)
@@ -55,29 +49,18 @@ unsigned get_total_njobs_stacks(void)
 	return total_number_of_jobs;
 }
 
-unsigned _starpu_get_stack_njobs(struct starpu_jobq_s *q)
+unsigned _starpu_get_stack_njobs(struct starpu_stack_jobq_s *stack_queue)
 {
-	STARPU_ASSERT(q);
-
-	struct starpu_stack_jobq_s *stack_queue = q->queue;
-
 	return stack_queue->njobs;
 }
 
-unsigned _starpu_get_stack_nprocessed(struct starpu_jobq_s *q)
+unsigned _starpu_get_stack_nprocessed(struct starpu_stack_jobq_s *stack_queue)
 {
-	STARPU_ASSERT(q);
-
-	struct starpu_stack_jobq_s *stack_queue = q->queue;
-
 	return stack_queue->nprocessed;
 }
 
-void _starpu_stack_push_prio_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task)
+void _starpu_stack_push_prio_task(struct starpu_stack_jobq_s *stack_queue, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task)
 {
-	STARPU_ASSERT(q);
-	struct starpu_stack_jobq_s *stack_queue = q->queue;
-
 	PTHREAD_MUTEX_LOCK(sched_mutex);
 	total_number_of_jobs++;
 
@@ -90,11 +73,8 @@ void _starpu_stack_push_prio_task(struct starpu_jobq_s *q, pthread_mutex_t *sche
 	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 }
 
-void _starpu_stack_push_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task)
+void _starpu_stack_push_task(struct starpu_stack_jobq_s *stack_queue, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task)
 {
-	STARPU_ASSERT(q);
-	struct starpu_stack_jobq_s *stack_queue = q->queue;
-
 	PTHREAD_MUTEX_LOCK(sched_mutex);
 	total_number_of_jobs++;
 
@@ -107,13 +87,10 @@ void _starpu_stack_push_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mut
 	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 }
 
-starpu_job_t _starpu_stack_pop_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex)
+starpu_job_t _starpu_stack_pop_task(struct starpu_stack_jobq_s *stack_queue, pthread_mutex_t *sched_mutex)
 {
 	starpu_job_t j = NULL;
 
-	STARPU_ASSERT(q);
-	struct starpu_stack_jobq_s *stack_queue = q->queue;
-
 	if (stack_queue->njobs == 0)
 		return NULL;
 

+ 7 - 7
src/core/mechanisms/stack_queues.h

@@ -19,7 +19,7 @@
 
 #include <starpu.h>
 #include <common/config.h>
-#include <core/mechanisms/queues.h>
+#include <core/jobs.h>
 
 struct starpu_stack_jobq_s {
 	/* the actual list */
@@ -37,18 +37,18 @@ struct starpu_stack_jobq_s {
 	double exp_len;
 };
 
-struct starpu_jobq_s *_starpu_create_stack(void);
+struct starpu_stack_jobq_s *_starpu_create_stack(void);
 
-void _starpu_stack_push_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task);
-void _starpu_stack_push_prio_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task);
+void _starpu_stack_push_task(struct starpu_stack_jobq_s *stack, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task);
+void _starpu_stack_push_prio_task(struct starpu_stack_jobq_s *stack, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task);
 
-starpu_job_t _starpu_stack_pop_task(struct starpu_jobq_s *q, pthread_mutex_t *sched_mutex);
+starpu_job_t _starpu_stack_pop_task(struct starpu_stack_jobq_s *stack, pthread_mutex_t *sched_mutex);
 
 void _starpu_init_stack_queues_mechanisms(void);
 
 
-unsigned _starpu_get_stack_njobs(struct starpu_jobq_s *q);
-unsigned _starpu_get_stack_nprocessed(struct starpu_jobq_s *q);
+unsigned _starpu_get_stack_njobs(struct starpu_stack_jobq_s *stack);
+unsigned _starpu_get_stack_nprocessed(struct starpu_stack_jobq_s *stack);
 
 
 #endif // __STACK_QUEUES_H__

+ 11 - 15
src/core/policies/deque_modeling_policy.c

@@ -19,7 +19,7 @@
 #include <core/perfmodel/perfmodel.h>
 
 static unsigned nworkers;
-static struct starpu_jobq_s *queue_array[STARPU_NMAXWORKERS];
+static struct starpu_fifo_jobq_s *queue_array[STARPU_NMAXWORKERS];
 
 static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
 static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
@@ -30,11 +30,10 @@ static starpu_job_t dm_pop_task(void)
 
 	int workerid = starpu_worker_get_id();
 
-	struct starpu_jobq_s *q = queue_array[workerid];
+	struct starpu_fifo_jobq_s *fifo = queue_array[workerid];
 
-	j = _starpu_fifo_pop_task(q);
+	j = _starpu_fifo_pop_task(fifo);
 	if (j) {
-		struct starpu_fifo_jobq_s *fifo = q->queue;
 		double model = j->predicted;
 	
 		fifo->exp_len -= model;
@@ -51,16 +50,15 @@ static struct starpu_job_list_s *dm_pop_every_task(uint32_t where)
 
 	int workerid = starpu_worker_get_id();
 
-	struct starpu_jobq_s *q = queue_array[workerid];
+	struct starpu_fifo_jobq_s *fifo = queue_array[workerid];
 
-	new_list = _starpu_fifo_pop_every_task(queue_array[workerid], &sched_mutex[workerid], where);
+	new_list = _starpu_fifo_pop_every_task(fifo, &sched_mutex[workerid], where);
 	if (new_list) {
 		starpu_job_itor_t i;
 		for(i = starpu_job_list_begin(new_list);
 			i != starpu_job_list_end(new_list);
 			i = starpu_job_list_next(i))
 		{
-			struct starpu_fifo_jobq_s *fifo = q->queue;
 			double model = i->predicted;
 	
 			fifo->exp_len -= model;
@@ -90,7 +88,7 @@ static int _dm_push_task(starpu_job_t j, unsigned prio)
 	{
 		double exp_end;
 		
-		fifo = queue_array[worker]->queue;
+		fifo = queue_array[worker];
 
 		fifo->exp_start = STARPU_MAX(fifo->exp_start, _starpu_timing_now());
 		fifo->exp_end = STARPU_MAX(fifo->exp_end, _starpu_timing_now());
@@ -133,7 +131,7 @@ static int _dm_push_task(starpu_job_t j, unsigned prio)
 	STARPU_ASSERT(best != -1);
 
 	/* we should now have the best worker in variable "best" */
-	fifo = queue_array[best]->queue;
+	fifo = queue_array[best];
 
 	fifo->exp_end += model_best;
 	fifo->exp_len += model_best;
@@ -165,14 +163,12 @@ static int dm_push_task(starpu_job_t j)
 	return _dm_push_task(j, 0);
 }
 
-static void init_dm_fifo(void)
-{
-}
-
 static void initialize_dm_policy(struct starpu_machine_config_s *config, 
 	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
 {
-	int workerid;
+	nworkers = config->nworkers;
+
+	unsigned workerid;
 	for (workerid = 0; workerid < config->nworkers; workerid++)
 	{
 		queue_array[workerid] = _starpu_create_fifo();
@@ -187,7 +183,7 @@ static void initialize_dm_policy(struct starpu_machine_config_s *config,
 static void deinitialize_dm_policy(struct starpu_machine_config_s *config, 
 	 __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
 {
-	int worker;
+	unsigned worker;
 	for (worker = 0; worker < config->nworkers; worker++)
 		_starpu_destroy_fifo(queue_array[worker]);
 }

+ 0 - 1
src/core/policies/deque_modeling_policy.h

@@ -18,7 +18,6 @@
 #define __DEQUE_MODELING_POLICY_H__
 
 #include <core/workers.h>
-#include <core/mechanisms/queues.h>
 #include <core/mechanisms/fifo_queues.h>
 #include <core/mechanisms/deque_queues.h>
 

+ 6 - 7
src/core/policies/deque_modeling_policy_data_aware.c

@@ -18,7 +18,7 @@
 #include <core/perfmodel/perfmodel.h>
 
 static unsigned nworkers;
-static struct starpu_jobq_s *queue_array[STARPU_NMAXWORKERS];
+static struct starpu_fifo_jobq_s *queue_array[STARPU_NMAXWORKERS];
 
 static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
 static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
@@ -31,11 +31,10 @@ static starpu_job_t dmda_pop_task(void)
 	struct starpu_job_s *j;
 
 	int workerid = starpu_worker_get_id();
-	struct starpu_jobq_s *q = queue_array[workerid];
+	struct starpu_fifo_jobq_s *fifo = queue_array[workerid];
 
-	j = _starpu_fifo_pop_task(q);
+	j = _starpu_fifo_pop_task(fifo);
 	if (j) {
-		struct starpu_fifo_jobq_s *fifo = q->queue;
 		double model = j->predicted;
 	
 		fifo->exp_len -= model;
@@ -84,7 +83,7 @@ static int _dmda_push_task(starpu_job_t j, unsigned prio)
 
 	for (worker = 0; worker < nworkers; worker++)
 	{
-		fifo = queue_array[worker]->queue;
+		fifo = queue_array[worker];
 
 		fifo->exp_start = STARPU_MAX(fifo->exp_start, _starpu_timing_now());
 		fifo->exp_end = STARPU_MAX(fifo->exp_end, _starpu_timing_now());
@@ -122,7 +121,7 @@ static int _dmda_push_task(starpu_job_t j, unsigned prio)
 	{
 		for (worker = 0; worker < nworkers; worker++)
 		{
-			fifo = queue_array[worker]->queue;
+			fifo = queue_array[worker];
 	
 			if (!_starpu_worker_may_execute_task(worker, task->cl->where))
 			{
@@ -162,7 +161,7 @@ static int _dmda_push_task(starpu_job_t j, unsigned prio)
 	}
 
 	/* we should now have the best worker in variable "best" */
-	fifo = queue_array[best]->queue;
+	fifo = queue_array[best];
 
 	fifo->exp_end += model_best;
 	fifo->exp_len += model_best;

+ 0 - 1
src/core/policies/deque_modeling_policy_data_aware.h

@@ -18,7 +18,6 @@
 #define __DEQUE_MODELING_POLICY_DATA_AWARE_H__
 
 #include <core/workers.h>
-#include <core/mechanisms/queues.h>
 #include <core/mechanisms/fifo_queues.h>
 
 extern struct starpu_sched_policy_s _starpu_sched_dmda_policy;

+ 7 - 7
src/core/policies/eager_central_policy.c

@@ -22,7 +22,7 @@
  */
 
 /* the former is the actual queue, the latter some container */
-static struct starpu_jobq_s *jobq;
+static struct starpu_fifo_jobq_s *fifo;
 
 static pthread_cond_t sched_cond;
 static pthread_mutex_t sched_mutex;
@@ -31,7 +31,7 @@ static void initialize_eager_center_policy(struct starpu_machine_config_s *confi
 		   __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
 {
 	/* there is only a single queue in that trivial design */
-	jobq = _starpu_create_fifo();
+	fifo = _starpu_create_fifo();
 
 	PTHREAD_MUTEX_INIT(&sched_mutex, NULL);
 	PTHREAD_COND_INIT(&sched_cond, NULL);
@@ -47,27 +47,27 @@ static void deinitialize_eager_center_policy(__attribute__ ((unused)) struct sta
 	/* TODO check that there is no task left in the queue */
 
 	/* deallocate the job queue */
-	_starpu_destroy_fifo(jobq);
+	_starpu_destroy_fifo(fifo);
 }
 
 static int push_task_eager_policy(starpu_job_t task)
 {
-	return _starpu_fifo_push_task(jobq, &sched_mutex, &sched_cond, task);
+	return _starpu_fifo_push_task(fifo, &sched_mutex, &sched_cond, task);
 }
 
 static int push_prio_task_eager_policy(starpu_job_t task)
 {
-	return _starpu_fifo_push_prio_task(jobq, &sched_mutex, &sched_cond, task);
+	return _starpu_fifo_push_prio_task(fifo, &sched_mutex, &sched_cond, task);
 }
 
 static struct starpu_job_list_s *pop_every_task_eager_policy(uint32_t where)
 {
-	return _starpu_fifo_pop_every_task(jobq, &sched_mutex, where);
+	return _starpu_fifo_pop_every_task(fifo, &sched_mutex, where);
 }
 
 static starpu_job_t pop_task_eager_policy(void)
 {
-	return _starpu_fifo_pop_task(jobq);
+	return _starpu_fifo_pop_task(fifo);
 }
 
 struct starpu_sched_policy_s _starpu_sched_eager_policy = {

+ 10 - 14
src/core/policies/eager_central_priority_policy.c

@@ -17,7 +17,7 @@
 #include <core/policies/eager_central_priority_policy.h>
 
 /* the former is the actual queue, the latter some container */
-static struct starpu_jobq_s *jobq;
+static struct starpu_priority_jobq_s *jobq;
 
 /* keep track of the total number of jobs to be scheduled to avoid infinite 
  * polling when there are really few jobs in the overall queue */
@@ -49,8 +49,6 @@ static void deinitialize_eager_center_priority_policy(struct starpu_machine_conf
 
 static int _starpu_priority_push_task(starpu_job_t j)
 {
-	struct starpu_priority_jobq_s *queue = jobq->queue;
-
 	/* wake people waiting for a task */
 	PTHREAD_MUTEX_LOCK(&global_sched_mutex);
 
@@ -58,9 +56,9 @@ static int _starpu_priority_push_task(starpu_job_t j)
 	
 	unsigned priolevel = j->task->priority - STARPU_MIN_PRIO;
 
-	starpu_job_list_push_front(queue->jobq[priolevel], j);
-	queue->njobs[priolevel]++;
-	queue->total_njobs++;
+	starpu_job_list_push_front(jobq->jobq[priolevel], j);
+	jobq->njobs[priolevel]++;
+	jobq->total_njobs++;
 
 	PTHREAD_COND_SIGNAL(&global_sched_cond);
 	PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
@@ -72,12 +70,10 @@ static starpu_job_t _starpu_priority_pop_task(void)
 {
 	starpu_job_t j = NULL;
 
-	struct starpu_priority_jobq_s *queue = jobq->queue;
-
 	/* block until some event happens */
 	PTHREAD_MUTEX_LOCK(&global_sched_mutex);
 
-	if ((queue->total_njobs == 0) && _starpu_machine_is_running())
+	if ((jobq->total_njobs == 0) && _starpu_machine_is_running())
 	{
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 		_starpu_datawizard_progress(q->memory_node, 1);
@@ -86,15 +82,15 @@ static starpu_job_t _starpu_priority_pop_task(void)
 #endif
 	}
 
-	if (queue->total_njobs > 0)
+	if (jobq->total_njobs > 0)
 	{
 		unsigned priolevel = NPRIO_LEVELS - 1;
 		do {
-			if (queue->njobs[priolevel] > 0) {
+			if (jobq->njobs[priolevel] > 0) {
 				/* there is some task that we can grab */
-				j = starpu_job_list_pop_back(queue->jobq[priolevel]);
-				queue->njobs[priolevel]--;
-				queue->total_njobs--;
+				j = starpu_job_list_pop_back(jobq->jobq[priolevel]);
+				jobq->njobs[priolevel]--;
+				jobq->total_njobs--;
 				STARPU_TRACE_JOB_POP(j, 0);
 			}
 		} while (!j && priolevel-- > 0);

+ 0 - 1
src/core/policies/eager_central_priority_policy.h

@@ -18,7 +18,6 @@
 #define __EAGER_CENTRAL_PRIORITY_POLICY_H__
 
 #include <core/workers.h>
-#include <core/mechanisms/queues.h>
 #include <core/mechanisms/priority_queues.h>
 
 extern struct starpu_sched_policy_s _starpu_sched_prio_policy;

+ 5 - 5
src/core/policies/no_prio_policy.c

@@ -22,7 +22,7 @@
  */
 
 /* the former is the actual queue, the latter some container */
-static struct starpu_jobq_s *jobq;
+static struct starpu_fifo_jobq_s *fifo;
 
 static pthread_cond_t sched_cond;
 static pthread_mutex_t sched_mutex;
@@ -31,24 +31,24 @@ static void initialize_no_prio_policy(struct starpu_machine_config_s *config,
 	   __attribute__ ((unused)) struct starpu_sched_policy_s *_policy) 
 {
 	/* there is only a single queue in that trivial design */
-	jobq = _starpu_create_fifo();
+	fifo = _starpu_create_fifo();
 
 	PTHREAD_MUTEX_INIT(&sched_mutex, NULL);
 	PTHREAD_COND_INIT(&sched_cond, NULL);
 
-	int workerid;
+	unsigned workerid;
 	for (workerid = 0; workerid < config->nworkers; workerid++)
 		starpu_worker_set_sched_condition(workerid, &sched_cond, &sched_mutex);
 }
 
 static int push_task_no_prio_policy(starpu_job_t task)
 {
-        return _starpu_fifo_push_task(jobq, &sched_mutex, &sched_cond, task);
+        return _starpu_fifo_push_task(fifo, &sched_mutex, &sched_cond, task);
 }
 
 static starpu_job_t pop_task_no_prio_policy(void)
 {
-	return _starpu_fifo_pop_task(jobq);
+	return _starpu_fifo_pop_task(fifo);
 }
 
 struct starpu_sched_policy_s _starpu_sched_no_prio_policy = {

+ 3 - 4
src/core/policies/random_policy.c

@@ -17,7 +17,7 @@
 #include <core/policies/random_policy.h>
 
 static unsigned nworkers;
-static struct starpu_jobq_s *queue_array[STARPU_NMAXWORKERS];
+static struct starpu_fifo_jobq_s *queue_array[STARPU_NMAXWORKERS];
 
 static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
 static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
@@ -27,9 +27,8 @@ static starpu_job_t random_pop_task(void)
 	struct starpu_job_s *j;
 
 	int workerid = starpu_worker_get_id();
-	struct starpu_jobq_s *jobq = queue_array[workerid];
 
-	j = _starpu_fifo_pop_task(jobq);
+	j = _starpu_fifo_pop_task(queue_array[workerid]);
 
 	return j;
 }
@@ -90,7 +89,7 @@ static void initialize_random_policy(struct starpu_machine_config_s *config,
 
 	nworkers = config->nworkers;
 
-	int workerid;
+	unsigned workerid;
 	for (workerid = 0; workerid < nworkers; workerid++)
 	{
 		queue_array[workerid] = _starpu_create_fifo();

+ 0 - 1
src/core/policies/sched_policy.h

@@ -19,7 +19,6 @@
 
 #include <starpu.h>
 #include <core/workers.h>
-#include <core/mechanisms/queues.h>
 
 struct starpu_machine_config_s;
 

+ 15 - 14
src/core/policies/work_stealing_policy.c

@@ -21,7 +21,7 @@
 
 static unsigned nworkers;
 static unsigned rr_worker;
-static struct starpu_jobq_s *queue_array[STARPU_NMAXWORKERS];
+static struct starpu_deque_jobq_s *queue_array[STARPU_NMAXWORKERS];
 
 static pthread_mutex_t global_sched_mutex;
 static pthread_cond_t global_sched_cond;
@@ -51,9 +51,9 @@ static float overload_metric(unsigned id)
 }
 
 /* who to steal work to ? */
-static struct starpu_jobq_s *select_victimq(void)
+static struct starpu_deque_jobq_s *select_victimq(void)
 {
-	struct starpu_jobq_s *q;
+	struct starpu_deque_jobq_s *q;
 
 	unsigned attempts = nworkers;
 
@@ -76,9 +76,9 @@ static struct starpu_jobq_s *select_victimq(void)
 	return q;
 }
 
-static struct starpu_jobq_s *select_workerq(void)
+static struct starpu_deque_jobq_s *select_workerq(void)
 {
-	struct starpu_jobq_s *q;
+	struct starpu_deque_jobq_s *q;
 
 	unsigned attempts = nworkers;
 
@@ -104,10 +104,9 @@ static struct starpu_jobq_s *select_workerq(void)
 #else
 
 /* who to steal work to ? */
-static struct starpu_jobq_s *select_victimq(void)
+static struct starpu_deque_jobq_s *select_victimq(void)
 {
-
-	struct starpu_jobq_s *q;
+	struct starpu_deque_jobq_s *q;
 
 	q = queue_array[rr_worker];
 
@@ -119,9 +118,9 @@ static struct starpu_jobq_s *select_victimq(void)
 
 /* when anonymous threads submit tasks, 
  * we need to select a queue where to dispose them */
-static struct starpu_jobq_s *select_workerq(void)
+static struct starpu_deque_jobq_s *select_workerq(void)
 {
-	struct starpu_jobq_s *q;
+	struct starpu_deque_jobq_s *q;
 
 	q = queue_array[rr_worker];
 
@@ -139,7 +138,9 @@ static starpu_job_t ws_pop_task(void)
 
 	int workerid = starpu_worker_get_id();
 
-	struct starpu_jobq_s *q = queue_array[workerid];
+	struct starpu_deque_jobq_s *q;
+
+	q = queue_array[workerid];
 
 	PTHREAD_MUTEX_LOCK(&global_sched_mutex);
 
@@ -152,7 +153,7 @@ static starpu_job_t ws_pop_task(void)
 	}
 	
 	/* we need to steal someone's job */
-	struct starpu_jobq_s *victimq;
+	struct starpu_deque_jobq_s *victimq;
 	victimq = select_victimq();
 
 	j = _starpu_deque_pop_task(victimq);
@@ -171,7 +172,7 @@ int ws_push_task(starpu_job_t task)
 	int workerid = starpu_worker_get_id();
 
         struct starpu_deque_jobq_s *deque_queue;
-	deque_queue = queue_array[workerid]->queue;
+	deque_queue = queue_array[workerid];
 
         PTHREAD_MUTEX_LOCK(&global_sched_mutex);
 	// XXX reuse ?
@@ -199,7 +200,7 @@ static void initialize_ws_policy(struct starpu_machine_config_s *config,
 	PTHREAD_MUTEX_INIT(&global_sched_mutex, NULL);
 	PTHREAD_COND_INIT(&global_sched_cond, NULL);
 
-	int workerid;
+	unsigned workerid;
 	for (workerid = 0; workerid < nworkers; workerid++)
 	{
 		queue_array[workerid] = _starpu_create_deque();