Browse Source

bugs fixs, change mutexes to rwlocks

Simon Archipoff 12 years ago
parent
commit
7adf474545

+ 0 - 1
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -278,7 +278,6 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	/* make sure someone coule execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
-
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[best_workerid];
 
 	starpu_pthread_mutex_t *sched_mutex;

+ 1 - 1
src/sched_policies/node_eager.c

@@ -9,7 +9,7 @@ static void initialize_eager_center_policy(unsigned sched_ctx_id)
 {
 	starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
 	struct _starpu_sched_tree *data = malloc(sizeof(struct _starpu_sched_tree));
-	STARPU_PTHREAD_MUTEX_INIT(&data->mutex,NULL);
+	STARPU_PTHREAD_RWLOCK_INIT(&data->mutex,NULL);
  	data->root = _starpu_sched_node_fifo_create();
 	
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);

+ 4 - 4
src/sched_policies/node_fifo.c

@@ -28,11 +28,11 @@ static double estimated_load(struct _starpu_sched_node * node)
 }
 static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
 {
-	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
 	struct _starpu_fifo_taskq * fifo = node->data;
 	int ret = _starpu_fifo_push_sorted_task(fifo, task);
 	fifo->exp_end += task->predicted;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 	node->available(node);
 	return ret;
 }
@@ -40,11 +40,11 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned sched_ctx_id)
 {
 	struct _starpu_fifo_taskq * fifo = node->data;
-STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
 	struct starpu_task * task  = _starpu_fifo_pop_task(fifo, starpu_worker_get_id());
 	if(task)
 		fifo->exp_start = starpu_timing_now() + task->predicted;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 	if(task)
 		return task;
 	struct _starpu_sched_node * father = node->fathers[sched_ctx_id];

+ 9 - 8
src/sched_policies/node_heft.c

@@ -1,4 +1,5 @@
 #include "node_sched.h"
+#include "fifo_queues.h"
 #include <starpu_perfmodel.h>
 #include <starpu_scheduler.h>
 #include <float.h>
@@ -44,7 +45,7 @@ static double compute_fitness_perf_model(struct _starpu_sched_node * child,
 
 static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
 {
-	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	STARPU_PTHREAD_RWLOCK_RDLOCK(&node->mutex);
 	struct _starpu_execute_pred preds[node->nchilds];
 	int i;
 	int calibrating = 0;
@@ -71,7 +72,7 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 	}
 	if(!can_execute)
 	{
-		STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+		STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 		return -ENODEV;
 	}
 	double (*fitness_fun)(struct _starpu_sched_node *,
@@ -101,7 +102,7 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 	starpu_task_set_implementation(task, preds[index_best_fitness].impl);
 	task->predicted = preds[index_best_fitness].expected_length;
 	task->predicted_transfer = c->estimated_transfer_length(c,task);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 	return c->push_task(c, task);
 }
 
@@ -111,7 +112,7 @@ static void add_child(struct _starpu_sched_node *node,
 		      struct _starpu_sched_node *child,
 		      unsigned sched_ctx_id)
 {
-	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
 	int i;
 	for(i = 0; i < node->nchilds; i++){
 		STARPU_ASSERT(node->childs[i] != node);
@@ -128,14 +129,14 @@ static void add_child(struct _starpu_sched_node *node,
 	node->childs[node->nchilds] = fifo_node;
 	node->nchilds++;
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 
 }
 static void remove_child(struct _starpu_sched_node *node,
 			 struct _starpu_sched_node *child,
 			 unsigned sched_ctx_id)
 {
-	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
 	int pos;
 	for(pos = 0; pos < node->nchilds; pos++)
 		if(*node->childs[pos]->childs == child)
@@ -145,7 +146,7 @@ static void remove_child(struct _starpu_sched_node *node,
 	node->childs[pos] = node->childs[--node->nchilds];
 	STARPU_ASSERT(fifo_node->fathers[sched_ctx_id] == node);
 	fifo_node->fathers[sched_ctx_id] = NULL;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 }
 
 
@@ -156,7 +157,7 @@ static void initialize_heft_center_policy(unsigned sched_ctx_id)
 {
 	starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
 	struct _starpu_sched_tree *data = malloc(sizeof(struct _starpu_sched_tree));
-	STARPU_PTHREAD_MUTEX_INIT(&data->mutex,NULL);
+	STARPU_PTHREAD_RWLOCK_INIT(&data->mutex,NULL);
 	data->root = _starpu_sched_node_heft_create(1,1,1,1);
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 }

+ 7 - 2
src/sched_policies/node_random.c

@@ -35,21 +35,26 @@ static void add_child(struct _starpu_sched_node *node,
 		  struct _starpu_sched_node *child,
 		  unsigned sched_ctx_id)
 {
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
 	_starpu_sched_node_add_child(node, child, sched_ctx_id);
 	update_relative_childs_speedup(node);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 }
 static void remove_child(struct _starpu_sched_node *node,
 		     struct _starpu_sched_node *child,
 		     unsigned sched_ctx_id)
 {
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
 	_starpu_sched_node_remove_child(node, child, sched_ctx_id);
 	update_relative_childs_speedup(node);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 }
 
 
 static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
 {
 	struct _starpu_random_data * rd = node->data;
+	STARPU_PTHREAD_RWLOCK_RDLOCK(&node->mutex);
 	int indexes_nodes[node->nchilds];
 	int size=0,i;
 	double alpha_sum = 0.0;
@@ -78,8 +83,8 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 	}
 	STARPU_ASSERT(select != NULL);
 	int ret_val = select->push_task(select,task);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
 	node->available(node);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 	return ret_val;
 }
 
@@ -110,7 +115,7 @@ static void initialize_random_center_policy(unsigned sched_ctx_id)
 {
 	starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
 	struct _starpu_sched_tree *data = malloc(sizeof(struct _starpu_sched_tree));
-	STARPU_PTHREAD_MUTEX_INIT(&data->mutex,NULL);
+	STARPU_PTHREAD_RWLOCK_INIT(&data->mutex,NULL);
  	data->root = _starpu_sched_node_random_create();
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 }

+ 9 - 8
src/sched_policies/node_sched.c

@@ -88,22 +88,22 @@ void _starpu_node_destroy_rec(struct _starpu_sched_node * node, unsigned sched_c
 void _starpu_tree_destroy(struct _starpu_sched_tree * tree, unsigned sched_ctx_id)
 {
 	_starpu_node_destroy_rec(tree->root, sched_ctx_id);
-	STARPU_PTHREAD_MUTEX_DESTROY(&tree->mutex);
+	STARPU_PTHREAD_RWLOCK_DESTROY(&tree->mutex);
 	free(tree);
 }
 void _starpu_sched_node_add_child(struct _starpu_sched_node* node, struct _starpu_sched_node * child,unsigned sched_ctx_id)
 {
 	STARPU_ASSERT(!_starpu_sched_node_is_worker(node));
-	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
 	node->childs = realloc(node->childs, sizeof(struct _starpu_sched_node *) * (node->nchilds + 1));
 	node->childs[node->nchilds] = child;
 	child->fathers[sched_ctx_id] = node;
 	node->nchilds++;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 }
 void _starpu_sched_node_remove_child(struct _starpu_sched_node * node, struct _starpu_sched_node * child,unsigned sched_ctx_id)
 {
-	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
 	int pos;
 	for(pos = 0; pos < node->nchilds; pos++)
 		if(node->childs[pos] == child)
@@ -111,7 +111,7 @@ void _starpu_sched_node_remove_child(struct _starpu_sched_node * node, struct _s
 	node->childs[pos] = node->childs[--node->nchilds];
 	STARPU_ASSERT(child->fathers[sched_ctx_id] == node);
 	child->fathers[sched_ctx_id] = NULL;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 }
 
 
@@ -119,10 +119,10 @@ int _starpu_tree_push_task(struct starpu_task * task)
 {
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct _starpu_sched_tree *tree = starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	STARPU_PTHREAD_MUTEX_LOCK(&tree->mutex);
+	STARPU_PTHREAD_RWLOCK_RDLOCK(&tree->mutex);
 	int ret_val = tree->root->push_task(tree->root,task); 
 //	starpu_push_task_end(task);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&tree->mutex);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&tree->mutex);
 	return ret_val;
 }
 struct starpu_task * _starpu_tree_pop_task(unsigned sched_ctx_id)
@@ -179,6 +179,7 @@ static struct _starpu_execute_pred estimated_execute_length(struct _starpu_sched
 		case PERF_MODEL:
 			nb++;
 			pred.expected_length += tmp.expected_length;
+			pred.state = PERF_MODEL;
 			break;
 		case CANNOT_EXECUTE:
 			break;
@@ -235,7 +236,7 @@ struct _starpu_sched_node * _starpu_sched_node_create(void)
 {
 	struct _starpu_sched_node * node = malloc(sizeof(*node));
 	memset(node,0,sizeof(*node));
-	STARPU_PTHREAD_MUTEX_INIT(&node->mutex,NULL);
+	STARPU_PTHREAD_RWLOCK_INIT(&node->mutex,NULL);
 	node->available = available;
 	node->pop_task = pop_task_node;
 	node->estimated_finish_time = estimated_finish_time;

+ 6 - 6
src/sched_policies/node_sched.h

@@ -8,7 +8,7 @@ struct _starpu_sched_node
 	struct starpu_task * (*pop_task)(struct _starpu_sched_node *,
 					 unsigned sched_ctx_id);
 	void (*available)(struct _starpu_sched_node *);
-	
+
 	/*this function only consider tasks that have a pref model, others does not count
 	 * note that pushing a task not necessarily increase estimated finish time
 	 */
