Explorar o código

migrate heteroprio to using prio deques instead of sorted fifos

Samuel Thibault %!s(int64=7) %!d(string=hai) anos
pai
achega
19d8b72046
Modificáronse 1 ficheiros con 27 adicións e 32 borrados
  1. 27 32
      src/sched_policies/heteroprio.c

+ 27 - 32
src/sched_policies/heteroprio.c

@@ -27,7 +27,7 @@
 #include <core/workers.h>
 #include <core/debug.h>
 
-#include <sched_policies/fifo_queues.h>
+#include <sched_policies/prio_deque.h>
 #include <limits.h>
 
 #ifndef DBL_MIN
@@ -49,7 +49,7 @@
 struct _heteroprio_bucket
 {
 	/* The task of the current bucket */
-	struct _starpu_fifo_taskq* tasks_queue;
+	struct _starpu_prio_deque tasks_queue;
 	/* The correct arch for the current bucket */
 	unsigned valid_archs;
 	/* The slow factors for any archs */
@@ -62,14 +62,14 @@ struct _heteroprio_bucket
 static void _heteroprio_bucket_init(struct _heteroprio_bucket* bucket)
 {
 	memset(bucket, 0, sizeof(*bucket));
-	bucket->tasks_queue =  _starpu_create_fifo();
+	_starpu_prio_deque_init(&bucket->tasks_queue);
 }
 
 /* Release a bucket */
 static void _heteroprio_bucket_release(struct _heteroprio_bucket* bucket)
 {
-	STARPU_ASSERT(_starpu_fifo_empty(bucket->tasks_queue) != 0);
-	_starpu_destroy_fifo(bucket->tasks_queue);
+	STARPU_ASSERT(_starpu_prio_deque_is_empty(&bucket->tasks_queue) != 0);
+	_starpu_prio_deque_destroy(&bucket->tasks_queue);
 }
 
 /* A worker is mainly composed of a fifo for the tasks
@@ -83,7 +83,7 @@ struct _heteroprio_worker_wrapper
 {
 	unsigned arch_type;
 	unsigned arch_index;
-	struct _starpu_fifo_taskq *tasks_queue;
+	struct _starpu_prio_deque tasks_queue;
 };
 
 struct _starpu_heteroprio_data
@@ -309,9 +309,7 @@ static void add_workers_heteroprio_policy(unsigned sched_ctx_id, int *workerids,
 		memset(&hp->workers_heteroprio[workerid], 0, sizeof(hp->workers_heteroprio[workerid]));
 		/* if the worker has already belonged to this context
 		   the queue and the synchronization variables have been already initialized */
-		if(hp->workers_heteroprio[workerid].tasks_queue == NULL)
-		{
-			hp->workers_heteroprio[workerid].tasks_queue = _starpu_create_fifo();
+			_starpu_prio_deque_init(&hp->workers_heteroprio[workerid].tasks_queue);
 			switch(starpu_worker_get_type(workerid))
 			{
 			case STARPU_CPU_WORKER:
@@ -341,7 +339,6 @@ static void add_workers_heteroprio_policy(unsigned sched_ctx_id, int *workerids,
 			default:
 				STARPU_ASSERT(0);
 			}
-		}
 		hp->nb_workers_per_arch_index[hp->workers_heteroprio[workerid].arch_index]++;
 
 	}
@@ -355,11 +352,7 @@ static void remove_workers_heteroprio_policy(unsigned sched_ctx_id, int *workeri
 	for (i = 0; i < nworkers; i++)
 	{
 		int workerid = workerids[i];
-		if(hp->workers_heteroprio[workerid].tasks_queue != NULL)
-		{
-			_starpu_destroy_fifo(hp->workers_heteroprio[workerid].tasks_queue);
-			hp->workers_heteroprio[workerid].tasks_queue = NULL;
-		}
+		_starpu_prio_deque_destroy(&hp->workers_heteroprio[workerid].tasks_queue);
 	}
 }
 
@@ -382,7 +375,7 @@ static int push_task_heteroprio_policy(struct starpu_task *task)
 	STARPU_ASSERT(((bucket->valid_archs ^ task->where) & bucket->valid_archs) == 0);
 
 	/* save the task */
-	_starpu_fifo_push_back_task(bucket->tasks_queue,task);
+	_starpu_prio_deque_push_back_task(&bucket->tasks_queue,task);
 
 	/* Inc counters */
 	unsigned arch_index;
@@ -458,7 +451,7 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
 	/* If no tasks available, no tasks in worker queue or some arch worker queue just return NULL */
 	if (!STARPU_RUNNING_ON_VALGRIND
 	    && (hp->total_tasks_in_buckets == 0 || hp->nb_remaining_tasks_per_arch_index[worker->arch_index] == 0)
-            && worker->tasks_queue->ntasks == 0 && hp->nb_prefetched_tasks_per_arch_index[worker->arch_index] == 0)
+            && worker->tasks_queue.ntasks == 0 && hp->nb_prefetched_tasks_per_arch_index[worker->arch_index] == 0)
 	{
 		return NULL;
 	}
