|
@@ -51,6 +51,7 @@ struct _starpu_dmda_data
|
|
|
|
|
|
long int total_task_cnt;
|
|
|
long int ready_task_cnt;
|
|
|
+ int num_priorities;
|
|
|
};
|
|
|
|
|
|
/* The dmda scheduling policy uses
|
|
@@ -115,7 +116,14 @@ static void param_modified(struct starpu_top_param* d)
|
|
|
}
|
|
|
#endif /* !STARPU_USE_TOP */
|
|
|
|
|
|
-static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo_taskq *fifo_queue, unsigned node)
|
|
|
+static int _normalize_prio(int priority, int num_priorities, unsigned sched_ctx_id)
|
|
|
+{
|
|
|
+ int min = starpu_sched_ctx_get_min_priority(sched_ctx_id);
|
|
|
+ int max = starpu_sched_ctx_get_max_priority(sched_ctx_id);
|
|
|
+ return ((num_priorities-1)/(max-min)) * (priority - min);
|
|
|
+}
|
|
|
+
|
|
|
+static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo_taskq *fifo_queue, unsigned node, int num_priorities)
|
|
|
{
|
|
|
struct starpu_task *task = NULL, *current;
|
|
|
|
|
@@ -156,6 +164,14 @@ static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo
|
|
|
current = current->next;
|
|
|
}
|
|
|
|
|
|
+ if(num_priorities != -1)
|
|
|
+ {
|
|
|
+ int i;
|
|
|
+ int task_prio = _normalize_prio(task->priority, num_priorities, task->sched_ctx);
|
|
|
+ for(i = 0; i <= task_prio; i++)
|
|
|
+ fifo_queue->ntasks_per_priority[i]--;
|
|
|
+ }
|
|
|
+
|
|
|
starpu_task_list_erase(&fifo_queue->taskq, task);
|
|
|
|
|
|
_STARPU_TRACE_JOB_POP(task, 0);
|
|
@@ -175,7 +191,7 @@ static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
|
|
|
|
|
|
unsigned node = starpu_worker_get_memory_node(workerid);
|
|
|
|
|
|
- task = _starpu_fifo_pop_first_ready_task(fifo, node);
|
|
|
+ task = _starpu_fifo_pop_first_ready_task(fifo, node, dt->num_priorities);
|
|
|
if (task)
|
|
|
{
|
|
|
/* We now start the transfer, get rid of it in the completion
|
|
@@ -186,6 +202,14 @@ static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
|
|
|
fifo->exp_len -= transfer_model;
|
|
|
fifo->exp_start = starpu_timing_now() + transfer_model;
|
|
|
fifo->exp_end = fifo->exp_start + fifo->exp_len;
|
|
|
+ if(dt->num_priorities != -1)
|
|
|
+ {
|
|
|
+ int i;
|
|
|
+ int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
|
|
|
+ for(i = 0; i <= task_prio; i++)
|
|
|
+ fifo->exp_len_per_priority[i] -= transfer_model;
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
#ifdef STARPU_VERBOSE
|
|
@@ -227,6 +251,14 @@ static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
|
|
|
fifo->exp_len -= transfer_model;
|
|
|
fifo->exp_start = starpu_timing_now() + transfer_model+model;
|
|
|
fifo->exp_end = fifo->exp_start + fifo->exp_len;
|
|
|
+ if(dt->num_priorities != -1)
|
|
|
+ {
|
|
|
+ int i;
|
|
|
+ int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
|
|
|
+ for(i = 0; i <= task_prio; i++)
|
|
|
+ fifo->exp_len_per_priority[i] -= transfer_model;
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
|
|
@@ -271,6 +303,13 @@ static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
|
|
|
fifo->exp_len -= transfer_model;
|
|
|
fifo->exp_start = starpu_timing_now() + transfer_model;
|
|
|
fifo->exp_end = fifo->exp_start + fifo->exp_len;
|
|
|
+ if(dt->num_priorities != -1)
|
|
|
+ {
|
|
|
+ int i;
|
|
|
+ for(i = 0; i < new_list->priority; i++)
|
|
|
+ fifo->exp_len_per_priority[i] -= transfer_model;
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
new_list = new_list->next;
|
|
@@ -327,12 +366,28 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
|
|
|
{
|
|
|
fifo->exp_end += predicted_transfer;
|
|
|
fifo->exp_len += predicted_transfer;
|
|
|
+ if(dt->num_priorities != -1)
|
|
|
+ {
|
|
|
+ int i;
|
|
|
+ int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
|
|
|
+ for(i = 0; i <= task_prio; i++)
|
|
|
+ fifo->exp_len_per_priority[i] += predicted_transfer;
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
if(!isnan(predicted))
|
|
|
{
|
|
|
fifo->exp_end += predicted;
|
|
|
fifo->exp_len += predicted;
|
|
|
+ if(dt->num_priorities != -1)
|
|
|
+ {
|
|
|
+ int i;
|
|
|
+ int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
|
|
|
+ for(i = 0; i <= task_prio; i++)
|
|
|
+ fifo->exp_len_per_priority[i] += predicted;
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
|
|
@@ -364,6 +419,15 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
|
|
|
{
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
|
|
|
ret =_starpu_fifo_push_sorted_task(dt->queue_array[best_workerid], task);
|
|
|
+ if(dt->num_priorities != -1)
|
|
|
+ {
|
|
|
+ int i;
|
|
|
+ int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
|
|
|
+ for(i = 0; i <= task_prio; i++)
|
|
|
+ dt->queue_array[best_workerid]->ntasks_per_priority[i]++;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
#ifndef STARPU_NON_BLOCKING_DRIVERS
|
|
|
STARPU_PTHREAD_COND_SIGNAL(sched_cond);
|
|
|
#endif
|
|
@@ -545,9 +609,14 @@ static void compute_all_performance_predictions(struct starpu_task *task,
|
|
|
|
|
|
unsigned nimpl;
|
|
|
unsigned impl_mask;
|
|
|
+ int task_prio = 0;
|
|
|
|
|
|
starpu_task_bundle_t bundle = task->bundle;
|
|
|
struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
+
|
|
|
+ if(sorted_decision && dt->num_priorities != -1)
|
|
|
+ task_prio = _normalize_prio(task->priority, dt->num_priorities, sched_ctx_id);
|
|
|
+
|
|
|
struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
|
|
|
|
|
|
struct starpu_sched_ctx_iterator it;
|
|
@@ -581,12 +650,20 @@ static void compute_all_performance_predictions(struct starpu_task *task,
|
|
|
compute the expected_end of the task if it is inserted before other tasks already scheduled */
|
|
|
if(sorted_decision)
|
|
|
{
|
|
|
- starpu_pthread_mutex_t *sched_mutex;
|
|
|
- starpu_pthread_cond_t *sched_cond;
|
|
|
- starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
|
|
|
- prev_exp_len = _starpu_fifo_get_exp_len_prev_task_list(fifo, task, worker, nimpl, &fifo_ntasks);
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
|
|
|
+ if(dt->num_priorities != -1)
|
|
|
+ {
|
|
|
+ prev_exp_len = fifo->exp_len_per_priority[task_prio];
|
|
|
+ fifo_ntasks = fifo->ntasks_per_priority[task_prio];
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ starpu_pthread_mutex_t *sched_mutex;
|
|
|
+ starpu_pthread_cond_t *sched_cond;
|
|
|
+ starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
|
|
|
+ prev_exp_len = _starpu_fifo_get_exp_len_prev_task_list(fifo, task, worker, nimpl, &fifo_ntasks);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
exp_end[worker_ctx][nimpl] = exp_start + prev_exp_len;
|
|
@@ -875,6 +952,18 @@ static void dmda_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nwo
|
|
|
the queue and the synchronization variables have been already initialized */
|
|
|
if(dt->queue_array[workerid] == NULL)
|
|
|
dt->queue_array[workerid] = _starpu_create_fifo();
|
|
|
+
|
|
|
+ if(dt->num_priorities != -1)
|
|
|
+ {
|
|
|
+ dt->queue_array[workerid]->exp_len_per_priority = (double*)malloc(dt->num_priorities*sizeof(double));
|
|
|
+ dt->queue_array[workerid]->ntasks_per_priority = (unsigned*)malloc(dt->num_priorities*sizeof(unsigned));
|
|
|
+ int j;
|
|
|
+ for(j = 0; j < dt->num_priorities; j++)
|
|
|
+ {
|
|
|
+ dt->queue_array[workerid]->exp_len_per_priority[j] = 0.0;
|
|
|
+ dt->queue_array[workerid]->ntasks_per_priority[j] = 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -889,6 +978,12 @@ static void dmda_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned
|
|
|
workerid = workerids[i];
|
|
|
if(dt->queue_array[workerid] != NULL)
|
|
|
{
|
|
|
+ if(dt->num_priorities != -1)
|
|
|
+ {
|
|
|
+ free(dt->queue_array[workerid]->exp_len_per_priority);
|
|
|
+ free(dt->queue_array[workerid]->ntasks_per_priority);
|
|
|
+ }
|
|
|
+
|
|
|
_starpu_destroy_fifo(dt->queue_array[workerid]);
|
|
|
dt->queue_array[workerid] = NULL;
|
|
|
}
|
|
@@ -914,6 +1009,12 @@ static void initialize_dmda_policy(unsigned sched_ctx_id)
|
|
|
dt->_gamma = starpu_get_env_float_default("STARPU_SCHED_GAMMA", _STARPU_SCHED_GAMMA_DEFAULT);
|
|
|
dt->idle_power = starpu_get_env_float_default("STARPU_IDLE_POWER", 0.0);
|
|
|
|
|
|
+ if(starpu_sched_ctx_min_priority_is_set(sched_ctx_id) != 0 && starpu_sched_ctx_max_priority_is_set(sched_ctx_id) != 0)
|
|
|
+ dt->num_priorities = starpu_sched_ctx_get_max_priority(sched_ctx_id) - starpu_sched_ctx_get_min_priority(sched_ctx_id) + 1;
|
|
|
+ else
|
|
|
+ dt->num_priorities = -1;
|
|
|
+
|
|
|
+
|
|
|
#ifdef STARPU_USE_TOP
|
|
|
/* FIXME: broken, needs to access context variable */
|
|
|
starpu_top_register_parameter_float("DMDA_ALPHA", &alpha,
|
|
@@ -974,6 +1075,13 @@ static void dmda_pre_exec_hook(struct starpu_task *task)
|
|
|
fifo->exp_len-= model;
|
|
|
fifo->exp_start = starpu_timing_now() + model;
|
|
|
fifo->exp_end= fifo->exp_start + fifo->exp_len;
|
|
|
+ if(dt->num_priorities != -1)
|
|
|
+ {
|
|
|
+ int i;
|
|
|
+ int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
|
|
|
+ for(i = 0; i <= task_prio; i++)
|
|
|
+ fifo->exp_len_per_priority[i] -= model;
|
|
|
+ }
|
|
|
}
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
|
|
|
}
|
|
@@ -1019,6 +1127,14 @@ static void dmda_push_task_notify(struct starpu_task *task, int workerid, int pe
|
|
|
task->predicted_transfer = predicted_transfer;
|
|
|
fifo->exp_end += predicted_transfer;
|
|
|
fifo->exp_len += predicted_transfer;
|
|
|
+ if(dt->num_priorities != -1)
|
|
|
+ {
|
|
|
+ int i;
|
|
|
+ int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
|
|
|
+ for(i = 0; i <= task_prio; i++)
|
|
|
+ fifo->exp_len_per_priority[i] += predicted_transfer;
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
/* If there is no prediction available, we consider the task has a null length */
|
|
@@ -1027,6 +1143,21 @@ static void dmda_push_task_notify(struct starpu_task *task, int workerid, int pe
|
|
|
task->predicted = predicted;
|
|
|
fifo->exp_end += predicted;
|
|
|
fifo->exp_len += predicted;
|
|
|
+ if(dt->num_priorities != -1)
|
|
|
+ {
|
|
|
+ int i;
|
|
|
+ int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
|
|
|
+ for(i = 0; i <= task_prio; i++)
|
|
|
+ fifo->exp_len_per_priority[i] += predicted;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ if(dt->num_priorities != -1)
|
|
|
+ {
|
|
|
+ int i;
|
|
|
+ int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
|
|
|
+ for(i = 0; i <= task_prio; i++)
|
|
|
+ fifo->ntasks_per_priority[i]++;
|
|
|
}
|
|
|
|
|
|
fifo->ntasks++;
|