Browse Source

protect add_workers/remove_workers

Olivier Aumage 8 years ago
parent
commit
77e0637657
1 changed files with 76 additions and 33 deletions
  1. 76 33
      src/core/sched_ctx.c

+ 76 - 33
src/core/sched_ctx.c

@@ -45,6 +45,30 @@ static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, unsigned al
 static void _starpu_sched_ctx_update_parallel_workers_with(unsigned sched_ctx_id);
 static void _starpu_sched_ctx_update_parallel_workers_without(unsigned sched_ctx_id);
 
+static void set_priority_on_locked_workers(int *workers, int nworkers, unsigned sched_ctx_id, unsigned priority);
+static void set_priority_hierarchically_on_locked_workers(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority);
+
+static void lock_workers_for_changing_ctx(const unsigned nworkers, const int * const workerids)
+{
+	unsigned i;
+	for (i=0; i<nworkers; i++)
+	{
+		struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
+		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+		_starpu_worker_wait_for_transient_sched_op_completion(worker);
+	}
+}
+
+static void unlock_workers_for_changing_ctx(const unsigned nworkers, const int * const workerids)
+{
+	unsigned i;
+	for (i=0; i<nworkers; i++)
+	{
+		struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
+		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
+	}
+}
+
 static void _starpu_worker_gets_into_ctx(unsigned sched_ctx_id, struct _starpu_worker *worker)
 {
 	unsigned ret_sched_ctx = _starpu_sched_ctx_elt_exists(worker->sched_ctx_list, sched_ctx_id);
@@ -101,6 +125,20 @@ static void _starpu_update_workers_with_ctx(int *workerids, int nworkers, int sc
 	return;
 }
 
+static void _starpu_update_locked_workers_with_ctx(int *workerids, int nworkers, int sched_ctx_id)
+{
+	int i;
+	struct _starpu_worker *worker = NULL;
+
+	for(i = 0; i < nworkers; i++)
+	{
+		worker = _starpu_get_worker_struct(workerids[i]);
+		_starpu_worker_gets_into_ctx(sched_ctx_id, worker);
+	}
+
+	return;
+}
+
 static void _starpu_update_workers_without_ctx(int *workerids, int nworkers, int sched_ctx_id, unsigned now)
 {
 	int i;
@@ -195,15 +233,8 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 				added_workers[(*n_added_workers)++] = worker;
 			else
 			{
-				int curr_workerid = starpu_worker_get_id();
 				struct _starpu_worker *worker_str = _starpu_get_worker_struct(workerids[i]);
-				if(curr_workerid != workerids[i])
-					STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker_str->sched_mutex);
-
 				worker_str->removed_from_ctx[sched_ctx->id] = 0;
-
-				if(curr_workerid != workerids[i])
-					STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker_str->sched_mutex);
 			}
 		}
 		else
@@ -594,7 +625,6 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 		sched_ctx->nsub_ctxs = nsub_ctxs;
 	}
 	
-	/* after having an worker_collection on the ressources add them */
 	_starpu_add_workers_to_sched_ctx(sched_ctx, workerids, nworkers_ctx, NULL, NULL);
 
 #ifdef STARPU_HAVE_HWLOC
@@ -1148,23 +1178,11 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 
 		/* announce upcoming context changes, then wait for transient unlocked operations to
 		 * complete before altering the sched_ctx under sched_mutex protection */
-		unsigned i;
-		for (i=0; i<nworkers_ctx; i++)
-		{
-			struct _starpu_worker *worker = _starpu_get_worker_struct(backup_workerids[i]);
-			STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
-			_starpu_worker_wait_for_transient_sched_op_completion(worker);
-		}
-
+		lock_workers_for_changing_ctx(nworkers_ctx, backup_workerids);
 		_starpu_update_locked_workers_without_ctx(workerids, nworkers_ctx, sched_ctx_id, 1);
 		_starpu_sched_ctx_free_scheduling_data(sched_ctx);
 		_starpu_delete_sched_ctx(sched_ctx);
-
-		for (i=0; i<nworkers_ctx; i++)
-		{
-			struct _starpu_worker *worker = _starpu_get_worker_struct(backup_workerids[i]);
-			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
-		}
+		unlock_workers_for_changing_ctx(nworkers_ctx, backup_workerids);
 	}
 
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
@@ -1297,6 +1315,14 @@ void _starpu_push_task_to_waiting_list(struct _starpu_sched_ctx *sched_ctx, stru
 	return;
 }
 
