浏览代码

The scheduling strategies now manipulate tasks instead of jobs. Existing
strategies still use jobs internally: we need to adapt the job list data
structures (in src/core/mechanisms/) to store tasks directly as well.

Cédric Augonnet 15 年之前
父节点
当前提交
cd6583e606

+ 5 - 0
include/starpu_task.h

@@ -137,6 +137,11 @@ struct starpu_task {
 	void *starpu_private;
 };
 
+struct starpu_task_list { 
+	struct starpu_task *task;
+	struct starpu_task_list *next;
+};
+
 /* It is possible to initialize statically allocated tasks with this value.
  * This is equivalent to initializing a starpu_task structure with the
  * starpu_task_init function. */

+ 35 - 23
src/core/mechanisms/fifo_queues.c

@@ -18,6 +18,7 @@
 #include <core/mechanisms/fifo_queues.h>
 #include <errno.h>
 #include <common/utils.h>
+#include <core/task.h>
 
 struct starpu_fifo_jobq_s *_starpu_create_fifo(void)
 {
@@ -42,12 +43,14 @@ void _starpu_destroy_fifo(struct starpu_fifo_jobq_s *fifo)
 	free(fifo);
 }
 
-int _starpu_fifo_push_prio_task(struct starpu_fifo_jobq_s *fifo_queue, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task)
+int _starpu_fifo_push_prio_task(struct starpu_fifo_jobq_s *fifo_queue, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task)
 {
+	starpu_job_t j = _starpu_get_job_associated_to_task(task);
+
 	PTHREAD_MUTEX_LOCK(sched_mutex);
 
 	STARPU_TRACE_JOB_PUSH(task, 0);
-	starpu_job_list_push_back(fifo_queue->jobq, task);
+	starpu_job_list_push_back(fifo_queue->jobq, j);
 	fifo_queue->njobs++;
 	fifo_queue->nprocessed++;
 
@@ -57,12 +60,14 @@ int _starpu_fifo_push_prio_task(struct starpu_fifo_jobq_s *fifo_queue, pthread_m
 	return 0;
 }
 
-int _starpu_fifo_push_task(struct starpu_fifo_jobq_s *fifo_queue, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task)
+int _starpu_fifo_push_task(struct starpu_fifo_jobq_s *fifo_queue, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task)
 {
+	starpu_job_t j = _starpu_get_job_associated_to_task(task);
+
 	PTHREAD_MUTEX_LOCK(sched_mutex);
 
 	STARPU_TRACE_JOB_PUSH(task, 0);
-	starpu_job_list_push_front(fifo_queue->jobq, task);
+	starpu_job_list_push_front(fifo_queue->jobq, j);
 	fifo_queue->njobs++;
 	fifo_queue->nprocessed++;
 
@@ -94,22 +99,20 @@ struct starpu_task *_starpu_fifo_pop_task(struct starpu_fifo_jobq_s *fifo_queue)
 }
 
 /* pop every task that can be executed on the calling driver */
-struct starpu_job_list_s * _starpu_fifo_pop_every_task(struct starpu_fifo_jobq_s *fifo_queue, pthread_mutex_t *sched_mutex, uint32_t where)
+struct starpu_task_list *_starpu_fifo_pop_every_task(struct starpu_fifo_jobq_s *fifo_queue, pthread_mutex_t *sched_mutex, uint32_t where)
 {
-	struct starpu_job_list_s *new_list, *old_list;
+	struct starpu_job_list_s *old_list;
 	unsigned size;
+
+	struct starpu_task_list *new_list = NULL;
+	struct starpu_task_list *new_list_tail = NULL;
 	
 	PTHREAD_MUTEX_LOCK(sched_mutex);
 
 	size = fifo_queue->njobs;
 
-	if (size == 0) {
-		new_list = NULL;
-	}
-	else {
+	if (size > 0) {
 		old_list = fifo_queue->jobq;
-		new_list = starpu_job_list_new();
-
 		unsigned new_list_size = 0;
 
 		starpu_job_itor_t i;
@@ -128,20 +131,29 @@ struct starpu_job_list_s * _starpu_fifo_pop_every_task(struct starpu_fifo_jobq_s
 				new_list_size++;
 				
 				starpu_job_list_erase(old_list, i);
-				starpu_job_list_push_back(new_list, i);
+
+				if (new_list)
+				{
+					struct starpu_task_list *link;
+
+					link = malloc(sizeof(struct starpu_task_list));
+					link->task = i->task;
+					link->next = NULL;
+
+					new_list_tail->next = link;
+					new_list_tail = link;
+					
+				}
+				else {
+					new_list = malloc(sizeof(struct starpu_task_list));
+					new_list->task = i->task;
+					new_list->next = NULL;
+					new_list_tail = new_list;
+				}
 			}
 		}
 
-		if (new_list_size == 0)
-		{
-			/* the new list is empty ... */
-			starpu_job_list_delete(new_list);
-			new_list = NULL;
-		}
-		else
-		{
-			fifo_queue->njobs -= new_list_size;
-		}
+		fifo_queue->njobs -= new_list_size;
 	}
 
 	PTHREAD_MUTEX_UNLOCK(sched_mutex);

+ 3 - 3
src/core/mechanisms/fifo_queues.h

@@ -40,10 +40,10 @@ struct starpu_fifo_jobq_s {
 struct starpu_fifo_jobq_s*_starpu_create_fifo(void);
 void _starpu_destroy_fifo(struct starpu_fifo_jobq_s *fifo);
 
-int _starpu_fifo_push_task(struct starpu_fifo_jobq_s *fifo, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task);
-int _starpu_fifo_push_prio_task(struct starpu_fifo_jobq_s *fifo, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task);
+int _starpu_fifo_push_task(struct starpu_fifo_jobq_s *fifo, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task);
+int _starpu_fifo_push_prio_task(struct starpu_fifo_jobq_s *fifo, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task);
 
 struct starpu_task *_starpu_fifo_pop_task(struct starpu_fifo_jobq_s *fifo);
-struct starpu_job_list_s * _starpu_fifo_pop_every_task(struct starpu_fifo_jobq_s *fifo, pthread_mutex_t *sched_mutex, uint32_t where);
+struct starpu_task_list *_starpu_fifo_pop_every_task(struct starpu_fifo_jobq_s *fifo, pthread_mutex_t *sched_mutex, uint32_t where);
 
 #endif // __FIFO_QUEUES_H__

+ 23 - 25
src/core/policies/deque_modeling_policy.c

@@ -44,34 +44,34 @@ static struct starpu_task *dm_pop_task(void)
 	return task;
 }
 
-static struct starpu_job_list_s *dm_pop_every_task(uint32_t where)
+static struct starpu_task_list *dm_pop_every_task(uint32_t where)
 {
-	struct starpu_job_list_s *new_list;
+	struct starpu_task_list *new_list;
 
 	int workerid = starpu_worker_get_id();
 
 	struct starpu_fifo_jobq_s *fifo = queue_array[workerid];
 
 	new_list = _starpu_fifo_pop_every_task(fifo, &sched_mutex[workerid], where);
-	if (new_list) {
-		starpu_job_itor_t i;
-		for(i = starpu_job_list_begin(new_list);
-			i != starpu_job_list_end(new_list);
-			i = starpu_job_list_next(i))
-		{
-			double model = i->task->predicted;
+
+	while (new_list)
+	{
+		double model = new_list->task->predicted;
+
+		fifo->exp_len -= model;
+		fifo->exp_start = _starpu_timing_now() + model;
+		fifo->exp_end = fifo->exp_start + fifo->exp_len;
 	
-			fifo->exp_len -= model;
-			fifo->exp_start = _starpu_timing_now() + model;
-			fifo->exp_end = fifo->exp_start + fifo->exp_len;
-		}
+		new_list = new_list->next;
 	}
 
 	return new_list;
 }
 
-static int _dm_push_task(starpu_job_t j, unsigned prio)
+static int _dm_push_task(struct starpu_task *task, unsigned prio)
 {
+	starpu_job_t j = _starpu_get_job_associated_to_task(task);
+
 	/* find the queue */
 	struct starpu_fifo_jobq_s *fifo;
 	unsigned worker;
@@ -80,8 +80,6 @@ static int _dm_push_task(starpu_job_t j, unsigned prio)
 	double best_exp_end = 0.0;
 	double model_best = 0.0;
 
-	struct starpu_task *task = j->task;
-
 	for (worker = 0; worker < nworkers; worker++)
 	{
 		double exp_end;
@@ -134,7 +132,7 @@ static int _dm_push_task(starpu_job_t j, unsigned prio)
 	fifo->exp_end += model_best;
 	fifo->exp_len += model_best;
 
-	j->task->predicted = model_best;
+	task->predicted = model_best;
 
 	unsigned memory_node = starpu_worker_get_memory_node(best);
 
@@ -142,23 +140,23 @@ static int _dm_push_task(starpu_job_t j, unsigned prio)
 		_starpu_prefetch_task_input_on_node(task, memory_node);
 
 	if (prio) {
-		return _starpu_fifo_push_prio_task(queue_array[best], &sched_mutex[best], &sched_cond[best], j);
+		return _starpu_fifo_push_prio_task(queue_array[best], &sched_mutex[best], &sched_cond[best], task);
 	} else {
-		return _starpu_fifo_push_task(queue_array[best], &sched_mutex[best], &sched_cond[best], j);
+		return _starpu_fifo_push_task(queue_array[best], &sched_mutex[best], &sched_cond[best], task);
 	}
 }
 
-static int dm_push_prio_task(starpu_job_t j)
+static int dm_push_prio_task(struct starpu_task *task)
 {
-	return _dm_push_task(j, 1);
+	return _dm_push_task(task, 1);
 }
 
-static int dm_push_task(starpu_job_t j)
+static int dm_push_task(struct starpu_task *task)
 {
-	if (j->task->priority == STARPU_MAX_PRIO)
-		return _dm_push_task(j, 1);
+	if (task->priority == STARPU_MAX_PRIO)
+		return _dm_push_task(task, 1);
 
-	return _dm_push_task(j, 0);
+	return _dm_push_task(task, 0);
 }
 
 static void initialize_dm_policy(struct starpu_machine_topology_s *topology, 

+ 12 - 12
src/core/policies/deque_modeling_policy_data_aware.c

@@ -58,8 +58,10 @@ static void update_data_requests(uint32_t memory_node, struct starpu_task *task)
 	}
 }
 
-static int _dmda_push_task(starpu_job_t j, unsigned prio)
+static int _dmda_push_task(struct starpu_task *task, unsigned prio)
 {
+	starpu_job_t j = _starpu_get_job_associated_to_task(task);
+
 	/* find the queue */
 	struct starpu_fifo_jobq_s *fifo;
 	unsigned worker;
@@ -79,8 +81,6 @@ static int _dmda_push_task(starpu_job_t j, unsigned prio)
 	double model_best = 0.0;
 	double penality_best = 0.0;
 
-	struct starpu_task *task = j->task;
-
 	for (worker = 0; worker < nworkers; worker++)
 	{
 		fifo = queue_array[worker];
@@ -166,7 +166,7 @@ static int _dmda_push_task(starpu_job_t j, unsigned prio)
 	fifo->exp_end += model_best;
 	fifo->exp_len += model_best;
 
-	j->task->predicted = model_best;
+	task->predicted = model_best;
 
 	unsigned memory_node = starpu_worker_get_memory_node(best);
 
@@ -176,23 +176,23 @@ static int _dmda_push_task(starpu_job_t j, unsigned prio)
 		_starpu_prefetch_task_input_on_node(task, memory_node);
 
 	if (prio) {
-		return _starpu_fifo_push_prio_task(queue_array[best], &sched_mutex[best], &sched_cond[best], j);
+		return _starpu_fifo_push_prio_task(queue_array[best], &sched_mutex[best], &sched_cond[best], task);
 	} else {
-		return _starpu_fifo_push_task(queue_array[best], &sched_mutex[best], &sched_cond[best], j);
+		return _starpu_fifo_push_task(queue_array[best], &sched_mutex[best], &sched_cond[best], task);
 	}
 }
 
-static int dmda_push_prio_task(starpu_job_t j)
+static int dmda_push_prio_task(struct starpu_task *task)
 {
-	return _dmda_push_task(j, 1);
+	return _dmda_push_task(task, 1);
 }
 
-static int dmda_push_task(starpu_job_t j)
+static int dmda_push_task(struct starpu_task *task)
 {
-	if (j->task->priority == STARPU_MAX_PRIO)
-		return _dmda_push_task(j, 1);
+	if (task->priority == STARPU_MAX_PRIO)
+		return _dmda_push_task(task, 1);
 
-	return _dmda_push_task(j, 0);
+	return _dmda_push_task(task, 0);
 }
 
 static void initialize_dmda_policy(struct starpu_machine_topology_s *topology, 

+ 3 - 3
src/core/policies/eager_central_policy.c

@@ -50,17 +50,17 @@ static void deinitialize_eager_center_policy(__attribute__ ((unused)) struct sta
 	_starpu_destroy_fifo(fifo);
 }
 
-static int push_task_eager_policy(starpu_job_t task)
+static int push_task_eager_policy(struct starpu_task *task)
 {
 	return _starpu_fifo_push_task(fifo, &sched_mutex, &sched_cond, task);
 }
 
-static int push_prio_task_eager_policy(starpu_job_t task)
+static int push_prio_task_eager_policy(struct starpu_task *task)
 {
 	return _starpu_fifo_push_prio_task(fifo, &sched_mutex, &sched_cond, task);
 }
 
-static struct starpu_job_list_s *pop_every_task_eager_policy(uint32_t where)
+static struct starpu_task_list *pop_every_task_eager_policy(uint32_t where)
 {
 	return _starpu_fifo_pop_every_task(fifo, &sched_mutex, where);
 }

+ 5 - 3
src/core/policies/eager_central_priority_policy.c

@@ -47,14 +47,16 @@ static void deinitialize_eager_center_priority_policy(struct starpu_machine_topo
 	_starpu_destroy_priority_jobq(jobq);
 }
 
-static int _starpu_priority_push_task(starpu_job_t j)
+static int _starpu_priority_push_task(struct starpu_task *task)
 {
+	starpu_job_t j = _starpu_get_job_associated_to_task(task);
+
 	/* wake people waiting for a task */
 	PTHREAD_MUTEX_LOCK(&global_sched_mutex);
 
-	STARPU_TRACE_JOB_PUSH(j, 1);
+	STARPU_TRACE_JOB_PUSH(task, 1);
 	
-	unsigned priolevel = j->task->priority - STARPU_MIN_PRIO;
+	unsigned priolevel = task->priority - STARPU_MIN_PRIO;
 
 	starpu_job_list_push_front(jobq->jobq[priolevel], j);
 	jobq->njobs[priolevel]++;

+ 1 - 1
src/core/policies/no_prio_policy.c

@@ -41,7 +41,7 @@ static void initialize_no_prio_policy(struct starpu_machine_topology_s *topology
 		starpu_worker_set_sched_condition(workerid, &sched_cond, &sched_mutex);
 }
 
-static int push_task_no_prio_policy(starpu_job_t task)
+static int push_task_no_prio_policy(struct starpu_task *task)
 {
         return _starpu_fifo_push_task(fifo, &sched_mutex, &sched_cond, task);
 }

+ 3 - 3
src/core/policies/random_policy.c

@@ -33,7 +33,7 @@ static struct starpu_task *random_pop_task(void)
 	return task;
 }
 
-static int _random_push_task(starpu_job_t task, unsigned prio)
+static int _random_push_task(struct starpu_task *task, unsigned prio)
 {
 	/* find the queue */
 	unsigned worker;
@@ -72,12 +72,12 @@ static int _random_push_task(starpu_job_t task, unsigned prio)
 	}
 }
 
-static int random_push_prio_task(starpu_job_t task)
+static int random_push_prio_task(struct starpu_task *task)
 {
 	return _random_push_task(task, 1);
 }
 
-static int random_push_task(starpu_job_t task)
+static int random_push_task(struct starpu_task *task)
 {
 	return _random_push_task(task, 0);
 }

+ 3 - 3
src/core/policies/sched_policy.c

@@ -228,7 +228,7 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 	else {
 		STARPU_ASSERT(policy.push_task);
 
-		return policy.push_task(j);
+		return policy.push_task(task);
 	}
 }
 
@@ -238,11 +238,11 @@ struct starpu_task *_starpu_pop_task(void)
 }
 
 /* pop every task that can be executed on "where" (eg. GORDON) */
-struct starpu_job_list_s *_starpu_pop_every_task(uint32_t where)
+struct starpu_task_list *_starpu_pop_every_task(uint32_t where)
 {
 	STARPU_ASSERT(policy.pop_every_task);
 
-	struct starpu_job_list_s *list = policy.pop_every_task(where);
+	struct starpu_task_list *list = policy.pop_every_task(where);
 
 	return list;
 }

+ 4 - 4
src/core/policies/sched_policy.h

@@ -31,8 +31,8 @@ struct starpu_sched_policy_s {
 	void (*deinit_sched)(struct starpu_machine_topology_s *, struct starpu_sched_policy_s *);
 
 	/* some methods to manipulate the previous queue */
-	int (*push_task)(starpu_job_t);
-	int (*push_prio_task)(starpu_job_t);
+	int (*push_task)(struct starpu_task *);
+	int (*push_prio_task)(struct starpu_task *);
 	struct starpu_task *(*pop_task)(void);
 
 	/* returns the number of tasks that were retrieved 
@@ -41,7 +41,7 @@ struct starpu_sched_policy_s {
  	 *
  	 * NB : this function is non blocking
  	 * */
-	struct starpu_job_list_s *(*pop_every_task)(uint32_t where);
+	struct starpu_task_list *(*pop_every_task)(uint32_t where);
 
 	/* name of the policy (optionnal) */
 	const char *policy_name;
@@ -59,7 +59,7 @@ int _starpu_get_prefetch_flag(void);
 
 int _starpu_push_task(starpu_job_t task, unsigned job_is_already_locked);
 struct starpu_task *_starpu_pop_task(void);
-struct starpu_job_list_s *_starpu_pop_every_task(uint32_t where);
+struct starpu_task_list *_starpu_pop_every_task(uint32_t where);
 
 void _starpu_wait_on_sched_event(void);
 

+ 4 - 2
src/core/policies/work_stealing_policy.c

@@ -167,8 +167,10 @@ static struct starpu_task *ws_pop_task(void)
 	return task;
 }
 
-int ws_push_task(starpu_job_t task)
+int ws_push_task(struct starpu_task *task)
 {
+	starpu_job_t j = _starpu_get_job_associated_to_task(task);
+
 	int workerid = starpu_worker_get_id();
 
         struct starpu_deque_jobq_s *deque_queue;
@@ -179,7 +181,7 @@ int ws_push_task(starpu_job_t task)
         //total_number_of_jobs++;
 
         STARPU_TRACE_JOB_PUSH(task, 0);
-        starpu_job_list_push_front(deque_queue->jobq, task);
+        starpu_job_list_push_front(deque_queue->jobq, j);
         deque_queue->njobs++;
         deque_queue->nprocessed++;
 

+ 0 - 5
src/datawizard/coherency.h

@@ -64,11 +64,6 @@ typedef struct starpu_local_data_state_t {
 
 struct starpu_data_requester_list_s;
 
-struct starpu_task_list {
-	struct starpu_task *task;
-	struct starpu_task_list *next;
-};
-
 struct starpu_jobid_list {
 	unsigned long id;
 	struct starpu_jobid_list *next;

+ 2 - 1
src/drivers/cpu/driver_cpu.c

@@ -135,7 +135,8 @@ void *_starpu_cpu_worker(void *arg)
 		if (!j)
 		{
 			struct starpu_task *task = _starpu_pop_task();
-			j = _starpu_get_job_associated_to_task(task);
+			if (task)
+				j = _starpu_get_job_associated_to_task(task);
 		}
 		
                 if (j == NULL) 

+ 2 - 1
src/drivers/cuda/driver_cuda.c

@@ -210,7 +210,8 @@ void *_starpu_cuda_worker(void *arg)
 		if (!j)
 		{
 			struct starpu_task *task = _starpu_pop_task();
-			j = _starpu_get_job_associated_to_task(task);
+			if (task)
+				j = _starpu_get_job_associated_to_task(task);
 		}
 	
                 if (j == NULL) 

+ 2 - 1
src/drivers/opencl/driver_opencl.c

@@ -333,7 +333,8 @@ void *_starpu_opencl_worker(void *arg)
 		if (!j)
 		{
 			struct starpu_task *task = _starpu_pop_task();
-			j = _starpu_get_job_associated_to_task(task);
+			if (task)
+				j = _starpu_get_job_associated_to_task(task);
 		}
 		
                 if (j == NULL)