Prechádzať zdrojové kódy

remove spinlock
now there is a rwlock that protect all scheduler, and mutexes where its needed (mainly fifos)

Simon Archipoff 12 rokov pred
rodič
commit
de2e23ca19

+ 6 - 6
src/sched_policies/node_eager.c

@@ -9,7 +9,7 @@ static void initialize_eager_center_policy(unsigned sched_ctx_id)
 {
 	starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
 	struct _starpu_sched_tree *data = malloc(sizeof(struct _starpu_sched_tree));
-	_starpu_spin_init(&data->lock);
+	STARPU_PTHREAD_RWLOCK_INIT(&data->lock,NULL);
  	data->root = _starpu_sched_node_fifo_create();
 	
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
@@ -25,7 +25,8 @@ static void deinitialize_eager_center_policy(unsigned sched_ctx_id)
 static void add_worker_eager(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
 {
 	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
-//	_starpu_spin_lock(&t->lock);
+
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&t->lock);
 	unsigned i;
 	for(i = 0; i < nworkers; i++)
 	{
@@ -33,14 +34,13 @@ static void add_worker_eager(unsigned sched_ctx_id, int * workerids, unsigned nw
 		_starpu_sched_node_worker_get(workerids[i])->fathers[sched_ctx_id] = t->root;
 	}
 	_starpu_tree_update_after_modification(t);
-//	_starpu_spin_unlock(&t->lock);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
 }
 
 static void remove_worker_eager(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
 {
 	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
-
-//	_starpu_spin_lock(&t->lock);
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&t->lock);
 	unsigned i;	
 	for(i = 0; i < nworkers; i++)
 	{
@@ -48,7 +48,7 @@ static void remove_worker_eager(unsigned sched_ctx_id, int * workerids, unsigned
 		_starpu_sched_node_worker_get(workerids[i])->fathers[sched_ctx_id] = NULL;
 	}
 	_starpu_tree_update_after_modification(t);
-//	_starpu_spin_unlock(&t->lock);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
 }
 
 

+ 29 - 22
src/sched_policies/node_fifo.c

@@ -3,10 +3,16 @@
 #include <starpu_scheduler.h>
 
 
+struct _starpu_fifo_data
+{
+	struct _starpu_fifo_taskq * fifo;
+	starpu_pthread_mutex_t mutex;
+};
+
+
 static struct _starpu_task_execute_preds estimated_execute_preds(struct _starpu_sched_node * node,
 								 struct starpu_task * task)
 {
-
 	if(node->nchilds == 0)
 	{
 		struct _starpu_task_execute_preds p = { CANNOT_EXECUTE };
@@ -15,16 +21,18 @@ static struct _starpu_task_execute_preds estimated_execute_preds(struct _starpu_
 	
 	struct _starpu_task_execute_preds preds = node->childs[0]->estimated_execute_preds(node->childs[0],task);
 
-	struct _starpu_fifo_taskq * fifo = node->data;
-//	printf("%p %f %f %f\n",fifo ,fifo->exp_start,fifo->exp_len,fifo->exp_end);
-
-	_starpu_spin_lock(&node->lock);
 	if(preds.state == PERF_MODEL)
-		preds.expected_finish_time = _starpu_compute_expected_time(fifo->exp_start = starpu_timing_now(),
+	{
+		struct _starpu_fifo_data * data = node->data;
+		struct _starpu_fifo_taskq * fifo = data->fifo;
+		starpu_pthread_mutex_t * mutex = &data->mutex;
+		STARPU_PTHREAD_MUTEX_LOCK(mutex);
+		preds.expected_finish_time = _starpu_compute_expected_time(fifo->exp_start,
 									   preds.expected_finish_time + fifo->exp_end,
 									   preds.expected_length + fifo->exp_len,
 									   preds.expected_transfer_length);
-	_starpu_spin_unlock(&node->lock);
+		STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+	}
 	return preds;
 }
 
@@ -50,9 +58,11 @@ static double estimated_load(struct _starpu_sched_node * node)
 
 static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
 {
-	_starpu_spin_lock(&node->lock);
 	STARPU_ASSERT(node->nworkers > 0);
-	struct _starpu_fifo_taskq * fifo = node->data;
+	struct _starpu_fifo_data * data = node->data;
+	struct _starpu_fifo_taskq * fifo = data->fifo;
+	starpu_pthread_mutex_t * mutex = &data->mutex;
+	STARPU_PTHREAD_MUTEX_LOCK(mutex);
 	STARPU_ASSERT(!isnan(fifo->exp_end));
 	STARPU_ASSERT(!isnan(fifo->exp_len));
 	STARPU_ASSERT(!isnan(fifo->exp_start));
@@ -65,16 +75,18 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 	STARPU_ASSERT(!isnan(fifo->exp_end));
 	STARPU_ASSERT(!isnan(fifo->exp_len));
 	STARPU_ASSERT(!isnan(fifo->exp_start));
+	STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
 
-	_starpu_spin_unlock(&node->lock);
 	node->available(node);
 	return ret;
 }
 
 static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned sched_ctx_id)
 {
-	struct _starpu_fifo_taskq * fifo = node->data;
-	_starpu_spin_lock(&node->lock);
+	struct _starpu_fifo_data * data = node->data;
+	struct _starpu_fifo_taskq * fifo = data->fifo;
+	starpu_pthread_mutex_t * mutex = &data->mutex;
+	STARPU_PTHREAD_MUTEX_LOCK(mutex);
 	STARPU_ASSERT(!isnan(fifo->exp_end));
 	STARPU_ASSERT(!isnan(fifo->exp_len));
 	STARPU_ASSERT(!isnan(fifo->exp_start));
@@ -90,8 +102,7 @@ static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned
 	STARPU_ASSERT(!isnan(fifo->exp_end));
 	STARPU_ASSERT(!isnan(fifo->exp_len));
 	STARPU_ASSERT(!isnan(fifo->exp_start));
-
-	_starpu_spin_unlock(&node->lock);
+	STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
 	if(task)
 		return task;
 	struct _starpu_sched_node * father = node->fathers[sched_ctx_id];
@@ -111,17 +122,13 @@ int _starpu_sched_node_is_fifo(struct _starpu_sched_node * node)
 struct _starpu_sched_node * _starpu_sched_node_fifo_create(void)
 {
 	struct _starpu_sched_node * node = _starpu_sched_node_create();
-	node->data = _starpu_create_fifo();
+	struct _starpu_fifo_data * data = malloc(sizeof(*data));
+	data->fifo = _starpu_create_fifo();
+	STARPU_PTHREAD_MUTEX_INIT(&data->mutex,NULL);
+	node->data = data;
 	node->estimated_execute_preds = estimated_execute_preds;
 	node->estimated_load = estimated_load;
 	node->push_task = push_task;
 	node->pop_task = pop_task;
 	return node;
 }
-
-
-struct _starpu_fifo_taskq *  _starpu_sched_node_fifo_get_fifo(struct _starpu_sched_node * node)
-{
-	STARPU_ASSERT(_starpu_sched_node_is_fifo(node));
-	return node->data;
-}

+ 5 - 5
src/sched_policies/node_heft.c

@@ -53,7 +53,6 @@ static double compute_fitness_perf_model(struct _starpu_sched_node * child STARP
 
 static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
 {
- 	_starpu_spin_lock(&node->lock);
 	struct _starpu_task_execute_preds preds[node->nchilds];
 	int i;
 	int calibrating = 0;
@@ -88,7 +87,6 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 	}
 	if(!can_execute)
 	{
-		_starpu_spin_unlock(&node->lock);
 		return -ENODEV;
 	}
 	
@@ -97,7 +95,6 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 	if(!calibrating && !perf_model)
 	{
 		int ret = data->no_model_node->push_task(data->no_model_node, task);
-		_starpu_spin_unlock(&node->lock);
 		return ret;
 	}
 
@@ -132,7 +129,6 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 	starpu_task_set_implementation(task, preds[index_best_fitness].impl);
 	task->predicted = preds[index_best_fitness].expected_length;
 	task->predicted_transfer = preds[index_best_fitness].expected_transfer_length;
- 	_starpu_spin_unlock(&node->lock);	
 	return c->push_task(c, task);
 }
 /*
@@ -176,7 +172,7 @@ static void initialize_heft_center_policy(unsigned sched_ctx_id)
 {
 	starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
 	struct _starpu_sched_tree *data = malloc(sizeof(struct _starpu_sched_tree));
-	_starpu_spin_init(&data->lock);
+	STARPU_PTHREAD_RWLOCK_INIT(&data->lock,NULL);
 	data->root = _starpu_sched_node_heft_create(1.0,1.0,1.0,1.0);
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 }
@@ -193,6 +189,7 @@ static void add_worker_heft(unsigned sched_ctx_id, int * workerids, unsigned nwo
 {
 	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	unsigned i;
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&t->lock);
 	for(i = 0; i < nworkers; i++)
 	{
 		struct _starpu_sched_node * fifo = _starpu_sched_node_fifo_create();
@@ -206,12 +203,14 @@ static void add_worker_heft(unsigned sched_ctx_id, int * workerids, unsigned nwo
 
 	}
 	_starpu_tree_update_after_modification(t);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
 }
 
 static void remove_worker_heft(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
 {
 	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	unsigned i;
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&t->lock);
 	for(i = 0; i < nworkers; i++)
 	{
 		int j;
@@ -223,6 +222,7 @@ static void remove_worker_heft(unsigned sched_ctx_id, int * workerids, unsigned
 		_starpu_sched_node_set_father(fifo, NULL, sched_ctx_id);
 		t->root->remove_child(t->root, fifo, sched_ctx_id);
 	}
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
 }
 
 static void destroy_heft_node(struct _starpu_sched_node * node)

+ 5 - 5
src/sched_policies/node_random.c

@@ -116,7 +116,7 @@ static void initialize_random_center_policy(unsigned sched_ctx_id)
 {
 	starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
 	struct _starpu_sched_tree *data = malloc(sizeof(struct _starpu_sched_tree));
-	_starpu_spin_init(&data->lock);
+	STARPU_PTHREAD_RWLOCK_INIT(&data->lock,NULL);
  	data->root = _starpu_sched_node_random_create();
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 }
@@ -132,7 +132,7 @@ static void deinitialize_random_center_policy(unsigned sched_ctx_id)
 {
 	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 		struct _starpu_sched_node * random_node = t->root;
-	_starpu_spin_lock(&t->lock);
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&t->lock);
 	unsigned i;
 	for(i = 0; i < nworkers; i++)
 	{
@@ -142,14 +142,14 @@ static void deinitialize_random_center_policy(unsigned sched_ctx_id)
 	}
 	_starpu_tree_update_after_modification(t);
 	update_relative_childs_speedup(random_node);
-	_starpu_spin_unlock(&t->lock);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
 }
 
 static void remove_worker_random(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
 {
 	struct _starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
-	_starpu_spin_lock(&t->lock);
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&t->lock);
 	struct _starpu_sched_node * random_node = t->root;
 	unsigned i;
 	for(i = 0; i < nworkers; i++)
@@ -160,7 +160,7 @@ static void remove_worker_random(unsigned sched_ctx_id, int * workerids, unsigne
 	}
 	_starpu_tree_update_after_modification(t);
 	update_relative_childs_speedup(t->root);
-	_starpu_spin_unlock(&t->lock);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
 }
 
 struct starpu_sched_policy _starpu_sched_tree_random_policy =

+ 12 - 13
src/sched_policies/node_sched.c

@@ -31,9 +31,12 @@ double _starpu_compute_expected_time(double now, double predicted_end, double pr
 
 static void available(struct _starpu_sched_node * node)
 {
+	(void)node;
+#ifndef STARPU_NON_BLOCKING_DRIVERS
 	int i;
 	for(i = 0; i < node->nchilds; i++)
 		node->childs[i]->available(node->childs[i]);
+#endif
 }
 static struct starpu_task * pop_task_node(struct _starpu_sched_node * node, unsigned sched_ctx_id)
 {
@@ -60,9 +63,9 @@ struct starpu_task * pop_task(unsigned sched_ctx_id)
 	struct _starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	int workerid = starpu_worker_get_id();
 	struct _starpu_sched_node * wn = _starpu_sched_node_worker_get(workerid);
-	_starpu_spin_lock(&t->lock);
+	STARPU_PTHREAD_RWLOCK_RDLOCK(&t->lock);
 	struct starpu_task * task = wn->pop_task(wn, sched_ctx_id);
-	_starpu_spin_unlock(&t->lock);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
 	return task;
 }
 
@@ -70,9 +73,9 @@ int push_task(struct starpu_task * task)
 {
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct _starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	_starpu_spin_lock(&t->lock);
+	STARPU_PTHREAD_RWLOCK_RDLOCK(&t->lock);
 	int ret = t->root->push_task(t->root, task);
-	_starpu_spin_unlock(&t->lock);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
 	return ret;
 }
 
@@ -124,7 +127,7 @@ void _starpu_node_destroy_rec(struct _starpu_sched_node * node, unsigned sched_c
 void _starpu_tree_destroy(struct _starpu_sched_tree * tree, unsigned sched_ctx_id)
 {
 	_starpu_node_destroy_rec(tree->root, sched_ctx_id);
-	_starpu_spin_destroy(&tree->lock);
+	STARPU_PTHREAD_RWLOCK_DESTROY(&tree->lock);
 	free(tree);
 }
 void _starpu_sched_node_add_child(struct _starpu_sched_node* node, struct _starpu_sched_node * child,unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
@@ -142,14 +145,12 @@ void _starpu_sched_node_add_child(struct _starpu_sched_node* node, struct _starp
 }
 void _starpu_sched_node_remove_child(struct _starpu_sched_node * node, struct _starpu_sched_node * child,unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
 {
-//	STARPU_PTHREAD_RWLOCK_WRLOCK(&node->mutex);
 	int pos;
 	for(pos = 0; pos < node->nchilds; pos++)
 		if(node->childs[pos] == child)
 			break;
 	STARPU_ASSERT(pos != node->nchilds);
 	node->childs[pos] = node->childs[--node->nchilds];
-//	STARPU_PTHREAD_RWLOCK_UNLOCK(&node->mutex);
 }
 
 
@@ -157,20 +158,19 @@ int _starpu_tree_push_task(struct starpu_task * task)
 {
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct _starpu_sched_tree *tree = starpu_sched_ctx_get_policy_data(sched_ctx_id);
-//	_starpu_spin_lock(&tree->lock);
+	STARPU_PTHREAD_RWLOCK_RDLOCK(&tree->lock);
 	int ret_val = tree->root->push_task(tree->root,task); 
-//	starpu_push_task_end(task);
-//	_starpu_spin_unlock(&tree->lock);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&tree->lock);
 	return ret_val;
 }
 struct starpu_task * _starpu_tree_pop_task(unsigned sched_ctx_id)
 {
 	struct _starpu_sched_tree *tree = starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	STARPU_PTHREAD_RWLOCK_RDLOCK(&tree->lock);
 	int workerid = starpu_worker_get_id();
 	struct _starpu_sched_node * node = _starpu_sched_node_worker_get(workerid);
-//	_starpu_spin_lock(&tree->lock);
 	struct starpu_task * task = node->pop_task(node, sched_ctx_id);
-//	_starpu_spin_unlock(&tree->lock);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&tree->lock);
 	return task;
 }
 /*
@@ -294,7 +294,6 @@ struct _starpu_sched_node * _starpu_sched_node_create(void)
 {
 	struct _starpu_sched_node * node = malloc(sizeof(*node));
 	memset(node,0,sizeof(*node));
-	_starpu_spin_init(&node->lock);
 	node->available = available;
 	node->pop_task = pop_task_node;
 	node->estimated_load = estimated_load;

+ 1 - 3
src/sched_policies/node_sched.h

@@ -18,7 +18,6 @@ struct _starpu_sched_node
 	int nchilds;
 	struct _starpu_sched_node ** childs;
 
-	struct _starpu_spinlock lock;
 
 	//the list of workers in the node's subtree
 	int workerids[STARPU_NMAXWORKERS];
@@ -71,7 +70,7 @@ struct _starpu_sched_tree
 {
 	struct _starpu_sched_node * root;
 	//this lock is used to protect the scheduler during modifications of his structure
-	struct _starpu_spinlock lock;
+	starpu_pthread_rwlock_t lock;
 };
 
 
@@ -109,7 +108,6 @@ int _starpu_sched_node_worker_get_workerid(struct _starpu_sched_node * worker_no
 
 struct _starpu_sched_node * _starpu_sched_node_fifo_create(void);
 int _starpu_sched_node_is_fifo(struct _starpu_sched_node * node);
-struct _starpu_fifo_taskq *  _starpu_sched_node_fifo_get_fifo(struct _starpu_sched_node *);
 
 /* struct _starpu_sched_node * _starpu_sched_node_work_stealing_create(void); */
 int _starpu_sched_node_is_work_stealing(struct _starpu_sched_node * node);

+ 5 - 3
src/sched_policies/node_work_stealing.c

@@ -134,7 +134,6 @@ static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned
 
 static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
 {
-	_starpu_spin_lock(&node->lock);
 	struct _starpu_work_stealing_data * wsd = node->data;
 	int ret = -1;
 	int start = wsd->last_push_child;
@@ -150,7 +149,6 @@ static int push_task(struct _starpu_sched_node * node, struct starpu_task * task
 	}
 	wsd->last_push_child = (wsd->last_push_child + 1) % node->nchilds;
 	node->childs[i]->available(node->childs[i]);
-	_starpu_spin_unlock(&node->lock);
 	return ret;
 }
 
@@ -289,7 +287,7 @@ static void initialize_ws_center_policy(unsigned sched_ctx_id)
 {
 	starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
 	struct _starpu_sched_tree *data = malloc(sizeof(struct _starpu_sched_tree));
-	_starpu_spin_init(&data->lock);
+	STARPU_PTHREAD_RWLOCK_INIT(&data->lock,NULL);
  	data->root = _starpu_sched_node_work_stealing_create();
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 }
@@ -305,6 +303,7 @@ static void deinitialize_ws_center_policy(unsigned sched_ctx_id)
 static void add_worker_ws(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
 {
 	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&t->lock);
 	unsigned i;
 	struct _starpu_sched_node * ws_node = t->root;
 	for(i = 0; i < nworkers; i++)
@@ -316,11 +315,13 @@ static void add_worker_ws(unsigned sched_ctx_id, int * workerids, unsigned nwork
 		_starpu_sched_node_set_father(worker, ws_node, sched_ctx_id);
 	}
 	_starpu_tree_update_after_modification(t);
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
 }
 
 static void remove_worker_ws(unsigned sched_ctx_id, int * workerids, unsigned nworkers)
 {
 	struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
+	STARPU_PTHREAD_RWLOCK_WRLOCK(&t->lock);
 	struct _starpu_sched_node * ws_node = t->root;
 	unsigned i;
 	for(i = 0; i < nworkers; i++)
@@ -329,6 +330,7 @@ static void remove_worker_ws(unsigned sched_ctx_id, int * workerids, unsigned nw
 		ws_node->remove_child(ws_node, worker, sched_ctx_id);
 		_starpu_sched_node_set_father(worker,NULL,sched_ctx_id);
 	}
+	STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
 }
 
 

+ 3 - 0
src/sched_policies/node_worker.c

@@ -52,6 +52,8 @@ void _starpu_sched_node_worker_destroy(struct _starpu_sched_node *node)
 
 static void available(struct _starpu_sched_node * worker_node)
 {
+	(void) worker_node;
+#ifndef STARPU_NON_BLOCKING_DRIVERS
 	struct _starpu_worker * w = worker_node->data;
 	starpu_pthread_mutex_t *sched_mutex = &w->sched_mutex;
 	starpu_pthread_cond_t *sched_cond = &w->sched_cond;
@@ -59,6 +61,7 @@ static void available(struct _starpu_sched_node * worker_node)
 	STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 	STARPU_PTHREAD_COND_SIGNAL(sched_cond);
 	STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+#endif
 }
 
 static double estimated_transfer_length(struct _starpu_sched_node * node,