Browse Source

fully serialize sched_ctx creation / deletion operations
sort workers by worker ids when locking multiple workers on sched_ctx change operations to avoid multi-lock deadlocks

Olivier Aumage 7 years ago
parent
commit
fc0210b1ff
1 changed files with 53 additions and 11 deletions
  1. 53 11
      src/core/sched_ctx.c

+ 53 - 11
src/core/sched_ctx.c

@@ -54,8 +54,25 @@ static void set_priority_hierarchically_on_notified_workers(int* workers_to_add,
 static void fetch_tasks_from_empty_ctx_list(struct _starpu_sched_ctx *sched_ctx);
 static void add_notified_workers(int *workers_to_add, int nworkers_to_add, unsigned sched_ctx_id);
 
+/* reused from combined_workers.c */
+static int compar_int(const void *pa, const void *pb)
+{
+	int a = *((int *)pa);
+	int b = *((int *)pb);
+
+	return a - b;
+}
+
+/* reused from combined_workers.c */
+static void sort_workerid_array(int nworkers, int workerid_array[])
+{
+	qsort(workerid_array, nworkers, sizeof(int), compar_int);
+}
+
 /* notify workers that a ctx change operation is about to proceed.
  *
+ * workerids must be sorted by ascending id
+ *
  * Once this function returns, the notified workers must not start a new
  * scheduling operation until they are notified that the ctx change op is
  * done.
@@ -67,6 +84,8 @@ static void notify_workers_about_changing_ctx_pending(const unsigned nworkers, c
 	unsigned i;
 	for (i=0; i<nworkers; i++)
 	{
+		/* check that workerids[] is sorted to prevent multi-lock acquisition deadlocks */
+		STARPU_ASSERT(i == 0 || (workerids[i] > workerids[i-1]));
 		if (starpu_worker_is_combined_worker(workerids[i]))
 			continue;
 		if (workerids[i] == cur_workerid)
@@ -80,6 +99,8 @@ static void notify_workers_about_changing_ctx_pending(const unsigned nworkers, c
 
 /* notify workers that a ctx change operation is complete.
  *
+ * workerids must be sorted by ascending id
+ *
  * Once this function returns, the workers may proceed with scheduling operations again.
  */
 static void notify_workers_about_changing_ctx_done(const unsigned nworkers, const int * const workerids)
@@ -89,6 +110,8 @@ static void notify_workers_about_changing_ctx_done(const unsigned nworkers, cons
 	unsigned i;
 	for (i=0; i<nworkers; i++)
 	{
+		/* check that workerids[] is sorted to prevent multi-lock acquisition deadlocks */
+		STARPU_ASSERT(i == 0 || (workerids[i] > workerids[i-1]));
 		if (starpu_worker_is_combined_worker(workerids[i]))
 			continue;
 		if (workerids[i] == cur_workerid)
@@ -368,6 +391,7 @@ static void _starpu_add_workers_to_new_sched_ctx(struct _starpu_sched_ctx *sched
 		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 	}
 
+	sort_workerid_array(nworkers, workerids);
 	notify_workers_about_changing_ctx_pending(nworkers, workerids);
 	_do_add_notified_workers(sched_ctx, workerids, nworkers);
 	if(sched_ctx->sched_policy && sched_ctx->sched_policy->add_workers)
@@ -498,6 +522,7 @@ static void _starpu_sched_ctx_create_hwloc_tree(struct _starpu_sched_ctx *sched_
 }
 #endif
 
+/* Must be called with sched_ctx_manag mutex held */
 struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *policy, int *workerids,
 						   int nworkers_ctx, unsigned is_initial_sched,
 						   const char *sched_ctx_name,
@@ -510,7 +535,6 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 {
 	struct _starpu_machine_config *config = _starpu_get_machine_config();
 
-	STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
 	STARPU_ASSERT(config->topology.nsched_ctxs < STARPU_NMAX_SCHED_CTXS);
 
 	unsigned id = _starpu_get_first_free_sched_ctx(config);
@@ -624,7 +648,6 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 	}
 
 	(void)STARPU_ATOMIC_ADD(&config->topology.nsched_ctxs,1);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
 
 	return sched_ctx;
 }
@@ -670,6 +693,7 @@ int starpu_sched_ctx_get_stream_worker(unsigned sub_ctx)
 
 unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched_ctx_name, ...)
 {
+	STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
 	va_list varg_list;
 	int arg_type;
 	int min_prio_set = 0;
@@ -754,6 +778,7 @@ unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched
 		if (workerids[i] < 0 || workerids[i] >= num_workers)
 		{
 			_STARPU_ERROR("Invalid worker ID (%d) specified!\n", workerids[i]);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
 			return STARPU_NMAX_SCHED_CTXS;
 		}
 	}
@@ -765,6 +790,7 @@ unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched
 
 	int *added_workerids;
 	unsigned nw_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &added_workerids);
+	sort_workerid_array(nw_ctx, added_workerids);
 	notify_workers_about_changing_ctx_pending(nw_ctx, added_workerids);
 	_starpu_sched_ctx_lock_write(sched_ctx->id);
 	_starpu_update_notified_workers_with_ctx(added_workerids, nw_ctx, sched_ctx->id);
@@ -774,11 +800,13 @@ unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched
 #ifdef STARPU_USE_SC_HYPERVISOR
 	sched_ctx->perf_counters = NULL;
 #endif
+	STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
 	return sched_ctx->id;
 }
 
 int fstarpu_sched_ctx_create(int *workerids, int nworkers, const char *sched_ctx_name, void **arglist)
 {
+	STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
 	int arg_i = 0;
 	int min_prio_set = 0;
 	int max_prio_set = 0;
@@ -879,6 +907,7 @@ int fstarpu_sched_ctx_create(int *workerids, int nworkers, const char *sched_ctx
 			if (workerids[i] < 0 || workerids[i] >= num_workers)
 			{
 				_STARPU_ERROR("Invalid worker ID (%d) specified!\n", workerids[i]);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
 				return STARPU_NMAX_SCHED_CTXS;
 			}
 		}
@@ -891,6 +920,7 @@ int fstarpu_sched_ctx_create(int *workerids, int nworkers, const char *sched_ctx
 
 	int *added_workerids;
 	unsigned nw_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &added_workerids);
+	sort_workerid_array(nw_ctx, added_workerids);
 	notify_workers_about_changing_ctx_pending(nw_ctx, added_workerids);
 	_starpu_sched_ctx_lock_write(sched_ctx->id);
 	_starpu_update_notified_workers_with_ctx(added_workerids, nw_ctx, sched_ctx->id);
@@ -900,6 +930,7 @@ int fstarpu_sched_ctx_create(int *workerids, int nworkers, const char *sched_ctx
 #ifdef STARPU_USE_SC_HYPERVISOR
 	sched_ctx->perf_counters = NULL;
 #endif
+	STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
 	return (int)sched_ctx->id;
 }
 
@@ -920,7 +951,8 @@ void starpu_sched_ctx_set_perf_counters(unsigned sched_ctx_id, void* perf_counte
 }
 #endif
 
-/* free all structures for the context */
+/* free all structures for the context
+   Must be called with sched_ctx_manag mutex held */
 static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
 {
 	STARPU_ASSERT(sched_ctx->id != STARPU_NMAX_SCHED_CTXS);
@@ -949,13 +981,13 @@ static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
 	hwloc_bitmap_free(sched_ctx->hwloc_workers_set);
 #endif //STARPU_HAVE_HWLOC
 
-	STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
+	sched_ctx->id = STARPU_NMAX_SCHED_CTXS;
 	config->topology.nsched_ctxs--;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
 }
 
 void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 {
+	STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	STARPU_ASSERT(sched_ctx);
 
@@ -978,6 +1010,7 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 	unsigned nworkers_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &workerids);
 	int backup_workerids[nworkers_ctx];
 	memcpy(backup_workerids, workerids, nworkers_ctx*sizeof(backup_workerids[0]));
+	sort_workerid_array(nworkers_ctx, backup_workerids);
 	notify_workers_about_changing_ctx_pending(nworkers_ctx, backup_workerids);
 
 	/*if both of them have all the ressources is pointless*/
@@ -1006,21 +1039,28 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 		 * complete before altering the sched_ctx under sched_mutex protection */
 		_starpu_update_notified_workers_without_ctx(workerids, nworkers_ctx, sched_ctx_id, 1);
 		_starpu_sched_ctx_free_scheduling_data(sched_ctx);
+		notify_workers_about_changing_ctx_done(nworkers_ctx, backup_workerids);
+		occupied_sms -= sched_ctx->nsms;
+		_starpu_sched_ctx_unlock_write(sched_ctx_id);
+		STARPU_PTHREAD_RWLOCK_DESTROY(&sched_ctx->rwlock);
 		_starpu_delete_sched_ctx(sched_ctx);
 	}
