Selaa lähdekoodia

firsts modules for schedulers

Simon Archipoff 12 vuotta sitten
vanhempi
commit
19d3462d3b

+ 145 - 0
src/sched_policies/node_eager.c

@@ -0,0 +1,145 @@
+#include "node_sched.h"
+#include <common/thread.h>
+#include <core/sched_policy.h>
+/*
+static void _starpu_wake_all_interested_workers(struct starpu_task * task){
+	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(task->sched_ctx);
+	struct starpu_sched_ctx_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
+	while(workers->has_next(workers, &it))
+	{
+		unsigned worker = workers->get_next(workers, &it);
+		int nimpl;
+		for(nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
+			if(starpu_worker_can_execute_task(worker, task, nimpl))
+			{
+				starpu_pthread_mutex_t *sched_mutex;
+				starpu_pthread_cond_t *sched_cond;
+				starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+				_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+				_STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+				_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+				break;
+			}
+	}
+
+
+}
+*/
+
+static int _starpu_eager_push_task(struct _starpu_sched_node * node, struct starpu_task * task)
+{
+	STARPU_ASSERT(node->push_task == _starpu_eager_push_task);
+	int ret_val = -1;
+	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	ret_val = _starpu_fifo_push_task(node->data, task);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+	node->available(node);
+	return ret_val;
+}
+
+static struct starpu_task * _starpu_eager_pop_task(struct _starpu_sched_node *node,
+						   unsigned sched_ctx_id)
+{
+	STARPU_ASSERT(node->pop_task == _starpu_eager_pop_task);
+
+	int workerid = starpu_worker_get_id();
+	
+	/* Tell helgrind that it's fine to check for empty fifo without actual
+	 * mutex (it's just a pointer) */
+	/* block until some event happens */
+	if (_starpu_fifo_empty(node->data))
+	{
+		VALGRIND_HG_MUTEX_UNLOCK_PRE(&node->mutex);
+		VALGRIND_HG_MUTEX_UNLOCK_POST(&node->mutex);
+		return NULL;
+	}
+	VALGRIND_HG_MUTEX_UNLOCK_PRE(&node->mutex);
+	VALGRIND_HG_MUTEX_UNLOCK_POST(&node->mutex);
+
+	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	struct starpu_task* task = _starpu_fifo_pop_task(node->data, workerid);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+	if(!task )
+	{
+		struct _starpu_sched_node * father = node->fathers[sched_ctx_id];
+		if(father)
+			task = father->pop_task(father, sched_ctx_id);
+	}
+	return task;
+}
+
+void _starpu_destroy_eager(struct _starpu_sched_node * node)
+{
+	_starpu_destroy_fifo(node->data);
+	_starpu_sched_node_destroy(node);
+}
+
+
+struct _starpu_sched_node * _starpu_sched_node_eager_create(void)
+{
+	struct _starpu_sched_node * node = _starpu_sched_node_create();
+	node->data = _starpu_create_fifo();
+	node->push_task = _starpu_eager_push_task;
+	node->pop_task = _starpu_eager_pop_task;
+	node->childs = NULL;
+	node->destroy_node = _starpu_destroy_eager;
+	return node;
+}
+
+
+
+static void initialize_eager_center_policy(unsigned sched_ctx_id)
+{
+	starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
+	struct _starpu_sched_tree *data = malloc(sizeof(struct _starpu_sched_tree));
+	STARPU_PTHREAD_MUTEX_INIT(&data->mutex,NULL);
+ 	data->root = _starpu_sched_node_eager_create();
+	
+	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
+}
+
+static void deinitialize_eager_center_policy(unsigned sched_ctx_id)
+{
+	struct _starpu_sched_tree *tree = (struct _starpu_sched_tree*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	_starpu_tree_destroy(tree, sched_ctx_id);
+	starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
+}
+
+static void add_worker_eager(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
+{
+	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	unsigned i;
+	for(i = 0; i < nworkers; i++)
+		_starpu_sched_node_add_child(t->root,
+					     _starpu_sched_node_worker_get(workerids[i]),
+					     sched_ctx_id);
+}
+
+static void remove_worker_eager(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
+{
+	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	unsigned i;
+	for(i = 0; i < nworkers; i++)
+		_starpu_sched_node_remove_child(t->root,
+						_starpu_sched_node_worker_get(workerids[i]),
+						sched_ctx_id);
+}
+
+
+
+struct starpu_sched_policy _starpu_sched_tree_eager_policy =
+{
+	.init_sched = initialize_eager_center_policy,
+	.deinit_sched = deinitialize_eager_center_policy,
+	.add_workers = add_worker_eager,
+	.remove_workers = remove_worker_eager,
+	.push_task = _starpu_tree_push_task,
+	.pop_task = _starpu_tree_pop_task,
+	.pre_exec_hook = NULL,
+	.post_exec_hook = NULL,
+	.pop_every_task = NULL,//pop_every_task_eager_policy,
+	.policy_name = "tree",
+	.policy_description = "test tree policy"
+};

+ 33 - 0
src/sched_policies/node_fifo.c

@@ -0,0 +1,33 @@
+#include <node_sched.h>
+#include "fifo_queues.h"
+
+static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	int ret = _starpu_push_sorted_task(node->data, task);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+	return ret;
+}
+
+static struct starpu_task *  pop_task(struct _starpu_sched_node * node, unsigned sched_ctx_id)
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	struct starpu_task * task  = _starpu_pop(node->data, starpu_get_worker_id());
+	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+	if(task)
+		return task;
+	struct _starpu_sched_node * father = node->father[sched_ctx_id];
+	if(father)
+		return father->pop_task(father,sched_ctx_id);
+	return NULL;
+}
+
+
+struct _starpu_sched_node * _starpu_sched_node_fifo_create(void)
+{
+	struct _starpu_sched_node * node = _starpu_sched_node_create();
+	node->data = _starpu_create_fifo();
+	node->push_task = push_task;
+	node->pop_task = pop_task;
+	return node;
+}

+ 163 - 0
src/sched_policies/node_random.c

@@ -0,0 +1,163 @@
+#include <core/workers.h>
+#include "node_sched.h"
+
+struct _starpu_random_data
+{
+	double * relative_speedup;
+};
+
+
+static double compute_relative_speedup(struct _starpu_sched_node * node)
+{
+	if(_starpu_sched_node_is_worker(node))
+	{
+		int id = _starpu_sched_node_worker_get_workerid(node);
+		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(id);
+		return starpu_worker_get_relative_speedup(perf_arch);
+	}
+	double sum = 0.0;
+	int i;
+	for(i = 0; i < node->nchilds; i++)
+		sum += compute_relative_speedup(node->childs[i]);
+	return sum;
+}
+
+static void update_relative_childs_speedup(struct _starpu_sched_node * node,int former_nchilds STARPU_ATTRIBUTE_UNUSED)
+{
+	struct _starpu_random_data * rd = node->data;
+	rd->relative_speedup = realloc(rd->relative_speedup,sizeof(double) * node->nchilds);
+	int i;
+	for(i = 0; i < node->nchilds; i++)
+		rd->relative_speedup[i] = compute_relative_speedup(node->childs[i]);
+}
+
+static void add_child(struct _starpu_sched_node *node,
+		  struct _starpu_sched_node *child,
+		  unsigned sched_ctx_id)
+{
+	_starpu_sched_node_add_child(node, child, sched_ctx_id);
+	update_relative_childs_speedup(node);
+}
+static void remove_child(struct _starpu_sched_node *node,
+		     struct _starpu_sched_node *child,
+		     unsigned sched_ctx_id)
+{
+	_starpu_sched_node_remove_child(node, child, sched_ctx_id);
+	update_relative_childs_speedup(node);
+}
+
+
+static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
+{
+	struct _starpu_random_data * rd = node->data;
+	int indexes_nodes[node->nchilds];
+	int size=0,i;
+	double alpha_sum = 0.0;
+	for(i = 0; i < node->nchilds ; i++)
+	{
+		if(_starpu_sched_node_can_execute_task(node->childs[i],task))
+		{
+			indexes_nodes[size++] = i;
+			alpha_sum += rd->relative_speedup[i];
+		}
+	}
+
+	double random = starpu_drand48()*alpha_sum;
+	double alpha = 0.0;
+	struct _starpu_sched_node * select  = NULL;
+	
+	for(i = 0; i < size ; i++)
+	{
+		int index = indexes_nodes[i];
+		if(alpha + rd->relative_speedup[index] >= random)
+		{	
+			select = node->childs[index];
+			break;
+		}
+		alpha += rd->relative_speedup[index];
+	}
+	STARPU_ASSERT(select != NULL);
+	int ret_val = select->push_task(select,task);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+	node->available(node);
+	return ret_val;
+}
+
+static void destroy_random_node(struct _starpu_sched_node * node)
+{
+	struct _starpu_random_data * rd = node->data;
+	free(rd->relative_speedup);
+	free(rd);
+	_starpu_sched_node_destroy(node);
+}
+
+
+struct _starpu_sched_node * _starpu_sched_node_random_create(void)
+{
+	struct _starpu_sched_node * node = _starpu_sched_node_create();
+	struct _starpu_random_data * rd = malloc(sizeof(struct _starpu_random_data));
+	rd->relative_speedup = NULL;
+	node->data = rd;
+	node->update_nchilds = update_relative_childs_speedup;
+	node->destroy_node = destroy_random_node;
+	node->push_task = push_task;
+	node->add_child = add_child;
+	node->remove_child = remove_child;
+	starpu_srand48(time(NULL));
+	return node;
+}
+
+static void initialize_random_center_policy(unsigned sched_ctx_id)
+{
+	starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
+	struct _starpu_sched_tree *data = malloc(sizeof(struct _starpu_sched_tree));
+	STARPU_PTHREAD_MUTEX_INIT(&data->mutex,NULL);
+ 	data->root = _starpu_sched_node_random_create();
+	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
+}
+static void deinitialize_random_center_policy(unsigned sched_ctx_id)
+{
+	struct _starpu_sched_tree *tree = (struct _starpu_sched_tree*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	_starpu_tree_destroy(tree, sched_ctx_id);
+	starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
+}
+
+
+static void add_worker_random(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
+{
+	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	unsigned i;
+	for(i = 0; i < nworkers; i++)
+		_starpu_sched_node_add_child(t->root,
+					     _starpu_sched_node_worker_get(workerids[i]),
+					     sched_ctx_id);
+	update_relative_childs_speedup(t->root,42 /*unused*/);
+	_starpu_tree_update_after_modification(t);
+}
+
+static void remove_worker_random(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
+{
+	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	unsigned i;
+	for(i = 0; i < nworkers; i++)
+		_starpu_sched_node_remove_child(t->root,
+						_starpu_sched_node_worker_get(workerids[i]),
+						sched_ctx_id);
+	update_relative_childs_speedup(t->root,42 /*unused*/);
+	_starpu_tree_update_after_modification(t);
+}
+
+struct starpu_sched_policy _starpu_sched_tree_random_policy =
+{
+	.init_sched = initialize_random_center_policy,
+	.deinit_sched = deinitialize_random_center_policy,
+	.add_workers = add_worker_random,
+	.remove_workers = remove_worker_random,
+	.push_task = _starpu_tree_push_task,
+	.pop_task = _starpu_tree_pop_task,
+	.pre_exec_hook = NULL,
+	.post_exec_hook = NULL,
+	.pop_every_task = NULL,
+	.policy_name = "tree-random",
+	.policy_description = "random tree policy"
+};

+ 216 - 0
src/sched_policies/node_sched.c

@@ -0,0 +1,216 @@
+#include "node_sched.h"
+#include <core/jobs.h>
+#include <core/workers.h>
+
+static void available(struct _starpu_sched_node * node)
+{
+	int i;
+	for(i = 0; i < node->nchilds; i++)
+		node->available(node->childs[i]);
+}
+static struct starpu_task * pop_task_null(struct _starpu_sched_node * node STARPU_ATTRIBUTE_UNUSED, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
+{
+	return NULL;
+}
+
+struct _starpu_sched_node * _starpu_sched_node_create(void)
+{
+	struct _starpu_sched_node * node = malloc(sizeof(*node));
+	memset(node,0,sizeof(*node));
+	STARPU_PTHREAD_MUTEX_INIT(&node->mutex,NULL);
+	node->available = available;
+	node->pop_task = pop_task_null;
+	node->destroy_node = _starpu_sched_node_destroy;
+	node->add_child = _starpu_sched_node_add_child;
+node->remove_child = _starpu_sched_node_remove_child;
+	
+	return node;
+}
+void _starpu_sched_node_destroy(struct _starpu_sched_node *node)
+{
+	int i,j;
+	for(i = 0; i < node->nchilds; i++)
+	{
+		struct _starpu_sched_node * child = node->childs[i];
+		for(j = 0; j < STARPU_NMAX_SCHED_CTXS; j++)
+			if(child->fathers[i] == node)
+				child->fathers[i] = NULL;
+		
+	}
+	free(node->childs);
+	free(node);
+}
+
+void _starpu_sched_node_set_father(struct _starpu_sched_node *node,
+				   struct _starpu_sched_node *father_node,
+				   unsigned sched_ctx_id)
+{
+	STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
+	node->fathers[sched_ctx_id] = father_node;
+}
+
+struct starpu_task * pop_task(unsigned sched_ctx_id)
+{
+	//struct _starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	int workerid = starpu_worker_get_id();
+	struct _starpu_sched_node * wn = _starpu_sched_node_worker_get(workerid);
+	return wn->pop_task(wn, sched_ctx_id);
+}
+
+int push_task(struct starpu_task * task)
+{
+	unsigned sched_ctx_id = task->sched_ctx;
+	struct _starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	return t->root->push_task(t->root, task);
+}
+
+void _starpu_node_destroy_rec(struct _starpu_sched_node * node, unsigned sched_ctx_id)
+{
+	struct _starpu_sched_node ** stack = NULL;
+	int top = -1;
+#define PUSH(n) do{							\
+		stack = realloc(stack, sizeof(*stack) * (top + 2));	\
+		stack[++top] = n;}while(0)
+#define POP() stack[top--]
+#define EMPTY() (top == -1)
+//we want to delete all subtrees exept if a pointer in fathers point in an other tree
+//ie an other context
+
+	node->fathers[sched_ctx_id] = NULL;
+	int shared = 0;
+	{
+		int i;
+		for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
+			if(node->fathers[i] != NULL)
+				shared = 1;
+	}
+	if(!shared)
+		PUSH(node);
+	while(!EMPTY())
+	{
+		struct _starpu_sched_node * n = POP();
+		int i;
+		for(i = 0; i < n->nchilds; i++)
+		{
+			struct _starpu_sched_node * child = n->childs[i];
+			int j;
+			shared = 0;
+			STARPU_ASSERT(child->fathers[sched_ctx_id] == n);
+			child->fathers[sched_ctx_id] = NULL;
+			for(j = 0; j < STARPU_NMAX_SCHED_CTXS; j++)
+			{
+				if(child->fathers[j] != NULL)//child is shared
+					shared = 1;
+			}
+			if(!shared)//if not shared we want to destroy it and his childs
+				PUSH(child);
+		}
+		n->destroy_node(n);
+	}
+	free(stack);
+}
+void _starpu_tree_destroy(struct _starpu_sched_tree * tree, unsigned sched_ctx_id)
+{
+	_starpu_node_destroy_rec(tree->root, sched_ctx_id);
+	STARPU_PTHREAD_MUTEX_DESTROY(&tree->mutex);
+	free(tree);
+}
+void _starpu_sched_node_add_child(struct _starpu_sched_node* node, struct _starpu_sched_node * child,unsigned sched_ctx_id)
+{
+	STARPU_ASSERT(!_starpu_sched_node_is_worker(node));
+	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	node->childs = realloc(node->childs, sizeof(struct _starpu_sched_node *) * (node->nchilds + 1));
+	node->childs[node->nchilds] = child;
+	child->fathers[sched_ctx_id] = node;
+	node->nchilds++;
+	if(node->update_nchilds)
+		node->update_nchilds(node, node->nchilds - 1);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+}
+void _starpu_sched_node_remove_child(struct _starpu_sched_node * node, struct _starpu_sched_node * child,unsigned sched_ctx_id)
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	int pos;
+	for(pos = 0; pos < node->nchilds; pos++)
+		if(node->childs[pos] == child)
+			break;
+	node->childs[pos] = node->childs[--node->nchilds];
+	STARPU_ASSERT(child->fathers[sched_ctx_id] == node);
+	child->fathers[sched_ctx_id] = NULL;
+	if(node->update_nchilds)
+		node->update_nchilds(node, node->nchilds + 1);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+}
+
+
+int _starpu_tree_push_task(struct starpu_task * task)
+{
+	unsigned sched_ctx_id = task->sched_ctx;
+	struct _starpu_sched_tree *tree = starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	STARPU_PTHREAD_MUTEX_LOCK(&tree->mutex);
+	int ret_val = tree->root->push_task(tree->root,task); 
+	starpu_push_task_end(task);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&tree->mutex);
+	return ret_val;
+}
+struct starpu_task * _starpu_tree_pop_task(unsigned sched_ctx_id)
+{
+	int workerid = starpu_worker_get_id();
+	struct _starpu_sched_node * node = _starpu_sched_node_worker_get(workerid);
+	return node->pop_task(node, sched_ctx_id);
+}
+
+
+
+int _starpu_sched_node_can_execute_task(struct _starpu_sched_node * node, struct starpu_task * task)
+{
+	unsigned nimpl;
+	int worker;
+	STARPU_ASSERT(task);
+
+	for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
+		for(worker = 0; worker < node->nworkers; worker++)
+			if (starpu_worker_can_execute_task(worker, task, nimpl))
+				return 1;
+	return 0;
+}
+
+
+
+static int in_tab(int elem, int * tab, int size)
+{
+	for(size--;size >= 0; size--)
+		if(tab[size] == elem)
+			return 1;
+	return 0;
+}
+
+static void _update_workerids_after_tree_modification(struct _starpu_sched_node * node)
+{
+	if(_starpu_sched_node_is_worker(node))
+	{
+		node->nworkers = 1;
+		node->workerids[0] =  _starpu_sched_node_worker_get_workerid(node);
+		return;
+	}
+	int i;
+	node->nworkers = 0;
+	for(i = 0; i < node->nchilds; i++)
+	{
+		struct _starpu_sched_node * child = node->childs[i];
+		_update_workerids_after_tree_modification(child);
+		int j;
+		for(j = 0; j < child->nworkers; j++)
+		{
+			int id = child->workerids[j];
+			if(!in_tab(id, node->workerids, node->nworkers))
+				node->workerids[node->nworkers++] = id;
+		}
+	}
+}
+
+
+void _starpu_tree_update_after_modification(struct _starpu_sched_tree * tree)
+{
+	_update_workerids_after_tree_modification(tree->root);
+}

+ 97 - 0
src/sched_policies/node_sched.h

@@ -0,0 +1,97 @@
+#ifndef __SCHED_NODE_H__
+#define __SCHED_NODE_H__
+#include <starpu.h>
+
+struct _starpu_sched_node
+{
+	int (*push_task)(struct _starpu_sched_node *, struct starpu_task *);
+	struct starpu_task * (*pop_task)(struct _starpu_sched_node *, unsigned sched_ctx_id);
+	void (*available)(struct _starpu_sched_node *);
+
+	void * data;
+
+	int nchilds;
+	struct _starpu_sched_node ** childs;
+
+	starpu_pthread_mutex_t mutex;
+
+	//the list of workers in the node's subtree
+	int workerids[STARPU_NMAXWORKERS];
+	int nworkers;
+
+	/* may be shared by several contexts
+	 * so we need several fathers
+	 */
+	struct _starpu_sched_node * fathers[STARPU_NMAX_SCHED_CTXS];
+
+
+	void (*add_child)(struct _starpu_sched_node *node,
+			  struct _starpu_sched_node *child,
+			  unsigned sched_ctx_id);
+	void (*remove_child)(struct _starpu_sched_node *node,
+			     struct _starpu_sched_node *child,
+			     unsigned sched_ctx_id);
+	/* this function is called to free node (it must call _starpu_sched_node_destroy(node));
+	*/
+	void (*destroy_node)(struct _starpu_sched_node *);
+};
+
+
+struct _starpu_sched_tree
+{
+	struct _starpu_sched_node * root;
+	starpu_pthread_mutex_t mutex;
+};
+
+
+/* allocate and initalise node field with defaults values :
+ *  .pop_task return NULL
+ *  .available make a recursive call on childrens
+ *  .destroy_node  call _starpu_sched_node_destroy
+ *  .update_nchilds a function that does nothing
+ *  .{add,remove}_child functions that simply add/remove the child and update the .fathers field of child
+ */
+struct _starpu_sched_node * _starpu_sched_node_create(void);
+
+/* free memory allocated by _starpu_sched_node_create, it does not call node->destroy_node(node)*/
+void _starpu_sched_node_destroy(struct _starpu_sched_node * node);
+
+void _starpu_sched_node_set_father(struct _starpu_sched_node *node, struct _starpu_sched_node *father_node, unsigned sched_ctx_id);
+
+/* those two function call node->update_nchilds after the child was added or removed */
+void _starpu_sched_node_add_child(struct _starpu_sched_node* node, struct _starpu_sched_node * child, unsigned sched_ctx_id);
+void _starpu_sched_node_remove_child(struct _starpu_sched_node * node, struct _starpu_sched_node * child, unsigned sched_ctx_id);
+
+
+int _starpu_sched_node_can_execute_task(struct _starpu_sched_node * node, struct starpu_task * task);
+
+
+//no public create function for workers because we dont want to have several node_worker for a single workerid
+struct _starpu_sched_node * _starpu_sched_node_worker_get(int workerid);
+void _starpu_sched_node_worker_destroy(struct _starpu_sched_node *);
+
+/*this function assume that workers are the only leafs */
+int _starpu_sched_node_is_worker(struct _starpu_sched_node * node);
+int _starpu_sched_node_worker_get_workerid(struct _starpu_sched_node * worker_node);
+
+//struct _starpu_sched_node * _starpu_sched_node_work_stealing_create(void);
+struct _starpu_sched_node * _starpu_sched_node_random_create(void);
+
+struct _starpu_sched_node * _starpu_sched_node_eager_create(void)
+
+
+void _starpu_tree_destroy(struct _starpu_sched_tree * tree, unsigned sched_ctx_id);
+
+/* destroy node and all his child
+ * except if they are shared between several contexts
+ */
+void _starpu_node_destroy_rec(struct _starpu_sched_node * node, unsigned sched_ctx_id);
+
+int _starpu_tree_push_task(struct starpu_task * task);
+struct starpu_task * _starpu_tree_pop_task(unsigned sched_ctx_id);
+void _starpu_tree_update_after_modification(struct _starpu_sched_tree * tree);
+
+
+extern struct starpu_sched_policy _starpu_sched_tree_eager_policy;
+extern struct starpu_sched_policy _starpu_sched_tree_random_policy;
+#endif

+ 148 - 0
src/sched_policies/node_work_stealing.c

@@ -0,0 +1,148 @@
+#include "node_sched.h"
+
+struct _starpu_work_stealing_data
+{
+	/* keep track of the work performed from the beginning of the algorithm to make
+	 * better decisions about which queue to select when stealing or deferring work
+	 */
+	
+	unsigned performed_total;
+	unsigned last_pop_worker;
+	unsigned last_push_worker;
+};
+//
+///* little dirty hack here, fifo_nodes under the workstealing node need to know wich child they are in order to push task on themselfs
+// */
+//struct _starpu_fifo_ws_data {
+//	struct _starpu_fifo_taskq *fifo;
+//	int rank;
+//};
+//
+//static void destroy_fifo_ws(struct _starpu_sched_node * node)
+//{
+//	struct _starpu_fifo_ws_data * fwsd = node->data;
+//	_starpu_destroy_fifo(fwsd->fifo);
+//	free(fwsd);
+//	_starpu_sched_node_destroy(node);
+//}
+//
+//static int fifo_ws_push_task(struct _starpu_sched_node * node,
+//			     struct starpu_task * task)
+//{
+//	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+//	struct _starpu_fifo_ws_data * fwsd = node->data;
+//	int ret_val =  _starpu_push_sorted_task(node->data, task);
+//	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+//	return ret_val;
+//}
+//
+//static starpu_task * fifo_ws_pop_task(struct _starpu_sched_node * node,
+//				      unsigned sched_ctx_id)
+//{
+//	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+//	int ret_val =  _starpu_push_sorted_task(node->data, task);
+//	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+//	return ret_val;
+//}
+//
+
+
+//this function is special, when a worker call it, we want to push the task in his fifo
+//because he deserve it.
+int _starpu_ws_push_task(struct starpu_task *task)
+{
+	int workerid = starpu_get_worker_id();
+	if(workerid == -1)
+		return _starpu_tree_push_task(task);
+	unsigned sched_ctx_id = task->sched_ctx;
+	struct _starpu_sched_node * node =_starpu_sched_node_worker_get(workerid);
+	while(node->fathers[sched_ctx_id] != NULL)
+	{
+		node = node->fathers[sched_ctx_id];
+		if(is_my_fifo_node(node))
+		{
+			STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+			int ret_val =  _starpu_push_sorted_task(node->data, task);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+			return ret_val;
+		}
+	}
+	//there were a problem here, dont know what to do
+	return _starpu_tree_push_task(task);
+
+}
+static struct _starpu_sched_node * fifo_ws_create(void)
+{
+	struct _starpu_sched_node * node = _starpu_sched_node_create();
+	struct _starpu_fifo_ws_data * fwsd = malloc(sizeof(struct _starpu_fifo_ws_data));
+	fwsd->fifo = _starpu_create_fifo();
+	node->data = fwsd;
+	return node;
+}
+
+
+static void add_child(struct _starpu_sched_node *node,
+		      struct _starpu_sched_node *child,
+		      unsigned sched_ctx_id)
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	int i;
+	for(i = 0; i < node->nchilds; i++){
+		STARPU_ASSERT(node->childs[i] != node);
+		STARPU_ASSERT(node->childs[i] != NULL);
+	}
+	node->childs = realloc(node->childs,
+			       sizeof(struct _starpu_sched_node*)
+			       * (node->nchilds + 1));
+	struct _starpu_sched_node * fifo_node = _starpu_sched_node_fifo_create();
+	_starpu_sched_node_set_father(fifo_node, node, sched_ctx_id);
+	node->childs[node->nchilds] = fifo_node;
+	node->nchilds++;
+	
+	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+
+}
+static void remove_child(struct _starpu_sched_node *node,
+			 struct _starpu_sched_node *child,
+			 unsigned sched_ctx_id)
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	int pos;
+	for(pos = 0; pos < node->nchilds; pos++)
+		if(*node->childs[pos]->childs == child)
+			break;
+	STARPU_ASSERT(pos != node->nchilds);
+	struct _starpu_sched_node * fifo_node = node->childs[pos];
+	node->childs[pos] = node->childs[--node->nchilds];
+	STARPU_ASSERT(fifo_node->fathers[sched_ctx_id] == node;
+	fifo_node->fathers[sched_ctx_id] = NULL;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+}
+
+
+
+struct _starpu_sched_node * _starpu_sched_node_work_stealing_create(void)
+{
+	struct _starpu_sched_node * node = _starpu_sched_node_create();
+	struct _starpu_work_stealing_data * wsd = malloc(sizeof(*wsd));
+	wsd->performed_total = 0;
+	wsd->last_pop_worker = 0;
+	wsd->last_push_worker = 0;
+	return node;
+}
+
+static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
+{
+	struct _starpu_work_stealing_data * wsd = node->data;
+	int ret = -1;
+	int start = wsd->last_pop_worker;
+	
+	int i;
+	for(i = start + 1; i != start; (i+1)%node->nchilds)
+	{
+		if(!childs[i]->fifo)
+			continue;
+		ret = _starpu_fifo_push_sorted_task(childs[i]->fifo, task);
+	}
+	
+}

+ 134 - 0
src/sched_policies/node_worker.c

@@ -0,0 +1,134 @@
+#include "node_sched.h"
+#include <core/workers.h>
+
+static struct _starpu_sched_node * _worker_nodes[STARPU_NMAXWORKERS];
+
+static struct _starpu_sched_node  * _starpu_sched_node_worker_create(int workerid);
+struct _starpu_sched_node * _starpu_sched_node_worker_get(int workerid)
+{
+	STARPU_ASSERT(workerid >= 0 && workerid < STARPU_NMAXWORKERS);
+	if(_worker_nodes[workerid])
+		return _worker_nodes[workerid];
+	else
+		return _worker_nodes[workerid] = _starpu_sched_node_worker_create(workerid);
+}
+
+
+
+int _starpu_sched_node_worker_push_task(struct _starpu_sched_node * node, struct starpu_task *task)
+{
+	
+	return _starpu_push_local_task(node->data, task, task->priority);
+
+/*	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	int ret_val = _starpu_fifo_push_sorted_task(node->fifo, task);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+	node->available(node);
+	return ret_val;
+*/
+}
+
+struct starpu_task * _starpu_sched_node_worker_pop_task(struct _starpu_sched_node *node,unsigned sched_ctx_id)
+{
+/*	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	struct starpu_task * task = _starpu_fifo_pop_local_task(node->fifo);
+	if(task)
+	{      
+		STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+		return task;
+	}
+*/	struct _starpu_sched_node *father = node->fathers[sched_ctx_id];
+//	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+	if(father == NULL)
+		return NULL;
+	else
+		return father->pop_task(father,sched_ctx_id);
+}
+void _starpu_sched_node_worker_destroy(struct _starpu_sched_node *node)
+{
+	struct _starpu_worker * worker = node->data;
+	unsigned id = worker->workerid;
+	assert(_worker_nodes[id] == node);
+	int i;
+	for(i = 0; i < STARPU_NMAX_SCHED_CTXS ; i++)
+		if(node->fathers[i] != NULL)
+			return;//this node is shared between several contexts
+//	_starpu_destroy_fifo(node->fifo);
+	_starpu_sched_node_destroy(node);
+	_worker_nodes[id] = NULL;
+}
+
+static void available(struct _starpu_sched_node * worker_node)
+{
+	struct _starpu_worker * w = worker_node->data;
+	starpu_pthread_mutex_t *sched_mutex = &w->sched_mutex;
+	starpu_pthread_cond_t *sched_cond = &w->sched_cond;
+
+	STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+	STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+}
+
+
+static struct _starpu_sched_node  * _starpu_sched_node_worker_create(int workerid)
+{
+	STARPU_ASSERT(workerid >= 0 && workerid <  (int) starpu_worker_get_count());
+
+	if(_worker_nodes[workerid])
+		return _worker_nodes[workerid];
+
+	struct _starpu_worker * worker = _starpu_get_worker_struct(workerid);
+	struct _starpu_sched_node * node = _starpu_sched_node_create();
+	node->data = worker;
+	//node->fifo = _starpu_create_fifo(),
+	node->push_task = _starpu_sched_node_worker_push_task;
+	node->pop_task = _starpu_sched_node_worker_pop_task;
+	node->destroy_node = _starpu_sched_node_worker_destroy;
+	node->available = available;
+	node->workerids[0] = workerid;
+	node->nworkers = 1;
+	_worker_nodes[workerid] = node;
+	return node;
+}
+
+int _starpu_sched_node_is_worker(struct _starpu_sched_node * node)
+{
+	int i;
+	for(i = 0; i < STARPU_NMAXWORKERS; i++)
+		if(_worker_nodes[i] == node)
+			return 1;
+	return 0;
+}
+
+#ifndef STARPU_NO_ASSERT
+static int _worker_consistant(struct _starpu_sched_node * node)
+{
+	int is_a_worker = 0;
+	int i;
+	for(i = 0; i<STARPU_NMAXWORKERS; i++)
+		if(_worker_nodes[i] == node)
+			is_a_worker = 1;
+	if(!is_a_worker)
+		return 0;
+	struct _starpu_worker * worker = node->data;
+	int id = worker->workerid;
+	int father = 1;
+	for(i = 0; i<STARPU_NMAX_SCHED_CTXS; i++)
+		if(node->fathers[i] != NULL)
+			return 1;
+		else
+			father = 0;
+	return  father
+		&& (_worker_nodes[id] == node)
+		&&  node->nchilds == 0;
+}
+#endif
+
+int _starpu_sched_node_worker_get_workerid(struct _starpu_sched_node * worker_node)
+{
+#ifndef STARPU_NO_ASSERT
+	assert(_worker_consistant(worker_node));
+#endif
+	struct _starpu_worker * worker = worker_node->data;
+	return worker->workerid;
+}