@@ -31,7 +31,7 @@ struct _starpu_sched_node
 	int nchilds;
 	struct _starpu_sched_node ** childs;
 
-	starpu_pthread_mutex_t mutex;
+	starpu_pthread_rwlock_t mutex;
 
 	//the list of workers in the node's subtree
 	int workerids[STARPU_NMAXWORKERS];
@@ -46,8 +46,8 @@ struct _starpu_sched_node
 	 * so we need several fathers
 	 */
 	struct _starpu_sched_node * fathers[STARPU_NMAX_SCHED_CTXS];
-	
-	
+
+
 	void (*add_child)(struct _starpu_sched_node *node,
 			  struct _starpu_sched_node *child,
 			  unsigned sched_ctx_id);
@@ -70,7 +70,7 @@ struct _starpu_execute_pred {
 struct _starpu_sched_tree
 {
 	struct _starpu_sched_node * root;
-	starpu_pthread_mutex_t mutex;
+	starpu_pthread_rwlock_t mutex;
 };
 
 
@@ -134,7 +134,7 @@ struct starpu_task * _starpu_tree_pop_task(unsigned sched_ctx_id);
 
 //this function must be called after all modification of tree
 void _starpu_tree_update_after_modification(struct _starpu_sched_tree * tree);
-;
+
 
 
 #endif

+ 9 - 18
src/sched_policies/node_work_stealing.c

@@ -39,7 +39,7 @@ static int select_victim_round_robin(struct _starpu_sched_node *node)
 {
 	struct _starpu_work_stealing_data *ws = node->data;
 	unsigned i = ws->last_pop_child;
-	
+
 	
 /* If the worker's queue is empty, let's try
  * the next ones */
@@ -48,13 +48,11 @@ static int select_victim_round_robin(struct _starpu_sched_node *node)
 		unsigned ntasks;
 		struct _starpu_sched_node * child = node->childs[i];
 		struct _starpu_fifo_taskq * fifo = _starpu_sched_node_fifo_get_fifo(child);
-		//STARPU_PTHREAD_MUTEX_LOCK(&child->mutex);//do we need to wait ?
-		if(starpu_pthread_mutex_trylock(&child->mutex))//or not
-			continue;
+		STARPU_PTHREAD_RWLOCK_WRLOCK(&child->mutex);
 		ntasks = fifo->ntasks;
 		if (ntasks)
 			break;
-		STARPU_PTHREAD_MUTEX_UNLOCK(&child->mutex);
+		STARPU_PTHREAD_RWLOCK_UNLOCK(&child->mutex);
 		i = (i + 1) % node->nchilds;
 		if (i == ws->last_pop_child)
 		{
@@ -230,9 +228,7 @@ static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned
 	struct starpu_task * task = _starpu_fifo_pop_task(fifo,
 							  starpu_worker_get_id());
 	fifo->nprocessed--;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&child->mutex);
-	if(task)
-		starpu_push_task_end(task);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&child->mutex);
 	return task;
 }
 
