Browse Source

Various bug fixes/cleanup in the parallel heft scheduling strategy

Cédric Augonnet 14 years ago
parent
commit
c25ff2e248
1 changed files with 52 additions and 48 deletions
  1. 52 48
      src/sched_policies/parallel_heft.c

+ 52 - 48
src/sched_policies/parallel_heft.c

@@ -37,13 +37,16 @@ static double beta = STARPU_DEFAULT_BETA;
 static double _gamma = STARPU_DEFAULT_GAMMA;
 static double idle_power = 0.0;
 
-static double exp_start[STARPU_NMAXWORKERS];
-static double exp_end[STARPU_NMAXWORKERS];
-static double exp_len[STARPU_NMAXWORKERS];
-static double ntasks[STARPU_NMAXWORKERS];
+static double worker_exp_start[STARPU_NMAXWORKERS];
+static double worker_exp_end[STARPU_NMAXWORKERS];
+static double worker_exp_len[STARPU_NMAXWORKERS];
+static int ntasks[STARPU_NMAXWORKERS];
 
 static void parallel_heft_post_exec_hook(struct starpu_task *task)
 {
+	if (!task->cl || task->execute_on_a_specific_worker)
+		return;
+
 	int workerid = starpu_worker_get_id();
 	double model = task->predicted;
 	
@@ -53,16 +56,14 @@ static void parallel_heft_post_exec_hook(struct starpu_task *task)
 	/* Once we have executed the task, we can update the predicted amount
 	 * of work. */
 	PTHREAD_MUTEX_LOCK(&sched_mutex[workerid]);
-	exp_len[workerid] -= model;
-	exp_start[workerid] = starpu_timing_now() + model;
-	exp_end[workerid] = exp_start[workerid] + exp_len[workerid];
+	worker_exp_len[workerid] -= model;
+	worker_exp_start[workerid] = starpu_timing_now();
+	worker_exp_end[workerid] = worker_exp_start[workerid] + worker_exp_len[workerid];
 	ntasks[workerid]--;
 	PTHREAD_MUTEX_UNLOCK(&sched_mutex[workerid]);
 }
 
