Procházet zdrojové kódy

src/sched_policies/deque_modeling_policy_data_aware.c: add a compute_all_performance_predictions() function.

It is almost the same as the compute_all_performance_predictions() function
defined in heft.c (it does not support task bundles yet, though).

It is used by dmda and dmdas.
Cyril Roelandt před 13 roky
rodič
revize
63a92fffd9
1 změnil soubory, kde provedl 93 přidání a 48 odebrání
  1. 93 48
      src/sched_policies/deque_modeling_policy_data_aware.c

+ 93 - 48
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -20,11 +20,21 @@
 
 #include <limits.h>
 
+#include <core/perfmodel/perfmodel.h>
+#include <core/task_bundle.h>
 #include <core/workers.h>
 #include <sched_policies/fifo_queues.h>
 #include <core/perfmodel/perfmodel.h>
 #include <starpu_parameters.h>
 
+#ifndef DBL_MIN
+#define DBL_MIN __DBL_MIN__
+#endif
+
+#ifndef DBL_MAX
+#define DBL_MAX __DBL_MAX__
+#endif
+
 static unsigned nworkers;
 static struct _starpu_fifo_taskq *queue_array[STARPU_NMAXWORKERS];
 
@@ -328,76 +338,75 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio)
 	return push_task_on_best_worker(task, best, model_best, prio);
 }
 
