Browse Source

dmda merge

Andra Hugo 12 years ago
parent
commit
7a435cde60
1 changed files with 129 additions and 77 deletions
  1. 129 77
      src/sched_policies/deque_modeling_policy_data_aware.c

+ 129 - 77
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -21,11 +21,21 @@
 
 #include <limits.h>
 
+#include <core/perfmodel/perfmodel.h>
+#include <core/task_bundle.h>
 #include <core/workers.h>
 #include <sched_policies/fifo_queues.h>
 #include <core/perfmodel/perfmodel.h>
 #include <starpu_parameters.h>
 
+#ifndef DBL_MIN
+#define DBL_MIN __DBL_MIN__
+#endif
+
+#ifndef DBL_MAX
+#define DBL_MAX __DBL_MAX__
+#endif
+
 typedef struct {
 	double alpha;
 	double beta;
@@ -208,6 +218,7 @@ static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
 
 
 
+
 static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio, unsigned sched_ctx_id)
 {
 	dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
@@ -353,84 +364,80 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 	return push_task_on_best_worker(task, best, model_best, prio, sched_ctx_id);
 }
 
-static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
+static void compute_all_performance_predictions(struct starpu_task *task,
+					double local_task_length[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
+					double exp_end[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
+					double *max_exp_endp,
+					double *best_exp_endp,
+					double local_data_penalty[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
+					double local_power[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
+					int *forced_worker, int *forced_impl, unsigned sched_ctx_id)
 {
-	dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
-	/* find the queue */
-	struct _starpu_fifo_taskq *fifo;
-	unsigned worker, worker_ctx = 0;
-	int best = -1, best_in_ctx = -1;
-	
-	/* this flag is set if the corresponding worker is selected because
-	   there is no performance prediction available yet */
-	int forced_best = -1;
-
-	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
-
-	unsigned nworkers_ctx = workers->nworkers;
-	double local_task_length[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
-	double local_data_penalty[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
-	double local_power[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
-	double exp_end[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
-	double max_exp_end = 0.0;
-
-	double fitness[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
-
-	double best_exp_end = 10e240;
-	double model_best = 0.0;
-	//double penality_best = 0.0;
-
+	int calibrating = 0;
+	double max_exp_end = DBL_MIN;
+	double best_exp_end = DBL_MAX;
 	int ntasks_best = -1;
+	int nimpl_best = 0;
 	double ntasks_best_end = 0.0;
-	int calibrating = 0;
 
 	/* A priori, we know all estimations */
 	int unknown = 0;
-
-	unsigned best_impl = 0;
-	unsigned nimpl=0;
-
-	if(workers->init_cursor)
-		workers->init_cursor(workers);
-
+	unsigned worker, worker_ctx = 0;
+	
+	unsigned nimpl;
+	
+	starpu_task_bundle_t bundle = task->bundle;
+	dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
+	
+	/* find the queue */
+	struct _starpu_fifo_taskq *fifo;
+	
 	while(workers->has_next(workers))
-        {
-                worker = workers->get_next(workers);
+	{
+		worker = workers->get_next(workers);
+		fifo = dt->queue_array[worker];
 		for(nimpl  = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 	 	{
-			fifo = dt->queue_array[worker];
-
-			/* Sometimes workers didn't take the tasks as early as we expected */
-			fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
-			fifo->exp_end = fifo->exp_start + fifo->exp_len;
-			if (fifo->exp_end > max_exp_end)
-				max_exp_end = fifo->exp_end;
-
 			if (!starpu_worker_can_execute_task(worker, task, nimpl))
 			{
 				/* no one on that queue may execute this task */
 				continue;
 			}
-
+			
+			/* Sometimes workers didn't take the tasks as early as we expected */
+			fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
+			exp_end[worker_ctx][nimpl] = fifo->exp_start + fifo->exp_len;
+			if (exp_end[worker_ctx][nimpl] > max_exp_end)
+				max_exp_end = exp_end[worker_ctx][nimpl];
+			
 			enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
-			local_task_length[worker_ctx][nimpl] = starpu_task_expected_length(task, perf_arch, nimpl);
-
-			//_STARPU_DEBUG("Scheduler dmda: task length (%lf) worker (%u) kernel (%u) \n", local_task_length[worker],worker,nimpl);
-
 			unsigned memory_node = starpu_worker_get_memory_node(worker);
-			local_data_penalty[worker_ctx][nimpl] = starpu_task_expected_data_transfer_time(memory_node, task);
-
+			
+			//_STARPU_DEBUG("Scheduler dmda: task length (%lf) worker (%u) kernel (%u) \n", local_task_length[worker][nimpl],worker,nimpl);
+			
+			if (bundle)
+			{
+				STARPU_ABORT(); /* Not implemented yet. */
+			}
+			else
+			{
+				local_task_length[worker_ctx][nimpl] = starpu_task_expected_length(task, perf_arch, nimpl);
+				local_data_penalty[worker_ctx][nimpl] = starpu_task_expected_data_transfer_time(memory_node, task);
+				local_power[worker_ctx][nimpl] = starpu_task_expected_power(task, perf_arch,nimpl);
+			}
+			
 			double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
-
+			
 			if (ntasks_best == -1
-			    || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
+			    || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better worker */
 			    || (!calibrating && isnan(local_task_length[worker_ctx][nimpl])) /* Not calibrating but this worker is being calibrated */
 			    || (calibrating && isnan(local_task_length[worker_ctx][nimpl]) && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
 				)
 			{
 				ntasks_best_end = ntasks_end;
 				ntasks_best = worker;
-				best_impl = nimpl;
+				nimpl_best = nimpl;
 			}
 
 			if (isnan(local_task_length[worker_ctx][nimpl]))
@@ -438,53 +445,93 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 				 * so we privilege non-calibrated tasks (but still
 				 * greedily distribute them to avoid dumb schedules) */
 				calibrating = 1;
-
+			
 			if (isnan(local_task_length[worker_ctx][nimpl])
 					|| _STARPU_IS_ZERO(local_task_length[worker_ctx][nimpl]))
 				/* there is no prediction available for that task
-				 * with that arch yet, so switch to a greedy strategy */
+				 * with that arch (yet or at all), so switch to a greedy strategy */
 				unknown = 1;
-
+			
 			if (unknown)
 				continue;
-
+			
 			exp_end[worker_ctx][nimpl] = fifo->exp_start + fifo->exp_len + local_task_length[worker_ctx][nimpl];
-
+			
 			if (exp_end[worker_ctx][nimpl] < best_exp_end)
 			{
 				/* a better solution was found */
 				best_exp_end = exp_end[worker_ctx][nimpl];
-				best_impl = nimpl;
+				nimpl_best = nimpl;
 			}
-
-			local_power[worker_ctx][nimpl] = starpu_task_expected_power(task, perf_arch, nimpl);
+			
 			if (isnan(local_power[worker_ctx][nimpl]))
 				local_power[worker_ctx][nimpl] = 0.;
-
-		 }
+			
+		}
 		worker_ctx++;
 	}
 
-	if (unknown)
-		forced_best = ntasks_best;
+	*forced_worker = unknown?ntasks_best:-1;
+	*forced_impl = unknown?nimpl_best:-1;
+
+	*best_exp_endp = best_exp_end;
+	*max_exp_endp = max_exp_end;
+}
+
+static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
+{
+	/* find the queue */
+	unsigned worker, worker_ctx = 0;
+	int best = -1, best_in_ctx = -1;
+	int selected_impl = 0;
+	double model_best = 0.0;
+
+	/* this flag is set if the corresponding worker is selected because
+	   there is no performance prediction available yet */
+	int forced_best = -1;
+	int forced_impl = -1;
+
+	dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
+	struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
+	unsigned nworkers_ctx = workers->nworkers;
+	double local_task_length[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
+	double local_data_penalty[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
+	double local_power[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
+	double exp_end[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
+	double max_exp_end = 0.0;
+	double best_exp_end;
+
+	double fitness[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
+
+	if(workers->init_cursor)
+		workers->init_cursor(workers);
+
+	compute_all_performance_predictions(task,
+										local_task_length,
+										exp_end,
+										&max_exp_end,
+										&best_exp_end,
+										local_data_penalty,
+										local_power,
+										&forced_best,
+										&forced_impl, sched_ctx_id);
 
 	double best_fitness = -1;
-	
-	worker_ctx = 0;
+
+	unsigned nimpl;
 	if (forced_best == -1)
 	{
 		while(workers->has_next(workers))
 		{
 			worker = workers->get_next(workers);
 			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
-			{	
+			{
 				if (!starpu_worker_can_execute_task(worker, task, nimpl))
 				{
 					/* no one on that queue may execute this task */
 					continue;
 				}
 				
-				fifo = dt->queue_array[worker];
 				
 				fitness[worker_ctx][nimpl] = dt->alpha*(exp_end[worker_ctx][nimpl] - best_exp_end) 
 					+ dt->beta*(local_data_penalty[worker_ctx][nimpl])
@@ -502,14 +549,14 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 					best_fitness = fitness[worker_ctx][nimpl];
 					best = worker;
 					best_in_ctx = worker_ctx;
-					best_impl = nimpl;
-					
-					//			_STARPU_DEBUG("best fitness (worker %d) %e = alpha*(%e) + beta(%e) +gamma(%e)\n", worker, best_fitness, exp_end[worker][nimpl] - best_exp_end, local_data_penalty[worker][nimpl], local_power[worker][nimpl]);
+					selected_impl = nimpl;
+
+					//_STARPU_DEBUG("best fitness (worker %d) %e = alpha*(%e) + beta(%e) +gamma(%e)\n", worker, best_fitness, exp_end[worker][nimpl] - best_exp_end, local_data_penalty[worker][nimpl], local_power[worker][nimpl]);
 				}
 			}
 		}
 	}
-		
+
 	STARPU_ASSERT(forced_best != -1 || best != -1);
 
 	if (forced_best != -1)
@@ -523,7 +570,7 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	}
 	else
 	{
-		model_best = local_task_length[best_in_ctx][best_impl];
+		model_best = local_task_length[best_in_ctx][selected_impl];
 		//penality_best = local_data_penalty[best_in_ctx][best_impl];
 	}
 
@@ -531,7 +578,7 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
                 workers->deinit_cursor(workers);
 
 	//_STARPU_DEBUG("Scheduler dmda: kernel (%u)\n", best_impl);
-	 _starpu_get_job_associated_to_task(task)->nimpl = best_impl;
+	 _starpu_get_job_associated_to_task(task)->nimpl = selected_impl;
 
 	/* we should now have the best worker in variable "best" */
 	return push_task_on_best_worker(task, best, model_best, prio, sched_ctx_id);
@@ -651,7 +698,12 @@ static void initialize_dmda_policy(unsigned sched_ctx_id)
 
 	const char *strval_gamma = getenv("STARPU_SCHED_GAMMA");
 	if (strval_gamma)
-		dt->_gamma = atof(strval_gamma);
+		dt->_gamma = atof(strval_gamma);	
+
+	const char *strval_idle_power = getenv("STARPU_IDLE_POWER");
+	if (strval_idle_power)
+		dt->idle_power = atof(strval_idle_power);
+
 }
 
 static void initialize_dmda_sorted_policy(unsigned sched_ctx_id)