Przeglądaj źródła

The pop_task method of the scheduler now returns a task instead of a job. This
is an intermediate step before having the scheduling policies to only
manipulate tasks.

Cédric Augonnet 15 lat temu
rodzic
commit
d185fc2875

+ 2 - 2
src/core/mechanisms/deque_queues.c

@@ -54,7 +54,7 @@ unsigned _starpu_get_deque_nprocessed(struct starpu_deque_jobq_s *deque_queue)
 	return deque_queue->nprocessed;
 }
 
-starpu_job_t _starpu_deque_pop_task(struct starpu_deque_jobq_s *deque_queue)
+struct starpu_task *_starpu_deque_pop_task(struct starpu_deque_jobq_s *deque_queue)
 {
 	starpu_job_t j = NULL;
 
@@ -74,7 +74,7 @@ starpu_job_t _starpu_deque_pop_task(struct starpu_deque_jobq_s *deque_queue)
 		STARPU_TRACE_JOB_POP(j, 0);
 	}
 	
-	return j;
+	return j->task;
 }
 
 struct starpu_job_list_s *_starpu_deque_pop_every_task(struct starpu_deque_jobq_s *deque_queue, pthread_mutex_t *sched_mutex, uint32_t where)

+ 1 - 1
src/core/mechanisms/deque_queues.h

@@ -40,7 +40,7 @@ struct starpu_deque_jobq_s {
 struct starpu_deque_jobq_s *_starpu_create_deque(void);
 void _starpu_destroy_deque(struct starpu_deque_jobq_s *deque);
 
-starpu_job_t _starpu_deque_pop_task(struct starpu_deque_jobq_s *deque_queue);
+struct starpu_task *_starpu_deque_pop_task(struct starpu_deque_jobq_s *deque_queue);
 struct starpu_job_list_s *_starpu_deque_pop_every_task(struct starpu_deque_jobq_s *deque_queue, pthread_mutex_t *sched_mutex, uint32_t where);
 
 unsigned _starpu_get_deque_njobs(struct starpu_deque_jobq_s *deque_queue);

+ 2 - 2
src/core/mechanisms/fifo_queues.c

@@ -72,7 +72,7 @@ int _starpu_fifo_push_task(struct starpu_fifo_jobq_s *fifo_queue, pthread_mutex_
 	return 0;
 }
 
-starpu_job_t _starpu_fifo_pop_task(struct starpu_fifo_jobq_s *fifo_queue)
+struct starpu_task *_starpu_fifo_pop_task(struct starpu_fifo_jobq_s *fifo_queue)
 {
 	starpu_job_t j = NULL;
 
@@ -90,7 +90,7 @@ starpu_job_t _starpu_fifo_pop_task(struct starpu_fifo_jobq_s *fifo_queue)
 		STARPU_TRACE_JOB_POP(j, 0);
 	}
 	
-	return j;
+	return j->task;
 }
 
 /* pop every task that can be executed on the calling driver */

+ 1 - 1
src/core/mechanisms/fifo_queues.h

@@ -43,7 +43,7 @@ void _starpu_destroy_fifo(struct starpu_fifo_jobq_s *fifo);
 int _starpu_fifo_push_task(struct starpu_fifo_jobq_s *fifo, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task);
 int _starpu_fifo_push_prio_task(struct starpu_fifo_jobq_s *fifo, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task);
 
-starpu_job_t _starpu_fifo_pop_task(struct starpu_fifo_jobq_s *fifo);
+struct starpu_task *_starpu_fifo_pop_task(struct starpu_fifo_jobq_s *fifo);
 struct starpu_job_list_s * _starpu_fifo_pop_every_task(struct starpu_fifo_jobq_s *fifo, pthread_mutex_t *sched_mutex, uint32_t where);
 
 #endif // __FIFO_QUEUES_H__

+ 6 - 6
src/core/policies/deque_modeling_policy.c

@@ -24,24 +24,24 @@ static struct starpu_fifo_jobq_s *queue_array[STARPU_NMAXWORKERS];
 static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
 static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
 