-	notify_workers_about_changing_ctx_done(nworkers_ctx, backup_workerids);
-
-	_starpu_sched_ctx_unlock_write(sched_ctx_id);
+	else
+	{
+		notify_workers_about_changing_ctx_done(nworkers_ctx, backup_workerids);
+		occupied_sms -= sched_ctx->nsms;
+		_starpu_sched_ctx_unlock_write(sched_ctx_id);
+	}
 	/* workerids is malloc-ed in starpu_sched_ctx_get_workers_list, don't forget to free it when
 	   you don't use it anymore */
 	free(workerids);
-	occupied_sms -= sched_ctx->nsms;
-	return;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
 }
 
 /* called after the workers are terminated so we don't have anything else to do but free the memory*/
 void _starpu_delete_all_sched_ctxs()
 {
+	STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
 	unsigned i;
 	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
 	{
@@ -1038,7 +1078,7 @@ void _starpu_delete_all_sched_ctxs()
 	}
 
 	STARPU_PTHREAD_KEY_DELETE(sched_ctx_key);
-	return;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
 }
 
 static void _starpu_check_workers(int *workerids, int nworkers)
@@ -1260,6 +1300,7 @@ void starpu_sched_ctx_add_workers(int *workers_to_add, unsigned nworkers_to_add,
 	}
 	else
 	{
+		sort_workerid_array(cumulated_nworkers, cumulated_workerids);
 		notify_workers_about_changing_ctx_pending(cumulated_nworkers, cumulated_workerids);
 		_starpu_sched_ctx_lock_write(sched_ctx_id);
 		add_notified_workers(workers_to_add, nworkers_to_add, sched_ctx_id);
@@ -1334,6 +1375,7 @@ void starpu_sched_ctx_remove_workers(int *workers_to_remove, unsigned nworkers_t
 		}
 		else
 		{
+			sort_workerid_array(cumulated_nworkers, cumulated_workerids);
 			notify_workers_about_changing_ctx_pending(cumulated_nworkers, cumulated_workerids);
 			_starpu_sched_ctx_lock_write(sched_ctx_id);
 			remove_notified_workers(workers_to_remove, nworkers_to_remove, sched_ctx_id);