-static int _dmda_push_task(struct starpu_task *task, unsigned prio)
+static void compute_all_performance_predictions(struct starpu_task *task,
+					double local_task_length[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
+					double exp_end[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
+					double *max_exp_endp,
+					double *best_exp_endp,
+					double local_data_penalty[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
+					double local_power[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
+					int *forced_worker, int *forced_impl)
 {
-	/* find the queue */
-	struct _starpu_fifo_taskq *fifo;
-	unsigned worker;
-	int best = -1;
-
-	/* this flag is set if the corresponding worker is selected because
-	   there is no performance prediction available yet */
-	int forced_best = -1;
-
-	double local_task_length[nworkers][STARPU_MAXIMPLEMENTATIONS];
-	double local_data_penalty[nworkers][STARPU_MAXIMPLEMENTATIONS];
-	double local_power[nworkers][STARPU_MAXIMPLEMENTATIONS];
-	double exp_end[nworkers][STARPU_MAXIMPLEMENTATIONS];
-	double max_exp_end = 0.0;
-
-	double fitness[nworkers][STARPU_MAXIMPLEMENTATIONS];
-
-	double best_exp_end = 10e240;
-	double model_best = 0.0;
-	//double penality_best = 0.0;
-
+	int calibrating = 0;
+	double max_exp_end = DBL_MIN;
+	double best_exp_end = DBL_MAX;
 	int ntasks_best = -1;
+	int nimpl_best = 0;
 	double ntasks_best_end = 0.0;
-	int calibrating = 0;
 
 	/* A priori, we know all estimations */
 	int unknown = 0;
+	unsigned worker;
 
-	unsigned best_impl = 0;
 	unsigned nimpl;
 
+	starpu_task_bundle_t bundle = task->bundle;
+
 	for (worker = 0; worker < nworkers; worker++)
 	{
+		struct _starpu_fifo_taskq *fifo = queue_array[worker];
+
 		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 		{
-			fifo = queue_array[worker];
-
-			/* Sometimes workers didn't take the tasks as early as we expected */
-			fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
-			fifo->exp_end = fifo->exp_start + fifo->exp_len;
-			if (fifo->exp_end > max_exp_end)
-				max_exp_end = fifo->exp_end;
-
 			if (!starpu_worker_can_execute_task(worker, task, nimpl))
 			{
 				/* no one on that queue may execute this task */
 				continue;
 			}
 
+			/* Sometimes workers didn't take the tasks as early as we expected */
+			fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
+			exp_end[worker][nimpl] = fifo->exp_start + fifo->exp_len;
+			if (exp_end[worker][nimpl] > max_exp_end)
+				max_exp_end = exp_end[worker][nimpl];
+
 			enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
-			local_task_length[worker][nimpl] = starpu_task_expected_length(task, perf_arch, nimpl);
+			unsigned memory_node = starpu_worker_get_memory_node(worker);
 
 			//_STARPU_DEBUG("Scheduler dmda: task length (%lf) worker (%u) kernel (%u) \n", local_task_length[worker][nimpl],worker,nimpl);
 
-			unsigned memory_node = starpu_worker_get_memory_node(worker);
-			local_data_penalty[worker][nimpl] = starpu_task_expected_data_transfer_time(memory_node, task);
+			if (bundle)
+			{
+				STARPU_ABORT(); /* Not implemented yet. */
+			}
+			else
+			{
+				local_task_length[worker][nimpl] = starpu_task_expected_length(task, perf_arch, nimpl);
+				local_data_penalty[worker][nimpl] = starpu_task_expected_data_transfer_time(memory_node, task);
+				local_power[worker][nimpl] = starpu_task_expected_power(task, perf_arch,nimpl);
+			}
 
 			double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
 
 			if (ntasks_best == -1
-			    || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
+			    || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better worker */
 			    || (!calibrating && isnan(local_task_length[worker][nimpl])) /* Not calibrating but this worker is being calibrated */
 			    || (calibrating && isnan(local_task_length[worker][nimpl]) && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
 				)
 			{
 				ntasks_best_end = ntasks_end;
 				ntasks_best = worker;
-				best_impl = nimpl;
+				nimpl_best = nimpl;
 			}
 
 			if (isnan(local_task_length[worker][nimpl]))
@@ -407,13 +416,13 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
 				calibrating = 1;
 
 			if (isnan(local_task_length[worker][nimpl])
-					|| _STARPU_IS_ZERO(local_task_length[worker][nimpl]))
+				|| _STARPU_IS_ZERO(local_task_length[worker][nimpl]))
 				/* there is no prediction available for that task
-				 * with that arch yet, so switch to a greedy strategy */
+				 * with that arch (yet or at all), so switch to a greedy strategy */
 				unknown = 1;
 
 			if (unknown)
-					continue;
+				continue;
 
 			exp_end[worker][nimpl] = fifo->exp_start + fifo->exp_len + local_task_length[worker][nimpl];
 
@@ -421,21 +430,57 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
 			{
 				/* a better solution was found */
 				best_exp_end = exp_end[worker][nimpl];
-				best_impl = nimpl;
+				nimpl_best = nimpl;
 			}
 
-			local_power[worker][nimpl] = starpu_task_expected_power(task, perf_arch, nimpl);
 			if (isnan(local_power[worker][nimpl]))
 				local_power[worker][nimpl] = 0.;
 
-		 }
+		}
 	}
 
-	if (unknown)
-		forced_best = ntasks_best;
+	*forced_worker = unknown?ntasks_best:-1;
+	*forced_impl = unknown?nimpl_best:-1;
+
+	*best_exp_endp = best_exp_end;
+	*max_exp_endp = max_exp_end;
+}
+
+static int _dmda_push_task(struct starpu_task *task, unsigned prio)
+{
+	/* find the queue */
+	unsigned worker;
+	int best = -1;
+	int selected_impl = 0;
+	double model_best = 0.0;
+
+	/* this flag is set if the corresponding worker is selected because
+	   there is no performance prediction available yet */
+	int forced_best = -1;
+	int forced_impl = -1;
+
+	double local_task_length[nworkers][STARPU_MAXIMPLEMENTATIONS];
+	double local_data_penalty[nworkers][STARPU_MAXIMPLEMENTATIONS];
+	double local_power[nworkers][STARPU_MAXIMPLEMENTATIONS];
+	double exp_end[nworkers][STARPU_MAXIMPLEMENTATIONS];
+	double max_exp_end = 0.0;
+	double best_exp_end;
+
+	double fitness[nworkers][STARPU_MAXIMPLEMENTATIONS];
+
+	compute_all_performance_predictions(task,
+		local_task_length,
+		exp_end,
+		&max_exp_end,
+		&best_exp_end,
+		local_data_penalty,
+		local_power,
+		&forced_best,
+		&forced_impl);
 
 	double best_fitness = -1;
 
+	unsigned nimpl;
 	if (forced_best == -1)
 	{
 		for (worker = 0; worker < nworkers; worker++)
@@ -464,7 +509,7 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
 				/* we found a better solution */
 				best_fitness = fitness[worker][nimpl];
 				best = worker;
-				best_impl = nimpl;
+				selected_impl = nimpl;
 
 				//			_STARPU_DEBUG("best fitness (worker %d) %e = alpha*(%e) + beta(%e) +gamma(%e)\n", worker, best_fitness, exp_end[worker][nimpl] - best_exp_end, local_data_penalty[worker][nimpl], local_power[worker][nimpl]);
 			}
@@ -484,13 +529,13 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
 	}
 	else
 	{
-		model_best = local_task_length[best][best_impl];
+		model_best = local_task_length[best][selected_impl];
 		//penality_best = local_data_penalty[best][best_impl];
 	}
 
 
 	//_STARPU_DEBUG("Scheduler dmda: kernel (%u)\n", best_impl);
-	 _starpu_get_job_associated_to_task(task)->nimpl = best_impl;
+	 _starpu_get_job_associated_to_task(task)->nimpl = selected_impl;
 
 	/* we should now have the best worker in variable "best" */
 	return push_task_on_best_worker(task, best, model_best, prio);