ソースを参照

Factorize dm with dmda*

Samuel Thibault 4 年 前
コミット
5a4f59f239
共有1 個のファイルを変更した43 個の追加170 個の削除を含む
  1. 43 170
      src/sched_policies/deque_modeling_policy_data_aware.c

+ 43 - 170
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -443,151 +443,6 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	return ret;
 }
 
-/* TODO: factorize with dmda!! */
-static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
-{
-	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	int best = -1;
-
-	double best_exp_end_of_task = 0.0;
-	double model_best = 0.0;
-	double transfer_model_best = 0.0;
-
-	int ntasks_best = -1;
-	double ntasks_best_end = 0.0;
-	int calibrating = 0;
-
-	/* A priori, we know all estimations */
-	int unknown = 0;
-
-	unsigned best_impl = 0;
-	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
-
-	struct starpu_sched_ctx_iterator it;
-
-	double now = starpu_timing_now();
-
-	workers->init_iterator_for_parallel_tasks(workers, &it, task);
-	while(workers->has_next(workers, &it))
-	{
-		unsigned nimpl;
-		unsigned impl_mask;
-		unsigned worker = workers->get_next(workers, &it);
-		struct _starpu_fifo_taskq *fifo  = &dt->queue_array[worker];
-		struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(worker, sched_ctx_id);
-
-		/* Sometimes workers didn't take the tasks as early as we expected */
-		double exp_start = isnan(fifo->exp_start) ? now + fifo->pipeline_len : STARPU_MAX(fifo->exp_start, now);
-
-		if (!starpu_worker_can_execute_task_impl(worker, task, &impl_mask))
-			continue;
-
-		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
-		{
-			if (!(impl_mask & (1U << nimpl)))
-			{
-				/* no one on that queue may execute this task */
-				continue;
-			}
-
-			double exp_end;
-			double local_length = starpu_task_worker_expected_length(task, worker, sched_ctx_id, nimpl);
-			double local_penalty = starpu_task_expected_data_transfer_time_for(task, worker);
-			double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
-
-			//_STARPU_DEBUG("Scheduler dm: task length (%lf) worker (%u) kernel (%u) \n", local_length,worker,nimpl);
-
-			/*
-			 * This implements a default greedy scheduler for the
-			 * case of tasks which have no performance model, or
-			 * whose performance model is not calibrated yet.
-			 *
-			 * It simply uses the number of tasks already pushed to
-			 * the workers, divided by the relative performance of
-			 * a CPU and of a GPU.
-			 *
-			 * This is always computed, but the ntasks_best
-			 * selection is only really used if the task indeed has
-			 * no performance model, or is not calibrated yet.
-			 */
-			if (ntasks_best == -1
-
-			    /* Always compute the greedy decision, at least for
-			     * the tasks with no performance model. */
-			    || (!calibrating && ntasks_end < ntasks_best_end)
-
-			    /* The performance model of this task is not
-			     * calibrated on this worker, try to run it there
-			     * to calibrate it there. */
-			    || (!calibrating && isnan(local_length))
-
-			    /* the performance model of this task is not
-			     * calibrated on this worker either, rather run it
-			     * there if this one is low on scheduled tasks. */
-			    || (calibrating && isnan(local_length) && ntasks_end < ntasks_best_end)
-				)
-			{
-				ntasks_best_end = ntasks_end;
-				ntasks_best = worker;
-				best_impl = nimpl;
-			}
-
-			if (isnan(local_length))
-			{
-				/* we are calibrating, we want to speed-up calibration time
-				 * so we privilege non-calibrated tasks (but still
-				 * greedily distribute them to avoid dumb schedules) */
-				static int warned;
-				if (!warned)
-				{
-					warned = 1;
-					_STARPU_DISP("Warning: performance model for %s not finished calibrating on worker %u, using a dumb scheduling heuristic for now\n", starpu_task_get_name(task), worker);
-				}
-				calibrating = 1;
-			}
-
-			if (isnan(local_length) || _STARPU_IS_ZERO(local_length))
-				/* there is no prediction available for that task
-				 * with that arch yet, so switch to a greedy strategy */
-				unknown = 1;
-
-			if (unknown)
-				continue;
-
-			exp_end = exp_start + fifo->exp_len + local_length;
-
-			if (best == -1 || exp_end < best_exp_end_of_task)
-			{
-				/* a better solution was found */
-				best_exp_end_of_task = exp_end;
-				best = worker;
-				model_best = local_length;
-				transfer_model_best = local_penalty;
-				best_impl = nimpl;
-			}
-		}
-	}
-
-	if (unknown)
-	{
-		best = ntasks_best;
-		model_best = 0.0;
-		transfer_model_best = 0.0;
-#ifdef STARPU_VERBOSE
-		dt->eager_task_cnt++;
-#endif
-	}
-
-	//_STARPU_DEBUG("Scheduler dm: kernel (%u)\n", best_impl);
-
-	starpu_task_set_implementation(task, best_impl);
-
-	starpu_sched_task_break(task);
-	/* we should now have the best worker in variable "best" */
-	return push_task_on_best_worker(task, best,
-					model_best, transfer_model_best, prio, sched_ctx_id);
-}
-
 /* TODO: factorise CPU computations, expensive with a lot of cores */
 static void compute_all_performance_predictions(struct starpu_task *task,
 						unsigned nworkers,
@@ -677,15 +532,19 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 			{
 				/* TODO : conversion time */
 				local_task_length[worker_ctx][nimpl] = starpu_task_bundle_expected_length(bundle, perf_arch, nimpl);
-				local_data_penalty[worker_ctx][nimpl] = starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
-				local_energy[worker_ctx][nimpl] = starpu_task_bundle_expected_energy(bundle, perf_arch,nimpl);
+				if (local_data_penalty)
+					local_data_penalty[worker_ctx][nimpl] = starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
+				if (local_energy)
+					local_energy[worker_ctx][nimpl] = starpu_task_bundle_expected_energy(bundle, perf_arch,nimpl);
 
 			}
 			else
 			{
 				local_task_length[worker_ctx][nimpl] = starpu_task_worker_expected_length(task, workerid, sched_ctx_id, nimpl);
-				local_data_penalty[worker_ctx][nimpl] = starpu_task_expected_data_transfer_time_for(task, workerid);
-				local_energy[worker_ctx][nimpl] = starpu_task_worker_expected_energy(task, workerid, sched_ctx_id,nimpl);
+				if (local_data_penalty)
+					local_data_penalty[worker_ctx][nimpl] = starpu_task_expected_data_transfer_time_for(task, workerid);
+				if (local_energy)
+					local_energy[worker_ctx][nimpl] = starpu_task_worker_expected_energy(task, workerid, sched_ctx_id,nimpl);
 				double conversion_time = starpu_task_expected_conversion_time(task, perf_arch, nimpl);
 				if (conversion_time > 0.0)
 					local_task_length[worker_ctx][nimpl] += conversion_time;
@@ -742,7 +601,10 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 			if (unknown)
 				continue;
 
-			double task_starting_time = STARPU_MAX(exp_start + prev_exp_len, now + local_data_penalty[worker_ctx][nimpl]); 
+			double task_starting_time = exp_start + prev_exp_len;
+			if (local_data_penalty)
+				task_starting_time = STARPU_MAX(task_starting_time,
+					now + local_data_penalty[worker_ctx][nimpl]);
 
 			exp_end[worker_ctx][nimpl] = task_starting_time + local_task_length[worker_ctx][nimpl];
 
@@ -753,8 +615,9 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 				nimpl_best = nimpl;
 			}
 
-			if (isnan(local_energy[worker_ctx][nimpl]))
-				local_energy[worker_ctx][nimpl] = 0.;
+			if (local_energy)
+				if (isnan(local_energy[worker_ctx][nimpl]))
+					local_energy[worker_ctx][nimpl] = 0.;
 
 		}
 		worker_ctx++;