-
-
-static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio)
+static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double exp_end_predicted, int prio)
 {
 	/* make sure someone coule execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
@@ -83,14 +84,13 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 
 	if (is_basic_worker)
 	{
-		if (predicted > 0.0)
-		{
-			exp_end[best_workerid] += predicted;
-			exp_len[best_workerid] += predicted;
-		}
-	
-		task->predicted = predicted;
+		task->predicted = exp_end_predicted - worker_exp_end[best_workerid];
+		worker_exp_len[best_workerid] += exp_end_predicted - worker_exp_end[best_workerid];
+		worker_exp_end[best_workerid] = exp_end_predicted;
+		worker_exp_start[best_workerid] = exp_end_predicted - worker_exp_len[best_workerid];
 	
+		ntasks[best_workerid]++;
+
 		ret = starpu_push_local_task(best_workerid, task, prio);
 	}
 	else {
@@ -100,8 +100,6 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		int worker_size = combined_worker->worker_size;
 		int *combined_workerid = combined_worker->combined_workerid;
 
-		task->predicted = predicted;
-
 		starpu_job_t j = _starpu_get_job_associated_to_task(task);
 		j->task_size = worker_size;
 		j->combined_workerid = best_workerid;
@@ -116,14 +114,14 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 			struct starpu_task *alias = _starpu_create_task_alias(task);
 			int local_worker = combined_workerid[i];
 
-			if (predicted > 0.0)
-			{
-				exp_end[local_worker] += predicted;
-				exp_len[local_worker] += predicted;
-			}
-		
-			alias->predicted = predicted;
+			alias->predicted = exp_end_predicted - worker_exp_end[local_worker];
+	
+			worker_exp_len[local_worker] += exp_end_predicted - worker_exp_end[local_worker];
+			worker_exp_end[local_worker] = exp_end_predicted;
+			worker_exp_start[local_worker] = exp_end_predicted - worker_exp_len[local_worker];
 		
+			ntasks[local_worker]++;
+	
 			ret |= starpu_push_local_task(local_worker, alias, prio);
 		}
 
@@ -139,7 +137,7 @@ static double compute_expected_end(int workerid, double length)
 	if (workerid < (int)nworkers)
 	{
 		/* This is a basic worker */
-		return exp_start[workerid] + exp_len[workerid] + length;
+		return worker_exp_start[workerid] + worker_exp_len[workerid] + length;
 	}
 	else {
 		/* This is a combined worker, the expected end is the end for the latest worker */
@@ -152,7 +150,9 @@ static double compute_expected_end(int workerid, double length)
 		int i;
 		for (i = 0; i < worker_size; i++)
 		{
-			double local_exp_end = (exp_start[combined_workerid[i]] + exp_len[combined_workerid[i]] + length);
+			double local_exp_start = worker_exp_start[combined_workerid[i]];
+			double local_exp_len = worker_exp_len[combined_workerid[i]];
+			double local_exp_end = local_exp_start + local_exp_len + length;
 			exp_end = STARPU_MAX(exp_end, local_exp_end);
 		}
 
@@ -199,7 +199,7 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 	double local_task_length[nworkers+ncombinedworkers];
 	double local_data_penalty[nworkers+ncombinedworkers];
 	double local_power[nworkers+ncombinedworkers];
-	double exp_end[nworkers+ncombinedworkers];
+	double local_exp_end[nworkers+ncombinedworkers];
 	double fitness[nworkers+ncombinedworkers];
 
 	double max_exp_end = 0.0;
@@ -207,7 +207,6 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 	int skip_worker[nworkers+ncombinedworkers];
 
 	double best_exp_end = DBL_MAX;
-	double model_best = 0.0;
 	double penality_best = 0.0;
 
 	int ntasks_best = -1;
@@ -220,10 +219,10 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 	for (worker = 0; worker < nworkers; worker++)
 	{
 		/* Sometimes workers didn't take the tasks as early as we expected */
-		exp_start[worker] = STARPU_MAX(exp_start[worker], starpu_timing_now());
-		exp_end[worker] = exp_start[worker] + exp_len[worker];
-		if (exp_end[worker] > max_exp_end)
-			max_exp_end = exp_end[worker];
+		worker_exp_start[worker] = STARPU_MAX(worker_exp_start[worker], starpu_timing_now());
+		worker_exp_end[worker] = worker_exp_start[worker] + worker_exp_len[worker];
+		if (worker_exp_end[worker] > max_exp_end)
+			max_exp_end = worker_exp_end[worker];
 	}
 
 	for (worker = 0; worker < (nworkers+ncombinedworkers); worker++)
@@ -269,12 +268,14 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 		if (unknown)
 			continue;
 
-		exp_end[worker] = compute_expected_end(worker, local_task_length[worker]);
+		local_exp_end[worker] = compute_expected_end(worker, local_task_length[worker]);
 
-		if (exp_end[worker] < best_exp_end)
+		//fprintf(stderr, "WORKER %d -> length %e end %e\n", worker, local_task_length[worker], local_exp_end[worker]);
+
+		if (local_exp_end[worker] < best_exp_end)
 		{
 			/* a better solution was found */
-			best_exp_end = exp_end[worker];
+			best_exp_end = local_exp_end[worker];
 		}
 
 		local_power[worker] = starpu_task_expected_power(task, perf_arch);
@@ -286,7 +287,8 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 		forced_best = ntasks_best;
 
 	double best_fitness = -1;
-	
+
+
 	if (forced_best == -1)
 	{
 		for (worker = 0; worker < nworkers+ncombinedworkers; worker++)
@@ -298,15 +300,15 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 				continue;
 			}
 	
-			fitness[worker] = alpha*(exp_end[worker] - best_exp_end) 
+			fitness[worker] = alpha*(local_exp_end[worker] - best_exp_end) 
 					+ beta*(local_data_penalty[worker])
 					+ _gamma*(local_power[worker]);
 
-			if (exp_end[worker] > max_exp_end)
+			if (local_exp_end[worker] > max_exp_end)
 				/* This placement will make the computation
 				 * longer, take into account the idle
 				 * consumption of other cpus */
-				fitness[worker] += _gamma * idle_power * (exp_end[worker] - max_exp_end) / 1000000.0;
+				fitness[worker] += _gamma * idle_power * (local_exp_end[worker] - max_exp_end) / 1000000.0;
 
 			if (best == -1 || fitness[worker] < best_fitness)
 			{
@@ -314,28 +316,30 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 				best_fitness = fitness[worker];
 				best = worker;
 			}
+
+		//	fprintf(stderr, "FITNESS worker %d -> %e local_exp_end %e - local_data_penalty %e\n", worker, fitness[worker], local_exp_end[worker] - best_exp_end, local_data_penalty[worker]);
 		}
 	}
 
 	STARPU_ASSERT(forced_best != -1 || best != -1);
-	
+
 	if (forced_best != -1)
 	{
 		/* there is no prediction available for that task
 		 * with that arch we want to speed-up calibration time
 		 * so we force this measurement */
 		best = forced_best;
-		model_best = 0.0;
 		penality_best = 0.0;
+		best_exp_end = local_exp_end[best];
 	}
 	else 
 	{
-		model_best = local_task_length[best];
 		penality_best = local_data_penalty[best];
+		best_exp_end = local_exp_end[best];
 	}
 
 	/* we should now have the best worker in variable "best" */
-	return push_task_on_best_worker(task, best, model_best, prio);
+	return push_task_on_best_worker(task, best, best_exp_end, prio);
 }
 
 static int parallel_heft_push_prio_task(struct starpu_task *task)
@@ -379,9 +383,9 @@ static void initialize_parallel_heft_policy(struct starpu_machine_topology_s *to
 	unsigned workerid;
 	for (workerid = 0; workerid < nworkers; workerid++)
 	{
-		exp_start[workerid] = starpu_timing_now();
-		exp_len[workerid] = 0.0;
-		exp_end[workerid] = exp_start[workerid]; 
+		worker_exp_start[workerid] = starpu_timing_now();
+		worker_exp_len[workerid] = 0.0;
+		worker_exp_end[workerid] = worker_exp_start[workerid]; 
 		ntasks[workerid] = 0;
 	
 		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);