Просмотр исходного кода

new approch for predictions propagations

Simon Archipoff лет назад: 12
Родитель
Сommit
c41dd1d064

+ 1 - 1
src/Makefile.am

@@ -221,11 +221,11 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
 	top/starpu_top_message_queue.c				\
 	top/starpu_top_connection.c                          	\
 	worker_collection/worker_list.c				\
+	sched_policies/node_worker.c				\
 	sched_policies/node_sched.c				\
 	sched_policies/node_eager.c				\
 	sched_policies/node_random.c				\
 	sched_policies/node_work_stealing.c			\
-	sched_policies/node_worker.c				\
 	sched_policies/node_fifo.c 				\
 	sched_policies/node_heft.c
 if STARPU_USE_CPU

+ 32 - 5
src/sched_policies/node_fifo.c

@@ -4,8 +4,34 @@
 
 static double estimated_finish_time(struct _starpu_sched_node * node)
 {
+	STARPU_PTHREAD_RWLOCK_RDLOCK(&node->mutex);
 	struct _starpu_fifo_taskq * fifo = node->data;
-	return fifo->exp_end;
+	double d = fifo->exp_end;
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
+	return d;
+}
+
+
+
+static struct _starpu_task_execute_preds estimated_execute_preds(struct _starpu_sched_node * node,
+								 struct starpu_task * task)
+{
+	if(node->nchilds == 0)
+	{
+		struct _starpu_task_execute_preds p = { CANNOT_EXECUTE };
+		return p;
+	}
+	
+	struct _starpu_task_execute_preds preds = node->childs[0]->estimated_execute_preds(node->childs[0],task);
+
+	struct _starpu_fifo_taskq * fifo = node->data;
+
+	if(preds.state == PERF_MODEL)
+		preds.expected_finish_time = _starpu_compute_expected_time(starpu_timing_now(),
+									   preds.expected_finish_time + fifo->exp_end,
+									   preds.expected_length + fifo->exp_len,
+									   preds.expected_transfer_length);
+	return preds;
 }
 
 static double estimated_load(struct _starpu_sched_node * node)
@@ -26,12 +52,13 @@ static double estimated_load(struct _starpu_sched_node * node)
 	}
 	return load;
 }