@@ -240,6 +236,7 @@ static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned
 
 static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
 {
+	STARPU_PTHREAD_RWLOCK_RDLOCK(&node->mutex);
 	struct _starpu_work_stealing_data * wsd = node->data;
 	int ret = -1;
 	int start = wsd->last_push_child;
@@ -285,15 +282,15 @@ int _starpu_ws_push_task(struct starpu_task *task)
 		node = node->fathers[sched_ctx_id];
 		if(is_my_fifo_node(node,sched_ctx_id))
 		{
-			STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
+			STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
 			struct _starpu_fifo_taskq * fifo = node->data;
 			int ret_val =  _starpu_fifo_push_sorted_task(fifo, task);
-			STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
+			STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 			return ret_val;
 		}
 	}
 	//there were a problem here, dont know what to do
-	STARPU_ASSERT(1);
+	STARPU_ASSERT(0);
 	return _starpu_tree_push_task(task);
 }
 
@@ -302,7 +299,6 @@ static void add_child(struct _starpu_sched_node *node,
 		      struct _starpu_sched_node *child,
 		      unsigned sched_ctx_id)
 {
-	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
 	int i;
 	for(i = 0; i < node->nchilds; i++){
 		STARPU_ASSERT(node->childs[i] != node);
@@ -318,15 +314,12 @@ static void add_child(struct _starpu_sched_node *node,
 	_starpu_sched_node_set_father(fifo_node, node, sched_ctx_id);
 	node->childs[node->nchilds] = fifo_node;
 	node->nchilds++;
-	
-	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
 
 }
 static void remove_child(struct _starpu_sched_node *node,
 			 struct _starpu_sched_node *child,
 			 unsigned sched_ctx_id)
 {
-	STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
 	int pos;
 	for(pos = 0; pos < node->nchilds; pos++)
 		if(*node->childs[pos]->childs == child)
@@ -336,7 +329,6 @@ static void remove_child(struct _starpu_sched_node *node,
 	node->childs[pos] = node->childs[--node->nchilds];
 	STARPU_ASSERT(fifo_node->fathers[sched_ctx_id] == node);
 	fifo_node->fathers[sched_ctx_id] = NULL;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
 }
 
 
@@ -362,9 +354,8 @@ static void initialize_ws_center_policy(unsigned sched_ctx_id)
 {
 	starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
 	struct _starpu_sched_tree *data = malloc(sizeof(struct _starpu_sched_tree));
-	STARPU_PTHREAD_MUTEX_INIT(&data->mutex,NULL);
+	STARPU_PTHREAD_RWLOCK_INIT(&data->mutex,NULL);
  	data->root = _starpu_sched_node_work_stealing_create();
-	
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 }