@@ -774,7 +637,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 	*max_exp_endp_of_workers = max_exp_end_of_workers;
 }
 
-static double _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id, unsigned simulate, unsigned sorted_decision)
+static double _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id, unsigned da, unsigned simulate, unsigned sorted_decision)
 {
 	/* find the queue */
 	int best = -1, best_in_ctx = -1;
@@ -812,8 +675,8 @@ static double _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned
 					    exp_end,
 					    &max_exp_end_of_workers,
 					    &min_exp_end_of_task,
-					    local_data_penalty,
-					    local_energy,
+					    da ? local_data_penalty : NULL,
+					    da ? local_energy : NULL,
 					    &forced_best,
 					    &forced_impl, sched_ctx_id, sorted_decision);
 
@@ -840,11 +703,14 @@ static double _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned
 					/* no one on that queue may execute this task */
 					continue;
 				}
-				fitness[worker_ctx][nimpl] = dt->alpha * __s_alpha__value *(exp_end[worker_ctx][nimpl] - min_exp_end_of_task)
-					+ dt->beta * __s_beta__value *(local_data_penalty[worker_ctx][nimpl])
-					+ dt->_gamma * __s_gamma__value *(local_energy[worker_ctx][nimpl]);
+				if (da)
+					fitness[worker_ctx][nimpl] = dt->alpha * __s_alpha__value *(exp_end[worker_ctx][nimpl] - min_exp_end_of_task)
+						+ dt->beta * __s_beta__value *(local_data_penalty[worker_ctx][nimpl])
+						+ dt->_gamma * __s_gamma__value *(local_energy[worker_ctx][nimpl]);
+				else
+					fitness[worker_ctx][nimpl] = exp_end[worker_ctx][nimpl] - min_exp_end_of_task;
 
