Browse Source

sched_policies: use _starpu_fifo_taskq instead of _starpu_deque_jobq for ws

No performance regressions with tasks_size_overhead.
Samuel Pitoiset 10 years ago
parent
commit
899702825f
1 changed files with 18 additions and 34 deletions
  1. 18 34
      src/sched_policies/work_stealing_policy.c

+ 18 - 34
src/sched_policies/work_stealing_policy.c

@@ -21,7 +21,7 @@
 #include <float.h>
 
 #include <core/workers.h>
-#include <sched_policies/deque_queues.h>
+#include <sched_policies/fifo_queues.h>
 #include <core/debug.h>
 #include <starpu_scheduler.h>
 
@@ -31,7 +31,7 @@
 
 struct _starpu_work_stealing_data
 {
-	struct _starpu_deque_jobq **queue_array;
+	struct _starpu_fifo_taskq **queue_array;
 	/* keep track of the work performed from the beginning of the algorithm to make
 	 * better decisions about which queue to select when stealing or deferring work
 	 */
@@ -69,15 +69,15 @@ static unsigned select_victim_round_robin(unsigned sched_ctx_id)
 	 * the next ones */
 	while (1)
 	{
-		unsigned njobs;
+		unsigned ntasks;
 
 		starpu_worker_get_sched_condition(worker, &victim_sched_mutex, &victim_sched_cond);
 		/* Here helgrind would shout that this is unprotected, but we
 		 * are fine with getting outdated values, this is just an
 		 * estimation */
-		njobs = ws->queue_array[worker]->njobs;
+		ntasks = ws->queue_array[worker]->ntasks;
 
-		if (njobs)
+		if (ntasks)
 			break;
 
 		worker = (worker + 1) % nworkers;
@@ -261,20 +261,14 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 	struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 	struct starpu_task *task;
-	struct _starpu_deque_jobq *q;
-
 	int workerid = starpu_worker_get_id();
 
 	STARPU_ASSERT(workerid != -1);
 
-	q = ws->queue_array[workerid];
-
-	task = _starpu_deque_pop_task(q, workerid);
+	task = _starpu_fifo_pop_task(ws->queue_array[workerid], workerid);
 	if (task)
 	{
 		/* there was a local task */
-		q->nprocessed++;
-		q->njobs--;
 		return task;
 	}
 	starpu_pthread_mutex_t *worker_sched_mutex;
@@ -293,17 +287,11 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 
 	starpu_worker_get_sched_condition(victim, &victim_sched_mutex, &victim_sched_cond);
 	STARPU_PTHREAD_MUTEX_LOCK(victim_sched_mutex);
-	struct _starpu_deque_jobq *victimq = ws->queue_array[victim];
 
-	task = _starpu_deque_pop_task(victimq, workerid);
+	task = _starpu_fifo_pop_task(ws->queue_array[victim], workerid);
 	if (task)
 	{
-		_STARPU_TRACE_WORK_STEALING(q, workerid);
-
-		/* Beware : we have to increase the number of processed tasks of
-		 * the stealer, not the victim ! */
-		q->nprocessed++;
-		victimq->njobs--;
+		_STARPU_TRACE_WORK_STEALING(workerid, victim);
 	}
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(victim_sched_mutex);
@@ -311,12 +299,10 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 	STARPU_PTHREAD_MUTEX_LOCK(worker_sched_mutex);
 	if(!task)
 	{
-		task = _starpu_deque_pop_task(q, workerid);
+		task = _starpu_fifo_pop_task(ws->queue_array[workerid], workerid);
 		if (task)
 		{
 			/* there was a local task */
-			q->nprocessed++;
-			q->njobs--;
 			return task;
 		}
 	}
@@ -330,8 +316,6 @@ int ws_push_task(struct starpu_task *task)
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
-	struct _starpu_deque_jobq *deque_queue;
-	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
 	int workerid = starpu_worker_get_id();
 
 	unsigned worker = 0;
@@ -356,17 +340,17 @@ int ws_push_task(struct starpu_task *task)
 	if (workerid == -1)
 		workerid = select_worker(sched_ctx_id);
 
-	deque_queue = ws->queue_array[workerid];
-
 #ifdef HAVE_AYUDAME_H
+	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
 	if (AYU_event)
 	{
 		intptr_t id = workerid;
 		AYU_event(AYU_ADDTASKTOQUEUE, j->job_id, &id);
 	}
 #endif
-	_starpu_job_list_push_back(&deque_queue->jobq, j);
-	deque_queue->njobs++;
+
+	_starpu_fifo_push_task(ws->queue_array[workerid], task);
+
 	starpu_push_task_end(task);
 
 	workers->init_iterator(workers, &it);
@@ -396,18 +380,18 @@ static void ws_add_workers(unsigned sched_ctx_id, int *workerids,unsigned nworke
 	{
 		workerid = workerids[i];
 		starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
-		ws->queue_array[workerid] = _starpu_create_deque();
+		ws->queue_array[workerid] = _starpu_create_fifo();
 
 		/* Tell helgrid that we are fine with getting outdated values,
 		 * this is just an estimation */
-		STARPU_HG_DISABLE_CHECKING(ws->queue_array[workerid]->njobs);
+		STARPU_HG_DISABLE_CHECKING(ws->queue_array[workerid]->ntasks);
 
 		/**
 		 * The first WS_POP_TASK will increase NPROCESSED though no task was actually performed yet,
 		 * we need to initialize it at -1.
 		 */
 		ws->queue_array[workerid]->nprocessed = -1;
-		ws->queue_array[workerid]->njobs = 0;
+		ws->queue_array[workerid]->ntasks = 0;
 	}
 }
 
@@ -421,7 +405,7 @@ static void ws_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nw
 	for (i = 0; i < nworkers; i++)
 	{
 		workerid = workerids[i];
-		_starpu_destroy_deque(ws->queue_array[workerid]);
+		_starpu_destroy_fifo(ws->queue_array[workerid]);
 	}
 }
 
@@ -434,7 +418,7 @@ static void initialize_ws_policy(unsigned sched_ctx_id)
 	ws->last_push_worker = 0;
 
 	unsigned nw = starpu_worker_get_count();
-	ws->queue_array = (struct _starpu_deque_jobq**)malloc(nw*sizeof(struct _starpu_deque_jobq*));
+	ws->queue_array = (struct _starpu_fifo_taskq**)malloc(nw*sizeof(struct _starpu_fifo_taskq*));
 }
 
 static void deinit_ws_policy(unsigned sched_ctx_id)