Browse Source

idea for heft (nice enough)

Simon Archipoff 12 years ago
parent
commit
38f6e98eaa

+ 42 - 4
src/sched_policies/node_fifo.c

@@ -1,10 +1,37 @@
 #include "node_sched.h"
 #include "fifo_queues.h"
+#include <starpu_scheduler.h>
 
+static double estimated_finish_time(struct _starpu_sched_node * node)
+{
+	struct _starpu_fifo_taskq * fifo = node->data;
+	return fifo->exp_end;
+}
+
+static double estimated_load(struct _starpu_sched_node * node)
+{
+	double relative_speedup = 0.0;
+	int i;
+	int nworkers = node->is_homogeneous ? 1 : node->nworkers;
+	for(i = 0; i < nworkers; i++)
+		relative_speedup += starpu_worker_get_relative_speedup(starpu_worker_get_perf_archtype(node->workerids[i]));
+	relative_speedup /= nworkers;
+	struct _starpu_fifo_taskq * fifo = node->data;
+	STARPU_ASSERT(!_STARPU_IS_ZERO(relative_speedup));
+	double load = fifo->ntasks / relative_speedup; 
+	for(i = 0; i < node->nchilds; i++)
+	{
+		struct _starpu_sched_node * c = node->childs[i];
+		load += c->estimated_load(c);
+	}
+	return load;
+}
 static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
 {
 	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
-	int ret = _starpu_fifo_push_sorted_task(node->data, task);
+	struct _starpu_fifo_taskq * fifo = node->data;
+	int ret = _starpu_fifo_push_sorted_task(fifo, task);
+	fifo->exp_end += task->predicted;
 	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
 	node->available(node);
 	return ret;
@@ -12,9 +39,11 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 
 static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned sched_ctx_id)
 {
-	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
-	struct starpu_task * task  = _starpu_fifo_pop_task(node->data, starpu_worker_get_id());
-
+	struct _starpu_fifo_taskq * fifo = node->data;
+STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	struct starpu_task * task  = _starpu_fifo_pop_task(fifo, starpu_worker_get_id());
+	if(task)
+		fifo->exp_start = starpu_timing_now() + task->predicted;
 	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
 	if(task)
 		return task;
@@ -24,11 +53,20 @@ static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned
 	return NULL;
 }
 