-				if (exp_end[worker_ctx][nimpl] > max_exp_end_of_workers)
+				if (da && exp_end[worker_ctx][nimpl] > max_exp_end_of_workers)
 				{
 					/* This placement will make the computation
 					 * longer, take into account the idle
@@ -886,15 +752,17 @@ static double _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned
 		struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(best_in_ctx, sched_ctx_id);
 		unsigned memory_node = starpu_worker_get_memory_node(best);
 		model_best = starpu_task_expected_length(task, perf_arch, selected_impl);
-		transfer_model_best = starpu_task_expected_data_transfer_time(memory_node, task);
+		if (da)
+			transfer_model_best = starpu_task_expected_data_transfer_time(memory_node, task);
 	}
 	else
 	{
 		model_best = local_task_length[best_in_ctx][selected_impl];
-		transfer_model_best = local_data_penalty[best_in_ctx][selected_impl];
+		if (da)
+			transfer_model_best = local_data_penalty[best_in_ctx][selected_impl];
 	}
 
-	//_STARPU_DEBUG("Scheduler dmda: kernel (%u)\n", best_impl);
+	//_STARPU_DEBUG("Scheduler dmda: kernel (%u)\n", selected_impl);
 	starpu_task_set_implementation(task, selected_impl);
 
 	starpu_sched_task_break(task);
@@ -911,7 +779,7 @@ static double _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned
 
 static int dmda_push_sorted_decision_task(struct starpu_task *task)
 {
-	return _dmda_push_task(task, 1, task->sched_ctx, 0, 1);
+	return _dmda_push_task(task, 1, task->sched_ctx, 1, 0, 1);
 }
 
 static int dmda_push_sorted_task(struct starpu_task *task)
@@ -919,35 +787,40 @@ static int dmda_push_sorted_task(struct starpu_task *task)
 #ifdef STARPU_DEVEL
 #warning TODO: after defining a scheduling window, use that instead of empty_ctx_tasks
 #endif
-	return _dmda_push_task(task, 1, task->sched_ctx, 0, 0);
+	return _dmda_push_task(task, 1, task->sched_ctx, 1, 0, 0);
 }
 
 static int dm_push_task(struct starpu_task *task)
 {
-	return _dm_push_task(task, 0, task->sched_ctx);
+	return _dmda_push_task(task, 0, task->sched_ctx, 0, 0, 0);
+}
+
+static int dm_simulate_push_task(struct starpu_task *task)
+{
+	return _dmda_push_task(task, 0, task->sched_ctx, 0, 1, 0);
 }
 
 static int dmda_push_task(struct starpu_task *task)
 {
 	STARPU_ASSERT(task);
-	return _dmda_push_task(task, 0, task->sched_ctx, 0, 0);
+	return _dmda_push_task(task, 0, task->sched_ctx, 1, 0, 0);
 }
 static double dmda_simulate_push_task(struct starpu_task *task)
 {
 	STARPU_ASSERT(task);
-	return _dmda_push_task(task, 0, task->sched_ctx, 1, 0);
+	return _dmda_push_task(task, 0, task->sched_ctx, 1, 1, 0);
 }
 
 static double dmda_simulate_push_sorted_task(struct starpu_task *task)
 {
 	STARPU_ASSERT(task);
-	return _dmda_push_task(task, 1, task->sched_ctx, 1, 0);
+	return _dmda_push_task(task, 1, task->sched_ctx, 1, 1, 0);
 }
 
 static double dmda_simulate_push_sorted_decision_task(struct starpu_task *task)
 {
 	STARPU_ASSERT(task);
-	return _dmda_push_task(task, 1, task->sched_ctx, 1, 1);
+	return _dmda_push_task(task, 1, task->sched_ctx, 1, 1, 1);
 }
 
 #ifdef NOTIFY_READY_SOON
@@ -1199,7 +1072,7 @@ struct starpu_sched_policy _starpu_sched_dm_policy =
 	.add_workers = dmda_add_workers ,
 	.remove_workers = dmda_remove_workers,
 	.push_task = dm_push_task,
-	.simulate_push_task = NULL,
+	.simulate_push_task = dm_simulate_push_task,
 	.push_task_notify = dm_push_task_notify,
 	.pop_task = dmda_pop_task,
 	.pre_exec_hook = dmda_pre_exec_hook,