Forráskód Böngészése

take transfer time into account in heft when the task is scheduled to be executed almost immediately: the transfer will never be overlappable in that case.

Samuel Thibault 14 éve
szülő
commit
408e6de3ce

+ 5 - 0
include/starpu_task.h

@@ -166,6 +166,10 @@ struct starpu_task {
 	 * scheduling strategy uses performance models. */
 	double predicted;
 
+	/* Predicted data transfer duration for the task in µs. This field is
+	 * only valid if the scheduling strategy uses performance models. */
+	double predicted_transfer;
+
 	/* This field are provided for the convenience of the scheduler. */
 	struct starpu_task *prev;
 	struct starpu_task *next;
@@ -197,6 +201,7 @@ struct starpu_task {
 	.status = STARPU_TASK_INVALID,			\
 	.profiling_info = NULL,				\
 	.predicted = -1.0,				\
+	.predicted_transfer = -1.0,			\
 	.starpu_private = NULL				\
 };
 

+ 1 - 1
src/core/perfmodel/perfmodel_history.c

@@ -980,7 +980,7 @@ void _starpu_update_perfmodel_history(starpu_job_t j, struct starpu_perfmodel_t
 
 		STARPU_ASSERT(j->footprint_is_computed);
 
-		fprintf(debug_file, "0x%x\t%lu\t%f\t%f\t%d\t\t", j->footprint, (unsigned long) _starpu_job_get_data_size(j), measured, task->predicted, cpuid);
+		fprintf(debug_file, "0x%x\t%lu\t%f\t%f\t%f\t%d\t\t", j->footprint, (unsigned long) _starpu_job_get_data_size(j), measured, task->predicted, task->predicted_transfer, cpuid);
 		unsigned i;
 			
 		for (i = 0; i < task->cl->nbuffers; i++)

+ 1 - 0
src/core/task.c

@@ -75,6 +75,7 @@ void starpu_task_init(struct starpu_task *task)
 	task->profiling_info = NULL;
 
 	task->predicted = -1.0;
+	task->predicted_transfer = -1.0;
 
 	task->starpu_private = NULL;
 }

+ 2 - 0
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -282,6 +282,8 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 
 	task->predicted = predicted;
 
+	/* TODO predicted_transfer */
+
 	unsigned memory_node = starpu_worker_get_memory_node(best_workerid);
 
 	if (starpu_get_prefetch_flag())

+ 49 - 7
src/sched_policies/heft.c

@@ -101,11 +101,12 @@ static void heft_post_exec_hook(struct starpu_task *task)
 {
 	int workerid = starpu_worker_get_id();
 	double model = task->predicted;
+	double transfer_model = task->predicted_transfer;
 	
 	/* Once we have executed the task, we can update the predicted amount
 	 * of work. */
 	PTHREAD_MUTEX_LOCK(&sched_mutex[workerid]);
-	exp_len[workerid] -= model;
+	exp_len[workerid] -= model + transfer_model;
 	exp_start[workerid] = starpu_timing_now();
 	exp_end[workerid] = exp_start[workerid] + exp_len[workerid];
 	ntasks[workerid]--;
@@ -116,16 +117,19 @@ static void heft_push_task_notify(struct starpu_task *task, int workerid)
 {
 	/* Compute the expected penality */
 	enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
+	unsigned memory_node = starpu_worker_get_memory_node(workerid);
 
 	double predicted = starpu_task_expected_length(task, perf_arch,
 			_starpu_get_job_associated_to_task(task)->nimpl);
 
+	double predicted_transfer = starpu_task_expected_data_transfer_time(memory_node, task);
+
 	/* Update the predictions */
 	PTHREAD_MUTEX_LOCK(&sched_mutex[workerid]);
 
 	/* Sometimes workers didn't take the tasks as early as we expected */
 	exp_start[workerid] = STARPU_MAX(exp_start[workerid], starpu_timing_now());
-	exp_end[workerid] = STARPU_MAX(exp_start[workerid], starpu_timing_now());
+	exp_end[workerid] = exp_start[workerid] + exp_len[workerid];
 
 	/* If there is no prediction available, we consider the task has a null length */
 	if (predicted != -1.0)
@@ -135,23 +139,59 @@ static void heft_push_task_notify(struct starpu_task *task, int workerid)
 		exp_len[workerid] += predicted;
 	}
 
+	/* If there is no prediction available, we consider the task has a null length */
+	if (predicted_transfer != -1.0)
+	{
+		if (starpu_timing_now() + predicted_transfer < exp_end[workerid]) {
+			/* We may hope that the transfer will be finished by
+			 * the start of the task. */
+			predicted_transfer = 0;
+		} else {
+			/* The transfer will not be finished by then, take the
+			 * remainder into account */
+			predicted_transfer = (starpu_timing_now() + predicted_transfer) - exp_end[workerid];
+		}
+		task->predicted_transfer = predicted_transfer;
+		exp_end[workerid] += predicted_transfer;
+		exp_len[workerid] += predicted_transfer;
+	}
+
 	ntasks[workerid]++;
 
 	PTHREAD_MUTEX_UNLOCK(&sched_mutex[workerid]);
 }
 
-static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio)
+static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, double predicted_transfer, int prio)
 {
 	/* make sure someone coule execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
 
 	PTHREAD_MUTEX_LOCK(&sched_mutex[best_workerid]);
+
+	/* Sometimes workers didn't take the tasks as early as we expected */
+	exp_start[best_workerid] = STARPU_MAX(exp_start[best_workerid], starpu_timing_now());
+	exp_end[best_workerid] = exp_start[best_workerid] + exp_len[best_workerid];
+
 	exp_end[best_workerid] += predicted;
 	exp_len[best_workerid] += predicted;
+
+	if (starpu_timing_now() + predicted_transfer < exp_end[best_workerid]) {
+		/* We may hope that the transfer will be finished by
+		 * the start of the task. */
+		predicted_transfer = 0;
+	} else {
+		/* The transfer will not be finished by then, take the
+		 * remainder into account */
+		predicted_transfer = (starpu_timing_now() + predicted_transfer) - exp_end[best_workerid];
+	}
+	exp_end[best_workerid] += predicted_transfer;
+	exp_len[best_workerid] += predicted_transfer;
+
 	ntasks[best_workerid]++;
 	PTHREAD_MUTEX_UNLOCK(&sched_mutex[best_workerid]);
 
 	task->predicted = predicted;
+	task->predicted_transfer = predicted_transfer;
 
 	if (starpu_top_status_get())
 		starputop_task_prevision(task, best_workerid, 
@@ -186,7 +226,6 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 	unsigned worker;
 
 	unsigned nimpl;
-	unsigned best_impl = 0;
 
 	for (worker = 0; worker < nworkers; worker++) {
 		for (nimpl = 0; nimpl <STARPU_MAXIMPLEMENTATIONS; nimpl++) {
@@ -301,7 +340,7 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio)
 	/* If there is no prediction available for that task with that arch we
 	 * want to speed-up calibration time so we force this measurement */
 	if (forced_best != -1)
-		return push_task_on_best_worker(task, forced_best, 0.0, prio);
+		return push_task_on_best_worker(task, forced_best, 0.0, 0.0, prio);
 
 	/*
 	 *	Determine which worker optimizes the fitness metric which is a
@@ -343,15 +382,17 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio)
 	STARPU_ASSERT(best != -1);
 	
 	/* we should now have the best worker in variable "best" */
-	double model_best;
+	double model_best, transfer_model_best;
 
 	if (bundle)
 	{
 		/* If we have a task bundle, we have computed the expected
 		 * length for the entire bundle, but not for the task alone. */
 		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(best);
+		unsigned memory_node = starpu_worker_get_memory_node(worker);
 		model_best = starpu_task_expected_length(task, perf_arch,
 				_starpu_get_job_associated_to_task(task)->nimpl);
+		transfer_model_best = starpu_task_expected_data_transfer_time(memory_node, task);
 
 		/* Remove the task from the bundle since we have made a
 		 * decision for it, and that other tasks should not consider it
@@ -367,11 +408,12 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio)
 	}
 	else {
 		model_best = local_task_length[best];
+		transfer_model_best = local_data_penalty[worker];
 	}
 
 	
 	_starpu_get_job_associated_to_task(task)->nimpl = nimpls[worker];
-	return push_task_on_best_worker(task, best, model_best, prio);
+	return push_task_on_best_worker(task, best, model_best, transfer_model_best, prio);
 }
 
 static int heft_push_task(struct starpu_task *task)

+ 6 - 1
src/sched_policies/parallel_heft.c

@@ -50,6 +50,7 @@ static void parallel_heft_post_exec_hook(struct starpu_task *task)
 
 	int workerid = starpu_worker_get_id();
 	double model = task->predicted;
+	double transfer_model = task->predicted_transfer;
 	
 	if (model < 0.0)
 		model = 0.0;
@@ -57,7 +58,7 @@ static void parallel_heft_post_exec_hook(struct starpu_task *task)
 	/* Once we have executed the task, we can update the predicted amount
 	 * of work. */
 	PTHREAD_MUTEX_LOCK(&sched_mutex[workerid]);
-	worker_exp_len[workerid] -= model;
+	worker_exp_len[workerid] -= model + transfer_model;
 	worker_exp_start[workerid] = starpu_timing_now();
 	worker_exp_end[workerid] = worker_exp_start[workerid] + worker_exp_len[workerid];
 	ntasks[workerid]--;
@@ -86,6 +87,8 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	if (is_basic_worker)
 	{
 		task->predicted = exp_end_predicted - worker_exp_end[best_workerid];
+		/* TODO */
+		task->predicted_transfer = 0;
 		worker_exp_len[best_workerid] += exp_end_predicted - worker_exp_end[best_workerid];
 		worker_exp_end[best_workerid] = exp_end_predicted;
 		worker_exp_start[best_workerid] = exp_end_predicted - worker_exp_len[best_workerid];
@@ -116,6 +119,8 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 			int local_worker = combined_workerid[i];
 
 			alias->predicted = exp_end_predicted - worker_exp_end[local_worker];
+			/* TODO */
+			alias->predicted_transfer = 0;
 	
 			worker_exp_len[local_worker] += exp_end_predicted - worker_exp_end[local_worker];
 			worker_exp_end[local_worker] = exp_end_predicted;