Browse Source

Use tasks instead of jobs in the "priority queues"

Cédric Augonnet 15 years ago
parent
commit
8e3c29dde8

+ 7 - 11
src/core/mechanisms/priority_queues.c

@@ -23,28 +23,24 @@
  * Centralized queue with priorities 
  * Centralized queue with priorities 
  */
  */
 
 
-struct starpu_priority_jobq_s *_starpu_create_priority_jobq(void)
+struct starpu_priority_taskq_s *_starpu_create_priority_taskq(void)
 {
 {
-	struct starpu_priority_jobq_s *central_queue;
+	struct starpu_priority_taskq_s *central_queue;
 	
 	
-	central_queue = malloc(sizeof(struct starpu_priority_jobq_s));
-	central_queue->total_njobs = 0;
+	central_queue = malloc(sizeof(struct starpu_priority_taskq_s));
+	central_queue->total_ntasks = 0;
 
 
 	unsigned prio;
 	unsigned prio;
 	for (prio = 0; prio < NPRIO_LEVELS; prio++)
 	for (prio = 0; prio < NPRIO_LEVELS; prio++)
 	{
 	{
-		central_queue->jobq[prio] = starpu_job_list_new();
-		central_queue->njobs[prio] = 0;
+		starpu_task_list_init(&central_queue->taskq[prio]);
+		central_queue->ntasks[prio] = 0;
 	}
 	}
 
 
 	return central_queue;
 	return central_queue;
 }
 }
 
 
-void _starpu_destroy_priority_jobq(struct starpu_priority_jobq_s *priority_queue)
+void _starpu_destroy_priority_taskq(struct starpu_priority_taskq_s *priority_queue)
 {
 {
-	unsigned prio;
-	for (prio = 0; prio < NPRIO_LEVELS; prio++)
-		starpu_job_list_delete(priority_queue->jobq[prio]);
-
 	free(priority_queue);
 	free(priority_queue);
 }
 }

+ 7 - 8
src/core/mechanisms/priority_queues.h

@@ -19,21 +19,20 @@
 
 
 #include <starpu.h>
 #include <starpu.h>
 #include <common/config.h>
 #include <common/config.h>
-#include <core/jobs.h>
 
 
 #define NPRIO_LEVELS	((STARPU_MAX_PRIO) - (STARPU_MIN_PRIO) + 1)
 #define NPRIO_LEVELS	((STARPU_MAX_PRIO) - (STARPU_MIN_PRIO) + 1)
 
 
-struct starpu_priority_jobq_s {
+struct starpu_priority_taskq_s {
 	/* the actual lists 
 	/* the actual lists 
-	 *	jobq[p] is for priority [p - STARPU_MIN_PRIO] */
-	starpu_job_list_t jobq[NPRIO_LEVELS];
-	unsigned njobs[NPRIO_LEVELS];
+	 *	taskq[p] is for priority [p - STARPU_MIN_PRIO] */
+	struct starpu_task_list taskq[NPRIO_LEVELS];
+	unsigned ntasks[NPRIO_LEVELS];
 
 
-	unsigned total_njobs;
+	unsigned total_ntasks;
 };
 };
 
 
-struct starpu_priority_jobq_s *_starpu_create_priority_jobq(void);
-void _starpu_destroy_priority_jobq(struct starpu_priority_jobq_s *priority_queue);
+struct starpu_priority_taskq_s *_starpu_create_priority_taskq(void);
+void _starpu_destroy_priority_taskq(struct starpu_priority_taskq_s *priority_queue);
 
 
 void _starpu_init_priority_queues_mechanisms(void);
 void _starpu_init_priority_queues_mechanisms(void);
 
 

+ 19 - 21
src/core/policies/eager_central_priority_policy.c

@@ -17,10 +17,10 @@
 #include <core/policies/eager_central_priority_policy.h>
 #include <core/policies/eager_central_priority_policy.h>
 
 
 /* the former is the actual queue, the latter some container */
 /* the former is the actual queue, the latter some container */
-static struct starpu_priority_jobq_s *jobq;
+static struct starpu_priority_taskq_s *taskq;
 
 
-/* keep track of the total number of jobs to be scheduled to avoid infinite 
- * polling when there are really few jobs in the overall queue */
+/* keep track of the total number of tasks to be scheduled to avoid infinite 
+ * polling when there are really few tasks in the overall queue */
 static pthread_cond_t global_sched_cond;
 static pthread_cond_t global_sched_cond;
 static pthread_mutex_t global_sched_mutex;
 static pthread_mutex_t global_sched_mutex;
 
 