+int _starpu_sched_node_is_fifo(struct _starpu_sched_node * node)
+{
+	return node->estimated_finish_time == estimated_finish_time
+		|| node->estimated_load == estimated_load
+		|| node->push_task == node->push_task
+		|| node->pop_task == node->pop_task;
+}
 
 struct _starpu_sched_node * _starpu_sched_node_fifo_create(void)
 {
 	struct _starpu_sched_node * node = _starpu_sched_node_create();
 	node->data = _starpu_create_fifo();
+	node->estimated_finish_time = estimated_finish_time;
+	node->estimated_load = estimated_load;
 	node->push_task = push_task;
 	node->pop_task = pop_task;
 	return node;

+ 71 - 138
src/sched_policies/node_heft.c

@@ -3,6 +3,7 @@
 #include <starpu_scheduler.h>
 #include <float.h>
 
+
 struct _starpu_dmda_data
 {
 	double alpha;
@@ -13,169 +14,100 @@ struct _starpu_dmda_data
 
 
 
-static void compute_all_things(struct starpu_task * task,
-			       struct _starpu_sched_node ** nodes, int nnodes,
-			       double * execution_lengths, int * best_impls,//impl used for best execution length, -1 if no execution possible
-			       double * transfer_lengths,
-			       double * finish_times,
-			       int * is_not_calibrated, enum starpu_perf_archtype * arch_not_calibrated, int * impl_not_calibrated,
-			       int * is_no_model)
-{
-	*is_not_calibrated = 0;
-	*is_no_model = 1;
-	int i = 0;
-	for(i = 0; i < nnodes; i++)
-	{
-		execution_lengths[i] = DBL_MAX;
-		best_impls[i] = -1;
-		int j;
-		for(j = 0; j < STARPU_MAXIMPLEMENTATIONS; j++)
-		{
-			if(_starpu_sched_node_can_execute_task_with_impl(nodes[i], task, j))
-			{
-				enum starpu_perf_archtype archtype = starpu_worker_get_perf_archtype(nodes[i]->workerids[0]);
-				double d = starpu_task_expected_length(task, archtype, j);
-				if(isnan(d))
-				{
-					*is_not_calibrated = 1;
-					*arch_not_calibrated = archtype;
-					*impl_not_calibrated = j;
-				}
-				if(!_STARPU_IS_ZERO(d))//we have a perf model
-				{
-					*is_no_model = 0;
-					if(d < execution_lengths[i])
-					{
-						execution_lengths[i] = d;
-						best_impls[i] = j;
-					}
-				}
-				else//we dont have a perf model for this implementation but we may have one for an other
-					if(*is_no_model)
-						best_impls[i] = j;
-				unsigned memory_node = starpu_worker_get_memory_node(nodes[i]->workerids[0]);
-				transfer_lengths[i] = starpu_task_expected_data_transfer_time(memory_node, task);
-				finish_times[i] = nodes[i]->estimated_finish_time(nodes[i]);
-			}
-		}
-	}
-}
-
-static double compute_total_finish_time(double exp_end, double exp_len, double exp_trans)
-{
-	if(exp_trans < exp_end)
-		return exp_end + exp_len;
-	else
-		return exp_end + exp_trans;
-}
-
-static double fitness(double alpha, double beta, double gamma,
-		      double execution_length, double transfer_length, double finish_time, double now)
+static double compute_fitness_calibration(struct _starpu_sched_node * child,
+					  struct _starpu_dmda_data * data STARPU_ATTRIBUTE_UNUSED,
+					  struct starpu_task * task STARPU_ATTRIBUTE_UNUSED,
+					  struct _starpu_execute_pred *pred)
 {
-	(void) gamma;
-	double total_execution_time = compute_total_finish_time(finish_time - now, execution_length, transfer_length);
-	return alpha * total_execution_time + transfer_length * beta;
+	if(pred->state == CALIBRATING)
+		return child->estimated_load(child);
+	return DBL_MAX;
 }
-
-static double fitness_no_model(double alpha, double beta, double transfer_length, double finish_time, double now)
+static double compute_fitness_no_perf_model(struct _starpu_sched_node * child,
+					    struct _starpu_dmda_data * data STARPU_ATTRIBUTE_UNUSED,
+					    struct starpu_task * task STARPU_ATTRIBUTE_UNUSED,
+					    struct _starpu_execute_pred *pred)
 {
-	(void) gamma;
-	double exp_end = finish_time - now;
-	return alpha * exp_end + beta * transfer_length;
+	if(pred->state == CANNOT_EXECUTE)
+		return DBL_MAX;
+	return child->estimated_load(child);
 }
 
-static double estimated_transfert_time(struct _starpu_sched_node * node, struct starpu_task * task)
+static double compute_fitness_perf_model(struct _starpu_sched_node * child,
+					 struct _starpu_dmda_data * data,
+					 struct starpu_task * task,
+					 struct _starpu_execute_pred * pred)
 {
-	STARPU_ASSERT(node->nworkers);
-	unsigned memory_node = starpu_worker_get_memory_node(node->workerids[0]);
-	return starpu_task_expected_data_transfer_time(memory_node, task);
+	if(pred->state == CANNOT_EXECUTE)
+		return DBL_MAX;
+	return data->alpha * pred->expected_length
+		+ data->beta * child->estimated_transfer_length(child, task);
 }
 
 static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
 {
-	struct _starpu_dmda_data * dt = node->data;
-	double execution_lengths[node->nchilds];
-	double finish_times[node->nchilds];
-	double transfer_lengths[node->nchilds];
-	int best_impls[node->nchilds];// -1 mean cant execute
-	//double power_consumptions[node->nchilds];
+	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	struct _starpu_execute_pred preds[node->nchilds];
 	int i;
-
-	int is_not_calibrated;
-	enum starpu_perf_archtype arch_not_calibrated;
-	int impl_not_calibrated;
-
-	int is_no_model;
-
-	compute_all_things(task,
-			   node->childs, node->nchilds,
-			   execution_lengths, best_impls,
-			   transfer_lengths,
-			   finish_times,
-			   &is_not_calibrated, &arch_not_calibrated, &impl_not_calibrated,
-			   &is_no_model);
-
-	double max_fitness = DBL_MAX;
-	int index_max = -1;
-	double now = starpu_timing_now();
-	if(is_not_calibrated)
+	int calibrating = 0;
+	int perf_model = 0;
+	int can_execute = 0;
+	for(i = 0; i < node->nchilds; i++)
 	{
-		for(i = 0; i < node->nchilds; i++)
+		preds[i] = node->childs[i]->estimated_execute_length(node->childs[i], task);
+		switch(preds[i].state)
 		{
-			if(best_impls[i] == -1)
-				continue;
-			enum starpu_perf_archtype archtype = starpu_worker_get_perf_archtype(node->childs[i]->workerids[0]);
-			if(archtype != arch_not_calibrated)
-				continue;
-			double f = fitness_no_model(dt->alpha, dt->beta, transfer_lengths[i], finish_times[i], now);
-			if(f < max_fitness)
-			{
-				max_fitness = f;
-				index_max = i;
-			}
+		case PERF_MODEL:
+			perf_model = 1;
+			can_execute = 1;
+			break;
+		case CALIBRATING:
+			calibrating = 1;
+			can_execute = 1;
+			break;
+		case NO_PERF_MODEL:
+			can_execute = 1;
+		case CANNOT_EXECUTE:
+			break;
 		}
 	}
-	else if(is_no_model)
+	if(!can_execute)
 	{
-		for(i = 0; i < node->nchilds; i++)
-		{
-			if(best_impls[i] == -1)
-				continue;
-			double f = fitness_no_model(dt->alpha, dt->beta, transfer_lengths[i], finish_times[i], now);
-			if(f < max_fitness)
-			{
-				max_fitness = f;
-				index_max = i;
-			}
-		}
+		STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+		return -ENODEV;
 	}
-	else
+	double (*fitness_fun)(struct _starpu_sched_node *,
+			      struct _starpu_dmda_data *,
+			      struct starpu_task *,
+			      struct _starpu_execute_pred*) = compute_fitness_no_perf_model;
+	if(perf_model)
+		fitness_fun = compute_fitness_perf_model;
+	if(calibrating)
+		fitness_fun = compute_fitness_calibration;
+	double best_fitness = DBL_MAX;
+	int index_best_fitness;
+	for(i = 0; i < node->nchilds; i++)
 	{
-		for(i = 0; i < node->nchilds; i++)
+		double tmp = fitness_fun(node->childs[i],
+					 node->data,
+					 task,
+					 preds + i);
+		if(tmp < best_fitness)
 		{
-			if(best_impls[i] == -1)
-				continue;
-			double f =  fitness(dt->alpha, dt->beta, dt->gamma,
-					    execution_lengths[i], transfer_lengths[i] , finish_times[i], now);
-
-			if(f < max_fitness)
-			{
-				max_fitness = f;
-				index_max = i;
-			}
+			best_fitness = tmp;
+			index_best_fitness = i;
 		}
 	}
+	struct _starpu_sched_node * c = node->childs[index_best_fitness];
 
-	STARPU_ASSERT(index_max != -1);
-	task->predicted = execution_lengths[index_max];
-	task->predicted_transfer = transfer_lengths[index_max];
-	starpu_task_set_implementation(task, best_impls[index_max]);
-	struct _starpu_sched_node * child = node->childs[index_max];
-	return child->push_task(child, task);
+	starpu_task_set_implementation(task, preds[index_best_fitness].impl);
+	task->predicted = preds[index_best_fitness].expected_length;
+	task->predicted_transfer = c->estimated_transfer_length(c,task);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+	return c->push_task(c, task);
 }
 
 
-
 struct _starpu_sched_node * _starpu_sched_node_heft_create(double alpha, double beta, double gamma, double idle_power)
 {
 	struct _starpu_sched_node * node = _starpu_sched_node_create();
@@ -195,3 +127,4 @@ struct _starpu_sched_node * _starpu_sched_node_heft_create(double alpha, double
 }
 
 
+

+ 104 - 27
src/sched_policies/node_sched.c

@@ -16,33 +16,6 @@ static struct starpu_task * pop_task_node(struct _starpu_sched_node * node, unsi
 		return node->fathers[sched_ctx_id]->pop_task(node->fathers[sched_ctx_id], sched_ctx_id);
 }
 
-struct _starpu_sched_node * _starpu_sched_node_create(void)
-{
-	struct _starpu_sched_node * node = malloc(sizeof(*node));
-	memset(node,0,sizeof(*node));
-	STARPU_PTHREAD_MUTEX_INIT(&node->mutex,NULL);
-	node->available = available;
-	node->pop_task = pop_task_node;
-	node->destroy_node = _starpu_sched_node_destroy;
-	node->add_child = _starpu_sched_node_add_child;
-	node->remove_child = _starpu_sched_node_remove_child;
-	
-	return node;
-}
-void _starpu_sched_node_destroy(struct _starpu_sched_node *node)
-{
-	int i,j;
-	for(i = 0; i < node->nchilds; i++)
-	{
-		struct _starpu_sched_node * child = node->childs[i];
-		for(j = 0; j < STARPU_NMAX_SCHED_CTXS; j++)
-			if(child->fathers[i] == node)
-				child->fathers[i] = NULL;
-		
-	}
-	free(node->childs);
-	free(node);
-}
 
 void _starpu_sched_node_set_father(struct _starpu_sched_node *node,
 				   struct _starpu_sched_node *father_node,
@@ -159,7 +132,78 @@ struct starpu_task * _starpu_tree_pop_task(unsigned sched_ctx_id)
 	return node->pop_task(node, sched_ctx_id);
 }
 
+static double estimated_finish_time(struct _starpu_sched_node * node)
+{
+	double sum = 0.0;
+	int i;
+	for(i = 0; i < node->nchilds; i++)
+	{
+		struct _starpu_sched_node * c = node->childs[i];
+		double tmp = c->estimated_finish_time(c);
+		if( tmp > sum)
+			sum = tmp;
+	}
+	return sum;
+}
 
+static double estimated_load(struct _starpu_sched_node * node)
+{
+	double sum = 0.0;
+	int i;
+	for( i = 0; i < node->nchilds; i++)
+	{
+		struct _starpu_sched_node * c = node->childs[i];
+		sum += c->estimated_load(c);
+	}
+	return sum;
+}
+
+static struct _starpu_execute_pred estimated_execute_length(struct _starpu_sched_node * node, struct starpu_task * task)
+{
+	if(node->is_homogeneous)
+		return node->childs[0]->estimated_execute_length(node->childs[0], task);
+	struct _starpu_execute_pred pred = { .state = CANNOT_EXECUTE, .expected_length = 0.0 };
+	int i, nb = 0;
+	for(i = 0; i < node->nchilds; i++)
+	{
+		struct _starpu_execute_pred tmp = node->childs[i]->estimated_execute_length(node->childs[i], task);
+		switch(tmp.state)
+		{
+		case CALIBRATING:
+			return tmp;
+			break;
+		case NO_PERF_MODEL:
+			if(pred.state == CANNOT_EXECUTE)
+				pred.state = NO_PERF_MODEL;
+			break;
+		case PERF_MODEL:
+			nb++;
+			pred.expected_length += tmp.expected_length;
+			break;
+		case CANNOT_EXECUTE:
+			break;
+		}
+	}
+	pred.expected_length /= nb;
+	return pred;
+}
+
+static double estimated_transfer_length(struct _starpu_sched_node * node, struct starpu_task * task)
+{
+	double sum = 0.0;
+	int nb = 0, i = 0;
+	for(i = 0; i < node->nchilds; i++)
+	{
+		struct _starpu_sched_node * c = node->childs[i];
+		if(_starpu_sched_node_can_execute_task(c, task))
+		{
+			sum += c->estimated_transfer_length(c, task);
+			nb++;
+		}
+	}
+	sum /= nb;
+	return sum;
+}
 
 int _starpu_sched_node_can_execute_task(struct _starpu_sched_node * node, struct starpu_task * task)
 {
@@ -187,6 +231,39 @@ int _starpu_sched_node_can_execute_task_with_impl(struct _starpu_sched_node * no
 
 }
 
+struct _starpu_sched_node * _starpu_sched_node_create(void)
+{
+	struct _starpu_sched_node * node = malloc(sizeof(*node));
+	memset(node,0,sizeof(*node));
+	STARPU_PTHREAD_MUTEX_INIT(&node->mutex,NULL);
+	node->available = available;
+	node->pop_task = pop_task_node;
+	node->estimated_finish_time = estimated_finish_time;
+	node->estimated_load = estimated_load;
+	node->estimated_transfer_length = estimated_transfer_length;
+	node->estimated_execute_length = estimated_execute_length;
+	node->destroy_node = _starpu_sched_node_destroy;
+	node->add_child = _starpu_sched_node_add_child;
+	node->remove_child = _starpu_sched_node_remove_child;
+	
+	return node;
+}
+void _starpu_sched_node_destroy(struct _starpu_sched_node *node)
+{
+	int i,j;
+	for(i = 0; i < node->nchilds; i++)
+	{
+		struct _starpu_sched_node * child = node->childs[i];
+		for(j = 0; j < STARPU_NMAX_SCHED_CTXS; j++)
+			if(child->fathers[i] == node)
+				child->fathers[i] = NULL;
+		
+	}
+	free(node->childs);
+	free(node);
+}
+
+
 static int is_homogeneous(int * workerids, int nworkers)
 {
 	if(nworkers == 0)

+ 30 - 1
src/sched_policies/node_sched.h

@@ -8,8 +8,25 @@ struct _starpu_sched_node
 	struct starpu_task * (*pop_task)(struct _starpu_sched_node *,
 					 unsigned sched_ctx_id);
 	void (*available)(struct _starpu_sched_node *);
-	double (*estimated_finish_time)(struct _starpu_sched_node * node);
 	
+	/*this function only consider tasks that have a pref model, others does not count
+	 * note that pushing a task not necessarily increase estimated finish time
+	 */
+	double (*estimated_finish_time)(struct _starpu_sched_node * node);
+	/* this function is an heuritic compute subtree's load.
+	 * the computation is based on number of tasks and relative speedup of processing units
+	 * more revelant than estimated_finish_time() when no perf model are available
+	 */
+	double (*estimated_load)(struct _starpu_sched_node * node);
+
+	//return the average of transfer length for all subtree workers
+	double (*estimated_transfer_length)(struct _starpu_sched_node * node,
+					    struct starpu_task * task);
+	/* return data on expected length of computation, if node is heterogeneous, its an average
+	 * if a calibration is not done, the arch and implementation are returned
+	 */
+	struct _starpu_execute_pred (*estimated_execute_length)(struct _starpu_sched_node * node,
+					   struct starpu_task * task);
 
 	int nchilds;
 	struct _starpu_sched_node ** childs;
@@ -42,6 +59,13 @@ struct _starpu_sched_node
 	void (*destroy_node)(struct _starpu_sched_node *);
 };
 
+struct _starpu_execute_pred {
+	enum {CANNOT_EXECUTE = 0, CALIBRATING , NO_PERF_MODEL, PERF_MODEL} state;
+	enum starpu_perf_archtype archtype;
+	int impl;
+	double expected_length;
+};
+
 
 struct _starpu_sched_tree
 {
@@ -52,6 +76,10 @@ struct _starpu_sched_tree
 
 /* allocate and initalise node field with defaults values :
  *  .pop_task make recursive call on father
+ *  .estimated_finish_time  max of the recursives calls on childrens
+ *  .estimated_load compute relative speedup and tasks in subtree
+ *  .estimated_transfer_length  average transfer cost for all workers in the subtree
+ *  .estimated_execution_length average execution cost for all workers in the subtree
  *  .available make a recursive call on childrens
  *  .destroy_node  call _starpu_sched_node_destroy
  *  .update_nchilds a function that does nothing
@@ -81,6 +109,7 @@ int _starpu_sched_node_is_worker(struct _starpu_sched_node * node);
 int _starpu_sched_node_worker_get_workerid(struct _starpu_sched_node * worker_node);
 
 struct _starpu_sched_node * _starpu_sched_node_fifo_create(void);
+int _starpu_sched_node_is_fifo(struct _starpu_sched_node * node);
 struct _starpu_fifo_taskq *  _starpu_sched_node_fifo_get_fifo(struct _starpu_sched_node *);
 
 /* struct _starpu_sched_node * _starpu_sched_node_work_stealing_create(void); */

+ 85 - 15
src/sched_policies/node_worker.c

@@ -3,7 +3,6 @@
 #include <float.h>
 
 static struct _starpu_sched_node * _worker_nodes[STARPU_NMAXWORKERS];
-
 static struct _starpu_sched_node  * _starpu_sched_node_worker_create(int workerid);
 struct _starpu_sched_node * _starpu_sched_node_worker_get(int workerid)
 {
@@ -21,8 +20,6 @@ int _starpu_sched_node_worker_push_task(struct _starpu_sched_node * node, struct
 	/*this function take the worker's mutex */
 	
 	int ret = _starpu_push_local_task(node->data, task, task->priority);
-
-
 	return ret;
 /*	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
 	int ret_val = _starpu_fifo_push_sorted_task(node->fifo, task);
@@ -49,7 +46,6 @@ void _starpu_sched_node_worker_destroy(struct _starpu_sched_node *node)
 	for(i = 0; i < STARPU_NMAX_SCHED_CTXS ; i++)
 		if(node->fathers[i] != NULL)
 			return;//this node is shared between several contexts
-//	_starpu_destroy_fifo(node->fifo);
 	_starpu_sched_node_destroy(node);
 	_worker_nodes[id] = NULL;
 }
@@ -65,23 +61,89 @@ static void available(struct _starpu_sched_node * worker_node)
 	STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 }
 
+static double estimated_transfer_length(struct _starpu_sched_node * node,
+				 struct starpu_task * task)
+{
+	STARPU_ASSERT(_starpu_sched_node_is_worker(node));
+	unsigned memory_node = starpu_worker_get_memory_node(node->workerids[0]);
+	double d = starpu_task_expected_data_transfer_time(memory_node, task);
+	return d;
+}
 
-static double estimated_finish_time(struct _starpu_sched_node * node, struct starpu_task * task)
+struct _starpu_execute_pred estimated_execute_length(struct _starpu_sched_node * node, struct starpu_task * task)
 {
 	STARPU_ASSERT(_starpu_sched_node_is_worker(node));
-	double d = DBL_MAX;
+	struct _starpu_worker * worker = node->data;
+	struct _starpu_execute_pred pred =
+		{
+			.state = CANNOT_EXECUTE,
+			.archtype = worker->perf_arch,
+			.expected_length = DBL_MAX,
+		};
+
 	int nimpl;
 	for(nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 	{
-		double tmp = starpu_task_expected_length(task,
-							 ((struct _starpu_worker *)node->data)->arch,
-							 nimpl);
-		if(!isnan(tmp) && tmp < d)
-			tmp = d;
-			
+		if(starpu_worker_can_execute_task(worker->workerid,task,nimpl))
+		{
+			double d = starpu_task_expected_length(task,
+							       worker->perf_arch,
+							       nimpl);
+			if(isnan(d))
+			{
+				pred.state = CALIBRATING;
+				pred.impl = nimpl;
+				return pred;
+			}
+			if(_STARPU_IS_ZERO(d) && pred.state == CANNOT_EXECUTE)
+			{
+				pred.state = NO_PERF_MODEL;
+				pred.impl = nimpl;
+				continue;
+			}
+			if(d < pred.expected_length)
+			{
+				pred.state = PERF_MODEL;
+				pred.expected_length = d;
+				pred.impl = nimpl;
+			}
+		}	
 	}
-	STARPU_ASSERT(d != DBL_MAX);
-	return d;
+	return pred;
+}
+
+static double estimated_load(struct _starpu_sched_node * node)
+{
+	struct _starpu_worker * worker = node->data;
+	int nb_task = 0;
+	STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
+	struct starpu_task_list list = worker->local_tasks;
+	struct starpu_task * task;
+	for(task = starpu_task_list_front(&list);
+	    task != starpu_task_list_end(&list);
+	    task = starpu_task_list_next(task))
+		nb_task++;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
+	return (double) nb_task
+		/ starpu_worker_get_relative_speedup(starpu_worker_get_perf_archtype(node->workerids[0]));
+}
+
+
+static double estimated_finish_time(struct _starpu_sched_node * node)
+{
+	struct _starpu_worker * worker = node->data;
+	STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
+	double sum = 0.0;
+	struct starpu_task_list list = worker->local_tasks;
+	struct starpu_task * task;
+	for(task = starpu_task_list_front(&list);
+	    task != starpu_task_list_end(&list);
+	    task = starpu_task_list_next(task))
+		sum += task->predicted;
+	if(worker->current_task)
+		sum += worker->current_task->predicted / 2;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
+	return sum + starpu_timing_now();
 }
 
 static struct _starpu_sched_node  * _starpu_sched_node_worker_create(int workerid)
@@ -98,6 +160,9 @@ static struct _starpu_sched_node  * _starpu_sched_node_worker_create(int workeri
 	node->push_task = _starpu_sched_node_worker_push_task;
 	node->pop_task = _starpu_sched_node_worker_pop_task;
 	node->estimated_finish_time = estimated_finish_time;
+	node->estimated_load = estimated_load;
+	node->estimated_execute_length = estimated_execute_length;
+	node->estimated_transfer_length = estimated_transfer_length;
 	node->destroy_node = _starpu_sched_node_worker_destroy;
 	node->available = available;
 	node->workerids[0] = workerid;
@@ -108,7 +173,12 @@ static struct _starpu_sched_node  * _starpu_sched_node_worker_create(int workeri
 
 int _starpu_sched_node_is_worker(struct _starpu_sched_node * node)
 {
-	return node->available == available;
+	return node->available == available
+		|| node->push_task == _starpu_sched_node_worker_push_task
+		|| node->pop_task == _starpu_sched_node_worker_pop_task
+		|| node->estimated_finish_time == estimated_finish_time
+		|| node->estimated_execute_length == estimated_execute_length;
+		
 }
 
 #ifndef STARPU_NO_ASSERT