Browse Source

Replace the starpu_fifo_jobq_s structure by starpu_fifo_taskq_s which works
with tasks instead of jobs.

Cédric Augonnet 15 years ago
parent
commit
1a47e25f4f

+ 33 - 39
src/core/mechanisms/fifo_queues.c

@@ -20,14 +20,14 @@
 #include <common/utils.h>
 #include <core/task.h>
 
-struct starpu_fifo_jobq_s *_starpu_create_fifo(void)
+struct starpu_fifo_taskq_s *_starpu_create_fifo(void)
 {
-	struct starpu_fifo_jobq_s *fifo;
-	fifo = malloc(sizeof(struct starpu_fifo_jobq_s));
+	struct starpu_fifo_taskq_s *fifo;
+	fifo = malloc(sizeof(struct starpu_fifo_taskq_s));
 
 	/* note that not all mechanisms (eg. the semaphore) have to be used */
-	fifo->jobq = starpu_job_list_new();
-	fifo->njobs = 0;
+	starpu_task_list_init(&fifo->taskq);
+	fifo->ntasks = 0;
 	fifo->nprocessed = 0;
 
 	fifo->exp_start = _starpu_timing_now();
@@ -37,21 +37,18 @@ struct starpu_fifo_jobq_s *_starpu_create_fifo(void)
 	return fifo;
 }
 
-void _starpu_destroy_fifo(struct starpu_fifo_jobq_s *fifo)
+void _starpu_destroy_fifo(struct starpu_fifo_taskq_s *fifo)
 {
-	starpu_job_list_delete(fifo->jobq);
 	free(fifo);
 }
 