-static starpu_job_t dm_pop_task(void)
+static struct starpu_task *dm_pop_task(void)
 {
-	struct starpu_job_s *j;
+	struct starpu_task *task;
 
 	int workerid = starpu_worker_get_id();
 
 	struct starpu_fifo_jobq_s *fifo = queue_array[workerid];
 
-	j = _starpu_fifo_pop_task(fifo);
-	if (j) {
-		double model = j->task->predicted;
+	task = _starpu_fifo_pop_task(fifo);
+	if (task) {
+		double model = task->predicted;
 	
 		fifo->exp_len -= model;
 		fifo->exp_start = _starpu_timing_now() + model;
 		fifo->exp_end = fifo->exp_start + fifo->exp_len;
 	}	
 
-	return j;
+	return task;
 }
 
 static struct starpu_job_list_s *dm_pop_every_task(uint32_t where)

+ 6 - 6
src/core/policies/deque_modeling_policy_data_aware.c

@@ -26,23 +26,23 @@ static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
 static double alpha = 1.0;
 static double beta = 1.0;
 
-static starpu_job_t dmda_pop_task(void)
+static struct starpu_task *dmda_pop_task(void)
 {
-	struct starpu_job_s *j;
+	struct starpu_task *task;
 
 	int workerid = starpu_worker_get_id();
 	struct starpu_fifo_jobq_s *fifo = queue_array[workerid];
 
-	j = _starpu_fifo_pop_task(fifo);
-	if (j) {
-		double model = j->task->predicted;
+	task = _starpu_fifo_pop_task(fifo);
+	if (task) {
+		double model = task->predicted;
 	
 		fifo->exp_len -= model;
 		fifo->exp_start = _starpu_timing_now() + model;
 		fifo->exp_end = fifo->exp_start + fifo->exp_len;
 	}	
 
-	return j;
+	return task;
 }
 
 static void update_data_requests(uint32_t memory_node, struct starpu_task *task)

+ 1 - 1
src/core/policies/eager_central_policy.c

@@ -65,7 +65,7 @@ static struct starpu_job_list_s *pop_every_task_eager_policy(uint32_t where)
 	return _starpu_fifo_pop_every_task(fifo, &sched_mutex, where);
 }
 
-static starpu_job_t pop_task_eager_policy(void)
+static struct starpu_task *pop_task_eager_policy(void)
 {
 	return _starpu_fifo_pop_task(fifo);
 }

+ 2 - 2
src/core/policies/eager_central_priority_policy.c

@@ -66,7 +66,7 @@ static int _starpu_priority_push_task(starpu_job_t j)
 	return 0;
 }
 
-static starpu_job_t _starpu_priority_pop_task(void)
+static struct starpu_task *_starpu_priority_pop_task(void)
 {
 	starpu_job_t j = NULL;
 
@@ -98,7 +98,7 @@ static starpu_job_t _starpu_priority_pop_task(void)
 
 	PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
 
-	return j;
+	return j->task;
 }
 
 struct starpu_sched_policy_s _starpu_sched_prio_policy = {

+ 1 - 1
src/core/policies/no_prio_policy.c

@@ -46,7 +46,7 @@ static int push_task_no_prio_policy(starpu_job_t task)
         return _starpu_fifo_push_task(fifo, &sched_mutex, &sched_cond, task);
 }
 
-static starpu_job_t pop_task_no_prio_policy(void)
+static struct starpu_task *pop_task_no_prio_policy(void)
 {
 	return _starpu_fifo_pop_task(fifo);
 }

+ 4 - 4
src/core/policies/random_policy.c

@@ -22,15 +22,15 @@ static struct starpu_fifo_jobq_s *queue_array[STARPU_NMAXWORKERS];
 static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
 static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
 
-static starpu_job_t random_pop_task(void)
+static struct starpu_task *random_pop_task(void)
 {
-	struct starpu_job_s *j;
+	struct starpu_task *task;
 
 	int workerid = starpu_worker_get_id();
 
-	j = _starpu_fifo_pop_task(queue_array[workerid]);
+	task = _starpu_fifo_pop_task(queue_array[workerid]);
 
-	return j;
+	return task;
 }
 
 static int _random_push_task(starpu_job_t task, unsigned prio)

+ 1 - 1
src/core/policies/sched_policy.c

@@ -232,7 +232,7 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 	}
 }
 
