|
@@ -41,7 +41,8 @@
|
|
|
#define DBL_MAX __DBL_MAX__
|
|
|
#endif
|
|
|
|
|
|
-typedef struct {
|
|
|
+struct _starpu_dmda_data
|
|
|
+{
|
|
|
double alpha;
|
|
|
double beta;
|
|
|
double _gamma;
|
|
@@ -51,7 +52,7 @@ typedef struct {
|
|
|
|
|
|
long int total_task_cnt;
|
|
|
long int ready_task_cnt;
|
|
|
-} dmda_data;
|
|
|
+};
|
|
|
|
|
|
static double alpha = _STARPU_DEFAULT_ALPHA;
|
|
|
static double beta = _STARPU_DEFAULT_BETA;
|
|
@@ -153,7 +154,7 @@ static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo
|
|
|
|
|
|
static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
|
|
|
{
|
|
|
- dmda_data *dt = (dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
+ struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
|
|
|
struct starpu_task *task;
|
|
|
|
|
@@ -188,7 +189,7 @@ static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
|
|
|
|
|
|
static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
|
|
|
{
|
|
|
- dmda_data *dt = (dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
+ struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
|
|
|
struct starpu_task *task;
|
|
|
|
|
@@ -221,7 +222,7 @@ static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
|
|
|
|
|
|
static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
|
|
|
{
|
|
|
- dmda_data *dt = (dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
+ struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
|
|
|
struct starpu_task *new_list;
|
|
|
|
|
@@ -251,7 +252,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
|
|
|
double predicted, double predicted_transfer,
|
|
|
int prio, unsigned sched_ctx_id)
|
|
|
{
|
|
|
- dmda_data *dt = (dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
+ struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
/* make sure someone coule execute that task ! */
|
|
|
STARPU_ASSERT(best_workerid != -1);
|
|
|
|
|
@@ -290,9 +291,9 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
|
|
|
|
|
|
fifo->exp_end += predicted_transfer;
|
|
|
fifo->exp_len += predicted_transfer;
|
|
|
-
|
|
|
+
|
|
|
_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
|
|
|
-
|
|
|
+
|
|
|
task->predicted = predicted;
|
|
|
task->predicted_transfer = predicted_transfer;
|
|
|
|
|
@@ -310,7 +311,8 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
|
|
|
}
|
|
|
|
|
|
#ifdef HAVE_AYUDAME_H
|
|
|
- if (AYU_event) {
|
|
|
+ if (AYU_event)
|
|
|
+ {
|
|
|
int id = best_workerid;
|
|
|
AYU_event(AYU_ADDTASKTOQUEUE, _starpu_get_job_associated_to_task(task)->job_id, &id);
|
|
|
}
|
|
@@ -326,10 +328,10 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
|
|
|
/* TODO: factorize with dmda!! */
|
|
|
static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
|
|
|
{
|
|
|
- dmda_data *dt = (dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
+ struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
unsigned worker, worker_ctx = 0;
|
|
|
int best = -1;
|
|
|
-
|
|
|
+
|
|
|
double best_exp_end = 0.0;
|
|
|
double model_best = 0.0;
|
|
|
double transfer_model_best = 0.0;
|
|
@@ -337,24 +339,24 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
|
|
|
int ntasks_best = -1;
|
|
|
double ntasks_best_end = 0.0;
|
|
|
int calibrating = 0;
|
|
|
-
|
|
|
+
|
|
|
/* A priori, we know all estimations */
|
|
|
int unknown = 0;
|
|
|
-
|
|
|
+
|
|
|
unsigned best_impl = 0;
|
|
|
unsigned nimpl;
|
|
|
struct starpu_sched_ctx_worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
|
|
|
-
|
|
|
+
|
|
|
if(workers->init_cursor)
|
|
|
workers->init_cursor(workers);
|
|
|
-
|
|
|
+
|
|
|
while(workers->has_next(workers))
|
|
|
{
|
|
|
worker = workers->get_next(workers);
|
|
|
struct _starpu_fifo_taskq *fifo = dt->queue_array[worker];
|
|
|
unsigned memory_node = starpu_worker_get_memory_node(worker);
|
|
|
enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
|
|
|
-
|
|
|
+
|
|
|
for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
|
|
|
{
|
|
|
if (!starpu_worker_can_execute_task(worker, task, nimpl))
|
|
@@ -363,25 +365,25 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
|
|
|
// worker_ctx++;
|
|
|
continue;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
double exp_end;
|
|
|
_starpu_pthread_mutex_t *sched_mutex;
|
|
|
_starpu_pthread_cond_t *sched_cond;
|
|
|
starpu_sched_ctx_get_worker_mutex_and_cond(sched_ctx_id, worker, &sched_mutex, &sched_cond);
|
|
|
-
|
|
|
+
|
|
|
/* Sometimes workers didn't take the tasks as early as we expected */
|
|
|
_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
|
|
|
fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
|
|
|
fifo->exp_end = fifo->exp_start + fifo->exp_len;
|
|
|
_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
double local_length = starpu_task_expected_length(task, perf_arch, nimpl);
|
|
|
double local_penalty = starpu_task_expected_data_transfer_time(memory_node, task);
|
|
|
double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
|
|
|
-
|
|
|
+
|
|
|
//_STARPU_DEBUG("Scheduler dm: task length (%lf) worker (%u) kernel (%u) \n", local_length,worker,nimpl);
|
|
|
-
|
|
|
+
|
|
|
if (ntasks_best == -1
|
|
|
|| (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
|
|
|
|| (!calibrating && isnan(local_length)) /* Not calibrating but this worker is being calibrated */
|
|
@@ -392,23 +394,23 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
|
|
|
ntasks_best = worker;
|
|
|
best_impl = nimpl;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (isnan(local_length))
|
|
|
/* we are calibrating, we want to speed-up calibration time
|
|
|
* so we privilege non-calibrated tasks (but still
|
|
|
* greedily distribute them to avoid dumb schedules) */
|
|
|
calibrating = 1;
|
|
|
-
|
|
|
+
|
|
|
if (isnan(local_length) || _STARPU_IS_ZERO(local_length))
|
|
|
/* there is no prediction available for that task
|
|
|
* with that arch yet, so switch to a greedy strategy */
|
|
|
unknown = 1;
|
|
|
-
|
|
|
+
|
|
|
if (unknown)
|
|
|
continue;
|
|
|
|
|
|
exp_end = fifo->exp_start + fifo->exp_len + local_length;
|
|
|
-
|
|
|
+
|
|
|
if (best == -1 || exp_end < best_exp_end)
|
|
|
{
|
|
|
/* a better solution was found */
|
|
@@ -428,27 +430,27 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
|
|
|
model_best = 0.0;
|
|
|
transfer_model_best = 0.0;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
//_STARPU_DEBUG("Scheduler dm: kernel (%u)\n", best_impl);
|
|
|
-
|
|
|
+
|
|
|
if (workers->deinit_cursor)
|
|
|
workers->deinit_cursor(workers);
|
|
|
-
|
|
|
+
|
|
|
_starpu_get_job_associated_to_task(task)->nimpl = best_impl;
|
|
|
-
|
|
|
+
|
|
|
/* we should now have the best worker in variable "best" */
|
|
|
return push_task_on_best_worker(task, best,
|
|
|
model_best, transfer_model_best, prio, sched_ctx_id);
|
|
|
}
|
|
|
|
|
|
static void compute_all_performance_predictions(struct starpu_task *task,
|
|
|
- double local_task_length[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
- double exp_end[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
- double *max_exp_endp,
|
|
|
- double *best_exp_endp,
|
|
|
- double local_data_penalty[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
- double local_power[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
- int *forced_worker, int *forced_impl, unsigned sched_ctx_id)
|
|
|
+ double local_task_length[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
+ double exp_end[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
+ double *max_exp_endp,
|
|
|
+ double *best_exp_endp,
|
|
|
+ double local_data_penalty[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
+ double local_power[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
+ int *forced_worker, int *forced_impl, unsigned sched_ctx_id)
|
|
|
{
|
|
|
int calibrating = 0;
|
|
|
double max_exp_end = DBL_MIN;
|
|
@@ -464,9 +466,9 @@ static void compute_all_performance_predictions(struct starpu_task *task,
|
|
|
unsigned nimpl;
|
|
|
|
|
|
starpu_task_bundle_t bundle = task->bundle;
|
|
|
- dmda_data *dt = (dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
+ struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
struct starpu_sched_ctx_worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
|
|
|
-
|
|
|
+
|
|
|
while(workers->has_next(workers))
|
|
|
{
|
|
|
worker = workers->get_next(workers);
|
|
@@ -530,7 +532,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
|
|
|
* so we privilege non-calibrated tasks (but still
|
|
|
* greedily distribute them to avoid dumb schedules) */
|
|
|
calibrating = 1;
|
|
|
-
|
|
|
+
|
|
|
if (isnan(local_task_length[worker_ctx][nimpl])
|
|
|
|| _STARPU_IS_ZERO(local_task_length[worker_ctx][nimpl]))
|
|
|
/* there is no prediction available for that task
|
|
@@ -539,19 +541,19 @@ static void compute_all_performance_predictions(struct starpu_task *task,
|
|
|
|
|
|
if (unknown)
|
|
|
continue;
|
|
|
-
|
|
|
+
|
|
|
exp_end[worker_ctx][nimpl] = fifo->exp_start + fifo->exp_len + local_task_length[worker_ctx][nimpl];
|
|
|
-
|
|
|
+
|
|
|
if (exp_end[worker_ctx][nimpl] < best_exp_end)
|
|
|
{
|
|
|
/* a better solution was found */
|
|
|
best_exp_end = exp_end[worker_ctx][nimpl];
|
|
|
nimpl_best = nimpl;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (isnan(local_power[worker_ctx][nimpl]))
|
|
|
local_power[worker_ctx][nimpl] = 0.;
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
worker_ctx++;
|
|
|
}
|
|
@@ -577,7 +579,7 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
|
|
|
int forced_best = -1;
|
|
|
int forced_impl = -1;
|
|
|
|
|
|
- dmda_data *dt = (dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
+ struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
struct starpu_sched_ctx_worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
|
|
|
unsigned nworkers_ctx = workers->nworkers;
|
|
|
double local_task_length[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
|
|
@@ -617,12 +619,12 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
|
|
|
/* no one on that queue may execute this task */
|
|
|
continue;
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
- fitness[worker_ctx][nimpl] = dt->alpha*(exp_end[worker_ctx][nimpl] - best_exp_end)
|
|
|
+
|
|
|
+
|
|
|
+ fitness[worker_ctx][nimpl] = dt->alpha*(exp_end[worker_ctx][nimpl] - best_exp_end)
|
|
|
+ dt->beta*(local_data_penalty[worker_ctx][nimpl])
|
|
|
+ dt->_gamma*(local_power[worker_ctx][nimpl]);
|
|
|
-
|
|
|
+
|
|
|
if (exp_end[worker_ctx][nimpl] > max_exp_end)
|
|
|
{
|
|
|
/* This placement will make the computation
|
|
@@ -630,7 +632,7 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
|
|
|
* consumption of other cpus */
|
|
|
fitness[worker_ctx][nimpl] += dt->_gamma * dt->idle_power * (exp_end[worker_ctx][nimpl] - max_exp_end) / 1000000.0;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (best == -1 || fitness[worker_ctx][nimpl] < best_fitness)
|
|
|
{
|
|
|
/* we found a better solution */
|
|
@@ -669,7 +671,7 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
|
|
|
model_best = local_task_length[best_in_ctx][selected_impl];
|
|
|
transfer_model_best = local_data_penalty[best_in_ctx][selected_impl];
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (task->bundle)
|
|
|
starpu_task_bundle_remove(task->bundle, task);
|
|
|
if (workers->deinit_cursor)
|
|
@@ -744,9 +746,9 @@ static int dmda_push_task(struct starpu_task *task)
|
|
|
return ret_val;
|
|
|
}
|
|
|
|
|
|
-static void dmda_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
|
|
|
+static void dmda_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
|
|
|
{
|
|
|
- dmda_data *dt = (dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
+ struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
|
|
|
int workerid;
|
|
|
unsigned i;
|
|
@@ -760,7 +762,7 @@ static void dmda_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nwo
|
|
|
|
|
|
static void dmda_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
|
|
|
{
|
|
|
- dmda_data *dt = (dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
+ struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
|
|
|
int workerid;
|
|
|
unsigned i;
|
|
@@ -772,11 +774,11 @@ static void dmda_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-static void initialize_dmda_policy(unsigned sched_ctx_id)
|
|
|
+static void initialize_dmda_policy(unsigned sched_ctx_id)
|
|
|
{
|
|
|
starpu_create_worker_collection_for_sched_ctx(sched_ctx_id, WORKER_LIST);
|
|
|
|
|
|
- dmda_data *dt = (dmda_data*)malloc(sizeof(dmda_data));
|
|
|
+ struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)malloc(sizeof(struct _starpu_dmda_data));
|
|
|
dt->alpha = _STARPU_DEFAULT_ALPHA;
|
|
|
dt->beta = _STARPU_DEFAULT_BETA;
|
|
|
dt->_gamma = _STARPU_DEFAULT_GAMMA;
|
|
@@ -796,7 +798,7 @@ static void initialize_dmda_policy(unsigned sched_ctx_id)
|
|
|
|
|
|
const char *strval_gamma = getenv("STARPU_SCHED_GAMMA");
|
|
|
if (strval_gamma)
|
|
|
- dt->_gamma = atof(strval_gamma);
|
|
|
+ dt->_gamma = atof(strval_gamma);
|
|
|
|
|
|
const char *strval_idle_power = getenv("STARPU_IDLE_POWER");
|
|
|
if (strval_idle_power)
|
|
@@ -823,9 +825,9 @@ static void initialize_dmda_sorted_policy(unsigned sched_ctx_id)
|
|
|
starpu_sched_set_max_priority(INT_MAX);
|
|
|
}
|
|
|
|
|
|
-static void deinitialize_dmda_policy(unsigned sched_ctx_id)
|
|
|
+static void deinitialize_dmda_policy(unsigned sched_ctx_id)
|
|
|
{
|
|
|
- dmda_data *dt = (dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
+ struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
free(dt->queue_array);
|
|
|
free(dt);
|
|
|
starpu_delete_worker_collection_for_sched_ctx(sched_ctx_id);
|
|
@@ -840,7 +842,7 @@ static void dmda_pre_exec_hook(struct starpu_task *task)
|
|
|
{
|
|
|
unsigned sched_ctx_id = task->sched_ctx;
|
|
|
int workerid = starpu_worker_get_id();
|
|
|
- dmda_data *dt = (dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
+ struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
|
|
|
double model = task->predicted;
|
|
|
double transfer_model = task->predicted_transfer;
|
|
@@ -859,7 +861,7 @@ static void dmda_pre_exec_hook(struct starpu_task *task)
|
|
|
|
|
|
static void dmda_push_task_notify(struct starpu_task *task, int workerid, unsigned sched_ctx_id)
|
|
|
{
|
|
|
- dmda_data *dt = (dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
+ struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
|
|
|
/* Compute the expected penality */
|
|
|
enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
|