Browse Source

work_stealing re-writen to carry his own fifos
a global rwlock protect scheduler during add/remove worker

Simon Archipoff 12 years ago
parent
commit
ee5f2cf413

+ 2 - 2
src/sched_policies/fifo_queues.c

@@ -23,7 +23,7 @@
 #include <sched_policies/fifo_queues.h>
 #include <common/fxt.h>
 
-int is_sorted_task_list(struct starpu_task * task)
+static int is_sorted_task_list(struct starpu_task * task)
 {
 	if(!task)
 		return 1;
@@ -184,7 +184,7 @@ struct starpu_task *_starpu_fifo_pop_local_task(struct _starpu_fifo_taskq *fifo_
 
 	if (!starpu_task_list_empty(&fifo_queue->taskq))
 	{
-		task = starpu_task_list_pop_back(&fifo_queue->taskq);
+		task = starpu_task_list_pop_front(&fifo_queue->taskq);
 		fifo_queue->ntasks--;
 		_STARPU_TRACE_JOB_POP(task, 0);
 	}

+ 8 - 6
src/sched_policies/node_eager.c

@@ -27,9 +27,10 @@ static void add_worker_eager(unsigned sched_ctx_id, int * workerids, unsigned nw
 	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	unsigned i;
 	for(i = 0; i < nworkers; i++)
-		_starpu_sched_node_add_child(t->root,
-					     _starpu_sched_node_worker_get(workerids[i]),
-					     sched_ctx_id);
+	{
+		t->root->add_child(t->root, _starpu_sched_node_worker_get(workerids[i]), sched_ctx_id);
+		_starpu_sched_node_worker_get(workerids[i])->fathers[sched_ctx_id] = t->root;
+	}
 }
 
 static void remove_worker_eager(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
@@ -37,9 +38,10 @@ static void remove_worker_eager(unsigned sched_ctx_id, int * workerids, unsigned
 	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	unsigned i;
 	for(i = 0; i < nworkers; i++)
-		_starpu_sched_node_remove_child(t->root,
-						_starpu_sched_node_worker_get(workerids[i]),
-						sched_ctx_id);
+	{
+		t->root->remove_child(t->root, _starpu_sched_node_worker_get(workerids[i]), sched_ctx_id);
+		_starpu_sched_node_worker_get(workerids[i])->fathers[sched_ctx_id] = NULL;
+	}
 }
 
 

+ 55 - 23
src/sched_policies/node_heft.c

@@ -11,6 +11,8 @@ struct _starpu_dmda_data
 	double beta;
 	double gamma;
 	double idle_power;
+	
+	struct _starpu_sched_node * no_model_node;
 };
 
 static double compute_fitness_calibration(struct _starpu_sched_node * child,
@@ -22,15 +24,6 @@ static double compute_fitness_calibration(struct _starpu_sched_node * child,
 		return child->estimated_load(child);
 	return DBL_MAX;
 }
-static double compute_fitness_no_perf_model(struct _starpu_sched_node * child,
-					    struct _starpu_dmda_data * data STARPU_ATTRIBUTE_UNUSED,
-					    struct starpu_task * task STARPU_ATTRIBUTE_UNUSED,
-					    struct _starpu_execute_pred *pred)
-{
-	if(pred->state == CANNOT_EXECUTE)
-		return DBL_MAX;
-	return child->estimated_load(child);
-}
 
 static double compute_fitness_perf_model(struct _starpu_sched_node * child,
 					 struct _starpu_dmda_data * data,
@@ -75,14 +68,26 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 		STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 		return -ENODEV;
 	}
+	
+	struct _starpu_dmda_data * data = node->data;
+	
+	if(!calibrating && !perf_model)
+	{
+		int ret = data->no_model_node->push_task(data->no_model_node, task);
+		STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
+		return ret;
+	}
+
 	double (*fitness_fun)(struct _starpu_sched_node *,
 			      struct _starpu_dmda_data *,
 			      struct starpu_task *,
-			      struct _starpu_execute_pred*) = compute_fitness_no_perf_model;
-	if(perf_model)
-		fitness_fun = compute_fitness_perf_model;
+			      struct _starpu_execute_pred*) = compute_fitness_perf_model;
+
 	if(calibrating)
 		fitness_fun = compute_fitness_calibration;
+
+
+
 	double best_fitness = DBL_MAX;
 	int index_best_fitness;
 	for(i = 0; i < node->nchilds; i++)
@@ -105,8 +110,18 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 	return c->push_task(c, task);
 }
-
-
+/*
+static void update_helper_node(struct _starpu_sched_node * heft_node)
+{
+	struct _starpu_dmda_data * data = heft_node->data;
+	struct _starpu_sched_node * node = data->no_model_node;
+	node->nchilds = heft_node->nchilds;
+	node->childs = realloc(node->childs, sizeof(struct _starpu_sched_node *) * node->nchilds);
+	memcpy(node->childs, heft_node->childs, sizeof(struct _starpu_sched_node*) * node->nchilds);
+	node->nworkers = heft_node->nworkers;
+	memcpy(node->workerids, heft_node->workerids, sizeof(int) * node->nworkers);
+}
+*/
 
 static void add_child(struct _starpu_sched_node *node,
 		      struct _starpu_sched_node *child,
@@ -123,11 +138,12 @@ static void add_child(struct _starpu_sched_node *node,
 			       * (node->nchilds + 1));
 	struct _starpu_sched_node * fifo_node = _starpu_sched_node_fifo_create();
 	_starpu_sched_node_add_child(fifo_node, child, sched_ctx_id);
-
-
 	_starpu_sched_node_set_father(fifo_node, node, sched_ctx_id);
 	node->childs[node->nchilds] = fifo_node;
 	node->nchilds++;
+	struct _starpu_dmda_data * data = node->data;
+	data->no_model_node->add_child(data->no_model_node, child, sched_ctx_id);
+	
 
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 
@@ -135,6 +151,7 @@ static void add_child(struct _starpu_sched_node *node,
 static void remove_child(struct _starpu_sched_node *node,
 			 struct _starpu_sched_node *child,
 			 unsigned sched_ctx_id)
+
 {
 	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
 	int pos;
@@ -145,7 +162,10 @@ static void remove_child(struct _starpu_sched_node *node,
 	struct _starpu_sched_node * fifo_node = node->childs[pos];
 	node->childs[pos] = node->childs[--node->nchilds];
 	STARPU_ASSERT(fifo_node->fathers[sched_ctx_id] == node);
-	fifo_node->fathers[sched_ctx_id] = NULL;
+
+	struct _starpu_dmda_data * data = node->data;
+	data->no_model_node->remove_child(data->no_model_node, child,sched_ctx_id);
+
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 }
 
@@ -175,9 +195,10 @@ static void add_worker_heft(unsigned sched_ctx_id, int * workerids, unsigned nwo
 	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	unsigned i;
 	for(i = 0; i < nworkers; i++)
-		t->root->add_child(t->root,
-				   _starpu_sched_node_worker_get(workerids[i]),
-				   sched_ctx_id);
+	{
+		t->root->add_child(t->root, _starpu_sched_node_worker_get(workerids[i]), sched_ctx_id);
+		_starpu_sched_node_worker_get(workerids[i])->fathers[sched_ctx_id] = t->root;
+	}
 	_starpu_tree_update_after_modification(t);
 }
 
@@ -186,10 +207,18 @@ static void remove_worker_heft(unsigned sched_ctx_id, int * workerids, unsigned
 	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	unsigned i;
 	for(i = 0; i < nworkers; i++)
-		t->root->remove_child(t->root,
-				   _starpu_sched_node_worker_get(workerids[i]),
-				   sched_ctx_id);
+	{
+		t->root->remove_child(t->root, _starpu_sched_node_worker_get(workerids[i]), sched_ctx_id);
+		_starpu_sched_node_worker_get(workerids[i])->fathers[sched_ctx_id] = NULL;
+	}
+}
 
+static void destroy_heft_node(struct _starpu_sched_node * node)
+{
+	struct _starpu_dmda_data * data = node->data;
+	data->no_model_node->destroy_node(data->no_model_node);
+	_starpu_sched_node_destroy(node);
+	free(data);
 }
 
 struct _starpu_sched_node * _starpu_sched_node_heft_create(double alpha, double beta, double gamma, double idle_power)
@@ -208,6 +237,9 @@ struct _starpu_sched_node * _starpu_sched_node_heft_create(double alpha, double
 	//data->total_task_cnt = data->ready_task_cnt = 0;
 	node->add_child = add_child;
 	node->remove_child = remove_child;
+	node->destroy_node = destroy_heft_node;
+
+	data->no_model_node = _starpu_sched_node_random_create();
 
 	return node;
 }

+ 31 - 19
src/sched_policies/node_random.c

@@ -32,29 +32,29 @@ static void update_relative_childs_speedup(struct _starpu_sched_node * node)
 }
 
 static void add_child(struct _starpu_sched_node *node,
-		  struct _starpu_sched_node *child,
-		  unsigned sched_ctx_id)
+		      struct _starpu_sched_node *child,
+		      unsigned sched_ctx_id)
 {
-	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
+//	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
 	_starpu_sched_node_add_child(node, child, sched_ctx_id);
 	update_relative_childs_speedup(node);
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
+//	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 }
 static void remove_child(struct _starpu_sched_node *node,
-		     struct _starpu_sched_node *child,
-		     unsigned sched_ctx_id)
+			 struct _starpu_sched_node *child,
+			 unsigned sched_ctx_id)
 {
-	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
+//	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
 	_starpu_sched_node_remove_child(node, child, sched_ctx_id);
 	update_relative_childs_speedup(node);
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
+//	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 }
 
 
 static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
 {
 	struct _starpu_random_data * rd = node->data;
-	STARPU_PTHREAD_RWLOCK_RDLOCK(&node->mutex);
+//	STARPU_PTHREAD_RWLOCK_RDLOCK(&node->mutex);
 	int indexes_nodes[node->nchilds];
 	int size=0,i;
 	double alpha_sum = 0.0;
@@ -84,7 +84,7 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 	STARPU_ASSERT(select != NULL);
 	int ret_val = select->push_task(select,task);
 	node->available(node);
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
+//	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 	return ret_val;
 }
 
@@ -101,6 +101,7 @@ struct _starpu_sched_node * _starpu_sched_node_random_create(void)
 {
 	struct _starpu_sched_node * node = _starpu_sched_node_create();
 	struct _starpu_random_data * rd = malloc(sizeof(struct _starpu_random_data));
+
 	rd->relative_speedup = NULL;
 	node->data = rd;
 	node->destroy_node = destroy_random_node;
@@ -130,25 +131,36 @@ static void deinitialize_random_center_policy(unsigned sched_ctx_id)
  static void add_worker_random(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
 {
 	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
+//	STARPU_PTHREAD_RWLOCK_WRLOCK(&t->mutex);
+		struct _starpu_sched_node * random_node = t->root;
 	unsigned i;
 	for(i = 0; i < nworkers; i++)
-		_starpu_sched_node_add_child(t->root,
-					     _starpu_sched_node_worker_get(workerids[i]),
-					     sched_ctx_id);
-	update_relative_childs_speedup(t->root);
+	{
+		struct _starpu_sched_node * worker = _starpu_sched_node_worker_get(workerids[i]);
+		t->root->add_child(random_node, _starpu_sched_node_worker_get(workerids[i]), sched_ctx_id);
+		_starpu_sched_node_set_father(worker, random_node, sched_ctx_id);
+	}
 	_starpu_tree_update_after_modification(t);
+	update_relative_childs_speedup(random_node);
+//	STARPU_PTHREAD_RWLOCK_UNLOCK(&t->mutex);
 }
 
 static void remove_worker_random(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
 {
-	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	struct _starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
+
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&t->mutex);
+	struct _starpu_sched_node * random_node = t->root;
 	unsigned i;
 	for(i = 0; i < nworkers; i++)
-		_starpu_sched_node_remove_child(t->root,
-						_starpu_sched_node_worker_get(workerids[i]),
-						sched_ctx_id);
-	update_relative_childs_speedup(t->root);
+	{
+		struct _starpu_sched_node * worker = _starpu_sched_node_worker_get(workerids[i]);
+		random_node->remove_child(random_node, worker, sched_ctx_id);
+		_starpu_sched_node_set_father(worker, NULL, sched_ctx_id);
+	}
 	_starpu_tree_update_after_modification(t);
+	update_relative_childs_speedup(t->root);
+//	STARPU_PTHREAD_RWLOCK_UNLOCK(&t->mutex);
 }
 
 struct starpu_sched_policy _starpu_sched_tree_random_policy =

+ 11 - 10
src/sched_policies/node_sched.c

@@ -91,27 +91,24 @@ void _starpu_tree_destroy(struct _starpu_sched_tree * tree, unsigned sched_ctx_i
 	STARPU_PTHREAD_RWLOCK_DESTROY(&tree->mutex);
 	free(tree);
 }
-void _starpu_sched_node_add_child(struct _starpu_sched_node* node, struct _starpu_sched_node * child,unsigned sched_ctx_id)
+void _starpu_sched_node_add_child(struct _starpu_sched_node* node, struct _starpu_sched_node * child,unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
 {
 	STARPU_ASSERT(!_starpu_sched_node_is_worker(node));
-	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
+//	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
 	node->childs = realloc(node->childs, sizeof(struct _starpu_sched_node *) * (node->nchilds + 1));
 	node->childs[node->nchilds] = child;
-	child->fathers[sched_ctx_id] = node;
 	node->nchilds++;
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
+//	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 }
-void _starpu_sched_node_remove_child(struct _starpu_sched_node * node, struct _starpu_sched_node * child,unsigned sched_ctx_id)
+void _starpu_sched_node_remove_child(struct _starpu_sched_node * node, struct _starpu_sched_node * child,unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
 {
-	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
+//	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
 	int pos;
 	for(pos = 0; pos < node->nchilds; pos++)
 		if(node->childs[pos] == child)
 			break;
 	node->childs[pos] = node->childs[--node->nchilds];
-	STARPU_ASSERT(child->fathers[sched_ctx_id] == node);
-	child->fathers[sched_ctx_id] = NULL;
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
+//	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 }
 
 
@@ -127,9 +124,13 @@ int _starpu_tree_push_task(struct starpu_task * task)
 }
 struct starpu_task * _starpu_tree_pop_task(unsigned sched_ctx_id)
 {
+	struct _starpu_sched_tree *tree = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	int workerid = starpu_worker_get_id();
 	struct _starpu_sched_node * node = _starpu_sched_node_worker_get(workerid);
-	return node->pop_task(node, sched_ctx_id);
+	STARPU_PTHREAD_RWLOCK_RDLOCK(&tree->mutex);
+	struct starpu_task * task = node->pop_task(node, sched_ctx_id);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&tree->mutex);
+	return task;
 }
 
 static double estimated_finish_time(struct _starpu_sched_node * node)

+ 4 - 0
src/sched_policies/node_sched.h

@@ -51,6 +51,7 @@ struct _starpu_sched_node
 	void (*add_child)(struct _starpu_sched_node *node,
 			  struct _starpu_sched_node *child,
 			  unsigned sched_ctx_id);
+
 	void (*remove_child)(struct _starpu_sched_node *node,
 			     struct _starpu_sched_node *child,
 			     unsigned sched_ctx_id);
@@ -70,6 +71,7 @@ struct _starpu_execute_pred {
 struct _starpu_sched_tree
 {
 	struct _starpu_sched_node * root;
+	//this lock is used to protect the scheduler during modifications of his structure
 	starpu_pthread_rwlock_t mutex;
 };
 
@@ -113,6 +115,8 @@ int _starpu_sched_node_is_fifo(struct _starpu_sched_node * node);
 struct _starpu_fifo_taskq *  _starpu_sched_node_fifo_get_fifo(struct _starpu_sched_node *);
 
 /* struct _starpu_sched_node * _starpu_sched_node_work_stealing_create(void); */
+int _starpu_sched_node_is_work_stealing(struct _starpu_sched_node * node);
+
 struct _starpu_sched_node * _starpu_sched_node_random_create(void);
 
 struct _starpu_sched_node * _starpu_sched_node_eager_create(void);

+ 129 - 81
src/sched_policies/node_work_stealing.c

@@ -24,47 +24,45 @@ struct _starpu_work_stealing_data
 	unsigned performed_total;
 	unsigned last_pop_child;
 	unsigned last_push_child;
+	
+	struct _starpu_fifo_taskq ** fifos;
+	starpu_pthread_mutex_t * mutexes;
 };
 
 
 /**
- * Return a child from which a task can be stolen.
- * Selecting a worker is done in a round-robin fashion, unless
- * the child previously selected doesn't own any task,
- * then we return the first non-empty worker.
- * and take his mutex
- * if no child have tasks return -1 
+ * steal a task in a round robin way
+ * return NULL if none available
  */
-static int select_victim_round_robin(struct _starpu_sched_node *node)
+static struct starpu_task *  steal_task_round_robin(struct _starpu_sched_node *node, int workerid)
 {
-	struct _starpu_work_stealing_data *ws = node->data;
-	unsigned i = ws->last_pop_child;
-
-	
-/* If the worker's queue is empty, let's try
- * the next ones */
+	struct _starpu_work_stealing_data *wsd = node->data;
+	unsigned i = wsd->last_pop_child;
+	/* If the worker's queue have no suitable tasks, let's try
+	 * the next ones */
+	struct starpu_task * task = NULL;
 	while (1)
 	{
-		unsigned ntasks;
-		struct _starpu_sched_node * child = node->childs[i];
-		struct _starpu_fifo_taskq * fifo = _starpu_sched_node_fifo_get_fifo(child);
-		STARPU_PTHREAD_RWLOCK_WRLOCK(&child->mutex);
-		ntasks = fifo->ntasks;
-		if (ntasks)
+		struct _starpu_fifo_taskq * fifo = wsd->fifos[i];
+		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
+		task = _starpu_fifo_pop_task(fifo, workerid);
+		STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
+		if(task)
+		{
+			fifo->nprocessed--;
 			break;
-		STARPU_PTHREAD_RWLOCK_UNLOCK(&child->mutex);
+		}
 		i = (i + 1) % node->nchilds;
-		if (i == ws->last_pop_child)
+		if (i == wsd->last_pop_child)
 		{
 			/* We got back to the first worker,
 			 * don't go in infinite loop */
-			return -1;
+			return NULL;
 		}
 	}
 
-	ws->last_pop_child = (i+1)%node->nchilds;
-
-	return i;
+	wsd->last_pop_child = i;
+	return task;
 }
 
 /**
@@ -189,12 +187,12 @@ static unsigned select_worker_overload(struct _starpu_sched_node * node)
  * This is a phony function used to call the right
  * function depending on the value of USE_OVERLOAD.
  */
-static inline int select_victim(struct _starpu_sched_node * node)
+static inline struct starpu_task * steal_task(struct _starpu_sched_node * node, int workerid)
 {
 #ifdef USE_OVERLOAD
-	return select_victim_overload(node);
+	return select_victim_overload(node, workerid);
 #else
-	return select_victim_round_robin(node);
+	return steal_task_round_robin(node, workerid);
 #endif /* USE_OVERLOAD */
 }
 
@@ -213,23 +211,40 @@ static inline unsigned select_worker(struct _starpu_sched_node * node)
 }
 
 
+static int is_worker_of_node(struct _starpu_sched_node * node, int workerid)
+{
+	int j;
+	for(j = 0; j < node->nworkers; j++)
+	{
+		if(node->workerids[j] == workerid)
+			return 1;
+	}
+	return 0;
+}
+
 static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned sched_ctx_id)
 {
-	int victim = select_victim(node);
-	if(victim < 0)
+	int workerid = starpu_worker_get_id();
+	int i;
+	for(i = 0; i < node->nchilds; i++)
 	{
-		if(node->fathers[sched_ctx_id])
-			return node->fathers[sched_ctx_id]->pop_task(node->fathers[sched_ctx_id],sched_ctx_id);
-		else
-			return NULL;
+		if(is_worker_of_node(node->childs[i], workerid))
+			break;
 	}
-	struct _starpu_sched_node * child = node->childs[victim];
-	struct _starpu_fifo_taskq * fifo = _starpu_sched_node_fifo_get_fifo(child);
-	struct starpu_task * task = _starpu_fifo_pop_task(fifo,
-							  starpu_worker_get_id());
-	fifo->nprocessed--;
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&child->mutex);
-	return task;
+	STARPU_ASSERT(i < node->nchilds);
+	struct _starpu_work_stealing_data * wsd = node->data;
+	STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
+	struct starpu_task * task = _starpu_fifo_pop_local_task(wsd->fifos[i]);
+	STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
+	if(task)
+		return task;
+	task  = steal_task(node, workerid);
+	if(task)
+		return task;
+	if(node->fathers[sched_ctx_id])
+		return node->fathers[sched_ctx_id]->pop_task(node->fathers[sched_ctx_id],sched_ctx_id);
+	else
+		return NULL;
 }
 
 
@@ -247,7 +262,7 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 		struct _starpu_sched_node * child = node->childs[i];
 		if(_starpu_sched_node_can_execute_task(child,task))
 		{
-			ret = child->push_task(child,task);
+			ret = _starpu_fifo_push_sorted_task(wsd->fifos[i], task);
 			break;
 		}
 	}
@@ -261,13 +276,8 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 static void add_child(struct _starpu_sched_node *node,
 		      struct _starpu_sched_node *child,
 		      unsigned sched_ctx_id);
-//compute if le father is a work_stealing node
-static int is_my_fifo_node(struct _starpu_sched_node * node, unsigned sched_ctx_id)
-{
-	if(node->fathers[sched_ctx_id] == NULL)
-		return 0;
-	return node->fathers[sched_ctx_id]->add_child == add_child;
-}
+
+
 
 //this function is special, when a worker call it, we want to push the task in his fifo
 int _starpu_ws_push_task(struct starpu_task *task)
@@ -280,55 +290,80 @@ int _starpu_ws_push_task(struct starpu_task *task)
 	while(node->fathers[sched_ctx_id] != NULL)
 	{
 		node = node->fathers[sched_ctx_id];
-		if(is_my_fifo_node(node,sched_ctx_id))
+		if(_starpu_sched_node_is_work_stealing(node))
 		{
-			STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
-			struct _starpu_fifo_taskq * fifo = node->data;
-			int ret_val =  _starpu_fifo_push_sorted_task(fifo, task);
-			STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
-			return ret_val;
+			int i;
+			for(i = 0; i < node->nchilds; i++)
+				if(is_worker_of_node(node->childs[i], workerid))
+					break;
+			STARPU_ASSERT(i < node->nchilds);
+			
+			struct _starpu_work_stealing_data * wsd = node->data;
+			STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
+			int ret = _starpu_fifo_push_sorted_task(wsd->fifos[i], task);
+			STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
+			return ret;
 		}
 	}
-	//there were a problem here, dont know what to do
-	STARPU_ASSERT(0);
+	STARPU_ASSERT_MSG(0, "there were a problem here, dont know what to do");
 	return _starpu_tree_push_task(task);
 }
 
 
 static void add_child(struct _starpu_sched_node *node,
 		      struct _starpu_sched_node *child,
-		      unsigned sched_ctx_id)
+		      unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
 {
 	int i;
 	for(i = 0; i < node->nchilds; i++){
 		STARPU_ASSERT(node->childs[i] != node);
 		STARPU_ASSERT(node->childs[i] != NULL);
 	}
+	struct _starpu_work_stealing_data * wsd = node->data;
+	int new_size = node->nchilds + 1;
 	node->childs = realloc(node->childs,
 			       sizeof(struct _starpu_sched_node*)
-			       * (node->nchilds + 1));
-	struct _starpu_sched_node * fifo_node = _starpu_sched_node_fifo_create();
-	_starpu_sched_node_add_child(fifo_node, child, sched_ctx_id);
-
+			       * new_size);
+	wsd->fifos = realloc(wsd->fifos,
+			     sizeof(struct _starpu_fifo_taskq*)
+			     * new_size);
+	wsd->mutexes = realloc(wsd->mutexes,
+			     sizeof(starpu_pthread_rwlock_t)
+			     * new_size);
+	node->childs[new_size - 1] = child;
+	wsd->fifos[new_size - 1] = _starpu_create_fifo();
+	STARPU_PTHREAD_MUTEX_INIT(wsd->mutexes + (new_size - 1), NULL);
+	node->nchilds = new_size;
+}
 
-	_starpu_sched_node_set_father(fifo_node, node, sched_ctx_id);
-	node->childs[node->nchilds] = fifo_node;
-	node->nchilds++;
 
-}
 static void remove_child(struct _starpu_sched_node *node,
 			 struct _starpu_sched_node *child,
-			 unsigned sched_ctx_id)
+			 unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
 {
 	int pos;
 	for(pos = 0; pos < node->nchilds; pos++)
-		if(*node->childs[pos]->childs == child)
+		if(node->childs[pos] == child)
 			break;
 	STARPU_ASSERT(pos != node->nchilds);
-	struct _starpu_sched_node * fifo_node = node->childs[pos];
-	node->childs[pos] = node->childs[--node->nchilds];
-	STARPU_ASSERT(fifo_node->fathers[sched_ctx_id] == node);
-	fifo_node->fathers[sched_ctx_id] = NULL;
+	struct _starpu_work_stealing_data * wsd = node->data;
+	struct _starpu_fifo_taskq * fifo = wsd->fifos[pos];
+	wsd->fifos[pos] = wsd->fifos[node->nchilds - 1];
+	node->childs[pos] = node->childs[node->nchilds - 1];
+	STARPU_PTHREAD_MUTEX_DESTROY(wsd->mutexes + pos);
+	wsd->mutexes[pos] = wsd->mutexes[node->nchilds - 1];
+	node->nchilds--;
+	int i;
+	struct starpu_task * task = fifo->taskq.head;
+	_starpu_destroy_fifo(fifo);
+	for(i = 0; task; i = (i + 1)%node->nchilds)
+	{
+		struct starpu_task * next = task->next;
+		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
+		_starpu_fifo_push_sorted_task(wsd->fifos[i],task);
+		STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
+		task = next;
+	}
 }
 
 