@@ -28,7 +28,7 @@ static void initialize_eager_center_priority_policy(struct starpu_machine_topolo
 			__attribute__ ((unused))	struct starpu_sched_policy_s *_policy) 
 			__attribute__ ((unused))	struct starpu_sched_policy_s *_policy) 
 {
 {
 	/* only a single queue (even though there are several internaly) */
 	/* only a single queue (even though there are several internaly) */
-	jobq = _starpu_create_priority_jobq();
+	taskq = _starpu_create_priority_taskq();
 
 
 	PTHREAD_MUTEX_INIT(&global_sched_mutex, NULL);
 	PTHREAD_MUTEX_INIT(&global_sched_mutex, NULL);
 	PTHREAD_COND_INIT(&global_sched_cond, NULL);
 	PTHREAD_COND_INIT(&global_sched_cond, NULL);
@@ -43,14 +43,12 @@ static void deinitialize_eager_center_priority_policy(struct starpu_machine_topo
 {
 {
 	/* TODO check that there is no task left in the queue */
 	/* TODO check that there is no task left in the queue */
 
 
-	/* deallocate the job queue */
-	_starpu_destroy_priority_jobq(jobq);
+	/* deallocate the task queue */
+	_starpu_destroy_priority_taskq(taskq);
 }
 }
 
 
 static int _starpu_priority_push_task(struct starpu_task *task)
 static int _starpu_priority_push_task(struct starpu_task *task)
 {
 {
-	starpu_job_t j = _starpu_get_job_associated_to_task(task);
-
 	/* wake people waiting for a task */
 	/* wake people waiting for a task */
 	PTHREAD_MUTEX_LOCK(&global_sched_mutex);
 	PTHREAD_MUTEX_LOCK(&global_sched_mutex);
 
 
@@ -58,9 +56,9 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 	
 	
 	unsigned priolevel = task->priority - STARPU_MIN_PRIO;
 	unsigned priolevel = task->priority - STARPU_MIN_PRIO;
 
 
-	starpu_job_list_push_front(jobq->jobq[priolevel], j);
-	jobq->njobs[priolevel]++;
-	jobq->total_njobs++;
+	starpu_task_list_push_front(&taskq->taskq[priolevel], task);
+	taskq->ntasks[priolevel]++;
+	taskq->total_ntasks++;
 
 
 	PTHREAD_COND_SIGNAL(&global_sched_cond);
 	PTHREAD_COND_SIGNAL(&global_sched_cond);
 	PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
 	PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
@@ -70,12 +68,12 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 
 
 static struct starpu_task *_starpu_priority_pop_task(void)
 static struct starpu_task *_starpu_priority_pop_task(void)
 {
 {
-	starpu_job_t j = NULL;
+	struct starpu_task *task = NULL;
 
 
 	/* block until some event happens */
 	/* block until some event happens */
 	PTHREAD_MUTEX_LOCK(&global_sched_mutex);
 	PTHREAD_MUTEX_LOCK(&global_sched_mutex);
 
 
-	if ((jobq->total_njobs == 0) && _starpu_machine_is_running())
+	if ((taskq->total_ntasks == 0) && _starpu_machine_is_running())
 	{
 	{
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 		_starpu_datawizard_progress(q->memory_node, 1);
 		_starpu_datawizard_progress(q->memory_node, 1);
@@ -84,23 +82,23 @@ static struct starpu_task *_starpu_priority_pop_task(void)
 #endif
 #endif
 	}
 	}
 
 
-	if (jobq->total_njobs > 0)
+	if (taskq->total_ntasks > 0)
 	{
 	{
 		unsigned priolevel = NPRIO_LEVELS - 1;
 		unsigned priolevel = NPRIO_LEVELS - 1;
 		do {
 		do {
-			if (jobq->njobs[priolevel] > 0) {
+			if (taskq->ntasks[priolevel] > 0) {
 				/* there is some task that we can grab */
 				/* there is some task that we can grab */
-				j = starpu_job_list_pop_back(jobq->jobq[priolevel]);
-				jobq->njobs[priolevel]--;
-				jobq->total_njobs--;
-				STARPU_TRACE_JOB_POP(j, 0);
+				task = starpu_task_list_pop_back(&taskq->taskq[priolevel]);
+				taskq->ntasks[priolevel]--;
+				taskq->total_ntasks--;
+				STARPU_TRACE_JOB_POP(task, 0);
 			}
 			}
-		} while (!j && priolevel-- > 0);
+		} while (!task && priolevel-- > 0);
 	}
 	}
 
 
 	PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
 	PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
 
 
-	return j->task;
+	return task;
 }
 }
 
 
 struct starpu_sched_policy_s _starpu_sched_prio_policy = {
 struct starpu_sched_policy_s _starpu_sched_prio_policy = {