|
@@ -86,6 +86,8 @@ static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo
|
|
|
fifo_queue->ntasks--;
|
|
|
|
|
|
task = starpu_task_list_back(&fifo_queue->taskq);
|
|
|
+ if (STARPU_UNLIKELY(!task))
|
|
|
+ return NULL;
|
|
|
|
|
|
int first_task_priority = task->priority;
|
|
|
|
|
@@ -659,6 +661,59 @@ static void dmda_pre_exec_hook(struct starpu_task *task)
|
|
|
_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[workerid]);
|
|
|
}
|
|
|
|
|
|
+static void dmda_push_task_notify(struct starpu_task *task, int workerid)
|
|
|
+{
|
|
|
+ struct _starpu_fifo_taskq *fifo = queue_array[workerid];
|
|
|
+
|
|
|
+ /* Compute the expected penality */
|
|
|
+ enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
|
|
|
+ unsigned memory_node = starpu_worker_get_memory_node(workerid);
|
|
|
+
|
|
|
+ double predicted = starpu_task_expected_length(task, perf_arch,
|
|
|
+ _starpu_get_job_associated_to_task(task)->nimpl);
|
|
|
+
|
|
|
+ double predicted_transfer = starpu_task_expected_data_transfer_time(memory_node, task);
|
|
|
+
|
|
|
+ /* Update the predictions */
|
|
|
+ _STARPU_PTHREAD_MUTEX_LOCK(&sched_mutex[workerid]);
|
|
|
+
|
|
|
+ /* Sometimes workers didn't take the tasks as early as we expected */
|
|
|
+ fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
|
|
|
+ fifo->exp_end = fifo->exp_start + fifo->exp_len;
|
|
|
+
|
|
|
+ /* If there is no prediction available, we consider the task has a null length */
|
|
|
+ if (!isnan(predicted))
|
|
|
+ {
|
|
|
+ task->predicted = predicted;
|
|
|
+ fifo->exp_end += predicted;
|
|
|
+ fifo->exp_len += predicted;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* If there is no prediction available, we consider the task has a null length */
|
|
|
+ if (!isnan(predicted_transfer))
|
|
|
+ {
|
|
|
+ if (starpu_timing_now() + predicted_transfer < fifo->exp_end)
|
|
|
+ {
|
|
|
+ /* We may hope that the transfer will be finished by
|
|
|
+ * the start of the task. */
|
|
|
+ predicted_transfer = 0;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ /* The transfer will not be finished by then, take the
|
|
|
+ * remainder into account */
|
|
|
+ predicted_transfer = (starpu_timing_now() + predicted_transfer) - fifo->exp_end;
|
|
|
+ }
|
|
|
+ task->predicted_transfer = predicted_transfer;
|
|
|
+ fifo->exp_end += predicted_transfer;
|
|
|
+ fifo->exp_len += predicted_transfer;
|
|
|
+ }
|
|
|
+
|
|
|
+ fifo->ntasks++;
|
|
|
+
|
|
|
+ _STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[workerid]);
|
|
|
+}
|
|
|
+
|
|
|
/* TODO: use post_exec_hook to fix the expected start */
|
|
|
struct starpu_sched_policy _starpu_sched_dm_policy =
|
|
|
{
|
|
@@ -678,6 +733,7 @@ struct starpu_sched_policy _starpu_sched_dmda_policy =
|
|
|
.init_sched = initialize_dmda_policy,
|
|
|
.deinit_sched = deinitialize_dmda_policy,
|
|
|
.push_task = dmda_push_task,
|
|
|
+ .push_task_notify = dmda_push_task_notify,
|
|
|
.pop_task = dmda_pop_task,
|
|
|
.pre_exec_hook = dmda_pre_exec_hook,
|
|
|
.post_exec_hook = NULL,
|
|
@@ -691,6 +747,7 @@ struct starpu_sched_policy _starpu_sched_dmda_sorted_policy =
|
|
|
.init_sched = initialize_dmda_sorted_policy,
|
|
|
.deinit_sched = deinitialize_dmda_policy,
|
|
|
.push_task = dmda_push_sorted_task,
|
|
|
+ .push_task_notify = dmda_push_task_notify,
|
|
|
.pop_task = dmda_pop_ready_task,
|
|
|
.pre_exec_hook = dmda_pre_exec_hook,
|
|
|
.post_exec_hook = NULL,
|
|
@@ -704,6 +761,7 @@ struct starpu_sched_policy _starpu_sched_dmda_ready_policy =
|
|
|
.init_sched = initialize_dmda_policy,
|
|
|
.deinit_sched = deinitialize_dmda_policy,
|
|
|
.push_task = dmda_push_task,
|
|
|
+ .push_task_notify = dmda_push_task_notify,
|
|
|
.pop_task = dmda_pop_ready_task,
|
|
|
.pre_exec_hook = dmda_pre_exec_hook,
|
|
|
.post_exec_hook = NULL,
|