-int _starpu_fifo_push_prio_task(struct starpu_fifo_jobq_s *fifo_queue, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task)
+int _starpu_fifo_push_prio_task(struct starpu_fifo_taskq_s *fifo_queue, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task)
 {
-	starpu_job_t j = _starpu_get_job_associated_to_task(task);
-
 	PTHREAD_MUTEX_LOCK(sched_mutex);
 
 	STARPU_TRACE_JOB_PUSH(task, 0);
-	starpu_job_list_push_back(fifo_queue->jobq, j);
-	fifo_queue->njobs++;
+	starpu_task_list_push_back(&fifo_queue->taskq, task);
+	fifo_queue->ntasks++;
 	fifo_queue->nprocessed++;
 
 	pthread_cond_signal(sched_cond);
@@ -60,15 +57,13 @@ int _starpu_fifo_push_prio_task(struct starpu_fifo_jobq_s *fifo_queue, pthread_m
 	return 0;
 }
 
-int _starpu_fifo_push_task(struct starpu_fifo_jobq_s *fifo_queue, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task)
+int _starpu_fifo_push_task(struct starpu_fifo_taskq_s *fifo_queue, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task)
 {
-	starpu_job_t j = _starpu_get_job_associated_to_task(task);
-
 	PTHREAD_MUTEX_LOCK(sched_mutex);
 
 	STARPU_TRACE_JOB_PUSH(task, 0);
-	starpu_job_list_push_front(fifo_queue->jobq, j);
-	fifo_queue->njobs++;
+	starpu_task_list_push_front(&fifo_queue->taskq, task);
+	fifo_queue->ntasks++;
 	fifo_queue->nprocessed++;
 
 	pthread_cond_signal(sched_cond);
@@ -77,31 +72,31 @@ int _starpu_fifo_push_task(struct starpu_fifo_jobq_s *fifo_queue, pthread_mutex_
 	return 0;
 }
 
-struct starpu_task *_starpu_fifo_pop_task(struct starpu_fifo_jobq_s *fifo_queue)
+struct starpu_task *_starpu_fifo_pop_task(struct starpu_fifo_taskq_s *fifo_queue)
 {
-	starpu_job_t j = NULL;
+	struct starpu_task *task = NULL;
 
-	if (fifo_queue->njobs == 0)
+	if (fifo_queue->ntasks == 0)
 		return NULL;
 
-	if (fifo_queue->njobs > 0) 
+	if (fifo_queue->ntasks > 0) 
 	{
 		/* there is a task */
-		j = starpu_job_list_pop_back(fifo_queue->jobq);
+		task = starpu_task_list_pop_back(&fifo_queue->taskq);
 	
-		STARPU_ASSERT(j);
-		fifo_queue->njobs--;
+		STARPU_ASSERT(task);
+		fifo_queue->ntasks--;
 		
-		STARPU_TRACE_JOB_POP(j, 0);
+		STARPU_TRACE_JOB_POP(task, 0);
 	}
 	
-	return j->task;
+	return task;
 }
 
 /* pop every task that can be executed on the calling driver */
-struct starpu_task *_starpu_fifo_pop_every_task(struct starpu_fifo_jobq_s *fifo_queue, pthread_mutex_t *sched_mutex, uint32_t where)
+struct starpu_task *_starpu_fifo_pop_every_task(struct starpu_fifo_taskq_s *fifo_queue, pthread_mutex_t *sched_mutex, uint32_t where)
 {
-	struct starpu_job_list_s *old_list;
+	struct starpu_task_list *old_list;
 	unsigned size;
 
 	struct starpu_task *new_list = NULL;
@@ -109,29 +104,26 @@ struct starpu_task *_starpu_fifo_pop_every_task(struct starpu_fifo_jobq_s *fifo_
 	
 	PTHREAD_MUTEX_LOCK(sched_mutex);
 
-	size = fifo_queue->njobs;
+	size = fifo_queue->ntasks;
 
 	if (size > 0) {
-		old_list = fifo_queue->jobq;
+		old_list = &fifo_queue->taskq;
 		unsigned new_list_size = 0;
 
-		starpu_job_itor_t i;
-		starpu_job_t next_job;
+		struct starpu_task *task, *next_task;
 		/* note that this starts at the _head_ of the list, so we put
  		 * elements at the back of the new list */
-		for(i = starpu_job_list_begin(old_list);
-			i != starpu_job_list_end(old_list);
-			i  = next_job)
+		task = starpu_task_list_front(old_list);
+		while (task)
 		{
-			next_job = starpu_job_list_next(i);
-			struct starpu_task *task = i->task;
+			next_task = task->next;
 
 			if (task->cl->where & where)
 			{
 				/* this elements can be moved into the new list */
 				new_list_size++;
 				
-				starpu_job_list_erase(old_list, i);
+				starpu_task_list_erase(old_list, task);
 
 				if (new_list_tail)
 				{
@@ -147,9 +139,11 @@ struct starpu_task *_starpu_fifo_pop_every_task(struct starpu_fifo_jobq_s *fifo_
 					task->next = NULL;
 				}
 			}
+		
+			task = next_task;
 		}
 
-		fifo_queue->njobs -= new_list_size;
+		fifo_queue->ntasks -= new_list_size;
 	}
 
 	PTHREAD_MUTEX_UNLOCK(sched_mutex);

+ 9 - 10
src/core/mechanisms/fifo_queues.h

@@ -19,14 +19,13 @@
 
 #include <starpu.h>
 #include <common/config.h>
-#include <core/jobs.h>
 
-struct starpu_fifo_jobq_s {
+struct starpu_fifo_taskq_s {
 	/* the actual list */
-	starpu_job_list_t jobq;
+	struct starpu_task_list taskq;
 
 	/* the number of tasks currently in the queue */
-	unsigned njobs;
+	unsigned ntasks;
 
 	/* the number of tasks that were processed */
 	unsigned nprocessed;
@@ -37,13 +36,13 @@ struct starpu_fifo_jobq_s {
 	double exp_len;
 };
 
-struct starpu_fifo_jobq_s*_starpu_create_fifo(void);
-void _starpu_destroy_fifo(struct starpu_fifo_jobq_s *fifo);
+struct starpu_fifo_taskq_s*_starpu_create_fifo(void);
+void _starpu_destroy_fifo(struct starpu_fifo_taskq_s *fifo);
 
-int _starpu_fifo_push_task(struct starpu_fifo_jobq_s *fifo, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task);
-int _starpu_fifo_push_prio_task(struct starpu_fifo_jobq_s *fifo, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task);
+int _starpu_fifo_push_task(struct starpu_fifo_taskq_s *fifo, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task);
+int _starpu_fifo_push_prio_task(struct starpu_fifo_taskq_s *fifo, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task);
 
-struct starpu_task *_starpu_fifo_pop_task(struct starpu_fifo_jobq_s *fifo);
-struct starpu_task *_starpu_fifo_pop_every_task(struct starpu_fifo_jobq_s *fifo, pthread_mutex_t *sched_mutex, uint32_t where);
+struct starpu_task *_starpu_fifo_pop_task(struct starpu_fifo_taskq_s *fifo);
+struct starpu_task *_starpu_fifo_pop_every_task(struct starpu_fifo_taskq_s *fifo, pthread_mutex_t *sched_mutex, uint32_t where);
 
 #endif // __FIFO_QUEUES_H__

+ 4 - 4
src/core/policies/deque_modeling_policy.c

@@ -19,7 +19,7 @@
 #include <core/perfmodel/perfmodel.h>
 
 static unsigned nworkers;
-static struct starpu_fifo_jobq_s *queue_array[STARPU_NMAXWORKERS];
+static struct starpu_fifo_taskq_s *queue_array[STARPU_NMAXWORKERS];
 
 static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
 static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
@@ -30,7 +30,7 @@ static struct starpu_task *dm_pop_task(void)
 
 	int workerid = starpu_worker_get_id();
 
-	struct starpu_fifo_jobq_s *fifo = queue_array[workerid];
+	struct starpu_fifo_taskq_s *fifo = queue_array[workerid];
 
 	task = _starpu_fifo_pop_task(fifo);
 	if (task) {
@@ -50,7 +50,7 @@ static struct starpu_task *dm_pop_every_task(uint32_t where)
 
 	int workerid = starpu_worker_get_id();
 
-	struct starpu_fifo_jobq_s *fifo = queue_array[workerid];
+	struct starpu_fifo_taskq_s *fifo = queue_array[workerid];
 
 	new_list = _starpu_fifo_pop_every_task(fifo, &sched_mutex[workerid], where);
 
@@ -71,7 +71,7 @@ static struct starpu_task *dm_pop_every_task(uint32_t where)
 static int _dm_push_task(struct starpu_task *task, unsigned prio)
 {
 	/* find the queue */
-	struct starpu_fifo_jobq_s *fifo;
+	struct starpu_fifo_taskq_s *fifo;
 	unsigned worker;
 	int best = -1;
 

+ 3 - 3
src/core/policies/deque_modeling_policy_data_aware.c

@@ -18,7 +18,7 @@
 #include <core/perfmodel/perfmodel.h>
 
 static unsigned nworkers;
-static struct starpu_fifo_jobq_s *queue_array[STARPU_NMAXWORKERS];
+static struct starpu_fifo_taskq_s *queue_array[STARPU_NMAXWORKERS];
 
 static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
 static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
@@ -31,7 +31,7 @@ static struct starpu_task *dmda_pop_task(void)
 	struct starpu_task *task;
 
 	int workerid = starpu_worker_get_id();
-	struct starpu_fifo_jobq_s *fifo = queue_array[workerid];
+	struct starpu_fifo_taskq_s *fifo = queue_array[workerid];
 
 	task = _starpu_fifo_pop_task(fifo);
 	if (task) {
@@ -61,7 +61,7 @@ static void update_data_requests(uint32_t memory_node, struct starpu_task *task)
 static int _dmda_push_task(struct starpu_task *task, unsigned prio)
 {
 	/* find the queue */
-	struct starpu_fifo_jobq_s *fifo;
+	struct starpu_fifo_taskq_s *fifo;
 	unsigned worker;
 	int best = -1;
 	

+ 1 - 1
src/core/policies/eager_central_policy.c

@@ -22,7 +22,7 @@
  */
 
 /* the former is the actual queue, the latter some container */
-static struct starpu_fifo_jobq_s *fifo;
+static struct starpu_fifo_taskq_s *fifo;
 
 static pthread_cond_t sched_cond;
 static pthread_mutex_t sched_mutex;

+ 1 - 1
src/core/policies/no_prio_policy.c

@@ -22,7 +22,7 @@
  */
 
 /* the former is the actual queue, the latter some container */
-static struct starpu_fifo_jobq_s *fifo;
+static struct starpu_fifo_taskq_s *fifo;
 
 static pthread_cond_t sched_cond;
 static pthread_mutex_t sched_mutex;

+ 1 - 1
src/core/policies/random_policy.c

@@ -17,7 +17,7 @@
 #include <core/policies/random_policy.h>
 
 static unsigned nworkers;
-static struct starpu_fifo_jobq_s *queue_array[STARPU_NMAXWORKERS];
+static struct starpu_fifo_taskq_s *queue_array[STARPU_NMAXWORKERS];
 
 static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
 static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];