Kaynağa Gözat

The push/pop methods are now directly in the starpu_sched_policy_s structure
instead of being defined per queue.

Cédric Augonnet 14 yıl önce
ebeveyn
işleme
bfb112613e

+ 0 - 15
src/core/mechanisms/queues.h

@@ -22,25 +22,10 @@
 #include <core/jobs.h>
 #include <core/policies/sched_policy.h>
 
-enum starpu_perf_archtype;
-
 struct starpu_jobq_s {
 	/* a pointer to some queue structure */
 	void *queue; 
 
-	/* some methods to manipulate the previous queue */
-	int (*push_task)(struct starpu_jobq_s *, starpu_job_t);
-	int (*push_prio_task)(struct starpu_jobq_s *, starpu_job_t);
-	struct starpu_job_s* (*pop_task)(struct starpu_jobq_s *);
-
-	/* returns the number of tasks that were retrieved 
- 	 * the function is reponsible for allocating the output but the driver
- 	 * has to free it 
- 	 *
- 	 * NB : this function is non blocking
- 	 * */
-	struct starpu_job_list_s *(*pop_every_task)(struct starpu_jobq_s *, uint32_t);
-
 	/* for performance analysis purpose */
 	double total_computation_time;
 	double total_communication_time;

+ 5 - 6
src/core/policies/deque_modeling_policy.c

@@ -160,11 +160,6 @@ static struct starpu_jobq_s *init_dm_fifo(void)
 
 	q = _starpu_create_fifo();
 
-	q->push_task = dm_push_task; 
-	q->push_prio_task = dm_push_prio_task; 
-	q->pop_task = dm_pop_task;
-	q->pop_every_task = dm_pop_every_task;
-
 	queue_array[nworkers++] = q;
 
 	return q;
@@ -201,7 +196,11 @@ static struct starpu_jobq_s *get_local_queue_dm(struct starpu_sched_policy_s *po
 struct starpu_sched_policy_s _starpu_sched_dm_policy = {
 	.init_sched = initialize_dm_policy,
 	.deinit_sched = deinitialize_dm_policy,
-	.starpu_get_local_queue = get_local_queue_dm,
+	.get_local_queue = get_local_queue_dm,
+	.push_task = dm_push_task, 
+	.push_prio_task = dm_push_prio_task,
+	.pop_task = dm_pop_task,
+	.pop_every_task = dm_pop_every_task,
 	.policy_name = "dm",
 	.policy_description = "performance model"
 };

+ 4 - 5
src/core/policies/deque_modeling_policy_data_aware.c

@@ -197,10 +197,6 @@ static struct starpu_jobq_s *init_dmda_fifo(void)
 
 	q = _starpu_create_fifo();
 
-	q->push_task = dmda_push_task; 
-	q->push_prio_task = dmda_push_prio_task; 
-	q->pop_task = dmda_pop_task;
-
 	queue_array[nworkers++] = q;
 
 	return q;
@@ -247,7 +243,10 @@ static struct starpu_jobq_s *get_local_queue_dmda(struct starpu_sched_policy_s *
 struct starpu_sched_policy_s _starpu_sched_dmda_policy = {
 	.init_sched = initialize_dmda_policy,
 	.deinit_sched = deinitialize_dmda_policy,
-	.starpu_get_local_queue = get_local_queue_dmda,
+	.get_local_queue = get_local_queue_dmda,
+	.push_task = dmda_push_task, 
+	.push_prio_task = dmda_push_prio_task, 
+	.pop_task = dmda_pop_task,
 	.policy_name = "dmda",
 	.policy_description = "data-aware performance model"
 };

+ 5 - 6
src/core/policies/eager_central_policy.c

@@ -31,11 +31,6 @@ static void init_central_queue_design(void)
 
 	_starpu_init_fifo_queues_mechanisms();
 
-	jobq->push_task = _starpu_fifo_push_task;
-	jobq->push_prio_task = _starpu_fifo_push_prio_task;
-	jobq->pop_task = _starpu_fifo_pop_task;
-
-	jobq->pop_every_task = _starpu_fifo_pop_every_task;
 }
 
 static void deinit_central_queue_design(void)
@@ -75,7 +70,11 @@ static struct starpu_jobq_s *get_local_queue_eager(struct starpu_sched_policy_s
 struct starpu_sched_policy_s _starpu_sched_eager_policy = {
 	.init_sched = initialize_eager_center_policy,
 	.deinit_sched = deinitialize_eager_center_policy,
-	.starpu_get_local_queue = get_local_queue_eager,
+	.get_local_queue = get_local_queue_eager,
+	.push_task = _starpu_fifo_push_task,
+	.push_prio_task = _starpu_fifo_push_prio_task,
+	.pop_task = _starpu_fifo_pop_task,
+	.pop_every_task = _starpu_fifo_pop_every_task,
 	.policy_name = "eager",
 	.policy_description = "greedy policy"
 };

+ 5 - 6
src/core/policies/eager_central_priority_policy.c

@@ -25,11 +25,6 @@ static void init_priority_queue_design(void)
 	jobq = _starpu_create_priority_jobq();
 
 	_starpu_init_priority_queues_mechanisms();
-
-	/* we always use priorities in that policy */
-	jobq->push_task = _starpu_priority_push_task;
-	jobq->push_prio_task = _starpu_priority_push_task;
-	jobq->pop_task = _starpu_priority_pop_task;
 }
 
 static void deinit_priority_queue_design(void)
@@ -67,7 +62,11 @@ static struct starpu_jobq_s *get_local_queue_eager_priority(struct starpu_sched_
 struct starpu_sched_policy_s _starpu_sched_prio_policy = {
 	.init_sched = initialize_eager_center_priority_policy,
 	.deinit_sched = deinitialize_eager_center_priority_policy,
-	.starpu_get_local_queue = get_local_queue_eager_priority,
+	.get_local_queue = get_local_queue_eager_priority,
+	/* we always use priorities in that policy */
+	.push_task = _starpu_priority_push_task,
+	.push_prio_task = _starpu_priority_push_task,
+	.pop_task = _starpu_priority_pop_task,
 	.policy_name = "prio",
 	.policy_description = "eager (with priorities)"
 };

+ 4 - 6
src/core/policies/no_prio_policy.c

@@ -30,11 +30,6 @@ static void init_no_prio_design(void)
 	jobq = _starpu_create_fifo();
 
 	_starpu_init_fifo_queues_mechanisms();
-
-	jobq->push_task = _starpu_fifo_push_task;
-	/* no priority in that policy, let's be stupid here */
-	jobq->push_prio_task = _starpu_fifo_push_task;
-	jobq->pop_task = _starpu_fifo_pop_task;
 }
 
 static struct starpu_jobq_s *func_init_central_queue(void)
@@ -59,7 +54,10 @@ static struct starpu_jobq_s *get_local_queue_no_prio(struct starpu_sched_policy_
 struct starpu_sched_policy_s _starpu_sched_no_prio_policy = {
 	.init_sched = initialize_no_prio_policy,
 	.deinit_sched = NULL,
-	.starpu_get_local_queue = get_local_queue_no_prio,
+	.get_local_queue = get_local_queue_no_prio,
+	.push_task = _starpu_fifo_push_task,
+	.push_prio_task = _starpu_fifo_push_task,
+	.pop_task = _starpu_fifo_pop_task,
 	.policy_name = "no-prio",
 	.policy_description = "eager without priority"
 };

+ 4 - 5
src/core/policies/random_policy.c

@@ -84,10 +84,6 @@ static struct starpu_jobq_s *init_random_fifo(void)
 
 	q = _starpu_create_fifo();
 
-	q->push_task = random_push_task; 
-	q->push_prio_task = random_push_prio_task; 
-	q->pop_task = random_pop_task;
-
 	queue_array[nworkers++] = q;
 
 	return q;
@@ -120,7 +116,10 @@ static struct starpu_jobq_s *get_local_queue_random(struct starpu_sched_policy_s
 struct starpu_sched_policy_s _starpu_sched_random_policy = {
 	.init_sched = initialize_random_policy,
 	.deinit_sched = NULL,
-	.starpu_get_local_queue = get_local_queue_random,
+	.get_local_queue = get_local_queue_random,
+	.push_task = random_push_task,
+	.push_prio_task = random_push_prio_task,
+	.pop_task = random_pop_task,
 	.policy_name = "random",
 	.policy_description = "weighted random"
 };

+ 15 - 11
src/core/policies/sched_policy.c

@@ -83,7 +83,11 @@ static void load_sched_policy(struct starpu_sched_policy_s *sched_policy)
 
 	policy.init_sched = sched_policy->init_sched;
 	policy.deinit_sched = sched_policy->deinit_sched;
-	policy.starpu_get_local_queue = sched_policy->starpu_get_local_queue;
+	policy.get_local_queue = sched_policy->get_local_queue;
+	policy.push_task = sched_policy->push_task;
+	policy.push_prio_task = sched_policy->push_prio_task;
+	policy.pop_task = sched_policy->pop_task;
+	policy.pop_every_task = sched_policy->pop_every_task;
 
 	PTHREAD_COND_INIT(&policy.sched_activity_cond, NULL);
 	PTHREAD_MUTEX_INIT(&policy.sched_activity_mutex, NULL);
@@ -204,7 +208,7 @@ void _starpu_deinit_sched_policy(struct starpu_machine_config_s *config)
 /* the generic interface that call the proper underlying implementation */
 int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 {
-	struct starpu_jobq_s *queue = policy.starpu_get_local_queue(&policy);
+	struct starpu_jobq_s *queue = policy.get_local_queue(&policy);
 
 	struct starpu_task *task = j->task;
 
@@ -233,33 +237,33 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 		return _starpu_push_local_task(worker, j);
 	}
 	else {
-		STARPU_ASSERT(queue->push_task);
+		STARPU_ASSERT(policy.push_task);
 
-		return queue->push_task(queue, j);
+		return policy.push_task(queue, j);
 	}
 }
 
 struct starpu_job_s * _starpu_pop_task_from_queue(struct starpu_jobq_s *queue)
 {
-	STARPU_ASSERT(queue->pop_task);
+	STARPU_ASSERT(policy.pop_task);
 
-	struct starpu_job_s *j = queue->pop_task(queue);
+	struct starpu_job_s *j = policy.pop_task(queue);
 
 	return j;
 }
 
 struct starpu_job_s * _starpu_pop_task(void)
 {
-	struct starpu_jobq_s *queue = policy.starpu_get_local_queue(&policy);
+	struct starpu_jobq_s *queue = policy.get_local_queue(&policy);
 
 	return _starpu_pop_task_from_queue(queue);
 }
 
 struct starpu_job_list_s * _starpu_pop_every_task_from_queue(struct starpu_jobq_s *queue, uint32_t where)
 {
-	STARPU_ASSERT(queue->pop_every_task);
+	STARPU_ASSERT(policy.pop_every_task);
 
-	struct starpu_job_list_s *list = queue->pop_every_task(queue, where);
+	struct starpu_job_list_s *list = policy.pop_every_task(queue, where);
 
 	return list;
 }
@@ -267,14 +271,14 @@ struct starpu_job_list_s * _starpu_pop_every_task_from_queue(struct starpu_jobq_
 /* pop every task that can be executed on "where" (eg. GORDON) */
 struct starpu_job_list_s *_starpu_pop_every_task(uint32_t where)
 {
-	struct starpu_jobq_s *queue = policy.starpu_get_local_queue(&policy);
+	struct starpu_jobq_s *queue = policy.get_local_queue(&policy);
 
 	return _starpu_pop_every_task_from_queue(queue, where);
 }
 
 void _starpu_wait_on_sched_event(void)
 {
-	struct starpu_jobq_s *q = policy.starpu_get_local_queue(&policy);
+	struct starpu_jobq_s *q = policy.get_local_queue(&policy);
 
 	PTHREAD_MUTEX_LOCK(&q->activity_mutex);
 

+ 14 - 1
src/core/policies/sched_policy.h

@@ -35,7 +35,20 @@ struct starpu_sched_policy_s {
 	void (*deinit_sched)(struct starpu_machine_config_s *, struct starpu_sched_policy_s *);
 
 	/* anyone can request which queue it is associated to */
-	struct starpu_jobq_s *(*starpu_get_local_queue)(struct starpu_sched_policy_s *);
+	struct starpu_jobq_s *(*get_local_queue)(struct starpu_sched_policy_s *);
+
+	/* some methods to manipulate the previous queue */
+	int (*push_task)(struct starpu_jobq_s *, starpu_job_t);
+	int (*push_prio_task)(struct starpu_jobq_s *, starpu_job_t);
+	struct starpu_job_s* (*pop_task)(struct starpu_jobq_s *);
+
+	/* returns the number of tasks that were retrieved 
+ 	 * the function is reponsible for allocating the output but the driver
+ 	 * has to free it 
+ 	 *
+ 	 * NB : this function is non blocking
+ 	 * */
+	struct starpu_job_list_s *(*pop_every_task)(struct starpu_jobq_s *, uint32_t);
 
 	/* name of the policy (optionnal) */
 	const char *policy_name;

+ 4 - 5
src/core/policies/work_stealing_policy.c

@@ -164,10 +164,6 @@ static struct starpu_jobq_s *init_ws_deque(void)
 
 	q = _starpu_create_deque();
 
-	q->push_task = _starpu_deque_push_task; 
-	q->push_prio_task = _starpu_deque_push_prio_task; 
-	q->pop_task = ws_pop_task;
-
 	queue_array[nworkers++] = q;
 
 	return q;
@@ -201,7 +197,10 @@ static struct starpu_jobq_s *get_local_queue_ws(struct starpu_sched_policy_s *po
 struct starpu_sched_policy_s _starpu_sched_ws_policy = {
 	.init_sched = initialize_ws_policy,
 	.deinit_sched = NULL,
-	.starpu_get_local_queue = get_local_queue_ws,
+	.get_local_queue = get_local_queue_ws,
+	.push_task = _starpu_deque_push_task,
+	.push_prio_task = _starpu_deque_push_prio_task,
+	.pop_task = ws_pop_task,
 	.policy_name = "ws",
 	.policy_description = "work stealing"
 };

+ 1 - 1
src/drivers/cpu/driver_cpu.c

@@ -131,7 +131,7 @@ void *_starpu_cpu_worker(void *arg)
 	int res;
 
 	struct starpu_sched_policy_s *policy = _starpu_get_sched_policy();
-	struct starpu_jobq_s *queue = policy->starpu_get_local_queue(policy);
+	struct starpu_jobq_s *queue = policy->get_local_queue(policy);
 
 	while (_starpu_machine_is_running())
 	{

+ 1 - 1
src/drivers/cuda/driver_cuda.c

@@ -218,7 +218,7 @@ void *_starpu_cuda_worker(void *arg)
 	int res;
 
 	struct starpu_sched_policy_s *policy = _starpu_get_sched_policy();
-	struct starpu_jobq_s *queue = policy->starpu_get_local_queue(policy);
+	struct starpu_jobq_s *queue = policy->get_local_queue(policy);
 	
 	while (_starpu_machine_is_running())
 	{

+ 1 - 1
src/drivers/opencl/driver_opencl.c

@@ -320,7 +320,7 @@ void *_starpu_opencl_worker(void *arg)
 	int res;
 
 	struct starpu_sched_policy_s *policy = _starpu_get_sched_policy();
-	struct starpu_jobq_s *queue = policy->starpu_get_local_queue(policy);
+	struct starpu_jobq_s *queue = policy->get_local_queue(policy);
 
 	while (_starpu_machine_is_running())
 	{