浏览代码

push decision of "where" parameter of task pops down to scheduling policies, turn it into a worker id, and propagate to pop_task functions too

Samuel Thibault 14 年之前
父节点
当前提交
d023ed950e

+ 1 - 1
include/starpu_scheduler.h

@@ -79,7 +79,7 @@ struct starpu_sched_policy_s {
 	  * the means of the prev and next fields of the starpu_task
 	  * structure). The mutex associated to the worker is already taken
 	  * when this method is called. */
-	struct starpu_task *(*pop_every_task)(uint32_t where);
+	struct starpu_task *(*pop_every_task)(void);
 
 	/* This method is called every time a task has been executed. (optionnal) */
 	void (*post_exec_hook)(struct starpu_task *);

+ 2 - 3
src/core/sched_policy.c

@@ -245,12 +245,11 @@ struct starpu_task *_starpu_pop_task(void)
 	return policy.pop_task();
 }
 
-/* pop every task that can be executed on "where" (eg. GORDON) */
-struct starpu_task *_starpu_pop_every_task(uint32_t where)
+struct starpu_task *_starpu_pop_every_task(void)
 {
 	STARPU_ASSERT(policy.pop_every_task);
 
-	return policy.pop_every_task(where);
+	return policy.pop_every_task();
 }
 
 void _starpu_sched_post_exec_hook(struct starpu_task *task)

+ 3 - 1
src/core/sched_policy.h

@@ -31,8 +31,10 @@ void _starpu_deinit_sched_policy(struct starpu_machine_config_s *config);
 int _starpu_get_prefetch_flag(void);
 
 int _starpu_push_task(starpu_job_t task, unsigned job_is_already_locked);
+/* pop a task that can be executed on the worker */
 struct starpu_task *_starpu_pop_task(void);
-struct starpu_task *_starpu_pop_every_task(uint32_t where);
+/* pop every task that can be executed on the worker */
+struct starpu_task *_starpu_pop_every_task(void);
 void _starpu_sched_post_exec_hook(struct starpu_task *task);
 
 void _starpu_wait_on_sched_event(void);

+ 1 - 1
src/drivers/gordon/driver_gordon.c

@@ -338,7 +338,7 @@ void *gordon_worker_inject(struct starpu_worker_set_s *arg)
 			int ret = 0;
 #warning we should look into the local job list here !
 
