|
@@ -88,7 +88,7 @@ static void parallel_heft_pre_exec_hook(struct starpu_task *task, unsigned sched
|
|
|
if (isnan(transfer_model))
|
|
|
transfer_model = 0.0;
|
|
|
|
|
|
- /* Once we have executed the task, we can update the predicted amount
|
|
|
+ /* Once we have started the task, we can update the predicted amount
|
|
|
* of work. */
|
|
|
starpu_worker_lock_self();
|
|
|
worker_exp_len[workerid] -= model + transfer_model;
|
|
@@ -98,7 +98,7 @@ static void parallel_heft_pre_exec_hook(struct starpu_task *task, unsigned sched
|
|
|
starpu_worker_unlock_self();
|
|
|
}
|
|
|
|
|
|
-static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double exp_end_predicted, int prio, unsigned sched_ctx_id)
|
|
|
+static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double exp_start_predicted, double exp_end_predicted, int prio, unsigned sched_ctx_id)
|
|
|
{
|
|
|
/* make sure someone coule execute that task ! */
|
|
|
STARPU_ASSERT(best_workerid != -1);
|
|
@@ -113,7 +113,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
|
|
|
if (!starpu_worker_is_combined_worker(best_workerid))
|
|
|
{
|
|
|
starpu_worker_lock(best_workerid);
|
|
|
- task->predicted = exp_end_predicted - worker_exp_end[best_workerid];
|
|
|
+ task->predicted = exp_end_predicted - exp_start_predicted;
|
|
|
/* TODO */
|
|
|
task->predicted_transfer = 0;
|
|
|
worker_exp_len[best_workerid] += task->predicted;
|
|
@@ -181,17 +181,14 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
-static double compute_expected_end(int workerid, double length)
|
|
|
+static double compute_expected_end(double *_worker_exp_end, int workerid, double length)
|
|
|
{
|
|
|
if (!starpu_worker_is_combined_worker(workerid))
|
|
|
{
|
|
|
double res;
|
|
|
/* This is a basic worker */
|
|
|
|
|
|
- /* Here helgrind would shout that this is unprotected, but we
|
|
|
- * are fine with getting outdated values, this is just an
|
|
|
- * estimation */
|
|
|
- res = worker_exp_start[workerid] + worker_exp_len[workerid] + length;
|
|
|
+ res = _worker_exp_end[workerid] + length;
|
|
|
|
|
|
return res;
|
|
|
}
|
|
@@ -204,15 +201,10 @@ static double compute_expected_end(int workerid, double length)
|
|
|
|
|
|
double exp_end = DBL_MIN;
|
|
|
|
|
|
- /* Here helgrind would shout that this is unprotected, but we
|
|
|
- * are fine with getting outdated values, this is just an
|
|
|
- * estimation */
|
|
|
int i;
|
|
|
for (i = 0; i < worker_size; i++)
|
|
|
{
|
|
|
- double local_exp_start = worker_exp_start[combined_workerid[i]];
|
|
|
- double local_exp_len = worker_exp_len[combined_workerid[i]];
|
|
|
- double local_exp_end = local_exp_start + local_exp_len + length;
|
|
|
+ double local_exp_end = _worker_exp_end[combined_workerid[i]] + length;
|
|
|
exp_end = STARPU_MAX(exp_end, local_exp_end);
|
|
|
}
|
|
|
|
|
@@ -283,6 +275,7 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
|
|
|
|
|
|
int skip_worker[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
|
|
|
|
|
|
+ double best_exp_start;
|
|
|
double best_exp_end = DBL_MAX;
|
|
|
//double penality_best = 0.0;
|
|
|
|
|
@@ -295,6 +288,7 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
|
|
|
struct starpu_sched_ctx_iterator it;
|
|
|
|
|
|
double now = starpu_timing_now();
|
|
|
+ double _worker_exp_end[nworkers_ctx];
|
|
|
|
|
|
memset(skip_worker, 0, nworkers_ctx*STARPU_MAXIMPLEMENTATIONS*sizeof(int));
|
|
|
|
|
@@ -305,13 +299,14 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
|
|
|
|
|
|
if(!starpu_worker_is_combined_worker(workerid))
|
|
|
{
|
|
|
+ /* Here helgrind would shout that this is unprotected, but we
|
|
|
+ * are fine with getting outdated values, this is just an
|
|
|
+ * estimation */
|
|
|
/* Sometimes workers didn't take the tasks as early as we expected */
|
|
|
- starpu_worker_lock(workerid);
|
|
|
- worker_exp_start[workerid] = STARPU_MAX(worker_exp_start[workerid], now);
|
|
|
- worker_exp_end[workerid] = worker_exp_start[workerid] + worker_exp_len[workerid];
|
|
|
- if (worker_exp_end[workerid] > max_exp_end)
|
|
|
- max_exp_end = worker_exp_end[workerid];
|
|
|
- starpu_worker_unlock(workerid);
|
|
|
+ double exp_start = STARPU_MAX(worker_exp_start[workerid], now);
|
|
|
+ _worker_exp_end[workerid] = exp_start + worker_exp_len[workerid];
|
|
|
+ if (_worker_exp_end[workerid] > max_exp_end)
|
|
|
+ max_exp_end = _worker_exp_end[workerid];
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -378,7 +373,7 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
|
|
|
if (unknown)
|
|
|
continue;
|
|
|
|
|
|
- local_exp_end[worker_ctx][nimpl] = compute_expected_end(workerid, local_task_length[worker_ctx][nimpl]);
|
|
|
+ local_exp_end[worker_ctx][nimpl] = compute_expected_end(_worker_exp_end, workerid, local_task_length[worker_ctx][nimpl]);
|
|
|
|
|
|
//fprintf(stderr, "WORKER %d -> length %e end %e\n", workerid, local_task_length[worker_ctx][nimpl], local_exp_end[workerid][nimpl]);
|
|
|
|
|
@@ -460,19 +455,20 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
|
|
|
best_id_ctx = forced_best_ctx;
|
|
|
nimpl_best = forced_nimpl;
|
|
|
//penality_best = 0.0;
|
|
|
- best_exp_end = compute_expected_end(best, 0);
|
|
|
+ best_exp_end = compute_expected_end(_worker_exp_end, best, 0);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
//penality_best = local_data_penalty[best_id_ctx][nimpl_best];
|
|
|
best_exp_end = local_exp_end[best_id_ctx][nimpl_best];
|
|
|
}
|
|
|
+ best_exp_start = _worker_exp_end[best];
|
|
|
|
|
|
//_STARPU_DEBUG("Scheduler parallel heft: kernel (%u)\n", nimpl_best);
|
|
|
starpu_task_set_implementation(task, nimpl_best);
|
|
|
/* we should now have the best workerid in variable "best" */
|
|
|
starpu_sched_task_break(task);
|
|
|
- return push_task_on_best_worker(task, best, best_exp_end, prio, sched_ctx_id);
|
|
|
+ return push_task_on_best_worker(task, best, best_exp_start, best_exp_end, prio, sched_ctx_id);
|
|
|
}
|
|
|
|
|
|
static int parallel_heft_push_task(struct starpu_task *task)
|