@@ -337,9 +372,7 @@ struct _starpu_sched_node * _starpu_sched_node_work_stealing_create(void)
 {
 	struct _starpu_sched_node * node = _starpu_sched_node_create();
 	struct _starpu_work_stealing_data * wsd = malloc(sizeof(*wsd));
-	wsd->performed_total = 0;
-	wsd->last_pop_child = 0;
-	wsd->last_push_child = 0;
+	memset(wsd, 0, sizeof(*wsd));
 	node->data = wsd;
 	node->pop_task = pop_task;
 	node->push_task = push_task;
@@ -348,6 +381,14 @@ struct _starpu_sched_node * _starpu_sched_node_work_stealing_create(void)
 	return node;
 }
 
+int _starpu_sched_node_is_work_stealing(struct _starpu_sched_node * node)
+{
+	return node->add_child == add_child
+		|| node->remove_child == remove_child
+		|| node->pop_task == pop_task
+		|| node->push_task == push_task;//...
+}
+
 
 
 static void initialize_ws_center_policy(unsigned sched_ctx_id)
@@ -371,22 +412,29 @@ static void add_worker_ws(unsigned sched_ctx_id, int * workerids, unsigned nwork
 {
 	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	unsigned i;
+	struct _starpu_sched_node * ws_node = t->root;
 	for(i = 0; i < nworkers; i++)
-		t->root->add_child(t->root,
-				   _starpu_sched_node_worker_get(workerids[i]),
+	{
+		struct _starpu_sched_node * worker =_starpu_sched_node_worker_get(workerids[i]);
+		ws_node->add_child(ws_node,
+				   worker,
 				   sched_ctx_id);
+		_starpu_sched_node_set_father(worker, ws_node, sched_ctx_id);
+	}
 	_starpu_tree_update_after_modification(t);
 }
 
 static void remove_worker_ws(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
 {
 	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	struct _starpu_sched_node * ws_node = t->root;
 	unsigned i;
 	for(i = 0; i < nworkers; i++)
-		t->root->remove_child(t->root,
-				   _starpu_sched_node_worker_get(workerids[i]),
-				   sched_ctx_id);
-
+	{
+		struct _starpu_sched_node * worker =_starpu_sched_node_worker_get(workerids[i]);
+		ws_node->remove_child(ws_node, worker, sched_ctx_id);
+		_starpu_sched_node_set_father(worker,NULL,sched_ctx_id);
+	}
 }
 
 

+ 1 - 8
src/sched_policies/node_worker.c

@@ -193,14 +193,7 @@ static int _worker_consistant(struct _starpu_sched_node * node)
 		return 0;
 	struct _starpu_worker * worker = node->data;
 	int id = worker->workerid;
-	int father = 1;
-	for(i = 0; i<STARPU_NMAX_SCHED_CTXS; i++)
-		if(node->fathers[i] != NULL)
-			return 1;
-		else
-			father = 0;
-	return  father
-		&& (_worker_nodes[id] == node)
+	return  (_worker_nodes[id] == node)
 		&&  node->nchilds == 0;
 }
 #endif