-			struct starpu_job_list_s *list = _starpu_pop_every_task(STARPU_GORDON);
+			struct starpu_job_list_s *list = _starpu_pop_every_task();
 			/* XXX 0 is hardcoded */
 			if (list)
 			{

+ 3 - 3
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -148,7 +148,7 @@ static struct starpu_task *dmda_pop_task(void)
 	int workerid = starpu_worker_get_id();
 	struct starpu_fifo_taskq_s *fifo = queue_array[workerid];
 
-	task = _starpu_fifo_pop_task(fifo);
+	task = _starpu_fifo_pop_task(fifo, -1);
 	if (task) {
 		double model = task->predicted;
 	
@@ -173,7 +173,7 @@ static struct starpu_task *dmda_pop_task(void)
 
 
 
-static struct starpu_task *dmda_pop_every_task(uint32_t where)
+static struct starpu_task *dmda_pop_every_task(void)
 {
 	struct starpu_task *new_list;
 
@@ -181,7 +181,7 @@ static struct starpu_task *dmda_pop_every_task(uint32_t where)
 
 	struct starpu_fifo_taskq_s *fifo = queue_array[workerid];
 
-	new_list = _starpu_fifo_pop_every_task(fifo, &sched_mutex[workerid], where);
+	new_list = _starpu_fifo_pop_every_task(fifo, &sched_mutex[workerid], workerid);
 
 	while (new_list)
 	{

+ 4 - 2
src/sched_policies/deque_queues.c

@@ -56,7 +56,7 @@ unsigned _starpu_get_deque_nprocessed(struct starpu_deque_jobq_s *deque_queue)
 	return deque_queue->nprocessed;
 }
 
-struct starpu_task *_starpu_deque_pop_task(struct starpu_deque_jobq_s *deque_queue)
+struct starpu_task *_starpu_deque_pop_task(struct starpu_deque_jobq_s *deque_queue, int workerid)
 {
 	starpu_job_t j = NULL;
 
@@ -65,6 +65,7 @@ struct starpu_task *_starpu_deque_pop_task(struct starpu_deque_jobq_s *deque_que
 		return NULL;
 	}
 
+	/* TODO find a task that suits workerid */
 	if (deque_queue->njobs > 0) 
 	{
 		/* there is a task */
@@ -79,9 +80,10 @@ struct starpu_task *_starpu_deque_pop_task(struct starpu_deque_jobq_s *deque_que
 	return j->task;
 }
 
-struct starpu_job_list_s *_starpu_deque_pop_every_task(struct starpu_deque_jobq_s *deque_queue, pthread_mutex_t *sched_mutex, uint32_t where)
+struct starpu_job_list_s *_starpu_deque_pop_every_task(struct starpu_deque_jobq_s *deque_queue, pthread_mutex_t *sched_mutex, int workerid)
 {
 	struct starpu_job_list_s *new_list, *old_list;
+	uint32_t where = starpu_worker_get_type(workerid);
 
 	/* block until some task is available in that queue */
 	PTHREAD_MUTEX_LOCK(sched_mutex);

+ 2 - 2
src/sched_policies/deque_queues.h

@@ -42,8 +42,8 @@ struct starpu_deque_jobq_s {
 struct starpu_deque_jobq_s *_starpu_create_deque(void);
 void _starpu_destroy_deque(struct starpu_deque_jobq_s *deque);
 
-struct starpu_task *_starpu_deque_pop_task(struct starpu_deque_jobq_s *deque_queue);
-struct starpu_job_list_s *_starpu_deque_pop_every_task(struct starpu_deque_jobq_s *deque_queue, pthread_mutex_t *sched_mutex, uint32_t where);
+struct starpu_task *_starpu_deque_pop_task(struct starpu_deque_jobq_s *deque_queue, int workerid);
+struct starpu_job_list_s *_starpu_deque_pop_every_task(struct starpu_deque_jobq_s *deque_queue, pthread_mutex_t *sched_mutex, int workerid);
 
 unsigned _starpu_get_deque_njobs(struct starpu_deque_jobq_s *deque_queue);
 unsigned _starpu_get_deque_nprocessed(struct starpu_deque_jobq_s *deque_queue);

+ 3 - 3
src/sched_policies/eager_central_policy.c

@@ -61,14 +61,14 @@ static int push_prio_task_eager_policy(struct starpu_task *task)
 	return _starpu_fifo_push_prio_task(fifo, &sched_mutex, &sched_cond, task);
 }
 
-static struct starpu_task *pop_every_task_eager_policy(uint32_t where)
+static struct starpu_task *pop_every_task_eager_policy(void)
 {
-	return _starpu_fifo_pop_every_task(fifo, &sched_mutex, where);
+	return _starpu_fifo_pop_every_task(fifo, &sched_mutex, starpu_worker_get_id());
 }
 
 static struct starpu_task *pop_task_eager_policy(void)
 {
-	return _starpu_fifo_pop_task(fifo);
+	return _starpu_fifo_pop_task(fifo, starpu_worker_get_id());
 }
 
 struct starpu_sched_policy_s _starpu_sched_eager_policy = {

+ 4 - 2
src/sched_policies/fifo_queues.c

@@ -74,13 +74,14 @@ int _starpu_fifo_push_task(struct starpu_fifo_taskq_s *fifo_queue, pthread_mutex
 	return 0;
 }
 
-struct starpu_task *_starpu_fifo_pop_task(struct starpu_fifo_taskq_s *fifo_queue)
+struct starpu_task *_starpu_fifo_pop_task(struct starpu_fifo_taskq_s *fifo_queue, int workerid)
 {
 	struct starpu_task *task = NULL;
 
 	if (fifo_queue->ntasks == 0)
 		return NULL;
 
+	/* TODO: find a task that suits workerid */
 	if (fifo_queue->ntasks > 0) 
 	{
 		/* there is a task */
@@ -96,10 +97,11 @@ struct starpu_task *_starpu_fifo_pop_task(struct starpu_fifo_taskq_s *fifo_queue
 }
 
 /* pop every task that can be executed on the calling driver */
-struct starpu_task *_starpu_fifo_pop_every_task(struct starpu_fifo_taskq_s *fifo_queue, pthread_mutex_t *sched_mutex, uint32_t where)
+struct starpu_task *_starpu_fifo_pop_every_task(struct starpu_fifo_taskq_s *fifo_queue, pthread_mutex_t *sched_mutex, int workerid)
 {
 	struct starpu_task_list *old_list;
 	unsigned size;
+	uint32_t where = starpu_worker_get_type(workerid);
 
 	struct starpu_task *new_list = NULL;
 	struct starpu_task *new_list_tail = NULL;

+ 2 - 2
src/sched_policies/fifo_queues.h

@@ -44,7 +44,7 @@ void _starpu_destroy_fifo(struct starpu_fifo_taskq_s *fifo);
 int _starpu_fifo_push_task(struct starpu_fifo_taskq_s *fifo, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task);
 int _starpu_fifo_push_prio_task(struct starpu_fifo_taskq_s *fifo, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task);
 
-struct starpu_task *_starpu_fifo_pop_task(struct starpu_fifo_taskq_s *fifo);
-struct starpu_task *_starpu_fifo_pop_every_task(struct starpu_fifo_taskq_s *fifo, pthread_mutex_t *sched_mutex, uint32_t where);
+struct starpu_task *_starpu_fifo_pop_task(struct starpu_fifo_taskq_s *fifo, int workerid);
+struct starpu_task *_starpu_fifo_pop_every_task(struct starpu_fifo_taskq_s *fifo, pthread_mutex_t *sched_mutex, int workerid);
 
 #endif // __FIFO_QUEUES_H__

+ 1 - 1
src/sched_policies/random_policy.c

@@ -31,7 +31,7 @@ static struct starpu_task *random_pop_task(void)
 
 	int workerid = starpu_worker_get_id();
 
-	task = _starpu_fifo_pop_task(queue_array[workerid]);
+	task = _starpu_fifo_pop_task(queue_array[workerid], -1);
 
 	return task;
 }

+ 2 - 1
src/sched_policies/stack_queues.c

@@ -89,13 +89,14 @@ void _starpu_stack_push_task(struct starpu_stack_jobq_s *stack_queue, pthread_mu
 	PTHREAD_MUTEX_UNLOCK(sched_mutex);
 }
 
-starpu_job_t _starpu_stack_pop_task(struct starpu_stack_jobq_s *stack_queue, pthread_mutex_t *sched_mutex)
+starpu_job_t _starpu_stack_pop_task(struct starpu_stack_jobq_s *stack_queue, pthread_mutex_t *sched_mutex, int workerid)
 {
 	starpu_job_t j = NULL;
 
 	if (stack_queue->njobs == 0)
 		return NULL;
 
+	/* TODO find a task that suits workerid */
 	if (stack_queue->njobs > 0) 
 	{
 		/* there is a task */

+ 1 - 1
src/sched_policies/stack_queues.h

@@ -44,7 +44,7 @@ struct starpu_stack_jobq_s *_starpu_create_stack(void);
 void _starpu_stack_push_task(struct starpu_stack_jobq_s *stack, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task);
 void _starpu_stack_push_prio_task(struct starpu_stack_jobq_s *stack, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, starpu_job_t task);
 
-starpu_job_t _starpu_stack_pop_task(struct starpu_stack_jobq_s *stack, pthread_mutex_t *sched_mutex);
+starpu_job_t _starpu_stack_pop_task(struct starpu_stack_jobq_s *stack, pthread_mutex_t *sched_mutex, int workerid);
 
 void _starpu_init_stack_queues_mechanisms(void);
 

+ 2 - 2
src/sched_policies/work_stealing_policy.c

@@ -144,7 +144,7 @@ static struct starpu_task *ws_pop_task(void)
 
 	PTHREAD_MUTEX_LOCK(&global_sched_mutex);
 
-	task = _starpu_deque_pop_task(q);
+	task = _starpu_deque_pop_task(q, -1);
 	if (task) {
 		/* there was a local task */
 		performed_total++;
@@ -156,7 +156,7 @@ static struct starpu_task *ws_pop_task(void)
 	struct starpu_deque_jobq_s *victimq;
 	victimq = select_victimq();
 
-	task = _starpu_deque_pop_task(victimq);
+	task = _starpu_deque_pop_task(victimq, workerid);
 	if (task) {
 		STARPU_TRACE_WORK_STEALING(q, victimq);
 		performed_total++;