+
 static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
 {
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
 	struct _starpu_fifo_taskq * fifo = node->data;
 	int ret = _starpu_fifo_push_sorted_task(fifo, task);
-	fifo->exp_end += task->predicted;
+	fifo->exp_end += task->predicted/node->nworkers;
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 	node->available(node);
 	return ret;
@@ -55,7 +82,7 @@ static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned
 
 int _starpu_sched_node_is_fifo(struct _starpu_sched_node * node)
 {
-	return node->estimated_finish_time == estimated_finish_time
+	return 0//node->estimated_execute_preds == estimated_execute_preds
 		|| node->estimated_load == estimated_load
 		|| node->push_task == node->push_task
 		|| node->pop_task == node->pop_task;
@@ -65,7 +92,7 @@ struct _starpu_sched_node * _starpu_sched_node_fifo_create(void)
 {
 	struct _starpu_sched_node * node = _starpu_sched_node_create();
 	node->data = _starpu_create_fifo();
-	node->estimated_finish_time = estimated_finish_time;
+	node->estimated_execute_preds = estimated_execute_preds;
 	node->estimated_load = estimated_load;
 	node->push_task = push_task;
 	node->pop_task = pop_task;
@@ -75,6 +102,6 @@ struct _starpu_sched_node * _starpu_sched_node_fifo_create(void)
 
 struct _starpu_fifo_taskq *  _starpu_sched_node_fifo_get_fifo(struct _starpu_sched_node * node)
 {
-	STARPU_ASSERT(node->push_task == push_task);
+	STARPU_ASSERT(_starpu_sched_node_is_fifo(node));
 	return node->data;
 }

+ 12 - 11
src/sched_policies/node_heft.c

@@ -18,35 +18,36 @@ struct _starpu_dmda_data
 static double compute_fitness_calibration(struct _starpu_sched_node * child,
 					  struct _starpu_dmda_data * data STARPU_ATTRIBUTE_UNUSED,
 					  struct starpu_task * task STARPU_ATTRIBUTE_UNUSED,
-					  struct _starpu_execute_pred *pred)
+					  struct _starpu_task_execute_preds *pred)
 {
 	if(pred->state == CALIBRATING)
 		return child->estimated_load(child);
 	return DBL_MAX;
 }
 
-static double compute_fitness_perf_model(struct _starpu_sched_node * child,
+static double compute_fitness_perf_model(struct _starpu_sched_node * child STARPU_ATTRIBUTE_UNUSED,
 					 struct _starpu_dmda_data * data,
-					 struct starpu_task * task,
-					 struct _starpu_execute_pred * pred)
+					 struct starpu_task * task STARPU_ATTRIBUTE_UNUSED,
+					 struct _starpu_task_execute_preds * preds)
 {
-	if(pred->state == CANNOT_EXECUTE)
+	if(preds->state == CANNOT_EXECUTE)
 		return DBL_MAX;
-	return data->alpha * pred->expected_length
-		+ data->beta * child->estimated_transfer_length(child, task);
+	return data->alpha * preds->expected_length
+		+ data->beta * preds->expected_transfer_length
+		+ data->gamma * preds->expected_power;
 }
 
 static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
 {
 	STARPU_PTHREAD_RWLOCK_RDLOCK(&node->mutex);
-	struct _starpu_execute_pred preds[node->nchilds];
+	struct _starpu_task_execute_preds preds[node->nchilds];
 	int i;
 	int calibrating = 0;
 	int perf_model = 0;
 	int can_execute = 0;
 	for(i = 0; i < node->nchilds; i++)
 	{
-		preds[i] = node->childs[i]->estimated_execute_length(node->childs[i], task);
+		preds[i] = node->childs[i]->estimated_execute_preds(node->childs[i], task);
 		switch(preds[i].state)
 		{
 		case PERF_MODEL:
@@ -81,7 +82,7 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 	double (*fitness_fun)(struct _starpu_sched_node *,
 			      struct _starpu_dmda_data *,
 			      struct starpu_task *,
-			      struct _starpu_execute_pred*) = compute_fitness_perf_model;
+			      struct _starpu_task_execute_preds*) = compute_fitness_perf_model;
 
 	if(calibrating)
 		fitness_fun = compute_fitness_calibration;
@@ -106,7 +107,7 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 
 	starpu_task_set_implementation(task, preds[index_best_fitness].impl);
 	task->predicted = preds[index_best_fitness].expected_length;
-	task->predicted_transfer = c->estimated_transfer_length(c,task);
+	task->predicted_transfer = preds[index_best_fitness].expected_transfer_length;
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 	return c->push_task(c, task);
 }

+ 54 - 13
src/sched_policies/node_sched.c

@@ -2,6 +2,33 @@
 #include <core/workers.h>
 #include "node_sched.h"
 
+double _starpu_compute_expected_time(double now, double predicted_end, double predicted_length, double predicted_transfer)
+{
+
+	if (now + predicted_transfer < predicted_end)
+	{
+		/* We may hope that the transfer will be finished by
+		 * the start of the task. */
+		predicted_transfer = 0;
+	}
+	else
+	{
+		/* The transfer will not be finished by then, take the
+		 * remainder into account */
+		predicted_transfer += now;
+		predicted_transfer -= predicted_end;
+	}
+	if(!isnan(predicted_transfer)) 
+	{
+		predicted_end += predicted_transfer;
+		predicted_length += predicted_transfer;
+	}
+
+	if(!isnan(predicted_length))
+		predicted_end += predicted_length;
+	return predicted_end;
+}
+
 static void available(struct _starpu_sched_node * node)
 {
 	int i;
@@ -132,7 +159,7 @@ struct starpu_task * _starpu_tree_pop_task(unsigned sched_ctx_id)
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&tree->mutex);
 	return task;
 }
-
+/*
 static double estimated_finish_time(struct _starpu_sched_node * node)
 {
 	double sum = 0.0;
@@ -146,7 +173,7 @@ static double estimated_finish_time(struct _starpu_sched_node * node)
 	}
 	return sum;
 }
-
+*/
 static double estimated_load(struct _starpu_sched_node * node)
 {
 	double sum = 0.0;
@@ -159,15 +186,25 @@ static double estimated_load(struct _starpu_sched_node * node)
 	return sum;
 }
 
-static struct _starpu_execute_pred estimated_execute_length(struct _starpu_sched_node * node, struct starpu_task * task)
+
+static struct _starpu_task_execute_preds estimated_execute_preds(struct _starpu_sched_node * node, struct starpu_task * task)
 {
 	if(node->is_homogeneous)
-		return node->childs[0]->estimated_execute_length(node->childs[0], task);
-	struct _starpu_execute_pred pred = { .state = CANNOT_EXECUTE, .expected_length = 0.0 };
-	int i, nb = 0;
+		return node->childs[0]->estimated_execute_preds(node->childs[0], task);
+	struct _starpu_task_execute_preds pred =
+		{ 
+			.state = CANNOT_EXECUTE,
+			.expected_length = 0.0,
+			.expected_finish_time = 0.0,
+			.expected_transfer_length = 0.0,
+			.expected_power = 0.0
+			
+		};
+	int nb = 0;
+	int i;
 	for(i = 0; i < node->nchilds; i++)
 	{
-		struct _starpu_execute_pred tmp = node->childs[i]->estimated_execute_length(node->childs[i], task);
+		struct _starpu_task_execute_preds tmp = node->childs[i]->estimated_execute_preds(node->childs[i], task);
 		switch(tmp.state)
 		{
 		case CALIBRATING:
@@ -175,11 +212,14 @@ static struct _starpu_execute_pred estimated_execute_length(struct _starpu_sched
 			break;
 		case NO_PERF_MODEL:
 			if(pred.state == CANNOT_EXECUTE)
-				pred.state = NO_PERF_MODEL;
+				pred = tmp;
 			break;
 		case PERF_MODEL:
 			nb++;
 			pred.expected_length += tmp.expected_length;
+			pred.expected_finish_time += tmp.expected_finish_time;
+			pred.expected_transfer_length += tmp.expected_transfer_length;
+			pred.expected_power += tmp.expected_power;
 			pred.state = PERF_MODEL;
 			break;
 		case CANNOT_EXECUTE:
@@ -187,9 +227,12 @@ static struct _starpu_execute_pred estimated_execute_length(struct _starpu_sched
 		}
 	}
 	pred.expected_length /= nb;
+	pred.expected_finish_time /= nb;
+	pred.expected_transfer_length /= nb;
+	pred.expected_power /= nb;
 	return pred;
 }
-
+/*
 static double estimated_transfer_length(struct _starpu_sched_node * node, struct starpu_task * task)
 {
 	double sum = 0.0;
@@ -206,7 +249,7 @@ static double estimated_transfer_length(struct _starpu_sched_node * node, struct
 	sum /= nb;
 	return sum;
 }
-
+*/
 int _starpu_sched_node_can_execute_task(struct _starpu_sched_node * node, struct starpu_task * task)
 {
 	unsigned nimpl;
@@ -240,10 +283,8 @@ struct _starpu_sched_node * _starpu_sched_node_create(void)
 	STARPU_PTHREAD_RWLOCK_INIT(&node->mutex,NULL);
 	node->available = available;
 	node->pop_task = pop_task_node;
-	node->estimated_finish_time = estimated_finish_time;
 	node->estimated_load = estimated_load;
-	node->estimated_transfer_length = estimated_transfer_length;
-	node->estimated_execute_length = estimated_execute_length;
+	node->estimated_execute_preds = estimated_execute_preds;
 	node->destroy_node = _starpu_sched_node_destroy;
 	node->add_child = _starpu_sched_node_add_child;
 	node->remove_child = _starpu_sched_node_remove_child;

+ 21 - 20
src/sched_policies/node_sched.h

@@ -8,26 +8,13 @@ struct _starpu_sched_node
 	struct starpu_task * (*pop_task)(struct _starpu_sched_node *,
 					 unsigned sched_ctx_id);
 	void (*available)(struct _starpu_sched_node *);
-
-	/*this function only consider tasks that have a pref model, others does not count
-	 * note that pushing a task not necessarily increase estimated finish time
-	 */
-	double (*estimated_finish_time)(struct _starpu_sched_node * node);
-	/* this function is an heuritic compute subtree's load.
-	 * the computation is based on number of tasks and relative speedup of processing units
-	 * more revelant than estimated_finish_time() when no perf model are available
-	 */
+	
+	/* this function is an heuristic that compute load of subtree */
 	double (*estimated_load)(struct _starpu_sched_node * node);
 
-	//return the average of transfer length for all subtree workers
-	double (*estimated_transfer_length)(struct _starpu_sched_node * node,
-					    struct starpu_task * task);
-	/* return data on expected length of computation, if node is heterogeneous, its an average
-	 * if a calibration is not done, the arch and implementation are returned
-	 */
-	struct _starpu_execute_pred (*estimated_execute_length)(struct _starpu_sched_node * node,
-					   struct starpu_task * task);
-
+	struct _starpu_task_execute_preds (*estimated_execute_preds)(struct _starpu_sched_node * node,
+								     struct starpu_task * task);
+	
 	int nchilds;
 	struct _starpu_sched_node ** childs;
 
@@ -60,11 +47,23 @@ struct _starpu_sched_node
 	void (*destroy_node)(struct _starpu_sched_node *);
 };
 
-struct _starpu_execute_pred {
+struct _starpu_task_execute_preds
+{
 	enum {CANNOT_EXECUTE = 0, CALIBRATING , NO_PERF_MODEL, PERF_MODEL} state;
+
+	/* archtype and nimpl is set to
+	 * best values if state is PERF_MODEL
+	 * values that needs to be calibrated if state is CALIBRATING
+	 * suitable values if NO_PERF_MODEL
+	 * irrevelant if CANNOT_EXECUTE
+	 */
 	enum starpu_perfmodel_archtype archtype;
 	int impl;
+
+	double expected_finish_time;
 	double expected_length;
+	double expected_transfer_length;
+	double expected_power;
 };
 
 
@@ -124,7 +123,9 @@ struct _starpu_sched_node * _starpu_sched_node_eager_create(void);
 struct _starpu_sched_node * _starpu_sched_node_heft_create(double alpha, double beta, double gamma, double idle_power);
 
 
-
+/* compute predicted_end by taking in account the case of the predicted transfer and the predicted_end overlap
+ */
+double _starpu_compute_expected_time(double now, double predicted_end, double predicted_length, double predicted_transfer);
 
 void _starpu_tree_destroy(struct _starpu_sched_tree * tree, unsigned sched_ctx_id);
 

+ 48 - 37
src/sched_policies/node_worker.c

@@ -62,23 +62,48 @@ static void available(struct _starpu_sched_node * worker_node)
 }
 
 static double estimated_transfer_length(struct _starpu_sched_node * node,
-				 struct starpu_task * task)
+					struct starpu_task * task)
 {
 	STARPU_ASSERT(_starpu_sched_node_is_worker(node));
 	unsigned memory_node = starpu_worker_get_memory_node(node->workerids[0]);
 	double d = starpu_task_expected_data_transfer_time(memory_node, task);
 	return d;
 }
+static double estimated_finish_time(struct _starpu_sched_node * node)
+{
+	struct _starpu_worker * worker = node->data;
+	STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
+	double sum = 0.0;
+	struct starpu_task_list list = worker->local_tasks;
+	struct starpu_task * task;
+	for(task = starpu_task_list_front(&list);
+	    task != starpu_task_list_end(&list);
+	    task = starpu_task_list_next(task))
+		if(!isnan(task->predicted))
+		   sum += task->predicted;
+/*	if(worker->current_task) 
+	{
+	// drôle de bug, t est parfois null, il doit y avoir un problème de mutex quelque part
+		struct starpu_task * t = worker->current_task;
+		if(!isnan(t->predicted))
+			sum += t->predicted/2;
+			}*/
+	STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
+	return sum + starpu_timing_now();
+}
 
-struct _starpu_execute_pred estimated_execute_length(struct _starpu_sched_node * node, struct starpu_task * task)
+struct _starpu_task_execute_preds estimated_execute_preds(struct _starpu_sched_node * node, struct starpu_task * task)
 {
 	STARPU_ASSERT(_starpu_sched_node_is_worker(node));
 	struct _starpu_worker * worker = node->data;
-	struct _starpu_execute_pred pred =
+	struct _starpu_task_execute_preds preds =
 		{
 			.state = CANNOT_EXECUTE,
 			.archtype = worker->perf_arch,
 			.expected_length = DBL_MAX,
+			.expected_finish_time = estimated_finish_time(node),
+			.expected_transfer_length = estimated_transfer_length(node, task),
+			.expected_power = 0.0
 		};
 
 	int nimpl;
@@ -91,25 +116,31 @@ struct _starpu_execute_pred estimated_execute_length(struct _starpu_sched_node *
 							       nimpl);
 			if(isnan(d))
 			{
-				pred.state = CALIBRATING;
-				pred.impl = nimpl;
-				return pred;
+				preds.state = CALIBRATING;
+				preds.impl = nimpl;
+				return preds;
 			}
-			if(_STARPU_IS_ZERO(d) && pred.state == CANNOT_EXECUTE)
+			if(_STARPU_IS_ZERO(d) && preds.state == CANNOT_EXECUTE)
 			{
-				pred.state = NO_PERF_MODEL;
-				pred.impl = nimpl;
+				preds.state = NO_PERF_MODEL;
+				preds.impl = nimpl;
 				continue;
 			}
-			if(d < pred.expected_length)
+			if(d < preds.expected_length)
 			{
-				pred.state = PERF_MODEL;
-				pred.expected_length = d;
-				pred.impl = nimpl;
+				preds.state = PERF_MODEL;
+				preds.expected_length = d;
+				preds.impl = nimpl;
 			}
-		}	
+		}
 	}
-	return pred;
+
+	if(preds.state == PERF_MODEL)
+		preds.expected_finish_time = _starpu_compute_expected_time(starpu_timing_now(),
+									  preds.expected_finish_time,
+									  preds.expected_length,
+									  preds.expected_transfer_length);
+	return preds;
 }
 
 static double estimated_load(struct _starpu_sched_node * node)
