Browse Source

- Factorize code in the HEFT strategy
- Export starpu_data_expected_transfer_time which computes the transfer time
for a given data handle
- Rename starpu_data_expected_penalty into starpu_task_expected_data_transfer_time

Cédric Augonnet 14 years ago
parent
commit
e1fe20fbfc

+ 4 - 2
include/starpu_scheduler.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010  Université de Bordeaux 1
+ * Copyright (C) 2010, 2011  Université de Bordeaux 1
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -171,7 +171,9 @@ double starpu_task_expected_length(struct starpu_task *task, enum starpu_perf_ar
 /* Returns an estimated speedup factor relative to CPU speed */
 double starpu_worker_get_relative_speedup(enum starpu_perf_archtype perf_archtype);
 /* Returns expected data transfer time in µs */
-double starpu_data_expected_penalty(uint32_t memory_node, struct starpu_task *task);
+double starpu_task_expected_data_transfer_time(uint32_t memory_node, struct starpu_task *task);
+/* Predict the transfer time (in µs) to move a handle to a memory node */
+double starpu_data_expected_transfer_time(starpu_data_handle handle, unsigned memory_node, starpu_access_mode mode);
 /* Returns expected power consumption in J */
 double starpu_task_expected_power(struct starpu_task *task, enum starpu_perf_archtype arch);
 

+ 25 - 18
src/core/perfmodel/perfmodel.c

@@ -173,8 +173,31 @@ double starpu_task_expected_power(struct starpu_task *task, enum starpu_perf_arc
 	return starpu_model_expected_perf(task, task->cl->power_model, arch);
 }
 
+/* Predict the transfer time (in µs) to move a handle to a memory node */
+double starpu_data_expected_transfer_time(starpu_data_handle handle, unsigned memory_node, starpu_access_mode mode)
+{
+	/* If we don't need to read the content of the handle */
+	if (!mode & STARPU_R)
+		return 0.0;
+	
+	if (_starpu_is_data_present_or_requested(handle, memory_node))
+		return 0.0;
+
+	size_t size = _starpu_data_get_size(handle);
+
+	/* XXX in case we have an abstract piece of data (eg.  with the
+	 * void interface, this does not introduce any overhead, and we
+	 * don't even want to consider the latency that is not
+	 * relevant). */
+	if (size == 0)
+		return 0.0;
+
+	uint32_t src_node = _starpu_select_src_node(handle);
+	return _starpu_predict_transfer_time(src_node, memory_node, size);
+}
+
 /* Data transfer performance modeling */
-double starpu_data_expected_penalty(uint32_t memory_node, struct starpu_task *task)
+double starpu_task_expected_data_transfer_time(uint32_t memory_node, struct starpu_task *task)
 {
 	unsigned nbuffers = task->cl->nbuffers;
 	unsigned buffer;
@@ -184,25 +207,9 @@ double starpu_data_expected_penalty(uint32_t memory_node, struct starpu_task *ta
 	for (buffer = 0; buffer < nbuffers; buffer++)
 	{
 		starpu_data_handle handle = task->buffers[buffer].handle;
-
 		starpu_access_mode mode = task->buffers[buffer].mode;
 
-		if ((mode == STARPU_W) || (mode == STARPU_SCRATCH))
-			continue;
-
-		if (!_starpu_is_data_present_or_requested(handle, memory_node))
-		{
-			size_t size = _starpu_data_get_size(handle);
-
-			uint32_t src_node = _starpu_select_src_node(handle);
-
-			/* XXX in case we have an abstract piece of data (eg.
-			 * with the void interface, this does not introduce any
-			 * overhead, and we don't even want to consider the
-			 * latency that is not relevant). */
-			if (size > 0)
-				penalty += _starpu_predict_transfer_time(src_node, memory_node, size);
-		}
+		penalty += starpu_data_expected_transfer_time(handle, memory_node, mode);
 	}
 
 	return penalty;

+ 1 - 1
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -428,7 +428,7 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio)
 		local_task_length[worker] = starpu_task_expected_length(task, perf_arch);
 
 		unsigned memory_node = starpu_worker_get_memory_node(worker);
-		local_data_penalty[worker] = starpu_data_expected_penalty(memory_node, task);
+		local_data_penalty[worker] = starpu_task_expected_data_transfer_time(memory_node, task);
 
 		double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
 

+ 76 - 70
src/sched_policies/heft.c

@@ -17,6 +17,8 @@
 
 /* Distributed queues using performance modeling to assign tasks */
 
+#include <float.h>
+
 #include <core/workers.h>
 #include <core/perfmodel/perfmodel.h>
 #include <starpu_parameters.h>
@@ -103,6 +105,7 @@ static void heft_push_task_notify(struct starpu_task *task, int workerid)
 	/* If there is no prediction available, we consider the task has a null length */
 	if (predicted != -1.0)
 	{
+		task->predicted = predicted;
 		exp_end[workerid] += predicted;
 		exp_len[workerid] += predicted;
 	}
@@ -134,39 +137,22 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	return starpu_push_local_task(best_workerid, task, prio);
 }
 
-static int _heft_push_task(struct starpu_task *task, unsigned prio)
+static void compute_all_performance_predictions(struct starpu_task *task,
+					double *local_task_length, double *exp_end,
+					double *max_exp_endp, double *best_exp_endp,
+					double *local_data_penalty,
+					double *local_power, int *forced_best)
 {
-	unsigned worker;
-	int best = -1;
-	
-	/* this flag is set if the corresponding worker is selected because
-	   there is no performance prediction available yet */
-	int forced_best = -1;
-
-	double local_task_length[nworkers];
-	double local_data_penalty[nworkers];
-	double local_power[nworkers];
-	double exp_end[nworkers];
-	double max_exp_end = 0.0;
-
-	double fitness[nworkers];
-
-	double best_exp_end = 10e240;
-	double model_best = 0.0;
-	double penality_best = 0.0;
-
+	int calibrating = 0;
+	double max_exp_end = DBL_MIN;
+	double best_exp_end = DBL_MAX;
 	int ntasks_best = -1;
 	double ntasks_best_end = 0.0;
-	int calibrating = 0;
 
 	/* A priori, we know all estimations */
 	int unknown = 0;
 
-	/*
-	 *	Compute the expected end of the task on the various workers,
-	 *	and detect if there is some calibration that needs to be done.
-	 */
-
+	unsigned worker;
 	for (worker = 0; worker < nworkers; worker++)
 	{
 		/* Sometimes workers didn't take the tasks as early as we expected */
@@ -185,7 +171,7 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio)
 		local_task_length[worker] = starpu_task_expected_length(task, perf_arch);
 
 		unsigned memory_node = starpu_worker_get_memory_node(worker);
-		local_data_penalty[worker] = starpu_data_expected_penalty(memory_node, task);
+		local_data_penalty[worker] = starpu_task_expected_data_transfer_time(memory_node, task);
 
 		double ntasks_end = ntasks[worker] / starpu_worker_get_relative_speedup(perf_arch);
 
@@ -225,10 +211,43 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio)
 			local_power[worker] = 0.;
 	}
 
