Browse Source

Heft : bugfix. In _heft_push_task(), after running
compute_all_performance_predictions(), the values stored in
local_task_length[i], local_data_penalty[i] etc. were the correct values for the
nth implementation of a codelet, but were used even for other implementations.

Cyril Roelandt 13 years ago
parent
commit
af16a3e8d0
1 changed files with 91 additions and 75 deletions
  1. 91 75
      src/sched_policies/heft.c

+ 91 - 75
src/sched_policies/heft.c

@@ -25,6 +25,7 @@
 #include <starpu_parameters.h>
 #include <starpu_task_bundle.h>
 #include <starpu_top.h>
+#include <core/jobs.h>
 
 static unsigned nworkers;
 
@@ -37,8 +38,8 @@ static double _gamma = STARPU_DEFAULT_GAMMA;
 static double idle_power = 0.0;
 
 static double exp_start[STARPU_NMAXWORKERS]; /* of the first queued task */
-static double exp_end[STARPU_NMAXWORKERS];   /* of the set of queued tasks */
-static double exp_len[STARPU_NMAXWORKERS];   /* of the last queued task */
+static double exp_end[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];   /* of the set of queued tasks */
+static double exp_len[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];   /* of the last queued task */
 static double ntasks[STARPU_NMAXWORKERS];
 
 const float alpha_minimum=0;
