|
@@ -38,7 +38,8 @@
|
|
|
struct _starpu_graph_test_policy_data
|
|
|
{
|
|
|
struct _starpu_fifo_taskq *fifo; /* Bag of tasks which are ready before do_schedule is called */
|
|
|
- struct _starpu_prio_deque prio;
|
|
|
+ struct _starpu_prio_deque prio_cpu;
|
|
|
+ struct _starpu_prio_deque prio_gpu;
|
|
|
starpu_pthread_mutex_t policy_mutex;
|
|
|
struct starpu_bitmap *waiters;
|
|
|
unsigned computed;
|
|
@@ -50,7 +51,8 @@ static void initialize_graph_test_policy(unsigned sched_ctx_id)
|
|
|
|
|
|
/* there is only a single queue in that trivial design */
|
|
|
data->fifo = _starpu_create_fifo();
|
|
|
- _starpu_prio_deque_init(&data->prio );
|
|
|
+ _starpu_prio_deque_init(&data->prio_cpu);
|
|
|
+ _starpu_prio_deque_init(&data->prio_gpu);
|
|
|
data->waiters = starpu_bitmap_create();
|
|
|
data->computed = 0;
|
|
|
|
|
@@ -80,6 +82,64 @@ static void deinitialize_graph_test_policy(unsigned sched_ctx_id)
|
|
|
free(data);
|
|
|
}
|
|
|
|
|
|
+/* Push the given task on CPU or GPU prio list, using a dumb heuristic */
|
|
|
+static void push_task_to_prio(unsigned sched_ctx_id, struct _starpu_graph_test_policy_data *data, struct starpu_task *task)
|
|
|
+{
|
|
|
+ int cpu_can = 0, gpu_can = 0;
|
|
|
+ double cpu_speed;
|
|
|
+ double gpu_speed;
|
|
|
+
|
|
|
+ /* Compute how fast CPUs can compute it, and how fast GPUs can compute it */
|
|
|
+ unsigned worker;
|
|
|
+ struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
|
|
|
+ struct starpu_sched_ctx_iterator it;
|
|
|
+ workers->init_iterator(workers, &it);
|
|
|
+ while(workers->has_next(workers, &it))
|
|
|
+ {
|
|
|
+ worker = workers->get_next(workers, &it);
|
|
|
+ if (!starpu_worker_can_execute_task(worker, task, 0))
|
|
|
+ /* This worker can not execute this task, don't count it */
|
|
|
+ continue;
|
|
|
+
|
|
|
+ if (starpu_worker_get_type(worker) == STARPU_CPU_WORKER)
|
|
|
+ /* At least one CPU can run it */
|
|
|
+ cpu_can = 1;
|
|
|
+ else
|
|
|
+ /* At least one GPU can run it */
|
|
|
+ gpu_can = 1;
|
|
|
+
|
|
|
+ /* Get expected task duration for this worker */
|
|
|
+ struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(worker, sched_ctx_id);
|
|
|
+ double length = starpu_task_expected_length(task, perf_arch, 0);
|
|
|
+ double power;
|
|
|
+
|
|
|
+ if (isnan(length))
|
|
|
+ /* We don't have an estimation yet */
|
|
|
+ length = 0.;
|
|
|
+ if (length == 0.)
|
|
|
+ {
|
|
|
+ _STARPU_DISP("Warning: graph_test needs performance models for all tasks, including %s\n",
|
|
|
+ _starpu_job_get_task_name(_starpu_get_job_associated_to_task(task)));
|
|
|
+ power = 0.;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ power = 1./length;
|
|
|
+
|
|
|
+ /* Add the computation power to the CPU or GPU pool */
|
|
|
+ if (starpu_worker_get_type(worker) == STARPU_CPU_WORKER)
|
|
|
+ cpu_speed += power;
|
|
|
+ else
|
|
|
+ gpu_speed += power;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Decide to push on CPUs or GPUs depending on the overall computation power */
|
|
|
+ if (!gpu_can || (cpu_can && cpu_speed > gpu_speed))
|
|
|
+ _starpu_prio_deque_push_back_task(&data->prio_cpu, task);
|
|
|
+ else
|
|
|
+ _starpu_prio_deque_push_back_task(&data->prio_gpu, task);
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
static void set_priority(void *_data STARPU_ATTRIBUTE_UNUSED, struct _starpu_job *job)
|
|
|
{
|
|
|
job->task->priority = job->depth;
|
|
@@ -96,8 +156,7 @@ static void do_schedule_graph_test_policy(unsigned sched_ctx_id)
|
|
|
|
|
|
/* Now that we have priorities, move tasks from bag to priority queue */
|
|
|
while(!_starpu_fifo_empty(data->fifo)) {
|
|
|
- struct starpu_task *task = _starpu_fifo_pop_task(data->fifo, -1);
|
|
|
- _starpu_prio_deque_push_back_task(&data->prio, task);
|
|
|
+ push_task_to_prio(sched_ctx_id, data, _starpu_fifo_pop_task(data->fifo, -1));
|
|
|
}
|
|
|
|
|
|
/* And unleash the beast! */
|
|
@@ -144,7 +203,7 @@ static int push_task_graph_test_policy(struct starpu_task *task)
|
|
|
}
|
|
|
|
|
|
/* Priorities are computed, we can push to execution */
|
|
|
- _starpu_prio_deque_push_back_task(&data->prio,task);
|
|
|
+ push_task_to_prio(sched_ctx_id, data, task);
|
|
|
|
|
|
starpu_push_task_end(task);
|
|
|
|
|
@@ -205,12 +264,18 @@ static struct starpu_task *pop_task_graph_test_policy(unsigned sched_ctx_id)
|
|
|
struct starpu_task *chosen_task = NULL;
|
|
|
unsigned workerid = starpu_worker_get_id();
|
|
|
struct _starpu_graph_test_policy_data *data = (struct _starpu_graph_test_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
+ struct _starpu_prio_deque *prio;
|
|
|
+
|
|
|
+ if (starpu_worker_get_type(workerid) == STARPU_CPU_WORKER)
|
|
|
+ prio = &data->prio_cpu;
|
|
|
+ else
|
|
|
+ prio = &data->prio_gpu;
|
|
|
|
|
|
/* block until some event happens */
|
|
|
/* Here helgrind would shout that this is unprotected, this is just an
|
|
|
* integer access, and we hold the sched mutex, so we can not miss any
|
|
|
* wake up. */
|
|
|
- if (!STARPU_RUNNING_ON_VALGRIND && _starpu_prio_deque_is_empty(&data->prio))
|
|
|
+ if (!STARPU_RUNNING_ON_VALGRIND && _starpu_prio_deque_is_empty(prio))
|
|
|
return NULL;
|
|
|
|
|
|
#ifdef STARPU_NON_BLOCKING_DRIVERS
|
|
@@ -229,7 +294,7 @@ static struct starpu_task *pop_task_graph_test_policy(unsigned sched_ctx_id)
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
- chosen_task = _starpu_prio_deque_pop_task_for_worker(&data->prio, workerid);
|
|
|
+ chosen_task = _starpu_prio_deque_pop_task_for_worker(prio, workerid);
|
|
|
if (!chosen_task)
|
|
|
/* Tell pushers that we are waiting for tasks for us */
|
|
|
starpu_bitmap_set(data->waiters, workerid);
|