|
@@ -1,7 +1,7 @@
|
|
|
/* StarPU --- Runtime system for heterogeneous multicore architectures.
|
|
|
*
|
|
|
- * Copyright (C) 2010, 2011 Université de Bordeaux 1
|
|
|
- * Copyright (C) 2010, 2011 Centre National de la Recherche Scientifique
|
|
|
+ * Copyright (C) 2010, 2011-2012 Université de Bordeaux 1
|
|
|
+ * Copyright (C) 2010, 2011, 2012 Centre National de la Recherche Scientifique
|
|
|
* Copyright (C) 2011 Télécom-SudParis
|
|
|
* Copyright (C) 2011 INRIA
|
|
|
*
|
|
@@ -23,10 +23,25 @@
|
|
|
|
|
|
#include <core/workers.h>
|
|
|
#include <core/perfmodel/perfmodel.h>
|
|
|
+#include <core/task_bundle.h>
|
|
|
+#include <core/workers.h>
|
|
|
#include <starpu_parameters.h>
|
|
|
#include <starpu_task_bundle.h>
|
|
|
#include <starpu_top.h>
|
|
|
|
|
|
+#ifndef DBL_MIN
|
|
|
+#define DBL_MIN __DBL_MIN__
|
|
|
+#endif
|
|
|
+
|
|
|
+#ifndef DBL_MAX
|
|
|
+#define DBL_MAX __DBL_MAX__
|
|
|
+#endif
|
|
|
+
|
|
|
+static double exp_start[STARPU_NMAXWORKERS]; /* of the first queued task */
|
|
|
+static double exp_end[STARPU_NMAXWORKERS]; /* of the set of queued tasks */
|
|
|
+static double exp_len[STARPU_NMAXWORKERS]; /* of the last queued task */
|
|
|
+static double ntasks[STARPU_NMAXWORKERS];
|
|
|
+
|
|
|
typedef struct {
|
|
|
double alpha;
|
|
|
double beta;
|
|
@@ -34,12 +49,6 @@ typedef struct {
|
|
|
double idle_power;
|
|
|
} heft_data;
|
|
|
|
|
|
-static double exp_start[STARPU_NMAXWORKERS]; /* of the first queued task */
|
|
|
-static double exp_end[STARPU_NMAXWORKERS]; /* of the set of queued tasks */
|
|
|
-static double exp_len[STARPU_NMAXWORKERS]; /* of the last queued task */
|
|
|
-static double ntasks[STARPU_NMAXWORKERS];
|
|
|
-
|
|
|
-
|
|
|
const float alpha_minimum=0;
|
|
|
const float alpha_maximum=10.0;
|
|
|
const float beta_minimum=0;
|
|
@@ -49,7 +58,8 @@ const float gamma_maximum=10000.0;
|
|
|
const float idle_power_minimum=0;
|
|
|
const float idle_power_maximum=10000.0;
|
|
|
|
|
|
-void param_modified(struct starputop_param_t* d){
|
|
|
+static void param_modified(struct starpu_top_param* d)
|
|
|
+{
|
|
|
//just to show parameter modification
|
|
|
fprintf(stderr,"%s has been modified : %f !\n", d->name, d->value);
|
|
|
}
|
|
@@ -125,13 +135,16 @@ static void heft_init(unsigned sched_ctx_id)
|
|
|
starputop_register_parameter_float("HEFT_IDLE_POWER", &hd->idle_power, idle_power_minimum,idle_power_maximum,param_modified);
|
|
|
}
|
|
|
|
|
|
-static void heft_post_exec_hook(struct starpu_task *task)
|
|
|
+
|
|
|
+/* heft_pre_exec_hook is called right after the data transfer is done and right before
|
|
|
+ * the computation to begin, it is useful to update more precisely the value
|
|
|
+ * of the expected start, end, length, etc... */
|
|
|
+static void heft_pre_exec_hook(struct starpu_task *task)
|
|
|
{
|
|
|
unsigned sched_ctx_id = task->sched_ctx;
|
|
|
int workerid = starpu_worker_get_id();
|
|
|
- STARPU_ASSERT(workerid >= 0);
|
|
|
-
|
|
|
double model = task->predicted;
|
|
|
+ double transfer_model = task->predicted_transfer;
|
|
|
|
|
|
pthread_mutex_t *sched_mutex;
|
|
|
pthread_cond_t *sched_cond;
|
|
@@ -144,18 +157,21 @@ static void heft_post_exec_hook(struct starpu_task *task)
|
|
|
sched_cond = &workerarg->sched_cond;
|
|
|
starpu_worker_set_sched_condition(sched_ctx_id, workerid, sched_mutex, sched_cond);
|
|
|
}
|
|
|
-#ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
|
|
|
- starpu_call_poped_task_cb(workerid, sched_ctx_id, task->flops);
|
|
|
-#endif //STARPU_USE_SCHED_CTX_HYPERVISOR
|
|
|
-
|
|
|
- /* Once we have executed the task, we can update the predicted amount
|
|
|
+ /* Once the task is executing, we can update the predicted amount
|
|
|
* of work. */
|
|
|
- PTHREAD_MUTEX_LOCK(sched_mutex);
|
|
|
- exp_len[workerid] -= model;
|
|
|
+ _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
|
|
|
+ exp_len[workerid] -= model + transfer_model;
|
|
|
exp_start[workerid] = starpu_timing_now() + model;
|
|
|
exp_end[workerid] = exp_start[workerid] + exp_len[workerid];
|
|
|
ntasks[workerid]--;
|
|
|
- PTHREAD_MUTEX_UNLOCK(sched_mutex);
|
|
|
+ _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
|
|
|
+}
|
|
|
+
|
|
|
+static void heft_post_exec_hook(struct starpu_task *task)
|
|
|
+{
|
|
|
+#ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
|
|
|
+ starpu_call_poped_task_cb(workerid, sched_ctx_id, task->flops);
|
|
|
+#endif //STARPU_USE_SCHED_CTX_HYPERVISOR
|
|
|
}
|
|
|
|
|
|
static void heft_push_task_notify(struct starpu_task *task, int workerid)
|
|
@@ -163,10 +179,12 @@ static void heft_push_task_notify(struct starpu_task *task, int workerid)
|
|
|
unsigned sched_ctx_id = task->sched_ctx;
|
|
|
/* Compute the expected penality */
|
|
|
enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
|
|
|
+ unsigned memory_node = starpu_worker_get_memory_node(workerid);
|
|
|
|
|
|
double predicted = starpu_task_expected_length(task, perf_arch,
|
|
|
_starpu_get_job_associated_to_task(task)->nimpl);
|
|
|
|
|
|
+ double predicted_transfer = starpu_task_expected_data_transfer_time(memory_node, task);
|
|
|
pthread_mutex_t *sched_mutex;
|
|
|
pthread_cond_t *sched_cond;
|
|
|
starpu_worker_get_sched_condition(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
|
|
@@ -184,25 +202,45 @@ static void heft_push_task_notify(struct starpu_task *task, int workerid)
|
|
|
#endif //STARPU_USE_SCHED_CTX_HYPERVISOR
|
|
|
|
|
|
/* Update the predictions */
|
|
|
- PTHREAD_MUTEX_LOCK(sched_mutex);
|
|
|
+ _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
|
|
|
/* Sometimes workers didn't take the tasks as early as we expected */
|
|
|
exp_start[workerid] = STARPU_MAX(exp_start[workerid], starpu_timing_now());
|
|
|
- exp_end[workerid] = STARPU_MAX(exp_start[workerid], starpu_timing_now());
|
|
|
+ exp_end[workerid] = exp_start[workerid] + exp_len[workerid];
|
|
|
|
|
|
/* If there is no prediction available, we consider the task has a null length */
|
|
|
- if (predicted != -1.0)
|
|
|
+ if (!isnan(predicted))
|
|
|
{
|
|
|
task->predicted = predicted;
|
|
|
exp_end[workerid] += predicted;
|
|
|
exp_len[workerid] += predicted;
|
|
|
}
|
|
|
|
|
|
+ /* If there is no prediction available, we consider the task has a null length */
|
|
|
+ if (!isnan(predicted_transfer))
|
|
|
+ {
|
|
|
+ if (starpu_timing_now() + predicted_transfer < exp_end[workerid])
|
|
|
+ {
|
|
|
+ /* We may hope that the transfer will be finished by
|
|
|
+ * the start of the task. */
|
|
|
+ predicted_transfer = 0;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ /* The transfer will not be finished by then, take the
|
|
|
+ * remainder into account */
|
|
|
+ predicted_transfer = (starpu_timing_now() + predicted_transfer) - exp_end[workerid];
|
|
|
+ }
|
|
|
+ task->predicted_transfer = predicted_transfer;
|
|
|
+ exp_end[workerid] += predicted_transfer;
|
|
|
+ exp_len[workerid] += predicted_transfer;
|
|
|
+ }
|
|
|
+
|
|
|
ntasks[workerid]++;
|
|
|
|
|
|
- PTHREAD_MUTEX_UNLOCK(sched_mutex);
|
|
|
+ _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
|
|
|
}
|
|
|
|
|
|
-static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio, unsigned sched_ctx_id)
|
|
|
+static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, double predicted_transfer, int prio, unsigned sched_ctx_id)
|
|
|
{
|
|
|
/* make sure someone coule execute that task ! */
|
|
|
STARPU_ASSERT(best_workerid != -1);
|
|
@@ -225,16 +263,38 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
|
|
|
starpu_call_pushed_task_cb(best_workerid, sched_ctx_id);
|
|
|
#endif //STARPU_USE_SCHED_CTX_HYPERVISOR
|
|
|
|
|
|
- PTHREAD_MUTEX_LOCK(sched_mutex);
|
|
|
+ _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
|
|
|
+
|
|
|
+ /* Sometimes workers didn't take the tasks as early as we expected */
|
|
|
+ exp_start[best_workerid] = STARPU_MAX(exp_start[best_workerid], starpu_timing_now());
|
|
|
+ exp_end[best_workerid] = exp_start[best_workerid] + exp_len[best_workerid];
|
|
|
+
|
|
|
exp_end[best_workerid] += predicted;
|
|
|
exp_len[best_workerid] += predicted;
|
|
|
+
|
|
|
+ if (starpu_timing_now() + predicted_transfer < exp_end[best_workerid])
|
|
|
+ {
|
|
|
+ /* We may hope that the transfer will be finished by
|
|
|
+ * the start of the task. */
|
|
|
+ predicted_transfer = 0;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ /* The transfer will not be finished by then, take the
|
|
|
+ * remainder into account */
|
|
|
+ predicted_transfer = (starpu_timing_now() + predicted_transfer) - exp_end[best_workerid];
|
|
|
+ }
|
|
|
+ exp_end[best_workerid] += predicted_transfer;
|
|
|
+ exp_len[best_workerid] += predicted_transfer;
|
|
|
+
|
|
|
ntasks[best_workerid]++;
|
|
|
- PTHREAD_MUTEX_UNLOCK(sched_mutex);
|
|
|
+ _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
|
|
|
|
|
|
task->predicted = predicted;
|
|
|
+ task->predicted_transfer = predicted_transfer;
|
|
|
|
|
|
- if (starpu_top_status_get())
|
|
|
- starputop_task_prevision(task, best_workerid,
|
|
|
+ if (_starpu_top_status_get())
|
|
|
+ _starpu_top_task_prevision(task, best_workerid,
|
|
|
(unsigned long long)(exp_end[best_workerid]-predicted)/1000,
|
|
|
(unsigned long long)exp_end[best_workerid]/1000);
|
|
|
|
|
@@ -244,29 +304,32 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
|
|
|
starpu_prefetch_task_input_on_node(task, memory_node);
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ //_STARPU_DEBUG("Heft : pushing local task\n");
|
|
|
return starpu_push_local_task(best_workerid, task, prio);
|
|
|
}
|
|
|
|
|
|
+/* TODO: factorize with dmda!! */
|
|
|
static void compute_all_performance_predictions(struct starpu_task *task,
|
|
|
double *local_task_length, double *exp_end,
|
|
|
double *max_exp_endp, double *best_exp_endp,
|
|
|
double *local_data_penalty,
|
|
|
- double *local_power, int *forced_best,
|
|
|
- struct starpu_task_bundle *bundle,
|
|
|
+ double *local_power,
|
|
|
+ int *forced_worker, int *forced_impl,
|
|
|
+ starpu_task_bundle_t bundle,
|
|
|
unsigned sched_ctx_id)
|
|
|
{
|
|
|
int calibrating = 0;
|
|
|
double max_exp_end = DBL_MIN;
|
|
|
double best_exp_end = DBL_MAX;
|
|
|
int ntasks_best = -1;
|
|
|
+ int nimpl_best = 0;
|
|
|
double ntasks_best_end = 0.0;
|
|
|
-
|
|
|
+
|
|
|
/* A priori, we know all estimations */
|
|
|
int unknown = 0;
|
|
|
-
|
|
|
- unsigned nimpl;
|
|
|
- unsigned best_impl = 0;
|
|
|
unsigned worker, worker_ctx = 0;
|
|
|
+ unsigned nimpl;
|
|
|
|
|
|
struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
|
|
|
|
|
@@ -277,101 +340,142 @@ static void compute_all_performance_predictions(struct starpu_task *task,
|
|
|
{
|
|
|
/* Sometimes workers didn't take the tasks as early as we expected */
|
|
|
exp_start[worker] = STARPU_MAX(exp_start[worker], starpu_timing_now());
|
|
|
- exp_end[worker_ctx] = exp_start[worker] + exp_len[worker];
|
|
|
- if (exp_end[worker_ctx] > max_exp_end)
|
|
|
- max_exp_end = exp_end[worker_ctx];
|
|
|
+ exp_end[worker_ctx][nimpl] = exp_start[worker] + exp_len[worker];
|
|
|
+ if (exp_end[worker_ctx][nimpl] > max_exp_end)
|
|
|
+ max_exp_end = exp_end[worker_ctx][nimpl];
|
|
|
|
|
|
- if (!starpu_worker_may_execute_task(worker, task, nimpl))
|
|
|
+ if (!starpu_worker_can_execute_task(worker, task, nimpl))
|
|
|
{
|
|
|
/* no one on that queue may execute this task */
|
|
|
continue;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
|
|
|
unsigned memory_node = starpu_worker_get_memory_node(worker);
|
|
|
-
|
|
|
+
|
|
|
if (bundle)
|
|
|
{
|
|
|
- local_task_length[worker_ctx] = starpu_task_bundle_expected_length(bundle, perf_arch, nimpl);
|
|
|
- local_data_penalty[worker_ctx] = starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
|
|
|
- local_power[worker_ctx] = starpu_task_bundle_expected_power(bundle, perf_arch, nimpl);
|
|
|
+ /* TODO : conversion time */
|
|
|
+ local_task_length[worker_ctx][nimpl] = starpu_task_bundle_expected_length(bundle, perf_arch, nimpl);
|
|
|
+ local_data_penalty[worker_ctx][nimpl] = starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
|
|
|
+ local_power[worker_ctx][nimpl] = starpu_task_bundle_expected_power(bundle, perf_arch, nimpl);
|
|
|
//_STARPU_DEBUG("Scheduler heft bundle: task length (%lf) local power (%lf) worker (%u) kernel (%u) \n", local_task_length[worker_ctx],local_power[worker_ctx],worker,nimpl);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- local_task_length[worker_ctx] = starpu_task_expected_length(task, perf_arch, nimpl);
|
|
|
- local_data_penalty[worker_ctx] = starpu_task_expected_data_transfer_time(memory_node, task);
|
|
|
- local_power[worker_ctx] = starpu_task_expected_power(task, perf_arch, nimpl);
|
|
|
+ local_task_length[worker_ctx][nimpl] = starpu_task_expected_length(task, perf_arch, nimpl);
|
|
|
+ local_data_penalty[worker_ctx][nimpl] = starpu_task_expected_data_transfer_time(memory_node, task);
|
|
|
+ local_power[worker_ctx][nimpl] = starpu_task_expected_power(task, perf_arch, nimpl);
|
|
|
+ double conversion_time = starpu_task_expected_conversion_time(task, perf_arch, nimpl);
|
|
|
+ if (conversion_time > 0.0)
|
|
|
+ local_task_length[worker_ctx][nimpl] += conversion_time;
|
|
|
//_STARPU_DEBUG("Scheduler heft bundle: task length (%lf) local power (%lf) worker (%u) kernel (%u) \n", local_task_length[worker_ctx],local_power[worker_ctx],worker,nimpl);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
double ntasks_end = ntasks[worker] / starpu_worker_get_relative_speedup(perf_arch);
|
|
|
-
|
|
|
+
|
|
|
if (ntasks_best == -1
|
|
|
- || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
|
|
|
- || (!calibrating && local_task_length[worker_ctx] == -1.0) /* Not calibrating but this worker is being calibrated */
|
|
|
- || (calibrating && local_task_length[worker_ctx] == -1.0 && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
|
|
|
- )
|
|
|
+ || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better worker */
|
|
|
+ || (!calibrating && isnan(local_task_length[worker_ctx][nimpl])) /* Not calibrating but this worker is being calibrated */
|
|
|
+ || (calibrating && isnan(local_task_length[worker_ctx][nimpl]) && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
|
|
|
+ )
|
|
|
{
|
|
|
ntasks_best_end = ntasks_end;
|
|
|
ntasks_best = worker;
|
|
|
}
|
|
|
-
|
|
|
- if (local_task_length[worker_ctx] == -1.0)
|
|
|
+
|
|
|
+ if (isnan(local_task_length[worker_ctx][nimpl]))
|
|
|
/* we are calibrating, we want to speed-up calibration time
|
|
|
* so we privilege non-calibrated tasks (but still
|
|
|
* greedily distribute them to avoid dumb schedules) */
|
|
|
calibrating = 1;
|
|
|
-
|
|
|
- if (local_task_length[worker_ctx] <= 0.0)
|
|
|
+
|
|
|
+ if (isnan(local_task_length[worker_ctx][nimpl])
|
|
|
+ || _STARPU_IS_ZERO(local_task_length[worker_ctx][nimpl]))
|
|
|
/* there is no prediction available for that task
|
|
|
- * with that arch yet, so switch to a greedy strategy */
|
|
|
+ * with that arch (yet or at all), so switch to a greedy strategy */
|
|
|
unknown = 1;
|
|
|
-
|
|
|
+
|
|
|
if (unknown)
|
|
|
continue;
|
|
|
|
|
|
- exp_end[worker_ctx] = exp_start[worker] + exp_len[worker] + local_task_length[worker_ctx];
|
|
|
+ exp_end[worker_ctx][nimpl] = exp_start[worker] + exp_len[worker] + local_task_length[worker_ctx][nimpl];
|
|
|
|
|
|
- if (exp_end[worker_ctx] < best_exp_end)
|
|
|
+ if (exp_end[worker_ctx][nimpl] < best_exp_end)
|
|
|
{
|
|
|
/* a better solution was found */
|
|
|
- best_exp_end = exp_end[worker_ctx];
|
|
|
- best_impl = nimpl;
|
|
|
+ best_exp_end = exp_end[worker_ctx][nimpl];
|
|
|
+ nimpl_best = nimpl;
|
|
|
}
|
|
|
-
|
|
|
- if (local_power[worker_ctx] == -1.0)
|
|
|
- local_power[worker_ctx] = 0.;
|
|
|
+
|
|
|
+ if (isnan(local_power[worker_ctx][nimpl]))
|
|
|
+ local_power[worker_ctx][nimpl] = 0.;
|
|
|
+
|
|
|
}
|
|
|
worker_ctx++;
|
|
|
}
|
|
|
|
|
|
- *forced_best = unknown?ntasks_best:-1;
|
|
|
+ *forced_worker = unknown?ntasks_best:-1;
|
|
|
+ *forced_impl = unknown?nimpl_best:-1;
|
|
|
|
|
|
*best_exp_endp = best_exp_end;
|
|
|
*max_exp_endp = max_exp_end;
|
|
|
-
|
|
|
- /* save the best implementation */
|
|
|
- //_STARPU_DEBUG("Scheduler heft: kernel (%u)\n", best_impl);
|
|
|
- _starpu_get_job_associated_to_task(task)->nimpl = best_impl;
|
|
|
}
|
|
|
|
|
|
+static int push_conversion_tasks(struct starpu_task *task, unsigned int workerid)
|
|
|
+{
|
|
|
+ unsigned i;
|
|
|
+ int ret;
|
|
|
+ unsigned int node = starpu_worker_get_memory_node(workerid);
|
|
|
+
|
|
|
+ _STARPU_PTHREAD_MUTEX_LOCK(&sched_mutex[workerid]);
|
|
|
+ for (i = 0; i < task->cl->nbuffers; i++)
|
|
|
+ {
|
|
|
+ struct starpu_task *conversion_task;
|
|
|
+ starpu_data_handle_t handle;
|
|
|
+
|
|
|
+ handle = task->handles[i];
|
|
|
+ if (!_starpu_handle_needs_conversion_task(handle, node))
|
|
|
+ continue;
|
|
|
+
|
|
|
+ conversion_task = _starpu_create_conversion_task(handle, node);
|
|
|
+ conversion_task->execute_on_a_specific_worker = 1;
|
|
|
+ conversion_task->workerid = workerid;
|
|
|
+ conversion_task->mf_skip = 1;
|
|
|
+ ret = _starpu_task_submit_conversion_task(conversion_task, workerid);
|
|
|
+ STARPU_ASSERT(ret == 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ for (i = 0; i < task->cl->nbuffers; i++)
|
|
|
+ task->handles[i]->mf_node = node;
|
|
|
+
|
|
|
+ task->execute_on_a_specific_worker = 1;
|
|
|
+ task->workerid = workerid;
|
|
|
+ task->mf_skip= 1;
|
|
|
+ _STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[workerid]);
|
|
|
+
|
|
|
+ return 0;
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
|
|
|
{
|
|
|
heft_data *hd = (heft_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
|
|
|
- unsigned worker, worker_ctx = 0;
|
|
|
+ unsigned worker, nimpl, worker_ctx = 0;
|
|
|
int best = -1, best_id_ctx = -1;
|
|
|
-
|
|
|
+ int selected_impl= -1;
|
|
|
+
|
|
|
/* this flag is set if the corresponding worker is selected because
|
|
|
there is no performance prediction available yet */
|
|
|
- int forced_best;
|
|
|
+ int forced_worker;
|
|
|
+ int forced_impl;
|
|
|
struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
|
|
|
|
|
|
unsigned nworkers_ctx = workers->nworkers;
|
|
|
- double local_task_length[nworkers_ctx];
|
|
|
- double local_data_penalty[nworkers_ctx];
|
|
|
- double local_power[nworkers_ctx];
|
|
|
- double exp_end[nworkers_ctx];
|
|
|
+ double local_task_length[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
|
|
|
+ double local_data_penalty[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
|
|
|
+ double local_power[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
|
|
|
+ double exp_end[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
|
|
|
double max_exp_end = 0.0;
|
|
|
|
|
|
double best_exp_end;
|
|
@@ -381,93 +485,116 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio, unsigned sch
|
|
|
* and detect if there is some calibration that needs to be done.
|
|
|
*/
|
|
|
|
|
|
- struct starpu_task_bundle *bundle = task->bundle;
|
|
|
-
|
|
|
- if(workers->init_cursor)
|
|
|
- workers->init_cursor(workers);
|
|
|
+ starpu_task_bundle_t bundle = task->bundle;
|
|
|
|
|
|
compute_all_performance_predictions(task, local_task_length, exp_end,
|
|
|
- &max_exp_end, &best_exp_end,
|
|
|
- local_data_penalty,
|
|
|
- local_power, &forced_best, bundle, sched_ctx_id);
|
|
|
+ &max_exp_end, &best_exp_end,
|
|
|
+ local_data_penalty,
|
|
|
+ local_power, &forced_worker, &forced_impl,
|
|
|
+ bundle, sched_ctx_id);
|
|
|
+
|
|
|
/* If there is no prediction available for that task with that arch we
|
|
|
* want to speed-up calibration time so we force this measurement */
|
|
|
- if (forced_best != -1){
|
|
|
- return push_task_on_best_worker(task, forced_best, 0.0, prio, sched_ctx_id);
|
|
|
+ if (forced_worker != -1)
|
|
|
+ {
|
|
|
+ _starpu_get_job_associated_to_task(task)->nimpl = forced_impl;
|
|
|
+
|
|
|
+ if (_starpu_task_uses_multiformat_handles(task) && !task->mf_skip)
|
|
|
+ {
|
|
|
+ /*
|
|
|
+ * Our task uses multiformat handles, which may need to be converted.
|
|
|
+ */
|
|
|
+ push_conversion_tasks(task, forced_worker);
|
|
|
+ prio = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ return push_task_on_best_worker(task, forced_worker, 0.0, 0.0, prio, sched_ctx_id;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/*
|
|
|
* Determine which worker optimizes the fitness metric which is a
|
|
|
* trade-off between load-balacing, data locality, and energy
|
|
|
* consumption.
|
|
|
*/
|
|
|
|
|
|
- double fitness[nworkers_ctx];
|
|
|
+ double fitness[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
|
|
|
double best_fitness = -1;
|
|
|
|
|
|
while(workers->has_next(workers))
|
|
|
{
|
|
|
worker = workers->get_next(workers);
|
|
|
- if (!starpu_worker_may_execute_task(worker, task, 0))
|
|
|
+ for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
|
|
|
{
|
|
|
- worker_ctx++;
|
|
|
- /* no one on that queue may execute this task */
|
|
|
- continue;
|
|
|
- }
|
|
|
+ if (!starpu_worker_can_execute_task(worker, task, nimpl))
|
|
|
+ {
|
|
|
+ worker_ctx++;
|
|
|
+ /* no one on that queue may execute this task */
|
|
|
+ continue;
|
|
|
+ }
|
|
|
|
|
|
- fitness[worker_ctx] = hd->alpha*(exp_end[worker_ctx] - best_exp_end)
|
|
|
- + hd->beta*(local_data_penalty[worker_ctx])
|
|
|
- + hd->_gamma*(local_power[worker_ctx]);
|
|
|
|
|
|
- if (exp_end[worker_ctx] > max_exp_end)
|
|
|
+ fitness[worker_ctx][nimpl] = hd->alpha*(exp_end[worker_ctx][nimpl] - best_exp_end)
|
|
|
+ + hd->beta*(local_data_penalty[worker_ctx][nimpl])
|
|
|
+ + hd->_gamma*(local_power[worker_ctx][nimpl]);
|
|
|
+
|
|
|
+ if (exp_end[worker_ctx][nimpl] > max_exp_end)
|
|
|
/* This placement will make the computation
|
|
|
* longer, take into account the idle
|
|
|
* consumption of other cpus */
|
|
|
- fitness[worker_ctx] += hd->_gamma * hd->idle_power * (exp_end[worker_ctx] - max_exp_end) / 1000000.0;
|
|
|
+ fitness[worker_ctx][nimpl] += hd->_gamma * hd->idle_power * (exp_end[worker_ctx][nimpl] - max_exp_end) / 1000000.0;
|
|
|
|
|
|
- if (best == -1 || fitness[worker_ctx] < best_fitness)
|
|
|
+ if (best == -1 || fitness[worker_ctx][nimpl] < best_fitness)
|
|
|
{
|
|
|
/* we found a better solution */
|
|
|
- best_fitness = fitness[worker_ctx];
|
|
|
+ best_fitness = fitness[worker_ctx][nimpl];
|
|
|
best = worker;
|
|
|
best_id_ctx = worker_ctx;
|
|
|
+ selected_impl = nimpl;
|
|
|
}
|
|
|
worker_ctx++;
|
|
|
}
|
|
|
|
|
|
/* By now, we must have found a solution */
|
|
|
STARPU_ASSERT(best != -1);
|
|
|
-
|
|
|
+
|
|
|
/* we should now have the best worker in variable "best" */
|
|
|
- double model_best;
|
|
|
+ double model_best, transfer_model_best;
|
|
|
|
|
|
if (bundle)
|
|
|
{
|
|
|
/* If we have a task bundle, we have computed the expected
|
|
|
* length for the entire bundle, but not for the task alone. */
|
|
|
enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(best);
|
|
|
- model_best = starpu_task_expected_length(task, perf_arch,
|
|
|
- _starpu_get_job_associated_to_task(task)->nimpl);
|
|
|
+ unsigned memory_node = starpu_worker_get_memory_node(best);
|
|
|
+ model_best = starpu_task_expected_length(task, perf_arch, selected_impl);
|
|
|
+ transfer_model_best = starpu_task_expected_data_transfer_time(memory_node, task);
|
|
|
|
|
|
/* Remove the task from the bundle since we have made a
|
|
|
* decision for it, and that other tasks should not consider it
|
|
|
* anymore. */
|
|
|
- PTHREAD_MUTEX_LOCK(&bundle->mutex);
|
|
|
- int ret = starpu_task_bundle_remove(bundle, task);
|
|
|
-
|
|
|
- /* Perhaps the bundle was destroyed when removing the last
|
|
|
- * entry */
|
|
|
- if (ret != 1)
|
|
|
- PTHREAD_MUTEX_UNLOCK(&bundle->mutex);
|
|
|
+ starpu_task_bundle_remove(bundle, task);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- model_best = local_task_length[best_id_ctx];
|
|
|
+ model_best = local_task_length[best_id_ctx][selected_impl];
|
|
|
+ transfer_model_best = local_data_penalty[best_id_ctx][selected_impl];
|
|
|
}
|
|
|
|
|
|
if(workers->init_cursor)
|
|
|
workers->deinit_cursor(workers);
|
|
|
- return push_task_on_best_worker(task, best, model_best, prio, sched_ctx_id);
|
|
|
+
|
|
|
+ _starpu_get_job_associated_to_task(task)->nimpl = selected_impl;
|
|
|
+
|
|
|
+ if (_starpu_task_uses_multiformat_handles(task) && !task->mf_skip)
|
|
|
+ {
|
|
|
+ /*
|
|
|
+ * Our task uses multiformat handles, which may need to be converted.
|
|
|
+ */
|
|
|
+ push_conversion_tasks(task, forced_worker);
|
|
|
+ prio = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ return push_task_on_best_worker(task, best, model_best, transfer_model_best, prio, sched_ctx_id);
|
|
|
}
|
|
|
|
|
|
static int heft_push_task(struct starpu_task *task)
|
|
@@ -510,13 +637,14 @@ static void heft_deinit(unsigned sched_ctx_id)
|
|
|
free(ht);
|
|
|
}
|
|
|
|
|
|
-struct starpu_sched_policy_s heft_policy = {
|
|
|
+struct starpu_sched_policy heft_policy = {
|
|
|
.init_sched = heft_init,
|
|
|
.deinit_sched = heft_deinit,
|
|
|
- .push_task = heft_push_task,
|
|
|
+ .push_task = heft_push_task,
|
|
|
.push_task_notify = heft_push_task_notify,
|
|
|
.pop_task = NULL,
|
|
|
.pop_every_task = NULL,
|
|
|
+ .pre_exec_hook = heft_pre_exec_hook,
|
|
|
.post_exec_hook = heft_post_exec_hook,
|
|
|
.add_workers = heft_add_workers ,
|
|
|
.remove_workers = heft_remove_workers,
|