|
@@ -25,10 +25,9 @@ struct starpu_sched_ctx_performance_counters* perf_counters = NULL;
|
|
|
static void notify_idle_cycle(unsigned sched_ctx, int worker, double idle_time);
|
|
|
static void notify_pushed_task(unsigned sched_ctx, int worker);
|
|
|
static void notify_post_exec_task(struct starpu_task *task, size_t data_size, uint32_t footprint,
|
|
|
- int hypervisor_tag, int nready_tasks, double ready_flops);
|
|
|
+ int hypervisor_tag, double flops);
|
|
|
static void notify_poped_task(unsigned sched_ctx, int worker);
|
|
|
static void notify_submitted_job(struct starpu_task *task, unsigned footprint, size_t data_size);
|
|
|
-static void notify_ready_task(unsigned sched_ctx, struct starpu_task *task);
|
|
|
static void notify_empty_ctx(unsigned sched_ctx, struct starpu_task *task);
|
|
|
static void notify_delete_context(unsigned sched_ctx);
|
|
|
|
|
@@ -181,6 +180,7 @@ void* sc_hypervisor_init(struct sc_hypervisor_policy *hypervisor_policy)
|
|
|
hypervisor.sched_ctx_w[i].remaining_flops = 0.0;
|
|
|
hypervisor.sched_ctx_w[i].start_time = 0.0;
|
|
|
hypervisor.sched_ctx_w[i].real_start_time = 0.0;
|
|
|
+ hypervisor.sched_ctx_w[i].hyp_react_start_time = 0.0;
|
|
|
hypervisor.sched_ctx_w[i].resize_ack.receiver_sched_ctx = -1;
|
|
|
hypervisor.sched_ctx_w[i].resize_ack.moved_workers = NULL;
|
|
|
hypervisor.sched_ctx_w[i].resize_ack.nmoved_workers = 0;
|
|
@@ -190,9 +190,7 @@ void* sc_hypervisor_init(struct sc_hypervisor_policy *hypervisor_policy)
|
|
|
|
|
|
hypervisor.sched_ctx_w[i].ref_speed[0] = -1.0;
|
|
|
hypervisor.sched_ctx_w[i].ref_speed[1] = -1.0;
|
|
|
- hypervisor.sched_ctx_w[i].ready_flops = 0.0;
|
|
|
hypervisor.sched_ctx_w[i].total_flops_available = 0;
|
|
|
- hypervisor.sched_ctx_w[i].nready_tasks = 0;
|
|
|
hypervisor.sched_ctx_w[i].to_be_sized = 0;
|
|
|
int j;
|
|
|
for(j = 0; j < STARPU_NMAXWORKERS; j++)
|
|
@@ -223,7 +221,6 @@ void* sc_hypervisor_init(struct sc_hypervisor_policy *hypervisor_policy)
|
|
|
perf_counters->notify_poped_task = notify_poped_task;
|
|
|
perf_counters->notify_post_exec_task = notify_post_exec_task;
|
|
|
perf_counters->notify_submitted_job = notify_submitted_job;
|
|
|
- perf_counters->notify_ready_task = notify_ready_task;
|
|
|
perf_counters->notify_empty_ctx = notify_empty_ctx;
|
|
|
perf_counters->notify_delete_context = notify_delete_context;
|
|
|
|
|
@@ -316,6 +313,7 @@ void sc_hypervisor_register_ctx(unsigned sched_ctx, double total_flops)
|
|
|
hypervisor.sched_ctx_w[sched_ctx].total_flops = total_flops;
|
|
|
hypervisor.sched_ctx_w[sched_ctx].remaining_flops = total_flops;
|
|
|
hypervisor.resize[sched_ctx] = 1;
|
|
|
+ hypervisor.sched_ctx_w[sched_ctx].hyp_react_start_time = starpu_timing_now();
|
|
|
starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
|
|
|
}
|
|
|
|
|
@@ -861,35 +859,30 @@ void sc_hypervisor_update_resize_interval(unsigned *sched_ctxs, int nsched_ctxs)
|
|
|
if(hypervisor.sched_ctx_w[sched_ctx].exec_start_time[worker] == 0.0)
|
|
|
{
|
|
|
exec_time = hypervisor.sched_ctx_w[sched_ctx].exec_time[worker];
|
|
|
-// printf("%d/%d: exec_time %lf\n", worker, sched_ctx, hypervisor.sched_ctx_w[sched_ctx].exec_time[worker]);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
double current_exec_time = (end_time - hypervisor.sched_ctx_w[sched_ctx].exec_start_time[worker]) / 1000000.0; /* in seconds */
|
|
|
exec_time = hypervisor.sched_ctx_w[sched_ctx].exec_time[worker] + current_exec_time;
|
|
|
-// printf("%d/%d: exec_time %lf current_exec_time %lf\n", worker, sched_ctx, hypervisor.sched_ctx_w[sched_ctx].exec_time[worker], current_exec_time);
|
|
|
}
|
|
|
norm_exec_time += elapsed_time_worker[worker] == 0.0 ? 0.0 : exec_time / elapsed_time_worker[worker];
|
|
|
}
|
|
|
|
|
|
double curr_time = starpu_timing_now();
|
|
|
double elapsed_time = (curr_time - hypervisor.sched_ctx_w[sched_ctx].start_time) / 1000000.0; /* in seconds */
|
|
|
-// double norm_idle_time = max_workers_idle_time[i] / elapsed_time;
|
|
|
-// double norm_exec_time = exec_time / elapsed_time;
|
|
|
+ int nready_tasks = starpu_get_nready_tasks_of_sched_ctx(sched_ctx);
|
|
|
if(norm_idle_time >= 0.9)
|
|
|
{
|
|
|
-// config->max_nworkers = workers->nworkers - lrint(norm_idle_time);
|
|
|
config->max_nworkers = lrint(norm_exec_time);
|
|
|
-/* if(config->max_nworkers > hypervisor.sched_ctx_w[sched_ctx].nready_tasks) */
|
|
|
-/* config->max_nworkers = hypervisor.sched_ctx_w[sched_ctx].nready_tasks - 1; */
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- if(norm_idle_time < 0.1)//(max_workers_idle_time[i] < 0.000001)
|
|
|
- config->max_nworkers = lrint(norm_exec_time) + hypervisor.sched_ctx_w[sched_ctx].nready_tasks - 1; //workers->nworkers + hypervisor.sched_ctx_w[sched_ctx].nready_tasks - 1;
|
|
|
+ if(norm_idle_time < 0.1)
|
|
|
+ config->max_nworkers = lrint(norm_exec_time) + nready_tasks - 1; //workers->nworkers + hypervisor.sched_ctx_w[sched_ctx].nready_tasks - 1;
|
|
|
else
|
|
|
config->max_nworkers = lrint(norm_exec_time);
|
|
|
}
|
|
|
+// config->max_nworkers = hypervisor.sched_ctx_w[sched_ctx].nready_tasks - 1;
|
|
|
|
|
|
if(config->max_nworkers < 0)
|
|
|
config->max_nworkers = 0;
|
|
@@ -897,7 +890,7 @@ void sc_hypervisor_update_resize_interval(unsigned *sched_ctxs, int nsched_ctxs)
|
|
|
config->max_nworkers = max_cpus;
|
|
|
|
|
|
printf("%d: ready tasks %d idle for long %lf norm_idle_time %lf elapsed_time %lf norm_exec_time %lf nworker %d max %d \n",
|
|
|
- sched_ctx, hypervisor.sched_ctx_w[sched_ctx].nready_tasks, max_workers_idle_time[i], norm_idle_time, elapsed_time, norm_exec_time, workers->nworkers, config->max_nworkers);
|
|
|
+ sched_ctx, nready_tasks, max_workers_idle_time[i], norm_idle_time, elapsed_time, norm_exec_time, workers->nworkers, config->max_nworkers);
|
|
|
|
|
|
|
|
|
total_max_nworkers += config->max_nworkers;
|
|
@@ -913,9 +906,10 @@ void sc_hypervisor_update_resize_interval(unsigned *sched_ctxs, int nsched_ctxs)
|
|
|
unsigned max_nready_sched_ctx = sched_ctxs[0];
|
|
|
for(i = 0; i < nsched_ctxs; i++)
|
|
|
{
|
|
|
- if(max_nready < hypervisor.sched_ctx_w[sched_ctxs[i]].nready_tasks)
|
|
|
+ int nready_tasks = starpu_get_nready_tasks_of_sched_ctx(sched_ctxs[i]);
|
|
|
+ if(max_nready < nready_tasks)
|
|
|
{
|
|
|
- max_nready = hypervisor.sched_ctx_w[sched_ctxs[i]].nready_tasks;
|
|
|
+ max_nready = nready_tasks;
|
|
|
max_nready_sched_ctx = sched_ctxs[i];
|
|
|
}
|
|
|
}
|
|
@@ -964,7 +958,13 @@ static void notify_idle_cycle(unsigned sched_ctx, int worker, double idle_time)
|
|
|
|
|
|
if(hypervisor.policy.handle_idle_cycle)
|
|
|
{
|
|
|
- hypervisor.policy.handle_idle_cycle(sched_ctx, worker);
|
|
|
+ double curr_time = starpu_timing_now();
|
|
|
+ double elapsed_time = (curr_time - sc_w->hyp_react_start_time) / 1000000.0; /* in seconds */
|
|
|
+ if(sc_w->sched_ctx != STARPU_NMAX_SCHED_CTXS && elapsed_time > sc_w->config->time_sample)
|
|
|
+ {
|
|
|
+ sc_w->hyp_react_start_time = starpu_timing_now();
|
|
|
+ hypervisor.policy.handle_idle_cycle(sched_ctx, worker);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
return;
|
|
@@ -986,7 +986,7 @@ static void notify_poped_task(unsigned sched_ctx, int worker)
|
|
|
if(sc_w->idle_start_time[worker] > 0.0)
|
|
|
{
|
|
|
double end_time = starpu_timing_now();
|
|
|
- sc_w->idle_time[worker] += (end_time - sc_w->idle_start_time[worker]) / 1000000.0; /* in seconds */
|
|
|
+ sc_w->idle_time[worker] += (end_time - sc_w->idle_start_time[worker]) / 1000000.0; /* in seconds */
|
|
|
sc_w->idle_start_time[worker] = 0.0;
|
|
|
}
|
|
|
|
|
@@ -997,7 +997,7 @@ static void notify_poped_task(unsigned sched_ctx, int worker)
|
|
|
|
|
|
|
|
|
/* notifies the hypervisor that a tagged task has just been executed */
|
|
|
-static void notify_post_exec_task(struct starpu_task *task, size_t data_size, uint32_t footprint, int task_tag, int ready_tasks, double ready_flops)
|
|
|
+static void notify_post_exec_task(struct starpu_task *task, size_t data_size, uint32_t footprint, int task_tag, double flops)
|
|
|
{
|
|
|
unsigned sched_ctx = task->sched_ctx;
|
|
|
int worker = starpu_worker_get_id();
|
|
@@ -1011,25 +1011,30 @@ static void notify_post_exec_task(struct starpu_task *task, size_t data_size, ui
|
|
|
}
|
|
|
|
|
|
hypervisor.sched_ctx_w[sched_ctx].poped_tasks[worker]++;
|
|
|
- hypervisor.sched_ctx_w[sched_ctx].elapsed_flops[worker] += task->flops;
|
|
|
+ hypervisor.sched_ctx_w[sched_ctx].elapsed_flops[worker] += flops;
|
|
|
hypervisor.sched_ctx_w[sched_ctx].elapsed_data[worker] += data_size ;
|
|
|
hypervisor.sched_ctx_w[sched_ctx].elapsed_tasks[worker]++ ;
|
|
|
- hypervisor.sched_ctx_w[sched_ctx].total_elapsed_flops[worker] += task->flops;
|
|
|
+ hypervisor.sched_ctx_w[sched_ctx].total_elapsed_flops[worker] += flops;
|
|
|
|
|
|
starpu_pthread_mutex_lock(&act_hypervisor_mutex);
|
|
|
- hypervisor.sched_ctx_w[sched_ctx].remaining_flops -= task->flops;
|
|
|
- hypervisor.sched_ctx_w[sched_ctx].nready_tasks = ready_tasks;
|
|
|
- hypervisor.sched_ctx_w[sched_ctx].ready_flops = ready_flops;
|
|
|
- if(hypervisor.sched_ctx_w[sched_ctx].ready_flops < 0.0)
|
|
|
- hypervisor.sched_ctx_w[sched_ctx].ready_flops = 0.0;
|
|
|
- _ack_resize_completed(sched_ctx, worker);
|
|
|
+ hypervisor.sched_ctx_w[sched_ctx].remaining_flops -= flops;
|
|
|
+ if(_sc_hypervisor_use_lazy_resize())
|
|
|
+ _ack_resize_completed(sched_ctx, worker);
|
|
|
starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
|
|
|
|
|
|
|
|
|
if(hypervisor.resize[sched_ctx])
|
|
|
{
|
|
|
if(hypervisor.policy.handle_poped_task)
|
|
|
- hypervisor.policy.handle_poped_task(sched_ctx, worker, task, footprint);
|
|
|
+ {
|
|
|
+ double curr_time = starpu_timing_now();
|
|
|
+ double elapsed_time = (curr_time - hypervisor.sched_ctx_w[sched_ctx].hyp_react_start_time) / 1000000.0; /* in seconds */
|
|
|
+ if(hypervisor.sched_ctx_w[sched_ctx].sched_ctx != STARPU_NMAX_SCHED_CTXS && elapsed_time > hypervisor.sched_ctx_w[sched_ctx].config->time_sample)
|
|
|
+ {
|
|
|
+ hypervisor.sched_ctx_w[sched_ctx].hyp_react_start_time = starpu_timing_now();
|
|
|
+ hypervisor.policy.handle_poped_task(sched_ctx, worker, task, footprint);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
/* starpu_pthread_mutex_lock(&act_hypervisor_mutex); */
|
|
|
/* _ack_resize_completed(sched_ctx, worker); */
|
|
@@ -1042,9 +1047,7 @@ static void notify_post_exec_task(struct starpu_task *task, size_t data_size, ui
|
|
|
|
|
|
unsigned conf_sched_ctx;
|
|
|
unsigned i;
|
|
|
- starpu_pthread_mutex_lock(&act_hypervisor_mutex);
|
|
|
unsigned ns = hypervisor.nsched_ctxs;
|
|
|
- starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
|
|
|
|
|
|
for(i = 0; i < ns; i++)
|
|
|
{
|
|
@@ -1098,14 +1101,6 @@ static void notify_submitted_job(struct starpu_task *task, uint32_t footprint, s
|
|
|
hypervisor.policy.handle_submitted_job(task->cl, task->sched_ctx, footprint, data_size);
|
|
|
}
|
|
|
|
|
|
-static void notify_ready_task(unsigned sched_ctx_id, struct starpu_task *task)
|
|
|
-{
|
|
|
- starpu_pthread_mutex_lock(&act_hypervisor_mutex);
|
|
|
- hypervisor.sched_ctx_w[sched_ctx_id].nready_tasks++;
|
|
|
- hypervisor.sched_ctx_w[sched_ctx_id].ready_flops += task->flops;
|
|
|
- starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
|
|
|
-}
|
|
|
-
|
|
|
static void notify_empty_ctx(unsigned sched_ctx_id, struct starpu_task *task)
|
|
|
{
|
|
|
sc_hypervisor_resize_ctxs(NULL, -1 , NULL, -1);
|
|
@@ -1126,10 +1121,10 @@ static void notify_delete_context(unsigned sched_ctx)
|
|
|
|
|
|
void sc_hypervisor_size_ctxs(unsigned *sched_ctxs, int nsched_ctxs, int *workers, int nworkers)
|
|
|
{
|
|
|
- starpu_pthread_mutex_lock(&act_hypervisor_mutex);
|
|
|
+// starpu_pthread_mutex_lock(&act_hypervisor_mutex);
|
|
|
unsigned curr_nsched_ctxs = sched_ctxs == NULL ? hypervisor.nsched_ctxs : (unsigned)nsched_ctxs;
|
|
|
unsigned *curr_sched_ctxs = sched_ctxs == NULL ? hypervisor.sched_ctxs : sched_ctxs;
|
|
|
- starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
|
|
|
+// starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
|
|
|
unsigned s;
|
|
|
for(s = 0; s < curr_nsched_ctxs; s++)
|
|
|
hypervisor.resize[curr_sched_ctxs[s]] = 1;
|