|
@@ -30,11 +30,13 @@ static struct _starpu_sched_node * _worker_nodes[STARPU_NMAXWORKERS];
|
|
|
* P--N--N
|
|
|
* | | |
|
|
|
* W W W
|
|
|
-
|
|
|
-
|
|
|
+ *
|
|
|
+ *
|
|
|
* this API is a little asymmetric : struct _starpu_task_grid are allocated by the caller and freed by the data structure
|
|
|
+ *
|
|
|
*/
|
|
|
|
|
|
+
|
|
|
struct _starpu_task_grid
|
|
|
{
|
|
|
/* this member may be NULL if a worker have poped it but its a
|
|
@@ -57,6 +59,11 @@ struct _starpu_task_grid
|
|
|
};
|
|
|
};
|
|
|
|
|
|
+
|
|
|
+/* list->exp_start, list->exp_len, list-exp_end and list->ntasks
|
|
|
+ * are updated by _starpu_sched_node_worker_push_task(node, task) and pre_exec_hook
|
|
|
+ */
|
|
|
+
|
|
|
struct _starpu_worker_task_list
|
|
|
{
|
|
|
double exp_start, exp_len, exp_end;
|
|
@@ -112,15 +119,44 @@ static inline void _starpu_worker_task_list_push(struct _starpu_worker_task_list
|
|
|
t->up = NULL;
|
|
|
l->last = t;
|
|
|
l->ntasks++;
|
|
|
- l->exp_start = starpu_timing_now();
|
|
|
- if(!isnan(t->task->predicted))
|
|
|
+
|
|
|
+ double predicted = t->task->predicted;
|
|
|
+ double predicted_transfer = t->task->predicted_transfer;
|
|
|
+
|
|
|
+ /* Sometimes workers didn't take the tasks as early as we expected */
|
|
|
+ l->exp_start = STARPU_MAX(l->exp_start, starpu_timing_now());
|
|
|
+ l->exp_end = l->exp_start + l->exp_len;
|
|
|
+
|
|
|
+ if ((starpu_timing_now() + predicted_transfer) < l->exp_end)
|
|
|
+ {
|
|
|
+ /* We may hope that the transfer will be finished by
|
|
|
+ * the start of the task. */
|
|
|
+ predicted_transfer = 0.0;
|
|
|
+ }
|
|
|
+ else
|
|
|
{
|
|
|
- l->exp_len += t->task->predicted;
|
|
|
- l->exp_end = l->exp_start + l->exp_end;
|
|
|
+ /* The transfer will not be finished by then, take the
|
|
|
+ * remainder into account */
|
|
|
+ predicted_transfer = (starpu_timing_now() + predicted_transfer) - l->exp_end;
|
|
|
}
|
|
|
+
|
|
|
+ if(!isnan(predicted_transfer))
|
|
|
+ {
|
|
|
+ l->exp_end += predicted_transfer;
|
|
|
+ l->exp_len += predicted_transfer;
|
|
|
+ }
|
|
|
+
|
|
|
+ if(!isnan(predicted))
|
|
|
+ {
|
|
|
+ l->exp_end += predicted;
|
|
|
+ l->exp_len += predicted;
|
|
|
+ }
|
|
|
+
|
|
|
+ t->task->predicted = predicted;
|
|
|
+ t->task->predicted_transfer = predicted_transfer;
|
|
|
}
|
|
|
|
|
|
-//recursively set left and right pointers to NULL
|
|
|
+/* recursively set left and right pointers to NULL */
|
|
|
static inline void _starpu_task_grid_unset_left_right_member(struct _starpu_task_grid * t)
|
|
|
{
|
|
|
STARPU_ASSERT(t->task == NULL);
|
|
@@ -182,6 +218,7 @@ static inline struct starpu_task * _starpu_worker_task_list_pop(struct _starpu_w
|
|
|
l->exp_len -= task->predicted;
|
|
|
l->exp_end = l->exp_start + l->exp_len;
|
|
|
}
|
|
|
+
|
|
|
return task;
|
|
|
}
|
|
|
t = t->up;
|
|
@@ -203,10 +240,15 @@ struct _starpu_sched_node * _starpu_sched_node_worker_get(int workerid)
|
|
|
if(_worker_nodes[workerid])
|
|
|
return _worker_nodes[workerid];
|
|
|
else
|
|
|
- return _worker_nodes[workerid] =
|
|
|
- (workerid < (int) starpu_worker_get_count() ?
|
|
|
- _starpu_sched_node_worker_create:
|
|
|
- _starpu_sched_node_combined_worker_create)(workerid);
|
|
|
+ {
|
|
|
+ struct _starpu_sched_node * node;
|
|
|
+ if(workerid < (int) starpu_worker_get_count())
|
|
|
+ node = _starpu_sched_node_worker_create(workerid);
|
|
|
+ else
|
|
|
+ node = _starpu_sched_node_combined_worker_create(workerid);
|
|
|
+ _worker_nodes[workerid] = node;
|
|
|
+ return node;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
struct _starpu_worker * _starpu_sched_node_worker_get_worker(struct _starpu_sched_node * worker_node)
|
|
@@ -239,6 +281,13 @@ int _starpu_sched_node_worker_push_task(struct _starpu_sched_node * node, struct
|
|
|
struct _starpu_task_grid * t = _starpu_task_grid_create();
|
|
|
t->task = task;
|
|
|
t->ntasks = 1;
|
|
|
+
|
|
|
+ task->workerid = _starpu_bitmap_first(node->workers);
|
|
|
+ if (starpu_get_prefetch_flag())
|
|
|
+ {
|
|
|
+ unsigned memory_node = starpu_worker_get_memory_node(task->workerid);
|
|
|
+ starpu_prefetch_task_input_on_node(task, memory_node);
|
|
|
+ }
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
|
|
|
_starpu_worker_task_list_push(data->list, t);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
|
|
@@ -453,7 +502,7 @@ static struct _starpu_task_execute_preds estimated_execute_preds(struct _starpu_
|
|
|
if(bundle)
|
|
|
preds.expected_power = starpu_task_bundle_expected_power(bundle, preds.archtype, preds.impl);
|
|
|
else
|
|
|
- preds.expected_power = starpu_task_expected_power(task, preds.archtype,preds.impl);
|
|
|
+ preds.expected_power = starpu_task_expected_power(task, preds.archtype, preds.impl);
|
|
|
}
|
|
|
|
|
|
return preds;
|
|
@@ -530,6 +579,7 @@ static int _starpu_sched_node_combined_worker_push_task(struct _starpu_sched_nod
|
|
|
starpu_parallel_task_barrier_init(task, _starpu_bitmap_first(node->workers));
|
|
|
task_alias[0] = _starpu_task_grid_create();
|
|
|
task_alias[0]->task = starpu_task_dup(task);
|
|
|
+ task_alias[0]->task->workerid = combined_worker->combined_workerid[0];
|
|
|
task_alias[0]->left = NULL;
|
|
|
task_alias[0]->ntasks = combined_worker->worker_size;
|
|
|
int i;
|
|
@@ -537,6 +587,7 @@ static int _starpu_sched_node_combined_worker_push_task(struct _starpu_sched_nod
|
|
|
{
|
|
|
task_alias[i] = _starpu_task_grid_create();
|
|
|
task_alias[i]->task = starpu_task_dup(task);
|
|
|
+ task_alias[i]->task->workerid = combined_worker->combined_workerid[i];
|
|
|
task_alias[i]->left = task_alias[i-1];
|
|
|
task_alias[i - 1]->right = task_alias[i];
|
|
|
task_alias[i]->pntasks = &task_alias[0]->ntasks;
|
|
@@ -558,6 +609,7 @@ static int _starpu_sched_node_combined_worker_push_task(struct _starpu_sched_nod
|
|
|
i++;
|
|
|
}
|
|
|
while(i < combined_worker->worker_size);
|
|
|
+
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(mutex_to_unlock);
|
|
|
|
|
|
int workerid = starpu_worker_get_id();
|
|
@@ -719,16 +771,91 @@ static struct _starpu_worker_task_list * _worker_get_list(void)
|
|
|
|
|
|
void _starpu_sched_node_worker_pre_exec_hook(struct starpu_task * task)
|
|
|
{
|
|
|
-/* if(!isnan(task->predicted))
|
|
|
+ if(!isnan(task->predicted))
|
|
|
{
|
|
|
struct _starpu_worker_task_list * list = _worker_get_list();
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
|
|
|
+ STARPU_ASSERT(list->ntasks != 0);
|
|
|
+ list->ntasks--;
|
|
|
+ if(!task->execute_on_a_specific_worker)
|
|
|
+ list->exp_len = STARPU_MIN(list->exp_len - task->predicted, 0.0);
|
|
|
+
|
|
|
+ list->exp_start = starpu_timing_now() + task->predicted;
|
|
|
+ if(list->ntasks == 0)
|
|
|
+ {
|
|
|
+ list->exp_end = list->exp_start;
|
|
|
+ list->exp_end = 0.0;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ list->exp_end = list->exp_start + list->exp_len;
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
|
|
|
-
|
|
|
}
|
|
|
-*/
|
|
|
}
|
|
|
void _starpu_sched_node_worker_post_exec_hook(struct starpu_task * task)
|
|
|
{
|
|
|
+ if(task->execute_on_a_specific_worker)
|
|
|
+ return;
|
|
|
+ struct _starpu_worker_task_list * list = _worker_get_list();
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
|
|
|
+ list->exp_start = starpu_timing_now();
|
|
|
+ list->exp_end = list->exp_start + list->exp_len;
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
|
|
|
+}
|
|
|
|
|
|
+static void _starpu_sched_node_worker_push_task_notify(struct starpu_task *task, int workerid, unsigned sched_ctx_id)
|
|
|
+{
|
|
|
+
|
|
|
+ struct _starpu_sched_node * worker_node = _starpu_sched_node_worker_get(workerid);
|
|
|
+ /* dont work with parallel tasks */
|
|
|
+ if(_starpu_sched_node_is_combined_worker(worker_node))
|
|
|
+ return;
|
|
|
+
|
|
|
+ struct _starpu_worker_node_data * d = worker_node->data;
|
|
|
+ struct _starpu_worker_task_list * list = d->list;
|
|
|
+ /* Compute the expected penality */
|
|
|
+ enum starpu_perfmodel_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
|
|
|
+ unsigned memory_node = starpu_worker_get_memory_node(workerid);
|
|
|
+
|
|
|
+ double predicted = starpu_task_expected_length(task, perf_arch,
|
|
|
+ starpu_task_get_implementation(task));
|
|
|
+
|
|
|
+ double predicted_transfer = starpu_task_expected_data_transfer_time(memory_node, task);
|
|
|
+
|
|
|
+ /* Update the predictions */
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
|
|
|
+ /* Sometimes workers didn't take the tasks as early as we expected */
|
|
|
+ list->exp_start = STARPU_MAX(list->exp_start, starpu_timing_now());
|
|
|
+ list->exp_end = list->exp_start + list->exp_len;
|
|
|
+
|
|
|
+ /* If there is no prediction available, we consider the task has a null length */
|
|
|
+ if (!isnan(predicted_transfer))
|
|
|
+ {
|
|
|
+ if (starpu_timing_now() + predicted_transfer < list->exp_end)
|
|
|
+ {
|
|
|
+ /* We may hope that the transfer will be finished by
|
|
|
+ * the start of the task. */
|
|
|
+ predicted_transfer = 0;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ /* The transfer will not be finished by then, take the
|
|
|
+ * remainder into account */
|
|
|
+ predicted_transfer = (starpu_timing_now() + predicted_transfer) - list->exp_end;
|
|
|
+ }
|
|
|
+ task->predicted_transfer = predicted_transfer;
|
|
|
+ list->exp_end += predicted_transfer;
|
|
|
+ list->exp_len += predicted_transfer;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* If there is no prediction available, we consider the task has a null length */
|
|
|
+ if (!isnan(predicted))
|
|
|
+ {
|
|
|
+ task->predicted = predicted;
|
|
|
+ list->exp_end += predicted;
|
|
|
+ list->exp_len += predicted;
|
|
|
+ }
|
|
|
+
|
|
|
+ list->ntasks++;
|
|
|
+
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
|
|
|
}
|