Prechádzať zdrojové kódy

changes rwlocks to spin locks.
its slower, and its seems dead-lock friendly.

Simon Archipoff 12 rokov pred
rodič
commit
f0ac818f6f

+ 7 - 2
src/sched_policies/node_eager.c

@@ -9,7 +9,7 @@ static void initialize_eager_center_policy(unsigned sched_ctx_id)
 {
 	starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
 	struct _starpu_sched_tree *data = malloc(sizeof(struct _starpu_sched_tree));
-	STARPU_PTHREAD_RWLOCK_INIT(&data->mutex,NULL);
+	_starpu_spin_init(&data->lock);
  	data->root = _starpu_sched_node_fifo_create();
 	
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
@@ -25,6 +25,7 @@ static void deinitialize_eager_center_policy(unsigned sched_ctx_id)
 static void add_worker_eager(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
 {
 	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
+//	_starpu_spin_lock(&t->lock);
 	unsigned i;
 	for(i = 0; i < nworkers; i++)
 	{
@@ -32,18 +33,22 @@ static void add_worker_eager(unsigned sched_ctx_id, int * workerids, unsigned nw
 		_starpu_sched_node_worker_get(workerids[i])->fathers[sched_ctx_id] = t->root;
 	}
 	_starpu_tree_update_after_modification(t);
+//	_starpu_spin_unlock(&t->lock);
 }
 
 static void remove_worker_eager(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
 {
 	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	unsigned i;
+
+//	_starpu_spin_lock(&t->lock);
+	unsigned i;	
 	for(i = 0; i < nworkers; i++)
 	{
 		t->root->remove_child(t->root, _starpu_sched_node_worker_get(workerids[i]), sched_ctx_id);
 		_starpu_sched_node_worker_get(workerids[i])->fathers[sched_ctx_id] = NULL;
 	}
 	_starpu_tree_update_after_modification(t);
+//	_starpu_spin_unlock(&t->lock);
 }
 
 

+ 6 - 6
src/sched_policies/node_fifo.c

@@ -18,13 +18,13 @@ static struct _starpu_task_execute_preds estimated_execute_preds(struct _starpu_
 	struct _starpu_fifo_taskq * fifo = node->data;
 //	printf("%p %f %f %f\n",fifo ,fifo->exp_start,fifo->exp_len,fifo->exp_end);
 
-	STARPU_PTHREAD_RWLOCK_RDLOCK(&node->mutex);
+	_starpu_spin_lock(&node->lock);
 	if(preds.state == PERF_MODEL)
 		preds.expected_finish_time = _starpu_compute_expected_time(fifo->exp_start = starpu_timing_now(),
 									   preds.expected_finish_time + fifo->exp_end,
 									   preds.expected_length + fifo->exp_len,
 									   preds.expected_transfer_length);
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
+	_starpu_spin_unlock(&node->lock);
 	return preds;
 }
 
@@ -50,7 +50,7 @@ static double estimated_load(struct _starpu_sched_node * node)
 
 static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
 {
-	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
+	_starpu_spin_lock(&node->lock);
 	STARPU_ASSERT(node->nworkers > 0);
 	struct _starpu_fifo_taskq * fifo = node->data;
 	STARPU_ASSERT(!isnan(fifo->exp_end));
@@ -66,7 +66,7 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 	STARPU_ASSERT(!isnan(fifo->exp_len));
 	STARPU_ASSERT(!isnan(fifo->exp_start));
 
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
+	_starpu_spin_unlock(&node->lock);
 	node->available(node);
 	return ret;
 }
@@ -74,7 +74,7 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned sched_ctx_id)
 {
 	struct _starpu_fifo_taskq * fifo = node->data;
-	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
+	_starpu_spin_lock(&node->lock);
 	STARPU_ASSERT(!isnan(fifo->exp_end));
 	STARPU_ASSERT(!isnan(fifo->exp_len));
 	STARPU_ASSERT(!isnan(fifo->exp_start));
@@ -91,7 +91,7 @@ static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned
 	STARPU_ASSERT(!isnan(fifo->exp_len));
 	STARPU_ASSERT(!isnan(fifo->exp_start));
 
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
+	_starpu_spin_unlock(&node->lock);
 	if(task)
 		return task;
 	struct _starpu_sched_node * father = node->fathers[sched_ctx_id];

+ 5 - 5
src/sched_policies/node_heft.c

@@ -53,7 +53,7 @@ static double compute_fitness_perf_model(struct _starpu_sched_node * child STARP
 
 static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
 {
- 	STARPU_PTHREAD_RWLOCK_RDLOCK(&node->mutex);
+ 	_starpu_spin_lock(&node->lock);
 	struct _starpu_task_execute_preds preds[node->nchilds];
 	int i;
 	int calibrating = 0;
@@ -88,7 +88,7 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 	}
 	if(!can_execute)
 	{
-		STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
+		_starpu_spin_unlock(&node->lock);
 		return -ENODEV;
 	}
 	
@@ -97,7 +97,7 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 	if(!calibrating && !perf_model)
 	{
 		int ret = data->no_model_node->push_task(data->no_model_node, task);
-		STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
+		_starpu_spin_unlock(&node->lock);
 		return ret;
 	}
 
@@ -132,7 +132,7 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 	starpu_task_set_implementation(task, preds[index_best_fitness].impl);
 	task->predicted = preds[index_best_fitness].expected_length;
 	task->predicted_transfer = preds[index_best_fitness].expected_transfer_length;
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
+ 	_starpu_spin_unlock(&node->lock);	
 	return c->push_task(c, task);
 }
 /*
@@ -176,7 +176,7 @@ static void initialize_heft_center_policy(unsigned sched_ctx_id)
 {
 	starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
 	struct _starpu_sched_tree *data = malloc(sizeof(struct _starpu_sched_tree));
-	STARPU_PTHREAD_RWLOCK_INIT(&data->mutex,NULL);
+	_starpu_spin_init(&data->lock);
 	data->root = _starpu_sched_node_heft_create(1.0,1.0,1.0,1.0);
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 }

+ 5 - 5
src/sched_policies/node_random.c

@@ -116,7 +116,7 @@ static void initialize_random_center_policy(unsigned sched_ctx_id)
 {
 	starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
 	struct _starpu_sched_tree *data = malloc(sizeof(struct _starpu_sched_tree));
-	STARPU_PTHREAD_RWLOCK_INIT(&data->mutex,NULL);
+	_starpu_spin_init(&data->lock);
  	data->root = _starpu_sched_node_random_create();
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 }
@@ -131,8 +131,8 @@ static void deinitialize_random_center_policy(unsigned sched_ctx_id)
  static void add_worker_random(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
 {
 	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
-//	STARPU_PTHREAD_RWLOCK_WRLOCK(&t->mutex);
 		struct _starpu_sched_node * random_node = t->root;
+	_starpu_spin_lock(&t->lock);
 	unsigned i;
 	for(i = 0; i < nworkers; i++)
 	{
@@ -142,14 +142,14 @@ static void deinitialize_random_center_policy(unsigned sched_ctx_id)
 	}
 	_starpu_tree_update_after_modification(t);
 	update_relative_childs_speedup(random_node);
-//	STARPU_PTHREAD_RWLOCK_UNLOCK(&t->mutex);
+	_starpu_spin_unlock(&t->lock);
 }
 
 static void remove_worker_random(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
 {
 	struct _starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
-	STARPU_PTHREAD_RWLOCK_WRLOCK(&t->mutex);
+	_starpu_spin_lock(&t->lock);
 	struct _starpu_sched_node * random_node = t->root;
 	unsigned i;
 	for(i = 0; i < nworkers; i++)
@@ -160,7 +160,7 @@ static void remove_worker_random(unsigned sched_ctx_id, int * workerids, unsigne
 	}
 	_starpu_tree_update_after_modification(t);
 	update_relative_childs_speedup(t->root);
-//	STARPU_PTHREAD_RWLOCK_UNLOCK(&t->mutex);
+	_starpu_spin_unlock(&t->lock);
 }
 
 struct starpu_sched_policy _starpu_sched_tree_random_policy =

+ 15 - 9
src/sched_policies/node_sched.c

@@ -57,17 +57,23 @@ void _starpu_sched_node_set_father(struct _starpu_sched_node *node,
 
 struct starpu_task * pop_task(unsigned sched_ctx_id)
 {
-	//struct _starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	struct _starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	int workerid = starpu_worker_get_id();
 	struct _starpu_sched_node * wn = _starpu_sched_node_worker_get(workerid);
-	return wn->pop_task(wn, sched_ctx_id);
+	_starpu_spin_lock(&t->lock);
+	struct starpu_task * task = wn->pop_task(wn, sched_ctx_id);
+	_starpu_spin_unlock(&t->lock);
+	return task;
 }
 
 int push_task(struct starpu_task * task)
 {
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct _starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	return t->root->push_task(t->root, task);
+	_starpu_spin_lock(&t->lock);
+	int ret = t->root->push_task(t->root, task);
+	_starpu_spin_unlock(&t->lock);
+	return ret;
 }
 
 void _starpu_node_destroy_rec(struct _starpu_sched_node * node, unsigned sched_ctx_id)
@@ -118,7 +124,7 @@ void _starpu_node_destroy_rec(struct _starpu_sched_node * node, unsigned sched_c
 void _starpu_tree_destroy(struct _starpu_sched_tree * tree, unsigned sched_ctx_id)
 {
 	_starpu_node_destroy_rec(tree->root, sched_ctx_id);
-	STARPU_PTHREAD_RWLOCK_DESTROY(&tree->mutex);
+	_starpu_spin_destroy(&tree->lock);
 	free(tree);
 }
 void _starpu_sched_node_add_child(struct _starpu_sched_node* node, struct _starpu_sched_node * child,unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
@@ -151,10 +157,10 @@ int _starpu_tree_push_task(struct starpu_task * task)
 {
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct _starpu_sched_tree *tree = starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	STARPU_PTHREAD_RWLOCK_RDLOCK(&tree->mutex);
+//	_starpu_spin_lock(&tree->lock);
 	int ret_val = tree->root->push_task(tree->root,task); 
 //	starpu_push_task_end(task);
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&tree->mutex);
+//	_starpu_spin_unlock(&tree->lock);
 	return ret_val;
 }
 struct starpu_task * _starpu_tree_pop_task(unsigned sched_ctx_id)
@@ -162,9 +168,9 @@ struct starpu_task * _starpu_tree_pop_task(unsigned sched_ctx_id)
 	struct _starpu_sched_tree *tree = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	int workerid = starpu_worker_get_id();
 	struct _starpu_sched_node * node = _starpu_sched_node_worker_get(workerid);
-	STARPU_PTHREAD_RWLOCK_RDLOCK(&tree->mutex);
+//	_starpu_spin_lock(&tree->lock);
 	struct starpu_task * task = node->pop_task(node, sched_ctx_id);
-	STARPU_PTHREAD_RWLOCK_UNLOCK(&tree->mutex);
+//	_starpu_spin_unlock(&tree->lock);
 	return task;
 }
 /*
@@ -288,7 +294,7 @@ struct _starpu_sched_node * _starpu_sched_node_create(void)
 {
 	struct _starpu_sched_node * node = malloc(sizeof(*node));
 	memset(node,0,sizeof(*node));
-	STARPU_PTHREAD_RWLOCK_INIT(&node->mutex,NULL);
+	_starpu_spin_init(&node->lock);
 	node->available = available;
 	node->pop_task = pop_task_node;
 	node->estimated_load = estimated_load;

+ 3 - 3
src/sched_policies/node_sched.h

@@ -1,7 +1,7 @@
 #ifndef __SCHED_NODE_H__
 #define __SCHED_NODE_H__
 #include <starpu.h>
-
+#include <common/starpu_spinlock.h>
 struct _starpu_sched_node
 {
 	int (*push_task)(struct _starpu_sched_node *, struct starpu_task *);
@@ -18,7 +18,7 @@ struct _starpu_sched_node
 	int nchilds;
 	struct _starpu_sched_node ** childs;
 
-	starpu_pthread_rwlock_t mutex;
+	struct _starpu_spinlock lock;
 
 	//the list of workers in the node's subtree
 	int workerids[STARPU_NMAXWORKERS];
@@ -71,7 +71,7 @@ struct _starpu_sched_tree
 {
 	struct _starpu_sched_node * root;
 	//this lock is used to protect the scheduler during modifications of his structure
-	starpu_pthread_rwlock_t mutex;
+	struct _starpu_spinlock lock;
 };
 
 

+ 4 - 3
src/sched_policies/node_work_stealing.c

@@ -134,7 +134,7 @@ static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned
 
 static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
 {
-	STARPU_PTHREAD_RWLOCK_RDLOCK(&node->mutex);
+	_starpu_spin_lock(&node->lock);
 	struct _starpu_work_stealing_data * wsd = node->data;
 	int ret = -1;
 	int start = wsd->last_push_child;
@@ -150,6 +150,7 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 	}
 	wsd->last_push_child = (wsd->last_push_child + 1) % node->nchilds;
 	node->childs[i]->available(node->childs[i]);
+	_starpu_spin_unlock(&node->lock);
 	return ret;
 }
 
@@ -185,7 +186,7 @@ int _starpu_ws_push_task(struct starpu_task *task)
 			int ret = _starpu_fifo_push_sorted_task(wsd->fifos[i], task);
 			STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
 			
-			//we need to wake all other workers
+			//we need to wake all workers
 			int j;
 			for(j = 0; j < node->nchilds; j++)
 			{
@@ -288,7 +289,7 @@ static void initialize_ws_center_policy(unsigned sched_ctx_id)
 {
 	starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
 	struct _starpu_sched_tree *data = malloc(sizeof(struct _starpu_sched_tree));
-	STARPU_PTHREAD_RWLOCK_INIT(&data->mutex,NULL);
+	_starpu_spin_init(&data->lock);
  	data->root = _starpu_sched_node_work_stealing_create();
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 }