Browse Source

Remove unused level of indirection

Samuel Thibault 5 years ago
parent
commit
d707eb6be6

+ 6 - 7
src/sched_policies/component_fifo.c

@@ -23,7 +23,7 @@
 
 struct _starpu_fifo_data
 {
-	struct _starpu_fifo_taskq * fifo;
+	struct _starpu_fifo_taskq fifo;
 	starpu_pthread_mutex_t mutex;
 	unsigned ntasks_threshold;
 	double exp_len_threshold;
@@ -35,7 +35,6 @@ static void fifo_component_deinit_data(struct starpu_sched_component * component
 {
 	STARPU_ASSERT(component && component->data);
 	struct _starpu_fifo_data * f = component->data;
-	_starpu_destroy_fifo(f->fifo);
 	STARPU_PTHREAD_MUTEX_DESTROY(&f->mutex);
 	free(f);
 }
@@ -44,7 +43,7 @@ static double fifo_estimated_end(struct starpu_sched_component * component)
 {
 	STARPU_ASSERT(component && component->data);
 	struct _starpu_fifo_data * data = component->data;
-	struct _starpu_fifo_taskq * queue = data->fifo;
+	struct _starpu_fifo_taskq * queue = &data->fifo;
 	return starpu_sched_component_estimated_end_min_add(component, queue->exp_len);
 }
 
@@ -53,7 +52,7 @@ static double fifo_estimated_load(struct starpu_sched_component * component)
 	STARPU_ASSERT(component && component->data);
 	STARPU_ASSERT(starpu_bitmap_cardinal(&component->workers_in_ctx) != 0);
 	struct _starpu_fifo_data * data = component->data;
-	struct _starpu_fifo_taskq * queue = data->fifo;
+	struct _starpu_fifo_taskq * queue = &data->fifo;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	double relative_speedup = 0.0;
 	double load = starpu_sched_component_estimated_load(component);
@@ -87,7 +86,7 @@ static int fifo_push_local_task(struct starpu_sched_component * component, struc
 	STARPU_ASSERT(component && component->data && task);
 	STARPU_ASSERT(starpu_sched_component_can_execute_task(component,task));
 	struct _starpu_fifo_data * data = component->data;
-	struct _starpu_fifo_taskq * queue = data->fifo;
+	struct _starpu_fifo_taskq * queue = &data->fifo;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	int ret = 0;
 	const double now = starpu_timing_now();
@@ -169,7 +168,7 @@ static struct starpu_task * fifo_pull_task(struct starpu_sched_component * compo
 {
 	STARPU_ASSERT(component && component->data);
 	struct _starpu_fifo_data * data = component->data;
-	struct _starpu_fifo_taskq * queue = data->fifo;
+	struct _starpu_fifo_taskq * queue = &data->fifo;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	const double now = starpu_timing_now();
 
@@ -269,7 +268,7 @@ struct starpu_sched_component * starpu_sched_component_fifo_create(struct starpu
 	struct starpu_sched_component *component = starpu_sched_component_create(tree, "fifo");
 	struct _starpu_fifo_data *data;
 	_STARPU_MALLOC(data, sizeof(*data));
-	data->fifo = _starpu_create_fifo();
+	_starpu_init_fifo(&data->fifo);
 	STARPU_PTHREAD_MUTEX_INIT(&data->mutex,NULL);
 	component->data = data;
 	component->estimated_end = fifo_estimated_end;

+ 17 - 22
src/sched_policies/component_work_stealing.c

@@ -37,7 +37,7 @@ struct _starpu_component_work_stealing_data
  */
 	unsigned performed_total, last_pop_child, last_push_child;
 
-	struct _starpu_prio_deque ** fifos;
+	struct _starpu_prio_deque * fifos;
 	starpu_pthread_mutex_t ** mutexes;
 	unsigned size;
 };
@@ -59,7 +59,7 @@ static struct starpu_task *  steal_task_round_robin(struct starpu_sched_componen
 	struct starpu_task * task = NULL;
 	while (1)
 	{
-		struct _starpu_prio_deque * fifo = wsd->fifos[i];
+		struct _starpu_prio_deque * fifo = &wsd->fifos[i];
 
 		STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
 		task = _starpu_prio_deque_deque_task_for_worker(fifo, workerid, NULL);
@@ -141,17 +141,17 @@ static struct starpu_task * pull_task(struct starpu_sched_component * component,
 	struct _starpu_component_work_stealing_data * wsd = component->data;
 	const double now = starpu_timing_now();
 	STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
-	struct starpu_task * task = _starpu_prio_deque_pop_task(wsd->fifos[i]);
+	struct starpu_task * task = _starpu_prio_deque_pop_task(&wsd->fifos[i]);
 	if(task)
 	{
 		if(!isnan(task->predicted))
 		{
-			wsd->fifos[i]->exp_len -= task->predicted;
-			wsd->fifos[i]->exp_start = now + task->predicted;
+			wsd->fifos[i].exp_len -= task->predicted;
+			wsd->fifos[i].exp_start = now + task->predicted;
 		}
 	}
 	else
-		wsd->fifos[i]->exp_len = 0.0;
+		wsd->fifos[i].exp_len = 0.0;
 
 	STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
 	if(task)
@@ -163,7 +163,7 @@ static struct starpu_task * pull_task(struct starpu_sched_component * component,
 	if(task)
 	{
 		STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
-		wsd->fifos[i]->nprocessed++;
+		wsd->fifos[i].nprocessed++;
 		STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
 
 		return task;
@@ -196,9 +196,9 @@ double _ws_estimated_end(struct starpu_sched_component * component)
 	for(i = 0; i < component->nchildren; i++)
 	{
 		STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
-		sum_len += wsd->fifos[i]->exp_len;
-		wsd->fifos[i]->exp_start = STARPU_MAX(now, wsd->fifos[i]->exp_start);
-		sum_start += wsd->fifos[i]->exp_start;
+		sum_len += wsd->fifos[i].exp_len;
+		wsd->fifos[i].exp_start = STARPU_MAX(now, wsd->fifos[i].exp_start);
+		sum_start += wsd->fifos[i].exp_start;
 		STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
 
 	}
@@ -216,7 +216,7 @@ double _ws_estimated_load(struct starpu_sched_component * component)
 	for(i = 0; i < component->nchildren; i++)
 	{
 		STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
-		ntasks += wsd->fifos[i]->ntasks;
+		ntasks += wsd->fifos[i].ntasks;
 		STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
 	}
 	double speedup = 0.0;
@@ -265,7 +265,7 @@ static int push_task(struct starpu_sched_component * component, struct starpu_ta
 
 	STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
 	starpu_sched_task_break(task);
-	ret = _starpu_prio_deque_push_front_task(wsd->fifos[i], task);
+	ret = _starpu_prio_deque_push_front_task(&wsd->fifos[i], task);
 	STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
 
 	wsd->last_push_child = i;
@@ -308,9 +308,9 @@ int starpu_sched_tree_work_stealing_push_task(struct starpu_task *task)
 
 			struct _starpu_component_work_stealing_data * wsd = component->data;
 			STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
-			int ret = _starpu_prio_deque_push_front_task(wsd->fifos[i] , task);
+			int ret = _starpu_prio_deque_push_front_task(&wsd->fifos[i] , task);
 			if(ret == 0 && !isnan(task->predicted))
-				wsd->fifos[i]->exp_len += task->predicted;
+				wsd->fifos[i].exp_len += task->predicted;
 			STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
 
 			component->can_pull(component);
@@ -334,10 +334,7 @@ void _ws_add_child(struct starpu_sched_component * component, struct starpu_sche
 		wsd->size = component->nchildren;
 	}
 
-	struct _starpu_prio_deque *fifo;
-	_STARPU_MALLOC(fifo, sizeof(*fifo));
-	_starpu_prio_deque_init(fifo);
-	wsd->fifos[component->nchildren - 1] = fifo;
+	_starpu_prio_deque_init(&wsd->fifos[component->nchildren - 1]);
 
 	starpu_pthread_mutex_t *mutex;
 	_STARPU_MALLOC(mutex, sizeof(*mutex));
@@ -359,19 +356,17 @@ void _ws_remove_child(struct starpu_sched_component * component, struct starpu_s
 			break;
 	}
 	STARPU_ASSERT(i_component != component->nchildren);
-	struct _starpu_prio_deque * tmp_fifo = wsd->fifos[i_component];
+	struct _starpu_prio_deque tmp_fifo = wsd->fifos[i_component];
 	wsd->fifos[i_component] = wsd->fifos[component->nchildren - 1];
 
 
 	component->children[i_component] = component->children[component->nchildren - 1];
 	component->nchildren--;
 	struct starpu_task * task;
-	while ((task = _starpu_prio_deque_pop_task(tmp_fifo)))
+	while ((task = _starpu_prio_deque_pop_task(&tmp_fifo)))
 	{
 		starpu_sched_component_push_task(NULL, component, task);
 	}
-	_starpu_prio_deque_destroy(tmp_fifo);
-	free(tmp_fifo);
 }
 
 void _work_stealing_component_deinit_data(struct starpu_sched_component * component)

+ 23 - 39
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -52,7 +52,7 @@ struct _starpu_dmda_data
 	double _gamma;
 	double idle_power;
 
-	struct _starpu_fifo_taskq **queue_array;
+	struct _starpu_fifo_taskq queue_array[STARPU_NMAXWORKERS];
 
 	long int total_task_cnt;
 	long int ready_task_cnt;
@@ -234,7 +234,7 @@ static struct starpu_task *_dmda_pop_task(unsigned sched_ctx_id, int ready)
 	struct starpu_task *task;
 
 	unsigned workerid = starpu_worker_get_id_check();
-	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
+	struct _starpu_fifo_taskq *fifo = &dt->queue_array[workerid];
 
 	/* Take the opportunity to update start time */
 	fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start);
@@ -284,7 +284,7 @@ static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
 	struct starpu_task *new_list, *task;
 
 	unsigned workerid = starpu_worker_get_id_check();
-	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
+	struct _starpu_fifo_taskq *fifo = &dt->queue_array[workerid];
 
 	/* Take the opportunity to update start time */
 	fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start);
@@ -323,7 +323,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 			return 0;
 	}
 
-	struct _starpu_fifo_taskq *fifo = dt->queue_array[best_workerid];
+	struct _starpu_fifo_taskq *fifo = &dt->queue_array[best_workerid];
 
 	double now = starpu_timing_now();
 
@@ -405,13 +405,13 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	if (prio)
 	{
 		starpu_worker_lock(best_workerid);
-		ret =_starpu_fifo_push_sorted_task(dt->queue_array[best_workerid], task);
+		ret =_starpu_fifo_push_sorted_task(&dt->queue_array[best_workerid], task);
 		if(dt->num_priorities != -1)
 		{
 			int i;
 			int task_prio = _starpu_normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
 			for(i = 0; i <= task_prio; i++)
-				dt->queue_array[best_workerid]->ntasks_per_priority[i]++;
+				dt->queue_array[best_workerid].ntasks_per_priority[i]++;
 		}
 
 
@@ -424,9 +424,9 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	else
 	{
 		starpu_worker_lock(best_workerid);
-		starpu_task_list_push_back (&dt->queue_array[best_workerid]->taskq, task);
-		dt->queue_array[best_workerid]->ntasks++;
-		dt->queue_array[best_workerid]->nprocessed++;
+		starpu_task_list_push_back (&dt->queue_array[best_workerid].taskq, task);
+		dt->queue_array[best_workerid].ntasks++;
+		dt->queue_array[best_workerid].nprocessed++;
 #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
 		starpu_wake_worker_locked(best_workerid);
 #endif
@@ -469,7 +469,7 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 		unsigned nimpl;
 		unsigned impl_mask;
 		unsigned worker = workers->get_next(workers, &it);
-		struct _starpu_fifo_taskq *fifo  = dt->queue_array[worker];
+		struct _starpu_fifo_taskq *fifo  = &dt->queue_array[worker];
 		struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(worker, sched_ctx_id);
 
 		/* Sometimes workers didn't take the tasks as early as we expected */
@@ -624,7 +624,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 		unsigned nimpl;
 		unsigned impl_mask;
 		unsigned workerid = workers->get_next(workers, &it);
-		struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
+		struct _starpu_fifo_taskq *fifo = &dt->queue_array[workerid];
 		struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(workerid, sched_ctx_id);
 		unsigned memory_node = starpu_worker_get_memory_node(workerid);
 
@@ -965,15 +965,12 @@ static void dmda_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nwo
 		int workerid = workerids[i];
 		/* if the worker has alreadry belonged to this context
 		   the queue and the synchronization variables have been already initialized */
-		q = dt->queue_array[workerid];
-		if(q == NULL)
-		{
-			q = dt->queue_array[workerid] = _starpu_create_fifo();
-			/* These are only stats, they can be read with races */
-			STARPU_HG_DISABLE_CHECKING(q->exp_start);
-			STARPU_HG_DISABLE_CHECKING(q->exp_len);
-			STARPU_HG_DISABLE_CHECKING(q->exp_end);
-		}
+		q = &dt->queue_array[workerid];
+		_starpu_init_fifo(q);
+		/* These are only stats, they can be read with races */
+		STARPU_HG_DISABLE_CHECKING(q->exp_start);
+		STARPU_HG_DISABLE_CHECKING(q->exp_len);
+		STARPU_HG_DISABLE_CHECKING(q->exp_end);
 
 		if(dt->num_priorities != -1)
 		{
@@ -997,16 +994,10 @@ static void dmda_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned
 	for (i = 0; i < nworkers; i++)
 	{
 		int workerid = workerids[i];
-		if(dt->queue_array[workerid] != NULL)
+		if(dt->num_priorities != -1)
 		{
-			if(dt->num_priorities != -1)
-			{
-				free(dt->queue_array[workerid]->exp_len_per_priority);
-				free(dt->queue_array[workerid]->ntasks_per_priority);
-			}
-
-			_starpu_destroy_fifo(dt->queue_array[workerid]);
-			dt->queue_array[workerid] = NULL;
+			free(dt->queue_array[workerid].exp_len_per_priority);
+			free(dt->queue_array[workerid].ntasks_per_priority);
 		}
 	}
 }
@@ -1018,12 +1009,6 @@ static void initialize_dmda_policy(unsigned sched_ctx_id)
 
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)dt);
 
-	_STARPU_MALLOC(dt->queue_array, STARPU_NMAXWORKERS*sizeof(struct _starpu_fifo_taskq*));
-
-	int i;
-	for(i = 0; i < STARPU_NMAXWORKERS; i++)
-		dt->queue_array[i] = NULL;
-
 	dt->alpha = starpu_get_env_float_default("STARPU_SCHED_ALPHA", _STARPU_SCHED_ALPHA_DEFAULT);
 	dt->beta = starpu_get_env_float_default("STARPU_SCHED_BETA", _STARPU_SCHED_BETA_DEFAULT);
 	dt->_gamma = starpu_get_env_float_default("STARPU_SCHED_GAMMA", _STARPU_SCHED_GAMMA_DEFAULT);
@@ -1069,7 +1054,6 @@ static void deinitialize_dmda_policy(unsigned sched_ctx_id)
 	}
 #endif
 
-	free(dt->queue_array);
 	free(dt);
 }
 
@@ -1080,7 +1064,7 @@ static void dmda_pre_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
 {
 	unsigned workerid = starpu_worker_get_id_check();
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
+	struct _starpu_fifo_taskq *fifo = &dt->queue_array[workerid];
 	const double now = starpu_timing_now();
 
 	/* Once the task is executing, we can update the predicted amount
@@ -1099,7 +1083,7 @@ static void dmda_pre_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
 static void dmda_push_task_notify(struct starpu_task *task, int workerid, int perf_workerid, unsigned sched_ctx_id)
 {
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
+	struct _starpu_fifo_taskq *fifo = &dt->queue_array[workerid];
 
 	/* Compute the expected penality */
 	double predicted = starpu_task_worker_expected_length(task, perf_workerid, STARPU_NMAX_SCHED_CTXS,
@@ -1174,7 +1158,7 @@ static void dmda_post_exec_hook(struct starpu_task * task, unsigned sched_ctx_id
 {
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	unsigned workerid = starpu_worker_get_id_check();
-	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
+	struct _starpu_fifo_taskq *fifo = &dt->queue_array[workerid];
 	starpu_worker_lock_self();
 	_starpu_fifo_task_finished(fifo, task, dt->num_priorities);
 	starpu_worker_unlock_self();

+ 9 - 17
src/sched_policies/eager_central_policy.c

@@ -29,7 +29,7 @@
 
 struct _starpu_eager_center_policy_data
 {
-	struct _starpu_fifo_taskq *fifo;
+	struct _starpu_fifo_taskq fifo;
 	starpu_pthread_mutex_t policy_mutex;
 	struct starpu_bitmap waiters;
 };
@@ -40,14 +40,9 @@ static void initialize_eager_center_policy(unsigned sched_ctx_id)
 	_STARPU_MALLOC(data, sizeof(struct _starpu_eager_center_policy_data));
 
 	/* there is only a single queue in that trivial design */
-	data->fifo =  _starpu_create_fifo();
+	_starpu_init_fifo(&data->fifo);
 	starpu_bitmap_init(&data->waiters);
 
-	 /* Tell helgrind that it's fine to check for empty fifo in
-	  * pop_task_eager_policy without actual mutex (it's just an integer)
-	  */
-	STARPU_HG_DISABLE_CHECKING(data->fifo->ntasks);
-
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 	STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
 }
@@ -55,13 +50,10 @@ static void initialize_eager_center_policy(unsigned sched_ctx_id)
 static void deinitialize_eager_center_policy(unsigned sched_ctx_id)
 {
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	struct _starpu_fifo_taskq *fifo = data->fifo;
+	struct _starpu_fifo_taskq *fifo = &data->fifo;
 
 	STARPU_ASSERT(starpu_task_list_empty(&fifo->taskq));
 
-	/* deallocate the job queue */
-	_starpu_destroy_fifo(fifo);
-
 	STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
 	free(data);
 }
@@ -74,9 +66,9 @@ static int push_task_eager_policy(struct starpu_task *task)
 	starpu_worker_relax_on();
 	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
 	starpu_worker_relax_off();
-	starpu_task_list_push_back(&data->fifo->taskq,task);
-	data->fifo->ntasks++;
-	data->fifo->nprocessed++;
+	starpu_task_list_push_back(&data->fifo.taskq,task);
+	data->fifo.ntasks++;
+	data->fifo.nprocessed++;
 
 	if (_starpu_get_nsched_ctxs() > 1)
 	{
@@ -145,7 +137,7 @@ static struct starpu_task *pop_every_task_eager_policy(unsigned sched_ctx_id)
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	unsigned workerid = starpu_worker_get_id_check();
 	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
-	struct starpu_task* task = _starpu_fifo_pop_every_task(data->fifo, workerid);
+	struct starpu_task* task = _starpu_fifo_pop_every_task(&data->fifo, workerid);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
 	starpu_sched_ctx_list_task_counters_reset_all(task, sched_ctx_id);
@@ -162,7 +154,7 @@ static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 	/* Here helgrind would shout that this is unprotected, this is just an
 	 * integer access, and we hold the sched mutex, so we can not miss any
 	 * wake up. */
-	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_fifo_empty(data->fifo))
+	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_fifo_empty(&data->fifo))
 	{
 		return NULL;
 	}
@@ -179,7 +171,7 @@ static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
 	starpu_worker_relax_off();
 
-	chosen_task = _starpu_fifo_pop_task(data->fifo, workerid);
+	chosen_task = _starpu_fifo_pop_task(&data->fifo, workerid);
 	if (!chosen_task)
 		/* Tell pushers that we are waiting for tasks for us */
 		starpu_bitmap_set(&data->waiters, workerid);

+ 12 - 4
src/sched_policies/fifo_queues.c

@@ -44,14 +44,14 @@ static int is_sorted_task_list(struct starpu_task * task)
 }
 */
 
-struct _starpu_fifo_taskq *_starpu_create_fifo(void)
+void _starpu_init_fifo(struct _starpu_fifo_taskq *fifo)
 {
-	struct _starpu_fifo_taskq *fifo;
-	_STARPU_MALLOC(fifo, sizeof(struct _starpu_fifo_taskq));
-
 	/* note that not all mechanisms (eg. the semaphore) have to be used */
 	starpu_task_list_init(&fifo->taskq);
 	fifo->ntasks = 0;
+	/* Tell helgrind that it's fine to check for empty fifo in
+	 * pop_task_graph_test_policy without actual mutex (it's just an integer)
+	 */
 	STARPU_HG_DISABLE_CHECKING(fifo->ntasks);
 	fifo->nprocessed = 0;
 
@@ -60,6 +60,14 @@ struct _starpu_fifo_taskq *_starpu_create_fifo(void)
 	fifo->exp_end = fifo->exp_start;
 	fifo->exp_len_per_priority = NULL;
 	fifo->pipeline_len = 0.0;
+}
+
+struct _starpu_fifo_taskq *_starpu_create_fifo(void)
+{
+	struct _starpu_fifo_taskq *fifo;
+	_STARPU_MALLOC(fifo, sizeof(struct _starpu_fifo_taskq));
+
+	_starpu_init_fifo(fifo);
 
 	return fifo;
 }

+ 1 - 0
src/sched_policies/fifo_queues.h

@@ -50,6 +50,7 @@ struct _starpu_fifo_taskq
 };
 
 struct _starpu_fifo_taskq*_starpu_create_fifo(void) STARPU_ATTRIBUTE_MALLOC;
+void _starpu_init_fifo(struct _starpu_fifo_taskq *fifo);
 void _starpu_destroy_fifo(struct _starpu_fifo_taskq *fifo);
 
 int _starpu_fifo_empty(struct _starpu_fifo_taskq *fifo);

+ 8 - 14
src/sched_policies/graph_test_policy.c

@@ -36,7 +36,7 @@
 
 struct _starpu_graph_test_policy_data
 {
-	struct _starpu_fifo_taskq *fifo;	/* Bag of tasks which are ready before do_schedule is called */
+	struct _starpu_fifo_taskq fifo;	/* Bag of tasks which are ready before do_schedule is called */
 	struct _starpu_prio_deque prio_cpu;
 	struct _starpu_prio_deque prio_gpu;
 	starpu_pthread_mutex_t policy_mutex;
@@ -51,7 +51,7 @@ static void initialize_graph_test_policy(unsigned sched_ctx_id)
 	_STARPU_MALLOC(data, sizeof(struct _starpu_graph_test_policy_data));
 
 	/* there is only a single queue in that trivial design */
-	data->fifo =  _starpu_create_fifo();
+	_starpu_init_fifo(&data->fifo);
 	 _starpu_prio_deque_init(&data->prio_cpu);
 	 _starpu_prio_deque_init(&data->prio_gpu);
 	starpu_bitmap_init(&data->waiters);
@@ -60,11 +60,6 @@ static void initialize_graph_test_policy(unsigned sched_ctx_id)
 
 	_starpu_graph_record = 1;
 
-	 /* Tell helgrind that it's fine to check for empty fifo in
-	  * pop_task_graph_test_policy without actual mutex (it's just an integer)
-	  */
-	STARPU_HG_DISABLE_CHECKING(data->fifo->ntasks);
-
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 	STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
 }
@@ -72,12 +67,11 @@ static void initialize_graph_test_policy(unsigned sched_ctx_id)
 static void deinitialize_graph_test_policy(unsigned sched_ctx_id)
 {
 	struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	struct _starpu_fifo_taskq *fifo = data->fifo;
+	struct _starpu_fifo_taskq *fifo = &data->fifo;
 
 	STARPU_ASSERT(starpu_task_list_empty(&fifo->taskq));
 
 	/* deallocate the job queue */
-	_starpu_destroy_fifo(fifo);
 	 _starpu_prio_deque_destroy(&data->prio_cpu);
 	 _starpu_prio_deque_destroy(&data->prio_gpu);
 
@@ -193,9 +187,9 @@ static void do_schedule_graph_test_policy(unsigned sched_ctx_id)
 	}
 
 	/* Now that we have priorities, move tasks from bag to priority queue */
-	while(!_starpu_fifo_empty(data->fifo))
+	while(!_starpu_fifo_empty(&data->fifo))
 	{
-		struct starpu_task *task = _starpu_fifo_pop_task(data->fifo, -1);
+		struct starpu_task *task = _starpu_fifo_pop_task(&data->fifo, -1);
 		struct _starpu_prio_deque *prio = select_prio(sched_ctx_id, data, task);
 		_starpu_prio_deque_push_back_task(prio, task);
 	}
@@ -236,9 +230,9 @@ static int push_task_graph_test_policy(struct starpu_task *task)
 	if (!data->computed)
 	{
 		/* Priorities are not computed, leave the task in the bag for now */
-		starpu_task_list_push_back(&data->fifo->taskq,task);
-		data->fifo->ntasks++;
-		data->fifo->nprocessed++;
+		starpu_task_list_push_back(&data->fifo.taskq,task);
+		data->fifo.ntasks++;
+		data->fifo.nprocessed++;
 		starpu_push_task_end(task);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 		return 0;

+ 11 - 24
src/sched_policies/parallel_eager.c

@@ -35,8 +35,8 @@ struct _starpu_peager_common_data *_peager_common_data = NULL;
 struct _starpu_peager_data
 {
 	starpu_pthread_mutex_t policy_mutex;
-	struct _starpu_fifo_taskq *fifo;
-	struct _starpu_fifo_taskq *local_fifo[STARPU_NMAXWORKERS];
+	struct _starpu_fifo_taskq fifo;
+	struct _starpu_fifo_taskq local_fifo[STARPU_NMAXWORKERS];
 };
 
 static void initialize_peager_common(void)
@@ -136,22 +136,12 @@ static void peager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned n
 		/* slaves pick up tasks from their local queue, their master
 		 * will put tasks directly in that local list when a parallel
 		 * tasks comes. */
-		data->local_fifo[workerid] = _starpu_create_fifo();
+		_starpu_init_fifo(&data->local_fifo[workerid]);
 	}
 }
 
-static void peager_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
+static void peager_remove_workers(unsigned sched_ctx_id, int *workerids STARPU_ATTRIBUTE_UNUSED, unsigned nworkers STARPU_ATTRIBUTE_UNUSED)
 {
-	struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	unsigned i;
-	for(i = 0; i < nworkers; i++)
-        {
-		int workerid = workerids[i];
-		if(!starpu_worker_is_combined_worker(workerid))
-		{
-			_starpu_destroy_fifo(data->local_fifo[workerid]);
-		}
-	}
 	if (sched_ctx_id == 0)
 	{
 		deinitialize_peager_common();
@@ -166,7 +156,7 @@ static void initialize_peager_policy(unsigned sched_ctx_id)
 	_STARPU_DISP("Warning: the peager scheduler is mostly a proof of concept and not really very optimized\n");
 
 	/* masters pick tasks from that queue */
-	data->fifo = _starpu_create_fifo();
+	_starpu_init_fifo(&data->fifo);
 
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
         STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
@@ -177,9 +167,6 @@ static void deinitialize_peager_policy(unsigned sched_ctx_id)
 	/* TODO check that there is no task left in the queue */
 	struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
-	/* deallocate the job queue */
-	_starpu_destroy_fifo(data->fifo);
-
         STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
 
 	free(data);
@@ -193,7 +180,7 @@ static int push_task_peager_policy(struct starpu_task *task)
 	struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
-	ret_val = _starpu_fifo_push_task(data->fifo, task);
+	ret_val = _starpu_fifo_push_task(&data->fifo, task);
 #ifndef STARPU_NON_BLOCKING_DRIVERS
 	int is_parallel_task = task->cl && task->cl->max_parallelism > 1;
 #endif
@@ -249,7 +236,7 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 		starpu_worker_relax_on();
 		STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
 		starpu_worker_relax_off();
-		task = _starpu_fifo_pop_task(data->fifo, workerid);
+		task = _starpu_fifo_pop_task(&data->fifo, workerid);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
 		return task;
@@ -261,11 +248,11 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
 	starpu_worker_relax_off();
 	/* check if a slave task is available in the local queue */
-	task = _starpu_fifo_pop_task(data->local_fifo[workerid], workerid);
+	task = _starpu_fifo_pop_task(&data->local_fifo[workerid], workerid);
 	if (!task)
 	{
 		/* no slave task, try to pop a task as master */
-		task = _starpu_fifo_pop_task(data->fifo, workerid);
+		task = _starpu_fifo_pop_task(&data->fifo, workerid);
 		if (task)
 		{
 			_STARPU_DEBUG("poping master task %p\n", task);
@@ -277,7 +264,7 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 		{
 			/* task is potentially parallel, leave it for a combined worker master */
 			_STARPU_DEBUG("pushing back master task %p\n", task);
-			_starpu_fifo_push_back_task(data->fifo, task);
+			_starpu_fifo_push_back_task(&data->fifo, task);
 			task = NULL;
 		}
 #endif
@@ -339,7 +326,7 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 		int local_worker = combined_workerid[i];
 		alias->destroy = 1;
 		_STARPU_TRACE_JOB_PUSH(alias, alias->priority > 0);
-		_starpu_fifo_push_task(data->local_fifo[local_worker], alias);
+		_starpu_fifo_push_task(&data->local_fifo[local_worker], alias);
 	}
 
 	/* The master also manipulated an alias */