@@ -82,12 +83,14 @@ static void heft_init(struct starpu_machine_topology_s *topology,
 	starputop_register_parameter_float("HEFT_GAMMA", &_gamma, gamma_minimum,gamma_maximum,param_modified);
 	starputop_register_parameter_float("HEFT_IDLE_POWER", &idle_power, idle_power_minimum,idle_power_maximum,param_modified);
 
-	unsigned workerid;
+	unsigned workerid, nimpl;
 	for (workerid = 0; workerid < nworkers; workerid++)
 	{
-		exp_start[workerid] = starpu_timing_now();
-		exp_len[workerid] = 0.0;
-		exp_end[workerid] = exp_start[workerid]; 
+		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++) {
+			exp_start[workerid] = starpu_timing_now();
+			exp_len[workerid][nimpl] = 0.0;
+			exp_end[workerid][nimpl] = exp_start[workerid];
+		}
 		ntasks[workerid] = 0;
 
 		PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
@@ -102,25 +105,26 @@ static void heft_post_exec_hook(struct starpu_task *task)
 	int workerid = starpu_worker_get_id();
 	double model = task->predicted;
 	double transfer_model = task->predicted_transfer;
+	unsigned int nimpl = _starpu_get_job_associated_to_task(task)->nimpl;
 	
 	/* Once we have executed the task, we can update the predicted amount
 	 * of work. */
 	PTHREAD_MUTEX_LOCK(&sched_mutex[workerid]);
-	exp_len[workerid] -= model + transfer_model;
+	exp_len[workerid][nimpl] -= model + transfer_model;
 	exp_start[workerid] = starpu_timing_now();
-	exp_end[workerid] = exp_start[workerid] + exp_len[workerid];
+	exp_end[workerid][nimpl] = exp_start[workerid] + exp_len[workerid][nimpl];
 	ntasks[workerid]--;
 	PTHREAD_MUTEX_UNLOCK(&sched_mutex[workerid]);
 }
 
 static void heft_push_task_notify(struct starpu_task *task, int workerid)
 {
+	unsigned nimpl = _starpu_get_job_associated_to_task(task)->nimpl;
 	/* Compute the expected penality */
 	enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
 	unsigned memory_node = starpu_worker_get_memory_node(workerid);
 
-	double predicted = starpu_task_expected_length(task, perf_arch,
-			_starpu_get_job_associated_to_task(task)->nimpl);
+	double predicted = starpu_task_expected_length(task, perf_arch, nimpl);
 
 	double predicted_transfer = starpu_task_expected_data_transfer_time(memory_node, task);
 
@@ -129,31 +133,31 @@ static void heft_push_task_notify(struct starpu_task *task, int workerid)
 
 	/* Sometimes workers didn't take the tasks as early as we expected */
 	exp_start[workerid] = STARPU_MAX(exp_start[workerid], starpu_timing_now());
-	exp_end[workerid] = exp_start[workerid] + exp_len[workerid];
+	exp_end[workerid][nimpl] = exp_start[workerid] + exp_len[workerid][nimpl];
 
 	/* If there is no prediction available, we consider the task has a null length */
 	if (predicted != -1.0)
 	{
 		task->predicted = predicted;
-		exp_end[workerid] += predicted;
-		exp_len[workerid] += predicted;
+		exp_end[workerid][nimpl] += predicted;
+		exp_len[workerid][nimpl] += predicted;
 	}
 
 	/* If there is no prediction available, we consider the task has a null length */
 	if (predicted_transfer != -1.0)
 	{
-		if (starpu_timing_now() + predicted_transfer < exp_end[workerid]) {
+		if (starpu_timing_now() + predicted_transfer < exp_end[workerid][nimpl]) {
 			/* We may hope that the transfer will be finished by
 			 * the start of the task. */
 			predicted_transfer = 0;
 		} else {
 			/* The transfer will not be finished by then, take the
 			 * remainder into account */
-			predicted_transfer = (starpu_timing_now() + predicted_transfer) - exp_end[workerid];
+			predicted_transfer = (starpu_timing_now() + predicted_transfer) - exp_end[workerid][nimpl];
 		}
 		task->predicted_transfer = predicted_transfer;
-		exp_end[workerid] += predicted_transfer;
-		exp_len[workerid] += predicted_transfer;
+		exp_end[workerid][nimpl] += predicted_transfer;
+		exp_len[workerid][nimpl] += predicted_transfer;
 	}
 
 	ntasks[workerid]++;
@@ -163,6 +167,11 @@ static void heft_push_task_notify(struct starpu_task *task, int workerid)
 
 static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, double predicted_transfer, int prio)
 {
+	starpu_job_t j = _starpu_get_job_associated_to_task(task);
+
+	/* it would be quite bad if j was NULL, but we could always try going a little further */
+	unsigned int nimpl = j?j->nimpl:0;
+
 	/* make sure someone coule execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
 
@@ -170,22 +179,22 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 
 	/* Sometimes workers didn't take the tasks as early as we expected */
 	exp_start[best_workerid] = STARPU_MAX(exp_start[best_workerid], starpu_timing_now());
-	exp_end[best_workerid] = exp_start[best_workerid] + exp_len[best_workerid];
+	exp_end[best_workerid][nimpl] = exp_start[best_workerid] + exp_len[best_workerid][nimpl];
 
-	exp_end[best_workerid] += predicted;
-	exp_len[best_workerid] += predicted;
+	exp_end[best_workerid][nimpl] += predicted;
+	exp_len[best_workerid][nimpl] += predicted;
 
-	if (starpu_timing_now() + predicted_transfer < exp_end[best_workerid]) {
+	if (starpu_timing_now() + predicted_transfer < exp_end[best_workerid][nimpl]) {
 		/* We may hope that the transfer will be finished by
 		 * the start of the task. */
 		predicted_transfer = 0;
 	} else {
 		/* The transfer will not be finished by then, take the
 		 * remainder into account */
-		predicted_transfer = (starpu_timing_now() + predicted_transfer) - exp_end[best_workerid];
+		predicted_transfer = (starpu_timing_now() + predicted_transfer) - exp_end[best_workerid][nimpl];
 	}
-	exp_end[best_workerid] += predicted_transfer;
-	exp_len[best_workerid] += predicted_transfer;
+	exp_end[best_workerid][nimpl] += predicted_transfer;
+	exp_len[best_workerid][nimpl] += predicted_transfer;
 
 	ntasks[best_workerid]++;
 	PTHREAD_MUTEX_UNLOCK(&sched_mutex[best_workerid]);
@@ -195,8 +204,8 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 
 	if (starpu_top_status_get())
 		starputop_task_prevision(task, best_workerid, 
-					(unsigned long long)(exp_end[best_workerid]-predicted)/1000,
-					(unsigned long long)exp_end[best_workerid]/1000);
+					(unsigned long long)(exp_end[best_workerid][nimpl]-predicted)/1000,
+					(unsigned long long)exp_end[best_workerid][nimpl]/1000);
 
 	if (starpu_get_prefetch_flag())
 	{
@@ -208,10 +217,13 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 }
 
 static void compute_all_performance_predictions(struct starpu_task *task,
-					double *local_task_length, double *exp_end,
-					double *max_exp_endp, double *best_exp_endp,
-					double *local_data_penalty,
-					double *local_power, int *forced_best,
+					double local_task_length[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
+					double exp_end[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
+					double *max_exp_endp,
+					double *best_exp_endp,
+					double local_data_penalty[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
+					double local_power[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
+					int *forced_best,
 					struct starpu_task_bundle *bundle,
 					unsigned int *nimpls)
 {
@@ -232,9 +244,9 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 		for (nimpl = 0; nimpl <STARPU_MAXIMPLEMENTATIONS; nimpl++) {
 			/* Sometimes workers didn't take the tasks as early as we expected */
 			exp_start[worker] = STARPU_MAX(exp_start[worker], starpu_timing_now());
-			exp_end[worker] = exp_start[worker] + exp_len[worker];
-			if (exp_end[worker] > max_exp_end)
-				max_exp_end = exp_end[worker];
+			exp_end[worker][nimpl] = exp_start[worker] + exp_len[worker][nimpl];
+			if (exp_end[worker][nimpl] > max_exp_end)
+				max_exp_end = exp_end[worker][nimpl];
 
 			if (!starpu_worker_may_execute_task(worker, task, nimpl))
 			{
@@ -247,16 +259,16 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 
 			if (bundle)
 			{
-				local_task_length[worker] = starpu_task_bundle_expected_length(bundle, perf_arch, nimpl);
-				local_data_penalty[worker] = starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
-				local_power[worker] = starpu_task_bundle_expected_power(bundle, perf_arch,nimpl);
+				local_task_length[worker][nimpl] = starpu_task_bundle_expected_length(bundle, perf_arch, nimpl);
+				local_data_penalty[worker][nimpl] = starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
+				local_power[worker][nimpl] = starpu_task_bundle_expected_power(bundle, perf_arch,nimpl);
 				//_STARPU_DEBUG("Scheduler heft bundle: task length (%lf) local power (%lf) worker (%u) kernel (%u) \n", local_task_length[worker],local_power[worker],worker,nimpl);
 
 			}
 			else {
-				local_task_length[worker] = starpu_task_expected_length(task, perf_arch, nimpl);
-				local_data_penalty[worker] = starpu_task_expected_data_transfer_time(memory_node, task);
-				local_power[worker] = starpu_task_expected_power(task, perf_arch,nimpl);
+				local_task_length[worker][nimpl] = starpu_task_expected_length(task, perf_arch, nimpl);
+				local_data_penalty[worker][nimpl] = starpu_task_expected_data_transfer_time(memory_node, task);
+				local_power[worker][nimpl] = starpu_task_expected_power(task, perf_arch,nimpl);
 				//_STARPU_DEBUG("Scheduler heft: task length (%lf) local power (%lf) worker (%u) kernel (%u) \n", local_task_length[worker],local_power[worker],worker,nimpl);
 
 			}
@@ -265,20 +277,20 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 
 			if (ntasks_best == -1
 				|| (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
-				|| (!calibrating && local_task_length[worker] == -1.0) /* Not calibrating but this worker is being calibrated */
-				|| (calibrating && local_task_length[worker] == -1.0 && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
+				|| (!calibrating && local_task_length[worker][nimpl] == -1.0) /* Not calibrating but this worker is being calibrated */
+				|| (calibrating && local_task_length[worker][nimpl] == -1.0 && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
 				) {
 				ntasks_best_end = ntasks_end;
 				ntasks_best = worker;
 			}
 
-			if (local_task_length[worker] == -1.0)
+			if (local_task_length[worker][nimpl] == -1.0)
 				/* we are calibrating, we want to speed-up calibration time
 				 * so we privilege non-calibrated tasks (but still
 				 * greedily distribute them to avoid dumb schedules) */
 				calibrating = 1;
 
-			if (local_task_length[worker] <= 0.0)
+			if (local_task_length[worker][nimpl] <= 0.0)
 				/* there is no prediction available for that task
 				 * with that arch yet, so switch to a greedy strategy */
 				unknown = 1;
@@ -286,17 +298,17 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 			if (unknown)
 				continue;
 
-			exp_end[worker] = exp_start[worker] + exp_len[worker] + local_task_length[worker];
+			exp_end[worker][nimpl] = exp_start[worker] + exp_len[worker][nimpl] + local_task_length[worker][nimpl];
 
-			if (exp_end[worker] < best_exp_end)
+			if (exp_end[worker][nimpl] < best_exp_end)
 			{
 				/* a better solution was found */
-				best_exp_end = exp_end[worker];
+				best_exp_end = exp_end[worker][nimpl];
 				nimpls[worker] = nimpl;
 			}
 
-			if (local_power[worker] == -1.0)
-				local_power[worker] = 0.;
+			if (local_power[worker][nimpl] == -1.0)
+				local_power[worker][nimpl] = 0.;
 
 		}
 	}
@@ -309,17 +321,18 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 
 static int _heft_push_task(struct starpu_task *task, unsigned prio)
 {
-	unsigned worker;
+	unsigned worker, nimpl;
 	int best = -1;
+	int selected_impl= -1;
 	
 	/* this flag is set if the corresponding worker is selected because
 	   there is no performance prediction available yet */
 	int forced_best;
 
-	double local_task_length[nworkers];
-	double local_data_penalty[nworkers];
-	double local_power[nworkers];
-	double exp_end[nworkers];
+	double local_task_length[nworkers][STARPU_MAXIMPLEMENTATIONS];
+	double local_data_penalty[nworkers][STARPU_MAXIMPLEMENTATIONS];
+	double local_power[nworkers][STARPU_MAXIMPLEMENTATIONS];
+	double exp_end[nworkers][STARPU_MAXIMPLEMENTATIONS];
 	double max_exp_end = 0.0;
 	unsigned int  nimpls[nworkers];
 
@@ -349,33 +362,36 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio)
 	 *	consumption.
 	 */
 	
-	double fitness[nworkers];
+	double fitness[nworkers][STARPU_MAXIMPLEMENTATIONS];
 	double best_fitness = -1;
 
 	for (worker = 0; worker < nworkers; worker++)
 	{
-		if (!starpu_worker_may_execute_task(worker, task, 0))
-		{
-			/* no one on that queue may execute this task */
-			continue;
-		}
+		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++) {
+			if (!starpu_worker_may_execute_task(worker, task, nimpl))
+			{
+				/* no one on that queue may execute this task */
+				continue;
+			}
 
-		fitness[worker] = alpha*(exp_end[worker] - best_exp_end) 
-				+ beta*(local_data_penalty[worker])
-				+ _gamma*(local_power[worker]);
+			fitness[worker][nimpl] = alpha*(exp_end[worker][nimpl] - best_exp_end) 
+						+ beta*(local_data_penalty[worker][nimpl])
+						+ _gamma*(local_power[worker][nimpl]);
 
-		if (exp_end[worker] > max_exp_end) {
-			/* This placement will make the computation
-			 * longer, take into account the idle
-			 * consumption of other cpus */
-			fitness[worker] += _gamma * idle_power * (exp_end[worker] - max_exp_end) / 1000000.0;
-		}
+			if (exp_end[worker][nimpl] > max_exp_end) {
+				/* This placement will make the computation
+				 * longer, take into account the idle
+				 * consumption of other cpus */
+				fitness[worker][nimpl] += _gamma * idle_power * (exp_end[worker][nimpl] - max_exp_end) / 1000000.0;
+			}
 
-		if (best == -1 || fitness[worker] < best_fitness)
-		{
-			/* we found a better solution */
-			best_fitness = fitness[worker];
-			best = worker;
+			if (best == -1 || fitness[worker][nimpl] < best_fitness)
+			{
+				/* we found a better solution */
+				best_fitness = fitness[worker][nimpl];
+				best = worker;
+				selected_impl = nimpl;
+			}
 		}
 	}
 
@@ -408,8 +424,8 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio)
 
 	}
 	else {
-		model_best = local_task_length[best];
-		transfer_model_best = local_data_penalty[best];
+		model_best = local_task_length[best][selected_impl];
+		transfer_model_best = local_data_penalty[best][selected_impl];
 	}