-struct starpu_job_s * _starpu_pop_task(void)
+struct starpu_task *_starpu_pop_task(void)
 {
 	return policy.pop_task();
 }

+ 2 - 2
src/core/policies/sched_policy.h

@@ -33,7 +33,7 @@ struct starpu_sched_policy_s {
 	/* some methods to manipulate the previous queue */
 	int (*push_task)(starpu_job_t);
 	int (*push_prio_task)(starpu_job_t);
-	struct starpu_job_s* (*pop_task)(void);
+	struct starpu_task *(*pop_task)(void);
 
 	/* returns the number of tasks that were retrieved 
  	 * the function is reponsible for allocating the output but the driver
@@ -59,7 +59,7 @@ void _starpu_deinit_sched_policy(struct starpu_machine_config_s *config);
 int _starpu_get_prefetch_flag(void);
 
 int _starpu_push_task(starpu_job_t task, unsigned job_is_already_locked);
-struct starpu_job_s *_starpu_pop_task(void);
+struct starpu_task *_starpu_pop_task(void);
 struct starpu_job_s *_starpu_pop_task_from_queue(struct starpu_jobq_s *queue);
 struct starpu_job_list_s *_starpu_pop_every_task(uint32_t where);
 struct starpu_job_list_s * _starpu_pop_every_task_from_queue(struct starpu_jobq_s *queue, uint32_t where);

+ 9 - 9
src/core/policies/work_stealing_policy.c

@@ -132,9 +132,9 @@ static struct starpu_deque_jobq_s *select_workerq(void)
 #endif
 
 #warning TODO rewrite ... this will not scale at all now
-static starpu_job_t ws_pop_task(void)
+static struct starpu_task *ws_pop_task(void)
 {
-	starpu_job_t j;
+	struct starpu_task *task;
 
 	int workerid = starpu_worker_get_id();
 
@@ -144,27 +144,27 @@ static starpu_job_t ws_pop_task(void)
 
 	PTHREAD_MUTEX_LOCK(&global_sched_mutex);
 
-	j = _starpu_deque_pop_task(q);
-	if (j) {
+	task = _starpu_deque_pop_task(q);
+	if (task) {
 		/* there was a local task */
 		performed_total++;
 		PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
-		return j;
+		return task;
 	}
 	
 	/* we need to steal someone's job */
 	struct starpu_deque_jobq_s *victimq;
 	victimq = select_victimq();
 
-	j = _starpu_deque_pop_task(victimq);
-	if (j) {
-		STARPU_TRACE_WORK_STEALING(q, j);
+	task = _starpu_deque_pop_task(victimq);
+	if (task) {
+		STARPU_TRACE_WORK_STEALING(q, victimq);
 		performed_total++;
 	}
 
 	PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
 
-	return j;
+	return task;
 }
 
 int ws_push_task(starpu_job_t task)

+ 4 - 1
src/drivers/cpu/driver_cpu.c

@@ -133,7 +133,10 @@ void *_starpu_cpu_worker(void *arg)
 
 		/* otherwise ask a task to the scheduler */
 		if (!j)
-			j = _starpu_pop_task();
+		{
+			struct starpu_task *task = _starpu_pop_task();
+			j = _starpu_get_job_associated_to_task(task);
+		}
 		
                 if (j == NULL) 
 		{

+ 5 - 2
src/drivers/cuda/driver_cuda.c

@@ -208,8 +208,11 @@ void *_starpu_cuda_worker(void *arg)
 
 		/* otherwise ask a task to the scheduler */
 		if (!j)
-			j = _starpu_pop_task();
-	       
+		{
+			struct starpu_task *task = _starpu_pop_task();
+			j = _starpu_get_job_associated_to_task(task);
+		}
+	
                 if (j == NULL) 
 		{
 			if (_starpu_worker_can_block(memnode))

+ 4 - 1
src/drivers/opencl/driver_opencl.c

@@ -331,7 +331,10 @@ void *_starpu_opencl_worker(void *arg)
 
 		/* otherwise ask a task to the scheduler */
 		if (!j)
-			j = _starpu_pop_task();
+		{
+			struct starpu_task *task = _starpu_pop_task();
+			j = _starpu_get_job_associated_to_task(task);
+		}
 		
                 if (j == NULL) 
 		{