@@ -480,7 +473,7 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
 	if( hp->nb_remaining_tasks_per_arch_index[worker->arch_index] != 0 )
 	{
 		/* Ideally we would like to fill the prefetch array */
-		unsigned nb_tasks_to_prefetch = (STARPU_HETEROPRIO_MAX_PREFETCH-worker->tasks_queue->ntasks);
+		unsigned nb_tasks_to_prefetch = (STARPU_HETEROPRIO_MAX_PREFETCH-worker->tasks_queue.ntasks);
 		/* But there are maybe less tasks than that! */
 		if(nb_tasks_to_prefetch > hp->nb_remaining_tasks_per_arch_index[worker->arch_index])
 		{
@@ -489,7 +482,7 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
 		/* But in case there are less tasks than worker we take the minimum */
 		if(hp->nb_remaining_tasks_per_arch_index[worker->arch_index] < starpu_sched_ctx_get_nworkers(sched_ctx_id))
 		{
-			if(worker->tasks_queue->ntasks == 0)
+			if(worker->tasks_queue.ntasks == 0)
 				nb_tasks_to_prefetch = 1;
 			else
 				nb_tasks_to_prefetch = 0;
@@ -504,16 +497,16 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
 			/* Ensure we can compute task from this bucket */
 			STARPU_ASSERT(bucket->valid_archs & worker->arch_type);
 			/* Take nb_tasks_to_prefetch tasks if possible */
-			while(!_starpu_fifo_empty(bucket->tasks_queue) && nb_tasks_to_prefetch &&
+			while(!_starpu_prio_deque_is_empty(&bucket->tasks_queue) && nb_tasks_to_prefetch &&
 			      (bucket->factor_base_arch_index == 0 ||
 			       worker->arch_index == bucket->factor_base_arch_index ||
-			       (((float)bucket->tasks_queue->ntasks)/((float)hp->nb_workers_per_arch_index[bucket->factor_base_arch_index])) >= bucket->slow_factors_per_index[worker->arch_index]))
+			       (((float)bucket->tasks_queue.ntasks)/((float)hp->nb_workers_per_arch_index[bucket->factor_base_arch_index])) >= bucket->slow_factors_per_index[worker->arch_index]))
 			{
-				struct starpu_task* task = _starpu_fifo_pop_local_task(bucket->tasks_queue);
+				struct starpu_task* task = _starpu_prio_deque_pop_task(&bucket->tasks_queue);
 				STARPU_ASSERT(starpu_worker_can_execute_task(workerid, task, 0));
 				/* Save the task */
 				STARPU_AYU_ADDTOTASKQUEUE(starpu_task_get_job_id(task), workerid);
-				_starpu_fifo_push_task(worker->tasks_queue, task);
+				_starpu_prio_deque_push_task(&worker->tasks_queue, task);
 
 				/* Update general counter */
 				hp->nb_prefetched_tasks_per_arch_index[worker->arch_index] += 1;
@@ -538,9 +531,10 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
 	struct starpu_task* task = NULL;
 
 	/* The worker has some tasks in its queue */
-	if(worker->tasks_queue->ntasks)
+	if(worker->tasks_queue.ntasks)
 	{
-		task = _starpu_fifo_pop_task(worker->tasks_queue, workerid);
+		int skipped;
+		task = _starpu_prio_deque_pop_task_for_worker(&worker->tasks_queue, workerid, &skipped);
 		hp->nb_prefetched_tasks_per_arch_index[worker->arch_index] -= 1;
 	}
 	/* Otherwise look if we can steal some work */
@@ -582,16 +576,17 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
 
 				/* If it is the same arch and there is a task to steal */
 				if(hp->workers_heteroprio[victim].arch_index == worker->arch_index
-				   && hp->workers_heteroprio[victim].tasks_queue->ntasks)
+				   && hp->workers_heteroprio[victim].tasks_queue.ntasks)
 				{
 					/* ensure the worker is not currently prefetching its data */
 					_starpu_worker_lock(victim);
 
 					if(hp->workers_heteroprio[victim].arch_index == worker->arch_index
-					   && hp->workers_heteroprio[victim].tasks_queue->ntasks)
+					   && hp->workers_heteroprio[victim].tasks_queue.ntasks)
 					{
+						int skipped;
 						/* steal the last added task */
-						task = _starpu_fifo_pop_task(hp->workers_heteroprio[victim].tasks_queue, workerid);
+						task = _starpu_prio_deque_pop_task_for_worker(&hp->workers_heteroprio[victim].tasks_queue, workerid, &skipped);
 						/* we steal a task update global counter */
 						hp->nb_prefetched_tasks_per_arch_index[hp->workers_heteroprio[victim].arch_index] -= 1;
 
@@ -629,16 +624,16 @@ done:		;
 	}
 
 	/* if we have task (task) me way have some in the queue (worker->tasks_queue_size) that was freshly addeed (nb_added_tasks) */
-	if(task && worker->tasks_queue->ntasks && nb_added_tasks && starpu_get_prefetch_flag())
+	if(task && worker->tasks_queue.ntasks && nb_added_tasks && starpu_get_prefetch_flag())
 	{
 		const unsigned memory_node = starpu_worker_get_memory_node(workerid);
 
 /* TOTO berenger: iterate in the other sense */
 		struct starpu_task *task_to_prefetch = NULL;
-		for (task_to_prefetch  = starpu_task_list_begin(&worker->tasks_queue->taskq);
-		     (task_to_prefetch != starpu_task_list_end(&worker->tasks_queue->taskq) &&
+		for (task_to_prefetch  = starpu_task_prio_list_begin(&worker->tasks_queue.list);
+			(task_to_prefetch != starpu_task_prio_list_end(&worker->tasks_queue.list) &&
 		      nb_added_tasks && hp->nb_remaining_tasks_per_arch_index[worker->arch_index] != 0);
-		     task_to_prefetch  = starpu_task_list_next(task_to_prefetch))
+		     task_to_prefetch  = starpu_task_prio_list_next(&worker->tasks_queue.list, task_to_prefetch))
 		{
 			/* prefetch from closest to end task */
 			starpu_prefetch_task_input_on_node(task_to_prefetch, memory_node);