+static void set_priority_on_level_on_locked_workers(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority)
+{
+	(void) workers_to_add;
+	(void) nworkers_to_add;
+	(void) sched_ctx;
+	(void) priority;
+}
+
 void starpu_sched_ctx_set_priority_on_level(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority)
 {
 	(void) workers_to_add;
@@ -1326,14 +1352,15 @@ void starpu_sched_ctx_set_priority_on_level(int* workers_to_add, unsigned nworke
 /* 	return; */
 
 }
-static void _set_priority_hierarchically(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority)
+
+static void set_priority_hierarchically_on_locked_workers(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority)
 {
 	if(starpu_sched_ctx_get_hierarchy_level(sched_ctx) > 0)
 	{
 		unsigned father = starpu_sched_ctx_get_inheritor(sched_ctx);
-		starpu_sched_ctx_set_priority(workers_to_add, nworkers_to_add, father, priority);
-		starpu_sched_ctx_set_priority_on_level(workers_to_add, nworkers_to_add, father, priority);
-		_set_priority_hierarchically(workers_to_add, nworkers_to_add, father, priority);
+		set_priority_on_locked_workers(workers_to_add, nworkers_to_add, father, priority);
+		set_priority_on_level_on_locked_workers(workers_to_add, nworkers_to_add, father, priority);
+		set_priority_hierarchically_on_locked_workers(workers_to_add, nworkers_to_add, father, priority);
 	}
 	return;
 }
@@ -1353,15 +1380,16 @@ void starpu_sched_ctx_add_workers(int *workers_to_add, int nworkers_to_add, unsi
 		int added_workers[nworkers_to_add];
 		int n_added_workers = 0;
 
+		lock_workers_for_changing_ctx(nworkers_to_add, workers_to_add);
 		_starpu_add_workers_to_sched_ctx(sched_ctx, workers_to_add, nworkers_to_add, added_workers, &n_added_workers);
 
 		if(n_added_workers > 0)
 		{
-			_starpu_update_workers_with_ctx(added_workers, n_added_workers, sched_ctx->id);
+			_starpu_update_locked_workers_with_ctx(added_workers, n_added_workers, sched_ctx->id);
 		}
-		starpu_sched_ctx_set_priority(workers_to_add, nworkers_to_add, sched_ctx_id, 1);
-		_set_priority_hierarchically(workers_to_add, nworkers_to_add, sched_ctx_id, 0);
-
+		set_priority_on_locked_workers(workers_to_add, nworkers_to_add, sched_ctx_id, 1);
+		set_priority_hierarchically_on_locked_workers(workers_to_add, nworkers_to_add, sched_ctx_id, 0);
+		unlock_workers_for_changing_ctx(nworkers_to_add, workers_to_add);
 	}
 
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
@@ -1389,14 +1417,15 @@ void starpu_sched_ctx_remove_workers(int *workers_to_remove, int nworkers_to_rem
 		int removed_workers[sched_ctx->workers->nworkers];
 		int n_removed_workers = 0;
 
+		lock_workers_for_changing_ctx(nworkers_to_remove, workers_to_remove);
 		_starpu_remove_workers_from_sched_ctx(sched_ctx, workers_to_remove, nworkers_to_remove, removed_workers, &n_removed_workers);
 
 		if(n_removed_workers > 0)
 		{
-			_starpu_update_workers_without_ctx(removed_workers, n_removed_workers, sched_ctx_id, 0);
-			starpu_sched_ctx_set_priority(removed_workers, n_removed_workers, sched_ctx_id, 1);
+			_starpu_update_locked_workers_without_ctx(removed_workers, n_removed_workers, sched_ctx_id, 0);
+			set_priority_on_locked_workers(removed_workers, n_removed_workers, sched_ctx_id, 1);
 		}
-
+		unlock_workers_for_changing_ctx(nworkers_to_remove, workers_to_remove);
 	}
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
 
@@ -2086,6 +2115,20 @@ int starpu_sched_ctx_max_priority_is_set(unsigned sched_ctx_id)
 	return sched_ctx->max_priority_is_set;
 }
 
+static void set_priority_on_locked_workers(int *workers, int nworkers, unsigned sched_ctx_id, unsigned priority)
+{
+	if(nworkers != -1)
+	{
+		int w;
+		struct _starpu_worker *worker = NULL;
+		for(w = 0; w < nworkers; w++)
+		{
+			worker = _starpu_get_worker_struct(workers[w]);
+			_starpu_sched_ctx_list_move(&worker->sched_ctx_list, sched_ctx_id, priority);
+		}
+	}
+}
+
 void starpu_sched_ctx_set_priority(int *workers, int nworkers, unsigned sched_ctx_id, unsigned priority)
 {
 	if(nworkers != -1)