|
@@ -3,7 +3,7 @@
|
|
* Copyright (C) 2010-2012 Université de Bordeaux 1
|
|
* Copyright (C) 2010-2012 Université de Bordeaux 1
|
|
* Copyright (C) 2010, 2011, 2012 Centre National de la Recherche Scientifique
|
|
* Copyright (C) 2010, 2011, 2012 Centre National de la Recherche Scientifique
|
|
* Copyright (C) 2011 Télécom-SudParis
|
|
* Copyright (C) 2011 Télécom-SudParis
|
|
- * Copyright (C) 2012 inria
|
|
|
|
|
|
+ * Copyright (C) 2011-2012 INRIA
|
|
*
|
|
*
|
|
* StarPU is free software; you can redistribute it and/or modify
|
|
* StarPU is free software; you can redistribute it and/or modify
|
|
* it under the terms of the GNU Lesser General Public License as published by
|
|
* it under the terms of the GNU Lesser General Public License as published by
|
|
@@ -41,11 +41,17 @@
|
|
#define DBL_MAX __DBL_MAX__
|
|
#define DBL_MAX __DBL_MAX__
|
|
#endif
|
|
#endif
|
|
|
|
|
|
-static unsigned nworkers;
|
|
|
|
-static struct _starpu_fifo_taskq *queue_array[STARPU_NMAXWORKERS];
|
|
|
|
|
|
+typedef struct {
|
|
|
|
+ double alpha;
|
|
|
|
+ double beta;
|
|
|
|
+ double _gamma;
|
|
|
|
+ double idle_power;
|
|
|
|
|
|
-static _starpu_pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
|
|
|
|
-static _starpu_pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
|
|
|
|
|
|
+ struct _starpu_fifo_taskq **queue_array;
|
|
|
|
+
|
|
|
|
+ long int total_task_cnt;
|
|
|
|
+ long int ready_task_cnt;
|
|
|
|
+} dmda_data;
|
|
|
|
|
|
static double alpha = _STARPU_DEFAULT_ALPHA;
|
|
static double alpha = _STARPU_DEFAULT_ALPHA;
|
|
static double beta = _STARPU_DEFAULT_BETA;
|
|
static double beta = _STARPU_DEFAULT_BETA;
|
|
@@ -63,11 +69,6 @@ static const float idle_power_minimum=0;
|
|
static const float idle_power_maximum=10000.0;
|
|
static const float idle_power_maximum=10000.0;
|
|
#endif /* !STARPU_USE_TOP */
|
|
#endif /* !STARPU_USE_TOP */
|
|
|
|
|
|
-#ifdef STARPU_VERBOSE
|
|
|
|
-static long int total_task_cnt = 0;
|
|
|
|
-static long int ready_task_cnt = 0;
|
|
|
|
-#endif
|
|
|
|
-
|
|
|
|
static int count_non_ready_buffers(struct starpu_task *task, uint32_t node)
|
|
static int count_non_ready_buffers(struct starpu_task *task, uint32_t node)
|
|
{
|
|
{
|
|
int cnt = 0;
|
|
int cnt = 0;
|
|
@@ -150,12 +151,14 @@ static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo
|
|
return task;
|
|
return task;
|
|
}
|
|
}
|
|
|
|
|
|
-static struct starpu_task *dmda_pop_ready_task(void)
|
|
|
|
|
|
+static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
|
|
{
|
|
{
|
|
|
|
+ dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
|
|
|
|
+
|
|
struct starpu_task *task;
|
|
struct starpu_task *task;
|
|
|
|
|
|
int workerid = starpu_worker_get_id();
|
|
int workerid = starpu_worker_get_id();
|
|
- struct _starpu_fifo_taskq *fifo = queue_array[workerid];
|
|
|
|
|
|
+ struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
|
|
|
|
|
|
unsigned node = starpu_worker_get_memory_node(workerid);
|
|
unsigned node = starpu_worker_get_memory_node(workerid);
|
|
|
|
|
|
@@ -173,22 +176,24 @@ static struct starpu_task *dmda_pop_ready_task(void)
|
|
{
|
|
{
|
|
int non_ready = count_non_ready_buffers(task, node);
|
|
int non_ready = count_non_ready_buffers(task, node);
|
|
if (non_ready == 0)
|
|
if (non_ready == 0)
|
|
- ready_task_cnt++;
|
|
|
|
|
|
+ dt->ready_task_cnt++;
|
|
}
|
|
}
|
|
|
|
|
|
- total_task_cnt++;
|
|
|
|
|
|
+ dt->total_task_cnt++;
|
|
#endif
|
|
#endif
|
|
}
|
|
}
|
|
|
|
|
|
return task;
|
|
return task;
|
|
}
|
|
}
|
|
|
|
|
|
-static struct starpu_task *dmda_pop_task(void)
|
|
|
|
|
|
+static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
|
|
{
|
|
{
|
|
|
|
+ dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
|
|
|
|
+
|
|
struct starpu_task *task;
|
|
struct starpu_task *task;
|
|
|
|
|
|
int workerid = starpu_worker_get_id();
|
|
int workerid = starpu_worker_get_id();
|
|
- struct _starpu_fifo_taskq *fifo = queue_array[workerid];
|
|
|
|
|
|
+ struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
|
|
|
|
|
|
task = _starpu_fifo_pop_local_task(fifo);
|
|
task = _starpu_fifo_pop_local_task(fifo);
|
|
if (task)
|
|
if (task)
|
|
@@ -204,27 +209,29 @@ static struct starpu_task *dmda_pop_task(void)
|
|
{
|
|
{
|
|
int non_ready = count_non_ready_buffers(task, starpu_worker_get_memory_node(workerid));
|
|
int non_ready = count_non_ready_buffers(task, starpu_worker_get_memory_node(workerid));
|
|
if (non_ready == 0)
|
|
if (non_ready == 0)
|
|
- ready_task_cnt++;
|
|
|
|
|
|
+ dt->ready_task_cnt++;
|
|
}
|
|
}
|
|
|
|
|
|
- total_task_cnt++;
|
|
|
|
|
|
+ dt->total_task_cnt++;
|
|
#endif
|
|
#endif
|
|
}
|
|
}
|
|
|
|
|
|
return task;
|
|
return task;
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-static struct starpu_task *dmda_pop_every_task(void)
|
|
|
|
|
|
+static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
|
|
{
|
|
{
|
|
|
|
+ dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
|
|
|
|
+
|
|
struct starpu_task *new_list;
|
|
struct starpu_task *new_list;
|
|
|
|
|
|
int workerid = starpu_worker_get_id();
|
|
int workerid = starpu_worker_get_id();
|
|
|
|
+ struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
|
|
|
|
|
|
- struct _starpu_fifo_taskq *fifo = queue_array[workerid];
|
|
|
|
-
|
|
|
|
- new_list = _starpu_fifo_pop_every_task(fifo, &sched_mutex[workerid], workerid);
|
|
|
|
|
|
+ pthread_mutex_t *sched_mutex;
|
|
|
|
+ pthread_cond_t *sched_cond;
|
|
|
|
+ starpu_worker_get_sched_condition(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
|
|
|
|
+ new_list = _starpu_fifo_pop_every_task(fifo, sched_mutex, workerid);
|
|
|
|
|
|
while (new_list)
|
|
while (new_list)
|
|
{
|
|
{
|
|
@@ -240,21 +247,27 @@ static struct starpu_task *dmda_pop_every_task(void)
|
|
return new_list;
|
|
return new_list;
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
|
|
static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
|
|
double predicted, double predicted_transfer,
|
|
double predicted, double predicted_transfer,
|
|
- int prio)
|
|
|
|
|
|
+ int prio, unsigned sched_ctx_id)
|
|
{
|
|
{
|
|
|
|
+ dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
|
|
/* make sure someone coule execute that task ! */
|
|
/* make sure someone coule execute that task ! */
|
|
STARPU_ASSERT(best_workerid != -1);
|
|
STARPU_ASSERT(best_workerid != -1);
|
|
|
|
|
|
- struct _starpu_fifo_taskq *fifo = queue_array[best_workerid];
|
|
|
|
|
|
+ struct _starpu_fifo_taskq *fifo = dt->queue_array[best_workerid];
|
|
|
|
|
|
- _STARPU_PTHREAD_MUTEX_LOCK(&sched_mutex[best_workerid]);
|
|
|
|
|
|
+ pthread_mutex_t *sched_mutex;
|
|
|
|
+ pthread_cond_t *sched_cond;
|
|
|
|
+ starpu_worker_get_sched_condition(sched_ctx_id, best_workerid, &sched_mutex, &sched_cond);
|
|
|
|
|
|
- /* Sometimes workers didn't take the tasks as early as we expected */
|
|
|
|
|
|
+#ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
|
|
|
|
+ starpu_call_pushed_task_cb(best_workerid, sched_ctx_id);
|
|
|
|
+#endif //STARPU_USE_SCHED_CTX_HYPERVISOR
|
|
|
|
+
|
|
|
|
+ _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
|
|
|
|
+
|
|
|
|
+/* Sometimes workers didn't take the tasks as early as we expected */
|
|
fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
|
|
fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
|
|
fifo->exp_end = fifo->exp_start + fifo->exp_len;
|
|
fifo->exp_end = fifo->exp_start + fifo->exp_len;
|
|
|
|
|
|
@@ -277,9 +290,9 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
|
|
|
|
|
|
fifo->exp_end += predicted_transfer;
|
|
fifo->exp_end += predicted_transfer;
|
|
fifo->exp_len += predicted_transfer;
|
|
fifo->exp_len += predicted_transfer;
|
|
-
|
|
|
|
- _STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[best_workerid]);
|
|
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
|
|
|
|
+
|
|
task->predicted = predicted;
|
|
task->predicted = predicted;
|
|
task->predicted_transfer = predicted_transfer;
|
|
task->predicted_transfer = predicted_transfer;
|
|
|
|
|
|
@@ -303,20 +316,20 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
|
|
}
|
|
}
|
|
#endif
|
|
#endif
|
|
if (prio)
|
|
if (prio)
|
|
- return _starpu_fifo_push_sorted_task(queue_array[best_workerid],
|
|
|
|
- &sched_mutex[best_workerid], &sched_cond[best_workerid], task);
|
|
|
|
|
|
+ return _starpu_fifo_push_sorted_task(dt->queue_array[best_workerid],
|
|
|
|
+ sched_mutex, sched_cond, task);
|
|
else
|
|
else
|
|
- return _starpu_fifo_push_task(queue_array[best_workerid],
|
|
|
|
- &sched_mutex[best_workerid], &sched_cond[best_workerid], task);
|
|
|
|
|
|
+ return _starpu_fifo_push_task(dt->queue_array[best_workerid],
|
|
|
|
+ sched_mutex, sched_cond, task);
|
|
}
|
|
}
|
|
|
|
|
|
/* TODO: factorize with dmda!! */
|
|
/* TODO: factorize with dmda!! */
|
|
-static int _dm_push_task(struct starpu_task *task, unsigned prio)
|
|
|
|
|
|
+static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
|
|
{
|
|
{
|
|
- /* find the queue */
|
|
|
|
- unsigned worker;
|
|
|
|
|
|
+ dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
|
|
|
|
+ unsigned worker, worker_ctx = 0;
|
|
int best = -1;
|
|
int best = -1;
|
|
-
|
|
|
|
|
|
+
|
|
double best_exp_end = 0.0;
|
|
double best_exp_end = 0.0;
|
|
double model_best = 0.0;
|
|
double model_best = 0.0;
|
|
double transfer_model_best = 0.0;
|
|
double transfer_model_best = 0.0;
|
|
@@ -324,41 +337,51 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio)
|
|
int ntasks_best = -1;
|
|
int ntasks_best = -1;
|
|
double ntasks_best_end = 0.0;
|
|
double ntasks_best_end = 0.0;
|
|
int calibrating = 0;
|
|
int calibrating = 0;
|
|
-
|
|
|
|
|
|
+
|
|
/* A priori, we know all estimations */
|
|
/* A priori, we know all estimations */
|
|
int unknown = 0;
|
|
int unknown = 0;
|
|
-
|
|
|
|
|
|
+
|
|
unsigned best_impl = 0;
|
|
unsigned best_impl = 0;
|
|
unsigned nimpl;
|
|
unsigned nimpl;
|
|
-
|
|
|
|
- for (worker = 0; worker < nworkers; worker++)
|
|
|
|
|
|
+ struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
|
|
|
|
+
|
|
|
|
+ if(workers->init_cursor)
|
|
|
|
+ workers->init_cursor(workers);
|
|
|
|
+
|
|
|
|
+ while(workers->has_next(workers))
|
|
{
|
|
{
|
|
- struct _starpu_fifo_taskq *fifo = queue_array[worker];
|
|
|
|
|
|
+ worker = workers->get_next(workers);
|
|
|
|
+ struct _starpu_fifo_taskq *fifo = dt->queue_array[worker];
|
|
unsigned memory_node = starpu_worker_get_memory_node(worker);
|
|
unsigned memory_node = starpu_worker_get_memory_node(worker);
|
|
enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
|
|
enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
|
|
-
|
|
|
|
|
|
+
|
|
for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
|
|
for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
|
|
{
|
|
{
|
|
if (!starpu_worker_can_execute_task(worker, task, nimpl))
|
|
if (!starpu_worker_can_execute_task(worker, task, nimpl))
|
|
{
|
|
{
|
|
/* no one on that queue may execute this task */
|
|
/* no one on that queue may execute this task */
|
|
|
|
+ // worker_ctx++;
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
double exp_end;
|
|
double exp_end;
|
|
-
|
|
|
|
|
|
+ pthread_mutex_t *sched_mutex;
|
|
|
|
+ pthread_cond_t *sched_cond;
|
|
|
|
+ starpu_worker_get_sched_condition(sched_ctx_id, worker, &sched_mutex, &sched_cond);
|
|
|
|
+
|
|
/* Sometimes workers didn't take the tasks as early as we expected */
|
|
/* Sometimes workers didn't take the tasks as early as we expected */
|
|
- _STARPU_PTHREAD_MUTEX_LOCK(&sched_mutex[worker]);
|
|
|
|
|
|
+ _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
|
|
fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
|
|
fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
|
|
fifo->exp_end = fifo->exp_start + fifo->exp_len;
|
|
fifo->exp_end = fifo->exp_start + fifo->exp_len;
|
|
- _STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[worker]);
|
|
|
|
-
|
|
|
|
|
|
+ _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
|
|
|
|
+
|
|
|
|
+
|
|
double local_length = starpu_task_expected_length(task, perf_arch, nimpl);
|
|
double local_length = starpu_task_expected_length(task, perf_arch, nimpl);
|
|
double local_penalty = starpu_task_expected_data_transfer_time(memory_node, task);
|
|
double local_penalty = starpu_task_expected_data_transfer_time(memory_node, task);
|
|
double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
|
|
double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
|
|
-
|
|
|
|
|
|
+
|
|
//_STARPU_DEBUG("Scheduler dm: task length (%lf) worker (%u) kernel (%u) \n", local_length,worker,nimpl);
|
|
//_STARPU_DEBUG("Scheduler dm: task length (%lf) worker (%u) kernel (%u) \n", local_length,worker,nimpl);
|
|
-
|
|
|
|
|
|
+
|
|
if (ntasks_best == -1
|
|
if (ntasks_best == -1
|
|
|| (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
|
|
|| (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
|
|
|| (!calibrating && isnan(local_length)) /* Not calibrating but this worker is being calibrated */
|
|
|| (!calibrating && isnan(local_length)) /* Not calibrating but this worker is being calibrated */
|
|
@@ -369,23 +392,23 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio)
|
|
ntasks_best = worker;
|
|
ntasks_best = worker;
|
|
best_impl = nimpl;
|
|
best_impl = nimpl;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
if (isnan(local_length))
|
|
if (isnan(local_length))
|
|
/* we are calibrating, we want to speed-up calibration time
|
|
/* we are calibrating, we want to speed-up calibration time
|
|
* so we privilege non-calibrated tasks (but still
|
|
* so we privilege non-calibrated tasks (but still
|
|
* greedily distribute them to avoid dumb schedules) */
|
|
* greedily distribute them to avoid dumb schedules) */
|
|
calibrating = 1;
|
|
calibrating = 1;
|
|
-
|
|
|
|
|
|
+
|
|
if (isnan(local_length) || _STARPU_IS_ZERO(local_length))
|
|
if (isnan(local_length) || _STARPU_IS_ZERO(local_length))
|
|
/* there is no prediction available for that task
|
|
/* there is no prediction available for that task
|
|
* with that arch yet, so switch to a greedy strategy */
|
|
* with that arch yet, so switch to a greedy strategy */
|
|
unknown = 1;
|
|
unknown = 1;
|
|
-
|
|
|
|
|
|
+
|
|
if (unknown)
|
|
if (unknown)
|
|
continue;
|
|
continue;
|
|
|
|
|
|
exp_end = fifo->exp_start + fifo->exp_len + local_length;
|
|
exp_end = fifo->exp_start + fifo->exp_len + local_length;
|
|
-
|
|
|
|
|
|
+
|
|
if (best == -1 || exp_end < best_exp_end)
|
|
if (best == -1 || exp_end < best_exp_end)
|
|
{
|
|
{
|
|
/* a better solution was found */
|
|
/* a better solution was found */
|
|
@@ -396,6 +419,7 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio)
|
|
best_impl = nimpl;
|
|
best_impl = nimpl;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ worker_ctx++;
|
|
}
|
|
}
|
|
|
|
|
|
if (unknown)
|
|
if (unknown)
|
|
@@ -404,24 +428,27 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio)
|
|
model_best = 0.0;
|
|
model_best = 0.0;
|
|
transfer_model_best = 0.0;
|
|
transfer_model_best = 0.0;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
//_STARPU_DEBUG("Scheduler dm: kernel (%u)\n", best_impl);
|
|
//_STARPU_DEBUG("Scheduler dm: kernel (%u)\n", best_impl);
|
|
-
|
|
|
|
- _starpu_get_job_associated_to_task(task)->nimpl = best_impl;
|
|
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ if(workers->init_cursor)
|
|
|
|
+ workers->deinit_cursor(workers);
|
|
|
|
+
|
|
|
|
+ _starpu_get_job_associated_to_task(task)->nimpl = best_impl;
|
|
|
|
+
|
|
/* we should now have the best worker in variable "best" */
|
|
/* we should now have the best worker in variable "best" */
|
|
return push_task_on_best_worker(task, best,
|
|
return push_task_on_best_worker(task, best,
|
|
- model_best, transfer_model_best, prio);
|
|
|
|
|
|
+ model_best, transfer_model_best, prio, sched_ctx_id);
|
|
}
|
|
}
|
|
|
|
|
|
static void compute_all_performance_predictions(struct starpu_task *task,
|
|
static void compute_all_performance_predictions(struct starpu_task *task,
|
|
- double local_task_length[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
|
- double exp_end[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
|
- double *max_exp_endp,
|
|
|
|
- double *best_exp_endp,
|
|
|
|
- double local_data_penalty[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
|
- double local_power[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
|
- int *forced_worker, int *forced_impl)
|
|
|
|
|
|
+ double local_task_length[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
|
+ double exp_end[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
|
+ double *max_exp_endp,
|
|
|
|
+ double *best_exp_endp,
|
|
|
|
+ double local_data_penalty[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
|
+ double local_power[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
|
+ int *forced_worker, int *forced_impl, unsigned sched_ctx_id)
|
|
{
|
|
{
|
|
int calibrating = 0;
|
|
int calibrating = 0;
|
|
double max_exp_end = DBL_MIN;
|
|
double max_exp_end = DBL_MIN;
|
|
@@ -432,20 +459,23 @@ static void compute_all_performance_predictions(struct starpu_task *task,
|
|
|
|
|
|
/* A priori, we know all estimations */
|
|
/* A priori, we know all estimations */
|
|
int unknown = 0;
|
|
int unknown = 0;
|
|
- unsigned worker;
|
|
|
|
|
|
+ unsigned worker, worker_ctx = 0;
|
|
|
|
|
|
unsigned nimpl;
|
|
unsigned nimpl;
|
|
|
|
|
|
starpu_task_bundle_t bundle = task->bundle;
|
|
starpu_task_bundle_t bundle = task->bundle;
|
|
-
|
|
|
|
- for (worker = 0; worker < nworkers; worker++)
|
|
|
|
|
|
+ dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
|
|
|
|
+ struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
|
|
|
|
+
|
|
|
|
+ while(workers->has_next(workers))
|
|
{
|
|
{
|
|
- struct _starpu_fifo_taskq *fifo = queue_array[worker];
|
|
|
|
|
|
+ worker = workers->get_next(workers);
|
|
|
|
+ struct _starpu_fifo_taskq *fifo = dt->queue_array[worker];
|
|
enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
|
|
enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
|
|
unsigned memory_node = starpu_worker_get_memory_node(worker);
|
|
unsigned memory_node = starpu_worker_get_memory_node(worker);
|
|
|
|
|
|
- for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
|
|
|
|
- {
|
|
|
|
|
|
+ for(nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
|
|
|
|
+ {
|
|
if (!starpu_worker_can_execute_task(worker, task, nimpl))
|
|
if (!starpu_worker_can_execute_task(worker, task, nimpl))
|
|
{
|
|
{
|
|
/* no one on that queue may execute this task */
|
|
/* no one on that queue may execute this task */
|
|
@@ -453,38 +483,41 @@ static void compute_all_performance_predictions(struct starpu_task *task,
|
|
}
|
|
}
|
|
|
|
|
|
/* Sometimes workers didn't take the tasks as early as we expected */
|
|
/* Sometimes workers didn't take the tasks as early as we expected */
|
|
- _STARPU_PTHREAD_MUTEX_LOCK(&sched_mutex[worker]);
|
|
|
|
|
|
+ pthread_mutex_t *sched_mutex;
|
|
|
|
+ pthread_cond_t *sched_cond;
|
|
|
|
+ starpu_worker_get_sched_condition(sched_ctx_id, worker, &sched_mutex, &sched_cond);
|
|
|
|
+ _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
|
|
fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
|
|
fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
|
|
- exp_end[worker][nimpl] = fifo->exp_start + fifo->exp_len;
|
|
|
|
- _STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[worker]);
|
|
|
|
- if (exp_end[worker][nimpl] > max_exp_end)
|
|
|
|
- max_exp_end = exp_end[worker][nimpl];
|
|
|
|
|
|
+ _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
|
|
|
|
+ exp_end[worker_ctx][nimpl] = fifo->exp_start + fifo->exp_len;
|
|
|
|
+ if (exp_end[worker_ctx][nimpl] > max_exp_end)
|
|
|
|
+ max_exp_end = exp_end[worker_ctx][nimpl];
|
|
|
|
|
|
//_STARPU_DEBUG("Scheduler dmda: task length (%lf) worker (%u) kernel (%u) \n", local_task_length[worker][nimpl],worker,nimpl);
|
|
//_STARPU_DEBUG("Scheduler dmda: task length (%lf) worker (%u) kernel (%u) \n", local_task_length[worker][nimpl],worker,nimpl);
|
|
|
|
|
|
if (bundle)
|
|
if (bundle)
|
|
{
|
|
{
|
|
/* TODO : conversion time */
|
|
/* TODO : conversion time */
|
|
- local_task_length[worker][nimpl] = starpu_task_bundle_expected_length(bundle, perf_arch, nimpl);
|
|
|
|
- local_data_penalty[worker][nimpl] = starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
|
|
|
|
- local_power[worker][nimpl] = starpu_task_bundle_expected_power(bundle, perf_arch,nimpl);
|
|
|
|
|
|
+ local_task_length[worker_ctx][nimpl] = starpu_task_bundle_expected_length(bundle, perf_arch, nimpl);
|
|
|
|
+ local_data_penalty[worker_ctx][nimpl] = starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
|
|
|
|
+ local_power[worker_ctx][nimpl] = starpu_task_bundle_expected_power(bundle, perf_arch,nimpl);
|
|
}
|
|
}
|
|
else
|
|
else
|
|
{
|
|
{
|
|
- local_task_length[worker][nimpl] = starpu_task_expected_length(task, perf_arch, nimpl);
|
|
|
|
- local_data_penalty[worker][nimpl] = starpu_task_expected_data_transfer_time(memory_node, task);
|
|
|
|
- local_power[worker][nimpl] = starpu_task_expected_power(task, perf_arch,nimpl);
|
|
|
|
|
|
+ local_task_length[worker_ctx][nimpl] = starpu_task_expected_length(task, perf_arch, nimpl);
|
|
|
|
+ local_data_penalty[worker_ctx][nimpl] = starpu_task_expected_data_transfer_time(memory_node, task);
|
|
|
|
+ local_power[worker_ctx][nimpl] = starpu_task_expected_power(task, perf_arch,nimpl);
|
|
double conversion_time = starpu_task_expected_conversion_time(task, perf_arch, nimpl);
|
|
double conversion_time = starpu_task_expected_conversion_time(task, perf_arch, nimpl);
|
|
if (conversion_time > 0.0)
|
|
if (conversion_time > 0.0)
|
|
- local_task_length[worker][nimpl] += conversion_time;
|
|
|
|
|
|
+ local_task_length[worker_ctx][nimpl] += conversion_time;
|
|
}
|
|
}
|
|
|
|
|
|
double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
|
|
double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
|
|
|
|
|
|
if (ntasks_best == -1
|
|
if (ntasks_best == -1
|
|
|| (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better worker */
|
|
|| (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better worker */
|
|
- || (!calibrating && isnan(local_task_length[worker][nimpl])) /* Not calibrating but this worker is being calibrated */
|
|
|
|
- || (calibrating && isnan(local_task_length[worker][nimpl]) && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
|
|
|
|
|
|
+ || (!calibrating && isnan(local_task_length[worker_ctx][nimpl])) /* Not calibrating but this worker is being calibrated */
|
|
|
|
+ || (calibrating && isnan(local_task_length[worker_ctx][nimpl]) && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
|
|
)
|
|
)
|
|
{
|
|
{
|
|
ntasks_best_end = ntasks_end;
|
|
ntasks_best_end = ntasks_end;
|
|
@@ -492,34 +525,35 @@ static void compute_all_performance_predictions(struct starpu_task *task,
|
|
nimpl_best = nimpl;
|
|
nimpl_best = nimpl;
|
|
}
|
|
}
|
|
|
|
|
|
- if (isnan(local_task_length[worker][nimpl]))
|
|
|
|
|
|
+ if (isnan(local_task_length[worker_ctx][nimpl]))
|
|
/* we are calibrating, we want to speed-up calibration time
|
|
/* we are calibrating, we want to speed-up calibration time
|
|
* so we privilege non-calibrated tasks (but still
|
|
* so we privilege non-calibrated tasks (but still
|
|
* greedily distribute them to avoid dumb schedules) */
|
|
* greedily distribute them to avoid dumb schedules) */
|
|
calibrating = 1;
|
|
calibrating = 1;
|
|
-
|
|
|
|
- if (isnan(local_task_length[worker][nimpl])
|
|
|
|
- || _STARPU_IS_ZERO(local_task_length[worker][nimpl]))
|
|
|
|
|
|
+
|
|
|
|
+ if (isnan(local_task_length[worker_ctx][nimpl])
|
|
|
|
+ || _STARPU_IS_ZERO(local_task_length[worker_ctx][nimpl]))
|
|
/* there is no prediction available for that task
|
|
/* there is no prediction available for that task
|
|
* with that arch (yet or at all), so switch to a greedy strategy */
|
|
* with that arch (yet or at all), so switch to a greedy strategy */
|
|
unknown = 1;
|
|
unknown = 1;
|
|
|
|
|
|
if (unknown)
|
|
if (unknown)
|
|
continue;
|
|
continue;
|
|
-
|
|
|
|
- exp_end[worker][nimpl] = fifo->exp_start + fifo->exp_len + local_task_length[worker][nimpl];
|
|
|
|
-
|
|
|
|
- if (exp_end[worker][nimpl] < best_exp_end)
|
|
|
|
|
|
+
|
|
|
|
+ exp_end[worker_ctx][nimpl] = fifo->exp_start + fifo->exp_len + local_task_length[worker_ctx][nimpl];
|
|
|
|
+
|
|
|
|
+ if (exp_end[worker_ctx][nimpl] < best_exp_end)
|
|
{
|
|
{
|
|
/* a better solution was found */
|
|
/* a better solution was found */
|
|
- best_exp_end = exp_end[worker][nimpl];
|
|
|
|
|
|
+ best_exp_end = exp_end[worker_ctx][nimpl];
|
|
nimpl_best = nimpl;
|
|
nimpl_best = nimpl;
|
|
}
|
|
}
|
|
-
|
|
|
|
- if (isnan(local_power[worker][nimpl]))
|
|
|
|
- local_power[worker][nimpl] = 0.;
|
|
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ if (isnan(local_power[worker_ctx][nimpl]))
|
|
|
|
+ local_power[worker_ctx][nimpl] = 0.;
|
|
|
|
+
|
|
}
|
|
}
|
|
|
|
+ worker_ctx++;
|
|
}
|
|
}
|
|
|
|
|
|
*forced_worker = unknown?ntasks_best:-1;
|
|
*forced_worker = unknown?ntasks_best:-1;
|
|
@@ -529,11 +563,11 @@ static void compute_all_performance_predictions(struct starpu_task *task,
|
|
*max_exp_endp = max_exp_end;
|
|
*max_exp_endp = max_exp_end;
|
|
}
|
|
}
|
|
|
|
|
|
-static int _dmda_push_task(struct starpu_task *task, unsigned prio)
|
|
|
|
|
|
+static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
|
|
{
|
|
{
|
|
/* find the queue */
|
|
/* find the queue */
|
|
- unsigned worker;
|
|
|
|
- int best = -1;
|
|
|
|
|
|
+ unsigned worker, worker_ctx = 0;
|
|
|
|
+ int best = -1, best_in_ctx = -1;
|
|
int selected_impl = 0;
|
|
int selected_impl = 0;
|
|
double model_best = 0.0;
|
|
double model_best = 0.0;
|
|
double transfer_model_best = 0.0;
|
|
double transfer_model_best = 0.0;
|
|
@@ -543,32 +577,39 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
|
|
int forced_best = -1;
|
|
int forced_best = -1;
|
|
int forced_impl = -1;
|
|
int forced_impl = -1;
|
|
|
|
|
|
- double local_task_length[nworkers][STARPU_MAXIMPLEMENTATIONS];
|
|
|
|
- double local_data_penalty[nworkers][STARPU_MAXIMPLEMENTATIONS];
|
|
|
|
- double local_power[nworkers][STARPU_MAXIMPLEMENTATIONS];
|
|
|
|
- double exp_end[nworkers][STARPU_MAXIMPLEMENTATIONS];
|
|
|
|
|
|
+ dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
|
|
|
|
+ struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
|
|
|
|
+ unsigned nworkers_ctx = workers->nworkers;
|
|
|
|
+ double local_task_length[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
|
|
|
|
+ double local_data_penalty[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
|
|
|
|
+ double local_power[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
|
|
|
|
+ double exp_end[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
|
|
double max_exp_end = 0.0;
|
|
double max_exp_end = 0.0;
|
|
double best_exp_end;
|
|
double best_exp_end;
|
|
|
|
|
|
- double fitness[nworkers][STARPU_MAXIMPLEMENTATIONS];
|
|
|
|
|
|
+ double fitness[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
|
|
|
|
+
|
|
|
|
+ if(workers->init_cursor)
|
|
|
|
+ workers->init_cursor(workers);
|
|
|
|
|
|
compute_all_performance_predictions(task,
|
|
compute_all_performance_predictions(task,
|
|
- local_task_length,
|
|
|
|
- exp_end,
|
|
|
|
- &max_exp_end,
|
|
|
|
- &best_exp_end,
|
|
|
|
- local_data_penalty,
|
|
|
|
- local_power,
|
|
|
|
- &forced_best,
|
|
|
|
- &forced_impl);
|
|
|
|
|
|
+ local_task_length,
|
|
|
|
+ exp_end,
|
|
|
|
+ &max_exp_end,
|
|
|
|
+ &best_exp_end,
|
|
|
|
+ local_data_penalty,
|
|
|
|
+ local_power,
|
|
|
|
+ &forced_best,
|
|
|
|
+ &forced_impl, sched_ctx_id);
|
|
|
|
|
|
double best_fitness = -1;
|
|
double best_fitness = -1;
|
|
|
|
|
|
unsigned nimpl;
|
|
unsigned nimpl;
|
|
if (forced_best == -1)
|
|
if (forced_best == -1)
|
|
{
|
|
{
|
|
- for (worker = 0; worker < nworkers; worker++)
|
|
|
|
|
|
+ while(workers->has_next(workers))
|
|
{
|
|
{
|
|
|
|
+ worker = workers->get_next(workers);
|
|
for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
|
|
for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
|
|
{
|
|
{
|
|
if (!starpu_worker_can_execute_task(worker, task, nimpl))
|
|
if (!starpu_worker_can_execute_task(worker, task, nimpl))
|
|
@@ -576,29 +617,32 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
|
|
/* no one on that queue may execute this task */
|
|
/* no one on that queue may execute this task */
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
-
|
|
|
|
- fitness[worker][nimpl] = alpha*(exp_end[worker][nimpl] - best_exp_end)
|
|
|
|
- + beta*(local_data_penalty[worker][nimpl])
|
|
|
|
- + _gamma*(local_power[worker][nimpl]);
|
|
|
|
-
|
|
|
|
- if (exp_end[worker][nimpl] > max_exp_end)
|
|
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ fitness[worker_ctx][nimpl] = dt->alpha*(exp_end[worker_ctx][nimpl] - best_exp_end)
|
|
|
|
+ + dt->beta*(local_data_penalty[worker_ctx][nimpl])
|
|
|
|
+ + dt->_gamma*(local_power[worker_ctx][nimpl]);
|
|
|
|
+
|
|
|
|
+ if (exp_end[worker_ctx][nimpl] > max_exp_end)
|
|
{
|
|
{
|
|
/* This placement will make the computation
|
|
/* This placement will make the computation
|
|
* longer, take into account the idle
|
|
* longer, take into account the idle
|
|
* consumption of other cpus */
|
|
* consumption of other cpus */
|
|
- fitness[worker][nimpl] += _gamma * idle_power * (exp_end[worker][nimpl] - max_exp_end) / 1000000.0;
|
|
|
|
|
|
+ fitness[worker_ctx][nimpl] += dt->_gamma * dt->idle_power * (exp_end[worker_ctx][nimpl] - max_exp_end) / 1000000.0;
|
|
}
|
|
}
|
|
-
|
|
|
|
- if (best == -1 || fitness[worker][nimpl] < best_fitness)
|
|
|
|
|
|
+
|
|
|
|
+ if (best == -1 || fitness[worker_ctx][nimpl] < best_fitness)
|
|
{
|
|
{
|
|
/* we found a better solution */
|
|
/* we found a better solution */
|
|
- best_fitness = fitness[worker][nimpl];
|
|
|
|
|
|
+ best_fitness = fitness[worker_ctx][nimpl];
|
|
best = worker;
|
|
best = worker;
|
|
|
|
+ best_in_ctx = worker_ctx;
|
|
selected_impl = nimpl;
|
|
selected_impl = nimpl;
|
|
|
|
|
|
//_STARPU_DEBUG("best fitness (worker %d) %e = alpha*(%e) + beta(%e) +gamma(%e)\n", worker, best_fitness, exp_end[worker][nimpl] - best_exp_end, local_data_penalty[worker][nimpl], local_power[worker][nimpl]);
|
|
//_STARPU_DEBUG("best fitness (worker %d) %e = alpha*(%e) + beta(%e) +gamma(%e)\n", worker, best_fitness, exp_end[worker][nimpl] - best_exp_end, local_data_penalty[worker][nimpl], local_power[worker][nimpl]);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ worker_ctx++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -615,66 +659,148 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
|
|
}
|
|
}
|
|
else if (task->bundle)
|
|
else if (task->bundle)
|
|
{
|
|
{
|
|
- enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(best);
|
|
|
|
|
|
+ enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(best_in_ctx);
|
|
unsigned memory_node = starpu_worker_get_memory_node(best);
|
|
unsigned memory_node = starpu_worker_get_memory_node(best);
|
|
model_best = starpu_task_expected_length(task, perf_arch, selected_impl);
|
|
model_best = starpu_task_expected_length(task, perf_arch, selected_impl);
|
|
transfer_model_best = starpu_task_expected_data_transfer_time(memory_node, task);
|
|
transfer_model_best = starpu_task_expected_data_transfer_time(memory_node, task);
|
|
}
|
|
}
|
|
else
|
|
else
|
|
{
|
|
{
|
|
- model_best = local_task_length[best][selected_impl];
|
|
|
|
- transfer_model_best = local_data_penalty[best][selected_impl];
|
|
|
|
|
|
+ model_best = local_task_length[best_in_ctx][selected_impl];
|
|
|
|
+ transfer_model_best = local_data_penalty[best_in_ctx][selected_impl];
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
if (task->bundle)
|
|
if (task->bundle)
|
|
starpu_task_bundle_remove(task->bundle, task);
|
|
starpu_task_bundle_remove(task->bundle, task);
|
|
-
|
|
|
|
|
|
+ if(workers->init_cursor)
|
|
|
|
+ workers->deinit_cursor(workers);
|
|
|
|
|
|
//_STARPU_DEBUG("Scheduler dmda: kernel (%u)\n", best_impl);
|
|
//_STARPU_DEBUG("Scheduler dmda: kernel (%u)\n", best_impl);
|
|
_starpu_get_job_associated_to_task(task)->nimpl = selected_impl;
|
|
_starpu_get_job_associated_to_task(task)->nimpl = selected_impl;
|
|
|
|
|
|
/* we should now have the best worker in variable "best" */
|
|
/* we should now have the best worker in variable "best" */
|
|
- return push_task_on_best_worker(task, best,
|
|
|
|
- model_best, transfer_model_best, prio);
|
|
|
|
|
|
+ return push_task_on_best_worker(task, best, model_best, transfer_model_best, prio, sched_ctx_id);
|
|
}
|
|
}
|
|
|
|
|
|
static int dmda_push_sorted_task(struct starpu_task *task)
|
|
static int dmda_push_sorted_task(struct starpu_task *task)
|
|
{
|
|
{
|
|
- return _dmda_push_task(task, 1);
|
|
|
|
|
|
+ unsigned sched_ctx_id = task->sched_ctx;
|
|
|
|
+ pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
|
|
|
|
+ unsigned nworkers;
|
|
|
|
+ int ret_val = -1;
|
|
|
|
+
|
|
|
|
+ _STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
|
|
|
|
+ nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
|
|
|
|
+ if(nworkers == 0)
|
|
|
|
+ {
|
|
|
|
+ _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
|
|
|
|
+ return ret_val;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ ret_val = _dmda_push_task(task, 1, sched_ctx_id);
|
|
|
|
+ _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
|
|
|
|
+ return ret_val;
|
|
|
|
+
|
|
}
|
|
}
|
|
|
|
|
|
static int dm_push_task(struct starpu_task *task)
|
|
static int dm_push_task(struct starpu_task *task)
|
|
{
|
|
{
|
|
- return _dm_push_task(task, 0);
|
|
|
|
|
|
+ unsigned sched_ctx_id = task->sched_ctx;
|
|
|
|
+ pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
|
|
|
|
+ unsigned nworkers;
|
|
|
|
+ int ret_val = -1;
|
|
|
|
+
|
|
|
|
+ _STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
|
|
|
|
+ nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
|
|
|
|
+ if(nworkers == 0)
|
|
|
|
+ {
|
|
|
|
+ _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
|
|
|
|
+ return ret_val;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ ret_val = _dm_push_task(task, 0, sched_ctx_id);
|
|
|
|
+ _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
|
|
|
|
+ return ret_val;
|
|
}
|
|
}
|
|
|
|
|
|
static int dmda_push_task(struct starpu_task *task)
|
|
static int dmda_push_task(struct starpu_task *task)
|
|
{
|
|
{
|
|
- return _dmda_push_task(task, 0);
|
|
|
|
|
|
+ unsigned sched_ctx_id = task->sched_ctx;
|
|
|
|
+ pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
|
|
|
|
+ unsigned nworkers;
|
|
|
|
+ int ret_val = -1;
|
|
|
|
+
|
|
|
|
+ _STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
|
|
|
|
+ nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
|
|
|
|
+ if(nworkers == 0)
|
|
|
|
+ {
|
|
|
|
+ _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
|
|
|
|
+ return ret_val;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ STARPU_ASSERT(task);
|
|
|
|
+ ret_val = _dmda_push_task(task, 0, sched_ctx_id);
|
|
|
|
+ _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
|
|
|
|
+ return ret_val;
|
|
}
|
|
}
|
|
|
|
|
|
-static void initialize_dmda_policy(struct starpu_machine_topology *topology,
|
|
|
|
- struct starpu_sched_policy *policy)
|
|
|
|
|
|
+static void dmda_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
|
|
{
|
|
{
|
|
- (void) policy;
|
|
|
|
|
|
+ dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
|
|
|
|
|
|
- nworkers = topology->nworkers;
|
|
|
|
|
|
+ int workerid;
|
|
|
|
+ unsigned i;
|
|
|
|
+ for (i = 0; i < nworkers; i++)
|
|
|
|
+ {
|
|
|
|
+ workerid = workerids[i];
|
|
|
|
+ dt->queue_array[workerid] = _starpu_create_fifo();
|
|
|
|
+ starpu_worker_init_sched_condition(sched_ctx_id, workerid);
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+static void dmda_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
|
|
|
|
+{
|
|
|
|
+ dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
|
|
|
|
+
|
|
|
|
+ int workerid;
|
|
|
|
+ unsigned i;
|
|
|
|
+ for (i = 0; i < nworkers; i++)
|
|
|
|
+ {
|
|
|
|
+ workerid = workerids[i];
|
|
|
|
+ _starpu_destroy_fifo(dt->queue_array[workerid]);
|
|
|
|
+ starpu_worker_deinit_sched_condition(sched_ctx_id, workerid);
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+static void initialize_dmda_policy(unsigned sched_ctx_id)
|
|
|
|
+{
|
|
|
|
+ starpu_create_worker_collection_for_sched_ctx(sched_ctx_id, WORKER_LIST);
|
|
|
|
+
|
|
|
|
+ dmda_data *dt = (dmda_data*)malloc(sizeof(dmda_data));
|
|
|
|
+ dt->alpha = _STARPU_DEFAULT_ALPHA;
|
|
|
|
+ dt->beta = _STARPU_DEFAULT_BETA;
|
|
|
|
+ dt->_gamma = _STARPU_DEFAULT_GAMMA;
|
|
|
|
+ dt->idle_power = 0.0;
|
|
|
|
+
|
|
|
|
+ starpu_set_sched_ctx_policy_data(sched_ctx_id, (void*)dt);
|
|
|
|
+
|
|
|
|
+ dt->queue_array = (struct _starpu_fifo_taskq**)malloc(STARPU_NMAXWORKERS*sizeof(struct _starpu_fifo_taskq*));
|
|
|
|
|
|
const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
|
|
const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
|
|
if (strval_alpha)
|
|
if (strval_alpha)
|
|
- alpha = atof(strval_alpha);
|
|
|
|
|
|
+ dt->alpha = atof(strval_alpha);
|
|
|
|
|
|
const char *strval_beta = getenv("STARPU_SCHED_BETA");
|
|
const char *strval_beta = getenv("STARPU_SCHED_BETA");
|
|
if (strval_beta)
|
|
if (strval_beta)
|
|
- beta = atof(strval_beta);
|
|
|
|
|
|
+ dt->beta = atof(strval_beta);
|
|
|
|
|
|
const char *strval_gamma = getenv("STARPU_SCHED_GAMMA");
|
|
const char *strval_gamma = getenv("STARPU_SCHED_GAMMA");
|
|
if (strval_gamma)
|
|
if (strval_gamma)
|
|
- _gamma = atof(strval_gamma);
|
|
|
|
|
|
+ dt->_gamma = atof(strval_gamma);
|
|
|
|
|
|
const char *strval_idle_power = getenv("STARPU_IDLE_POWER");
|
|
const char *strval_idle_power = getenv("STARPU_IDLE_POWER");
|
|
if (strval_idle_power)
|
|
if (strval_idle_power)
|
|
- idle_power = atof(strval_idle_power);
|
|
|
|
|
|
+ dt->idle_power = atof(strval_idle_power);
|
|
|
|
|
|
#ifdef STARPU_USE_TOP
|
|
#ifdef STARPU_USE_TOP
|
|
starpu_top_register_parameter_float("DMDA_ALPHA", &alpha,
|
|
starpu_top_register_parameter_float("DMDA_ALPHA", &alpha,
|
|
@@ -686,43 +812,25 @@ static void initialize_dmda_policy(struct starpu_machine_topology *topology,
|
|
starpu_top_register_parameter_float("DMDA_IDLE_POWER", &idle_power,
|
|
starpu_top_register_parameter_float("DMDA_IDLE_POWER", &idle_power,
|
|
idle_power_minimum, idle_power_maximum, param_modified);
|
|
idle_power_minimum, idle_power_maximum, param_modified);
|
|
#endif /* !STARPU_USE_TOP */
|
|
#endif /* !STARPU_USE_TOP */
|
|
-
|
|
|
|
- unsigned workerid;
|
|
|
|
- for (workerid = 0; workerid < nworkers; workerid++)
|
|
|
|
- {
|
|
|
|
- queue_array[workerid] = _starpu_create_fifo();
|
|
|
|
-
|
|
|
|
- _STARPU_PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
|
|
|
|
- _STARPU_PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
|
|
|
|
-
|
|
|
|
- starpu_worker_set_sched_condition(workerid, &sched_cond[workerid], &sched_mutex[workerid]);
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
|
|
|
|
-static void initialize_dmda_sorted_policy(struct starpu_machine_topology *topology,
|
|
|
|
- struct starpu_sched_policy *policy)
|
|
|
|
|
|
+static void initialize_dmda_sorted_policy(unsigned sched_ctx_id)
|
|
{
|
|
{
|
|
- initialize_dmda_policy(topology, policy);
|
|
|
|
|
|
+ initialize_dmda_policy(sched_ctx_id);
|
|
|
|
|
|
/* The application may use any integer */
|
|
/* The application may use any integer */
|
|
starpu_sched_set_min_priority(INT_MIN);
|
|
starpu_sched_set_min_priority(INT_MIN);
|
|
starpu_sched_set_max_priority(INT_MAX);
|
|
starpu_sched_set_max_priority(INT_MAX);
|
|
}
|
|
}
|
|
|
|
|
|
-static void deinitialize_dmda_policy(struct starpu_machine_topology *topology,
|
|
|
|
- struct starpu_sched_policy *policy)
|
|
|
|
|
|
+static void deinitialize_dmda_policy(unsigned sched_ctx_id)
|
|
{
|
|
{
|
|
- (void) policy;
|
|
|
|
-
|
|
|
|
- unsigned workerid;
|
|
|
|
- for (workerid = 0; workerid < topology->nworkers; workerid++)
|
|
|
|
- {
|
|
|
|
- _starpu_destroy_fifo(queue_array[workerid]);
|
|
|
|
- _STARPU_PTHREAD_MUTEX_DESTROY(&sched_mutex[workerid]);
|
|
|
|
- _STARPU_PTHREAD_COND_DESTROY(&sched_cond[workerid]);
|
|
|
|
- }
|
|
|
|
|
|
+ dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
|
|
|
|
+ free(dt->queue_array);
|
|
|
|
+ free(dt);
|
|
|
|
+ starpu_delete_worker_collection_for_sched_ctx(sched_ctx_id);
|
|
|
|
|
|
- _STARPU_DEBUG("total_task_cnt %ld ready_task_cnt %ld -> %f\n", total_task_cnt, ready_task_cnt, (100.0f*ready_task_cnt)/total_task_cnt);
|
|
|
|
|
|
+ _STARPU_DEBUG("total_task_cnt %ld ready_task_cnt %ld -> %f\n", dt->total_task_cnt, dt->ready_task_cnt, (100.0f*dt->ready_task_cnt)/dt->total_task_cnt);
|
|
}
|
|
}
|
|
|
|
|
|
/* dmda_pre_exec_hook is called right after the data transfer is done and right
|
|
/* dmda_pre_exec_hook is called right after the data transfer is done and right
|
|
@@ -730,24 +838,30 @@ static void deinitialize_dmda_policy(struct starpu_machine_topology *topology,
|
|
* value of the expected start, end, length, etc... */
|
|
* value of the expected start, end, length, etc... */
|
|
static void dmda_pre_exec_hook(struct starpu_task *task)
|
|
static void dmda_pre_exec_hook(struct starpu_task *task)
|
|
{
|
|
{
|
|
|
|
+ unsigned sched_ctx_id = task->sched_ctx;
|
|
int workerid = starpu_worker_get_id();
|
|
int workerid = starpu_worker_get_id();
|
|
- struct _starpu_fifo_taskq *fifo = queue_array[workerid];
|
|
|
|
|
|
+ dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
|
|
|
|
+ struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
|
|
double model = task->predicted;
|
|
double model = task->predicted;
|
|
double transfer_model = task->predicted_transfer;
|
|
double transfer_model = task->predicted_transfer;
|
|
|
|
|
|
- /* Once the task is executing, we can update the predicted amount of
|
|
|
|
- * work. */
|
|
|
|
- _STARPU_PTHREAD_MUTEX_LOCK(&sched_mutex[workerid]);
|
|
|
|
|
|
+ pthread_mutex_t *sched_mutex;
|
|
|
|
+ pthread_cond_t *sched_cond;
|
|
|
|
+ starpu_worker_get_sched_condition(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
|
|
|
|
+ /* Once the task is executing, we can update the predicted amount
|
|
|
|
+ * of work. */
|
|
|
|
+ _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
|
|
fifo->exp_len-= transfer_model;
|
|
fifo->exp_len-= transfer_model;
|
|
fifo->exp_start = starpu_timing_now() + model;
|
|
fifo->exp_start = starpu_timing_now() + model;
|
|
fifo->exp_end= fifo->exp_start + fifo->exp_len;
|
|
fifo->exp_end= fifo->exp_start + fifo->exp_len;
|
|
- _STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[workerid]);
|
|
|
|
|
|
+ _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
|
|
}
|
|
}
|
|
|
|
|
|
static void dmda_push_task_notify(struct starpu_task *task, int workerid)
|
|
static void dmda_push_task_notify(struct starpu_task *task, int workerid)
|
|
{
|
|
{
|
|
- struct _starpu_fifo_taskq *fifo = queue_array[workerid];
|
|
|
|
-
|
|
|
|
|
|
+ unsigned sched_ctx_id = task->sched_ctx;
|
|
|
|
+ dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
|
|
|
|
+ struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
|
|
/* Compute the expected penality */
|
|
/* Compute the expected penality */
|
|
enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
|
|
enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
|
|
unsigned memory_node = starpu_worker_get_memory_node(workerid);
|
|
unsigned memory_node = starpu_worker_get_memory_node(workerid);
|
|
@@ -756,10 +870,13 @@ static void dmda_push_task_notify(struct starpu_task *task, int workerid)
|
|
_starpu_get_job_associated_to_task(task)->nimpl);
|
|
_starpu_get_job_associated_to_task(task)->nimpl);
|
|
|
|
|
|
double predicted_transfer = starpu_task_expected_data_transfer_time(memory_node, task);
|
|
double predicted_transfer = starpu_task_expected_data_transfer_time(memory_node, task);
|
|
|
|
+ pthread_mutex_t *sched_mutex;
|
|
|
|
+ pthread_cond_t *sched_cond;
|
|
|
|
+ starpu_worker_get_sched_condition(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
|
|
|
|
|
|
- /* Update the predictions */
|
|
|
|
- _STARPU_PTHREAD_MUTEX_LOCK(&sched_mutex[workerid]);
|
|
|
|
|
|
|
|
|
|
+ /* Update the predictions */
|
|
|
|
+ _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
|
|
/* Sometimes workers didn't take the tasks as early as we expected */
|
|
/* Sometimes workers didn't take the tasks as early as we expected */
|
|
fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
|
|
fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
|
|
fifo->exp_end = fifo->exp_start + fifo->exp_len;
|
|
fifo->exp_end = fifo->exp_start + fifo->exp_len;
|
|
@@ -794,7 +911,7 @@ static void dmda_push_task_notify(struct starpu_task *task, int workerid)
|
|
|
|
|
|
fifo->ntasks++;
|
|
fifo->ntasks++;
|
|
|
|
|
|
- _STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[workerid]);
|
|
|
|
|
|
+ _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
|
|
}
|
|
}
|
|
|
|
|
|
/* TODO: use post_exec_hook to fix the expected start */
|
|
/* TODO: use post_exec_hook to fix the expected start */
|
|
@@ -802,6 +919,8 @@ struct starpu_sched_policy _starpu_sched_dm_policy =
|
|
{
|
|
{
|
|
.init_sched = initialize_dmda_policy,
|
|
.init_sched = initialize_dmda_policy,
|
|
.deinit_sched = deinitialize_dmda_policy,
|
|
.deinit_sched = deinitialize_dmda_policy,
|
|
|
|
+ .add_workers = dmda_add_workers ,
|
|
|
|
+ .remove_workers = dmda_remove_workers,
|
|
.push_task = dm_push_task,
|
|
.push_task = dm_push_task,
|
|
.pop_task = dmda_pop_task,
|
|
.pop_task = dmda_pop_task,
|
|
.pre_exec_hook = NULL,
|
|
.pre_exec_hook = NULL,
|
|
@@ -815,6 +934,8 @@ struct starpu_sched_policy _starpu_sched_dmda_policy =
|
|
{
|
|
{
|
|
.init_sched = initialize_dmda_policy,
|
|
.init_sched = initialize_dmda_policy,
|
|
.deinit_sched = deinitialize_dmda_policy,
|
|
.deinit_sched = deinitialize_dmda_policy,
|
|
|
|
+ .add_workers = dmda_add_workers ,
|
|
|
|
+ .remove_workers = dmda_remove_workers,
|
|
.push_task = dmda_push_task,
|
|
.push_task = dmda_push_task,
|
|
.push_task_notify = dmda_push_task_notify,
|
|
.push_task_notify = dmda_push_task_notify,
|
|
.pop_task = dmda_pop_task,
|
|
.pop_task = dmda_pop_task,
|
|
@@ -829,6 +950,8 @@ struct starpu_sched_policy _starpu_sched_dmda_sorted_policy =
|
|
{
|
|
{
|
|
.init_sched = initialize_dmda_sorted_policy,
|
|
.init_sched = initialize_dmda_sorted_policy,
|
|
.deinit_sched = deinitialize_dmda_policy,
|
|
.deinit_sched = deinitialize_dmda_policy,
|
|
|
|
+ .add_workers = dmda_add_workers ,
|
|
|
|
+ .remove_workers = dmda_remove_workers,
|
|
.push_task = dmda_push_sorted_task,
|
|
.push_task = dmda_push_sorted_task,
|
|
.push_task_notify = dmda_push_task_notify,
|
|
.push_task_notify = dmda_push_task_notify,
|
|
.pop_task = dmda_pop_ready_task,
|
|
.pop_task = dmda_pop_ready_task,
|
|
@@ -843,6 +966,8 @@ struct starpu_sched_policy _starpu_sched_dmda_ready_policy =
|
|
{
|
|
{
|
|
.init_sched = initialize_dmda_policy,
|
|
.init_sched = initialize_dmda_policy,
|
|
.deinit_sched = deinitialize_dmda_policy,
|
|
.deinit_sched = deinitialize_dmda_policy,
|
|
|
|
+ .add_workers = dmda_add_workers ,
|
|
|
|
+ .remove_workers = dmda_remove_workers,
|
|
.push_task = dmda_push_task,
|
|
.push_task = dmda_push_task,
|
|
.push_task_notify = dmda_push_task_notify,
|
|
.push_task_notify = dmda_push_task_notify,
|
|
.pop_task = dmda_pop_ready_task,
|
|
.pop_task = dmda_pop_ready_task,
|