-	if (unknown)
-		forced_best = ntasks_best;
+	*forced_best = unknown?ntasks_best:-1;
 
-	double best_fitness = -1;
+	*best_exp_endp = best_exp_end;
+	*max_exp_endp = max_exp_end;
+}
+
+static int _heft_push_task(struct starpu_task *task, unsigned prio)
+{
+	unsigned worker;
+	int best = -1;
+	
+	/* this flag is set if the corresponding worker is selected because
+	   there is no performance prediction available yet */
+	int forced_best;
+
+	double local_task_length[nworkers];
+	double local_data_penalty[nworkers];
+	double local_power[nworkers];
+	double exp_end[nworkers];
+	double max_exp_end = 0.0;
+
+	double best_exp_end;
+
+	/*
+	 *	Compute the expected end of the task on the various workers,
+	 *	and detect if there is some calibration that needs to be done.
+	 */
+
+	compute_all_performance_predictions(task, local_task_length, exp_end,
+					&max_exp_end, &best_exp_end,
+					local_data_penalty,
+					local_power, &forced_best);
+
+	/* If there is no prediction available for that task with that arch we
+	 * want to speed-up calibration time so we force this measurement */
+	if (forced_best != -1)
+		return push_task_on_best_worker(task, forced_best, 0.0, prio);
 
 	/*
 	 *	Determine which worker optimizes the fitness metric which is a
@@ -236,53 +255,40 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio)
 	 *	consumption.
 	 */
 	
-	if (forced_best == -1)
+	double fitness[nworkers];
+	double best_fitness = -1;
+
+	for (worker = 0; worker < nworkers; worker++)
 	{
-		for (worker = 0; worker < nworkers; worker++)
+		if (!starpu_worker_may_execute_task(worker, task))
 		{
-			if (!starpu_worker_may_execute_task(worker, task))
-			{
-				/* no one on that queue may execute this task */
-				continue;
-			}
-	
-			fitness[worker] = alpha*(exp_end[worker] - best_exp_end) 
-					+ beta*(local_data_penalty[worker])
-					+ _gamma*(local_power[worker]);
-
-			if (exp_end[worker] > max_exp_end)
-				/* This placement will make the computation
-				 * longer, take into account the idle
-				 * consumption of other cpus */
-				fitness[worker] += _gamma * idle_power * (exp_end[worker] - max_exp_end) / 1000000.0;
-
-			if (best == -1 || fitness[worker] < best_fitness)
-			{
-				/* we found a better solution */
-				best_fitness = fitness[worker];
-				best = worker;
-			}
+			/* no one on that queue may execute this task */
+			continue;
 		}
-	}
 
-	STARPU_ASSERT(forced_best != -1 || best != -1);
-	
-	if (forced_best != -1)
-	{
-		/* there is no prediction available for that task
-		 * with that arch we want to speed-up calibration time
-		 * so we force this measurement */
-		best = forced_best;
-		model_best = 0.0;
-		penality_best = 0.0;
-	}
-	else 
-	{
-		model_best = local_task_length[best];
-		penality_best = local_data_penalty[best];
+		fitness[worker] = alpha*(exp_end[worker] - best_exp_end) 
+				+ beta*(local_data_penalty[worker])
+				+ _gamma*(local_power[worker]);
+
+		if (exp_end[worker] > max_exp_end)
+			/* This placement will make the computation
+			 * longer, take into account the idle
+			 * consumption of other cpus */
+			fitness[worker] += _gamma * idle_power * (exp_end[worker] - max_exp_end) / 1000000.0;
+
+		if (best == -1 || fitness[worker] < best_fitness)
+		{
+			/* we found a better solution */
+			best_fitness = fitness[worker];
+			best = worker;
+		}
 	}
 
+	/* By now, we must have found a solution */
+	STARPU_ASSERT(best != -1);
+	
 	/* we should now have the best worker in variable "best" */
+	double model_best = local_task_length[best];
 	return push_task_on_best_worker(task, best, model_best, prio);
 }
 

+ 1 - 1
src/sched_policies/parallel_heft.c

@@ -273,7 +273,7 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
 		local_task_length[worker] = starpu_task_expected_length(task, perf_arch);
 
 		unsigned memory_node = starpu_worker_get_memory_node(worker);
-		local_data_penalty[worker] = starpu_data_expected_penalty(memory_node, task);
+		local_data_penalty[worker] = starpu_task_expected_data_transfer_time(memory_node, task);
 
 		double ntasks_end = compute_ntasks_end(worker);