|
@@ -327,11 +327,9 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
|
|
|
static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
|
|
|
{
|
|
|
dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
|
|
|
- /* find the queue */
|
|
|
- struct _starpu_fifo_taskq *fifo;
|
|
|
unsigned worker, worker_ctx = 0;
|
|
|
int best = -1;
|
|
|
-
|
|
|
+
|
|
|
double best_exp_end = 0.0;
|
|
|
double model_best = 0.0;
|
|
|
double transfer_model_best = 0.0;
|
|
@@ -339,51 +337,51 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
|
|
|
int ntasks_best = -1;
|
|
|
double ntasks_best_end = 0.0;
|
|
|
int calibrating = 0;
|
|
|
-
|
|
|
+
|
|
|
/* A priori, we know all estimations */
|
|
|
int unknown = 0;
|
|
|
-
|
|
|
+
|
|
|
unsigned best_impl = 0;
|
|
|
unsigned nimpl;
|
|
|
struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
|
|
|
-
|
|
|
+
|
|
|
if(workers->init_cursor)
|
|
|
- workers->init_cursor(workers);
|
|
|
-
|
|
|
- while(workers->has_next(workers))
|
|
|
- {
|
|
|
- worker = workers->get_next(workers);
|
|
|
- struct _starpu_fifo_taskq *fifo = dt->queue_array[worker];
|
|
|
- unsigned memory_node = starpu_worker_get_memory_node(worker);
|
|
|
- enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
|
|
|
+ workers->init_cursor(workers);
|
|
|
|
|
|
+ while(workers->has_next(workers))
|
|
|
+ {
|
|
|
+ worker = workers->get_next(workers);
|
|
|
+ struct _starpu_fifo_taskq *fifo = dt->queue_array[worker];
|
|
|
+ unsigned memory_node = starpu_worker_get_memory_node(worker);
|
|
|
+ enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
|
|
|
+
|
|
|
for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
|
|
|
{
|
|
|
if (!starpu_worker_can_execute_task(worker, task, nimpl))
|
|
|
{
|
|
|
/* no one on that queue may execute this task */
|
|
|
- // worker_ctx++;
|
|
|
+ // worker_ctx++;
|
|
|
continue;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
double exp_end;
|
|
|
pthread_mutex_t *sched_mutex;
|
|
|
pthread_cond_t *sched_cond;
|
|
|
- starpu_worker_get_sched_condition(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
|
|
|
-
|
|
|
+ starpu_worker_get_sched_condition(sched_ctx_id, worker, &sched_mutex, &sched_cond);
|
|
|
+
|
|
|
/* Sometimes workers didn't take the tasks as early as we expected */
|
|
|
_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
|
|
|
fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
|
|
|
fifo->exp_end = fifo->exp_start + fifo->exp_len;
|
|
|
_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
double local_length = starpu_task_expected_length(task, perf_arch, nimpl);
|
|
|
double local_penalty = starpu_task_expected_data_transfer_time(memory_node, task);
|
|
|
double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
|
|
|
-
|
|
|
+
|
|
|
//_STARPU_DEBUG("Scheduler dm: task length (%lf) worker (%u) kernel (%u) \n", local_length,worker,nimpl);
|
|
|
-
|
|
|
+
|
|
|
if (ntasks_best == -1
|
|
|
|| (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
|
|
|
|| (!calibrating && isnan(local_length)) /* Not calibrating but this worker is being calibrated */
|
|
@@ -394,23 +392,23 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
|
|
|
ntasks_best = worker;
|
|
|
best_impl = nimpl;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (isnan(local_length))
|
|
|
/* we are calibrating, we want to speed-up calibration time
|
|
|
* so we privilege non-calibrated tasks (but still
|
|
|
* greedily distribute them to avoid dumb schedules) */
|
|
|
calibrating = 1;
|
|
|
-
|
|
|
+
|
|
|
if (isnan(local_length) || _STARPU_IS_ZERO(local_length))
|
|
|
/* there is no prediction available for that task
|
|
|
* with that arch yet, so switch to a greedy strategy */
|
|
|
unknown = 1;
|
|
|
-
|
|
|
+
|
|
|
if (unknown)
|
|
|
continue;
|
|
|
|
|
|
exp_end = fifo->exp_start + fifo->exp_len + local_length;
|
|
|
-
|
|
|
+
|
|
|
if (best == -1 || exp_end < best_exp_end)
|
|
|
{
|
|
|
/* a better solution was found */
|
|
@@ -430,27 +428,27 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
|
|
|
model_best = 0.0;
|
|
|
transfer_model_best = 0.0;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
//_STARPU_DEBUG("Scheduler dm: kernel (%u)\n", best_impl);
|
|
|
-
|
|
|
+
|
|
|
if(workers->init_cursor)
|
|
|
- workers->deinit_cursor(workers);
|
|
|
-
|
|
|
- _starpu_get_job_associated_to_task(task)->nimpl = best_impl;
|
|
|
-
|
|
|
+ workers->deinit_cursor(workers);
|
|
|
+
|
|
|
+ _starpu_get_job_associated_to_task(task)->nimpl = best_impl;
|
|
|
+
|
|
|
/* we should now have the best worker in variable "best" */
|
|
|
return push_task_on_best_worker(task, best,
|
|
|
- model_best, transfer_model_best, prio, sched_ctx_id);
|
|
|
+ model_best, transfer_model_best, prio, sched_ctx_id);
|
|
|
}
|
|
|
|
|
|
static void compute_all_performance_predictions(struct starpu_task *task,
|
|
|
- double local_task_length[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
- double exp_end[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
- double *max_exp_endp,
|
|
|
- double *best_exp_endp,
|
|
|
- double local_data_penalty[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
- double local_power[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
- int *forced_worker, int *forced_impl, unsigned sched_ctx_id)
|
|
|
+ double local_task_length[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
+ double exp_end[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
+ double *max_exp_endp,
|
|
|
+ double *best_exp_endp,
|
|
|
+ double local_data_penalty[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
+ double local_power[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
|
|
|
+ int *forced_worker, int *forced_impl, unsigned sched_ctx_id)
|
|
|
{
|
|
|
int calibrating = 0;
|
|
|
double max_exp_end = DBL_MIN;
|
|
@@ -468,14 +466,14 @@ static void compute_all_performance_predictions(struct starpu_task *task,
|
|
|
starpu_task_bundle_t bundle = task->bundle;
|
|
|
dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
|
|
|
struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
|
|
|
-
|
|
|
- /* find the queue */
|
|
|
- struct _starpu_fifo_taskq *fifo;
|
|
|
-
|
|
|
+
|
|
|
while(workers->has_next(workers))
|
|
|
{
|
|
|
worker = workers->get_next(workers);
|
|
|
- fifo = dt->queue_array[worker];
|
|
|
+ struct _starpu_fifo_taskq *fifo = dt->queue_array[worker];
|
|
|
+ enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
|
|
|
+ unsigned memory_node = starpu_worker_get_memory_node(worker);
|
|
|
+
|
|
|
for(nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
|
|
|
{
|
|
|
if (!starpu_worker_can_execute_task(worker, task, nimpl))
|
|
@@ -861,7 +859,7 @@ static void dmda_pre_exec_hook(struct starpu_task *task)
|
|
|
static void dmda_push_task_notify(struct starpu_task *task, int workerid)
|
|
|
{
|
|
|
unsigned sched_ctx_id = task->sched_ctx;
|
|
|
- dmda_data *dt = (heft_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
|
|
|
+ dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
|
|
|
struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
|
|
|
/* Compute the expected penality */
|
|
|
enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
|