@@ -129,22 +160,6 @@ static double estimated_load(struct _starpu_sched_node * node)
 }
 
 
-static double estimated_finish_time(struct _starpu_sched_node * node)
-{
-	struct _starpu_worker * worker = node->data;
-	STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
-	double sum = 0.0;
-	struct starpu_task_list list = worker->local_tasks;
-	struct starpu_task * task;
-	for(task = starpu_task_list_front(&list);
-	    task != starpu_task_list_end(&list);
-	    task = starpu_task_list_next(task))
-		sum += task->predicted;
-	if(worker->current_task)
-		sum += worker->current_task->predicted / 2;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
-	return sum + starpu_timing_now();
-}
 
 static struct _starpu_sched_node  * _starpu_sched_node_worker_create(int workerid)
 {
@@ -156,13 +171,10 @@ static struct _starpu_sched_node  * _starpu_sched_node_worker_create(int workeri
 	struct _starpu_worker * worker = _starpu_get_worker_struct(workerid);
 	struct _starpu_sched_node * node = _starpu_sched_node_create();
 	node->data = worker;
-	//node->fifo = _starpu_create_fifo(),
 	node->push_task = _starpu_sched_node_worker_push_task;
 	node->pop_task = _starpu_sched_node_worker_pop_task;
-	node->estimated_finish_time = estimated_finish_time;
+	node->estimated_execute_preds = estimated_execute_preds;
 	node->estimated_load = estimated_load;
-	node->estimated_execute_length = estimated_execute_length;
-	node->estimated_transfer_length = estimated_transfer_length;
 	node->destroy_node = _starpu_sched_node_worker_destroy;
 	node->available = available;
 	node->workerids[0] = workerid;
@@ -176,8 +188,7 @@ int _starpu_sched_node_is_worker(struct _starpu_sched_node * node)
 	return node->available == available
 		|| node->push_task == _starpu_sched_node_worker_push_task
 		|| node->pop_task == _starpu_sched_node_worker_pop_task
-		|| node->estimated_finish_time == estimated_finish_time
-		|| node->estimated_execute_length == estimated_execute_length;
+		|| node->estimated_execute_preds == estimated_execute_preds;
 		
 }