浏览代码

dmda*: take into account the data transfer time.

push_task_on_best_worker now has one more parameter: the expected data transfer time.
Cyril Roelandt 12 年之前
父节点
当前提交
473657492c
共有 1 个文件被更改,包括 43 次插入9 次删除
  1. 43 9
      src/sched_policies/deque_modeling_policy_data_aware.c

+ 43 - 9
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -216,21 +216,45 @@ static struct starpu_task *dmda_pop_every_task(void)
 
 
 
-static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio)
+static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
+				    double predicted, double predicted_transfer,
+				    int prio)
 {
 	/* make sure someone coule execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
 
-	struct _starpu_fifo_taskq *fifo;
-	fifo = queue_array[best_workerid];
+	struct _starpu_fifo_taskq *fifo = queue_array[best_workerid];
+
+	_STARPU_PTHREAD_MUTEX_LOCK(&sched_mutex[best_workerid]);
+
+	/* Sometimes workers didn't take the tasks as early as we expected */
+	fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
+	fifo->exp_end = fifo->exp_start + fifo->exp_len;
 
 	fifo->exp_end += predicted;
 	fifo->exp_len += predicted;
 
-	task->predicted = predicted;
+	if (starpu_timing_now() + predicted_transfer < fifo->exp_end)
+	{
+		/* We may hope that the transfer will be finished by
+		 * the start of the task. */
+		predicted_transfer = 0;
+	}
+	else
+	{
+		/* The transfer will not be finished by then, take the
+		 * remainder into account */
+		predicted_transfer += starpu_timing_now();
+		predicted_transfer -= fifo->exp_end;
+	}
+
+	fifo->exp_end += predicted_transfer;
+	fifo->exp_len += predicted_transfer;
 
-	/* TODO predicted_transfer */
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[best_workerid]);
 
+	task->predicted = predicted;
+	task->predicted_transfer = predicted_transfer;
 
 	if (starpu_get_prefetch_flag())
 	{
@@ -262,6 +286,7 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio)
 
 	double best_exp_end = 0.0;
 	double model_best = 0.0;
+	double transfer_model_best = 0.0;
 
 	int ntasks_best = -1;
 	double ntasks_best_end = 0.0;
@@ -275,6 +300,7 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio)
 
 	for (worker = 0; worker < nworkers; worker++)
 	{
+		unsigned memory_node = starpu_worker_get_memory_node(worker);
 		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 		{
 			double exp_end;
@@ -293,6 +319,7 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio)
 
 			enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
 			double local_length = starpu_task_expected_length(task, perf_arch, nimpl);
+			double local_penalty = starpu_task_expected_data_transfer_time(memory_node, task);
 			double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
 
 			//_STARPU_DEBUG("Scheduler dm: task length (%lf) worker (%u) kernel (%u) \n", local_length,worker,nimpl);
@@ -330,6 +357,7 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio)
 				best_exp_end = exp_end;
 				best = worker;
 				model_best = local_length;
+				transfer_model_best = local_penalty;
 				best_impl = nimpl;
 			}
 		}
@@ -339,6 +367,7 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio)
 	{
 		best = ntasks_best;
 		model_best = 0.0;
+		transfer_model_best = 0.0;
 	}
 
 	//_STARPU_DEBUG("Scheduler dm: kernel (%u)\n", best_impl);
@@ -346,7 +375,8 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio)
 	 _starpu_get_job_associated_to_task(task)->nimpl = best_impl;
 
 	/* we should now have the best worker in variable "best" */
-	return push_task_on_best_worker(task, best, model_best, prio);
+	return push_task_on_best_worker(task, best,
+					model_best, transfer_model_best, prio);
 }
 
 static void compute_all_performance_predictions(struct starpu_task *task,
@@ -472,6 +502,7 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
 	int best = -1;
 	int selected_impl = 0;
 	double model_best = 0.0;
+	double transfer_model_best = 0.0;
 
 	/* this flag is set if the corresponding worker is selected because
 	   there is no performance prediction available yet */
@@ -546,17 +577,19 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
 		 * so we force this measurement */
 		best = forced_best;
 		model_best = 0.0;
-		//penality_best = 0.0;
+		transfer_model_best = 0.0;
 	}
 	else if (task->bundle)
 	{
 		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(best);
+		unsigned memory_node = starpu_worker_get_memory_node(best);
 		model_best = starpu_task_expected_length(task, perf_arch, selected_impl);
+		transfer_model_best = starpu_task_expected_data_transfer_time(memory_node, task);
 	}
 	else
 	{
 		model_best = local_task_length[best][selected_impl];
-		//penality_best = local_data_penalty[best][best_impl];
+		transfer_model_best = local_data_penalty[best][selected_impl];
 	}
 
 	if (task->bundle)
@@ -567,7 +600,8 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
 	 _starpu_get_job_associated_to_task(task)->nimpl = selected_impl;
 
 	/* we should now have the best worker in variable "best" */
-	return push_task_on_best_worker(task, best, model_best, prio);
+	return push_task_on_best_worker(task, best,
+					model_best, transfer_model_best, prio);
 }
 
 static int dmda_push_sorted_task(struct starpu_task *task)