Forráskód Böngészése

when performance models are not calibrated yet for dm, dmda or pheft, revert to some alpha-ponderated greedy distribution

Samuel Thibault 14 éve
szülő
commit
643293a4b7

+ 45 - 17
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -307,6 +307,12 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio)
 	double best_exp_end = 0.0;
 	double model_best = 0.0;
 
+	int ntasks_best = -1;
+	double ntasks_best_end = 0.0;
+
+	/* A priori, we know all estimations */
+	int unknown = 0;
+
 	for (worker = 0; worker < nworkers; worker++)
 	{
 		double exp_end;
@@ -324,19 +330,21 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio)
 
 		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
 		double local_length = starpu_task_expected_length(task, perf_arch);
+		double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
 
-		if (local_length == -1.0) 
-		{
+		if (ntasks_best == -1 || ntasks_end < ntasks_best_end) {
+			ntasks_best_end = ntasks_end;
+			ntasks_best = worker;
+		}
+
+		if (local_length <= 0.0)
 			/* there is no prediction available for that task
 			 * with that arch yet, we want to speed-up calibration time 
-			 * so we force this measurement */
-			/* XXX assert we are benchmarking ! */
-			best = worker;
-			model_best = 0.0;
-			exp_end = fifo->exp_start + fifo->exp_len;
-			break;
-		}
+			 * so we switch to distributing tasks greedily */
+			unknown = 1;
 
+		if (unknown)
+			continue;
 
 		exp_end = fifo->exp_start + fifo->exp_len + local_length;
 
@@ -348,6 +356,11 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio)
 			model_best = local_length;
 		}
 	}
+
+	if (unknown) {
+		best = ntasks_best;
+		model_best = 0.0;
+	}
 	
 	/* we should now have the best worker in variable "best" */
 	return push_task_on_best_worker(task, best, model_best, prio);
@@ -375,6 +388,12 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
 	double model_best = 0.0;
 	double penality_best = 0.0;
 
+	int ntasks_best = -1;
+	double ntasks_best_end = 0.0;
+
+	/* A priori, we know all estimations */
+	int unknown = 0;
+
 	for (worker = 0; worker < nworkers; worker++)
 	{
 		fifo = queue_array[worker];
@@ -394,15 +413,21 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
 		unsigned memory_node = starpu_worker_get_memory_node(worker);
 		local_data_penalty[worker] = starpu_data_expected_penalty(memory_node, task);
 
-		if (local_task_length[worker] == -1.0)
-		{
+		double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
+
+		if (ntasks_best == -1 || ntasks_end < ntasks_best_end) {
+			ntasks_best_end = ntasks_end;
+			ntasks_best = worker;
+		}
+
+		if (local_task_length[worker] <= 0.0)
 			/* there is no prediction available for that task
 			 * with that arch yet, we want to speed-up calibration time 
-			 * so we force this measurement */
-			/* XXX assert we are benchmarking ! */
-			forced_best = worker;
-			break;
-		}
+			 * so we switch to distributing tasks greedily */
+			unknown = 1;
+
+		if (unknown)
+			continue;
 
 		exp_end[worker] = fifo->exp_start + fifo->exp_len + local_task_length[worker];
 
@@ -417,6 +442,9 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
 			local_power[worker] = 0.;
 	}
 
+	if (unknown)
+		forced_best = ntasks_best;
+
 	double best_fitness = -1;
 	
 	if (forced_best == -1)
@@ -453,7 +481,7 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
 		/* there is no prediction available for that task
 		 * with that arch we want to speed-up calibration time
 		 * so we force this measurement */
-		best = worker;
+		best = forced_best;
 		model_best = 0.0;
 		penality_best = 0.0;
 	}

+ 54 - 11
src/sched_policies/parallel_heft.c

@@ -182,6 +182,37 @@ static double compute_expected_end(int workerid, double length)
 	}
 }
 
+static double compute_ntasks_end(int workerid)
+{
+	enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
+	if (workerid < (int)nworkers)
+	{
+		/* This is a basic worker */
+		struct starpu_fifo_taskq_s *fifo;
+		fifo = queue_array[workerid];
+		return fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
+	}
+	else {
+		/* This is a combined worker, the expected end is the end for the latest worker */
+		int worker_size;
+		int *combined_workerid;
+		starpu_combined_worker_get_description(workerid, &worker_size, &combined_workerid);
+
+		int ntasks_end;
+
+		int i;
+		for (i = 0; i < worker_size; i++)
+		{
+			struct starpu_fifo_taskq_s *fifo;
+			fifo = queue_array[combined_workerid[i]];
+			/* XXX: this is actually bogus: not all pushed tasks are necessarily parallel... */
+			ntasks_end = STARPU_MAX(ntasks_end, fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch));
+		}
+
+		return ntasks_end;
+	}
+}
+
 static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 {
 	/* find the queue */
@@ -204,6 +235,12 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 	double model_best = 0.0;
 	double penality_best = 0.0;
 
+	int ntasks_best = -1;
+	double ntasks_best_end = 0.0;
+
+	/* A priori, we know all estimations */
+	int unknown = 0;
+
 	for (worker = 0; worker < nworkers; worker++)
 	{
 		fifo = queue_array[worker];
@@ -229,18 +266,21 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 		unsigned memory_node = starpu_worker_get_memory_node(worker);
 		local_data_penalty[worker] = starpu_data_expected_penalty(memory_node, task);
 
-		if (local_task_length[worker] == -1.0)
-		{
+		double ntasks_end = compute_ntasks_end(worker);
+
+		if (ntasks_best == -1 || ntasks_end < ntasks_best_end) {
+			ntasks_best_end = ntasks_end;
+			ntasks_best = worker;
+		}
+
+		if (local_task_length[worker] <= 0.0)
 			/* there is no prediction available for that task
 			 * with that arch yet, we want to speed-up calibration time 
-			 * so we force this measurement */
-			/* XXX assert we are benchmarking ! */
-			forced_best = worker;
-			break;
-		}
-		if (local_task_length[worker] == 0.)
-		{
-		}
+			 * so we switch to distributing tasks greedily */
+			unknown = 1;
+
+		if (unknown)
+			continue;
 
 		exp_end[worker] = compute_expected_end(worker, local_task_length[worker]);
 
@@ -251,6 +291,9 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 		}
 	}
 
+	if (unknown)
+		forced_best = ntasks_best;
+
 	double best_fitness = -1;
 	
 	if (forced_best == -1)
@@ -283,7 +326,7 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 		/* there is no prediction available for that task
 		 * with that arch we want to speed-up calibration time
 		 * so we force this measurement */
-		best = worker;
+		best = forced_best;
 		model_best = 0.0;
 		penality_best = 0.0;
 	}