Browse Source

rework relax mechanism to avoid lock ordering issue in _starpu_worker_lock

Olivier Aumage 8 years ago
parent
commit
1a60c8895f
2 changed files with 74 additions and 77 deletions
  1. 6 6
      src/core/workers.c
  2. 68 71
      src/core/workers.h

+ 6 - 6
src/core/workers.c

@@ -580,7 +580,7 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	workerarg->pop_ctx_priority = 1;
 	workerarg->is_slave_somewhere = 0;
 
-	workerarg->state_safe_for_observation = 1;
+	workerarg->state_relax_refcnt = 1;
 #ifdef STARPU_SPINLOCK_CHECK
 	workerarg->relax_on_file = __FILE__;
 	workerarg->relax_on_line = __LINE__;
@@ -1718,8 +1718,8 @@ unsigned starpu_worker_is_blocked_in_parallel(int workerid)
 		 * purpose of this 'if' block. */
 		cur_worker = cur_workerid<starpu_worker_get_count()?_starpu_get_worker_struct(cur_workerid):NULL;
 
-		relax_own_observation_state = (cur_worker != NULL) && (cur_worker->state_safe_for_observation == 0);
-		if (relax_own_observation_state && !worker->state_safe_for_observation)
+		relax_own_observation_state = (cur_worker != NULL) && (cur_worker->state_relax_refcnt == 0);
+		if (relax_own_observation_state && !worker->state_relax_refcnt)
 		{
 			/* moreover, when a worker (cur_worker != NULL)
 			 * observes another worker, we need to take special
@@ -1730,7 +1730,7 @@ unsigned starpu_worker_is_blocked_in_parallel(int workerid)
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 
 			STARPU_PTHREAD_MUTEX_LOCK_SCHED(&cur_worker->sched_mutex);
-			cur_worker->state_safe_for_observation = 1;
+			cur_worker->state_relax_refcnt = 1;
 			STARPU_PTHREAD_COND_BROADCAST(&cur_worker->sched_cond);
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&cur_worker->sched_mutex);
 
@@ -1740,7 +1740,7 @@ unsigned starpu_worker_is_blocked_in_parallel(int workerid)
 		 * and also waits for any pending blocking state change
 		 * requests to be processed, in order to not obtain an
 		 * ephemeral information */
-		while (!worker->state_safe_for_observation
+		while (!worker->state_relax_refcnt
 				|| worker->state_block_in_parallel_req
 				|| worker->state_unblock_in_parallel_req)
 		{
@@ -1756,7 +1756,7 @@ unsigned starpu_worker_is_blocked_in_parallel(int workerid)
 	if (relax_own_observation_state)
 	{
 		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&cur_worker->sched_mutex);
-		cur_worker->state_safe_for_observation = 0;
+		cur_worker->state_relax_refcnt = 0;
 		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&cur_worker->sched_mutex);
 	}
 	return ret;

+ 68 - 71
src/core/workers.h

@@ -89,7 +89,7 @@ LIST_TYPE(_starpu_worker,
 	 * since the condition is shared for multiple purpose */
 	starpu_pthread_cond_t sched_cond;
         starpu_pthread_mutex_t sched_mutex; /* mutex protecting sched_cond */
-	unsigned state_safe_for_observation; /* mark scheduling sections where other workers can safely access the worker state */
+	unsigned state_relax_refcnt; /* mark scheduling sections where other workers can safely access the worker state */
 #ifdef STARPU_SPINLOCK_CHECK
 	const char *relax_on_file;
 	int relax_on_line;
@@ -807,7 +807,7 @@ static inline void _starpu_worker_enter_sched_op(struct _starpu_worker * const w
 	 * enter sched_op */
 	worker->state_sched_op_pending = 1;
 	worker->state_blocked_in_parallel_observed = 0;
-	worker->state_safe_for_observation = 0;
+	worker->state_relax_refcnt = 0;
 #ifdef STARPU_SPINLOCK_CHECK
 	worker->relax_on_file = file;
 	worker->relax_on_line = line;
@@ -830,7 +830,7 @@ static inline void _starpu_worker_leave_sched_op(struct _starpu_worker * const w
 #endif
 {
 	STARPU_ASSERT(worker->state_sched_op_pending);
-	worker->state_safe_for_observation = 1;
+	worker->state_relax_refcnt = 1;
 #ifdef STARPU_SPINLOCK_CHECK
 	worker->relax_off_file = file;
 	worker->relax_off_line = line;
@@ -896,67 +896,6 @@ static inline void _starpu_worker_leave_changing_ctx_op(struct _starpu_worker *
 	STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 }
 
-/* lock a worker for observing contents 
- *
- * notes:
- * - if the observed worker is not in state_safe_for_observation, the function block until the state is reached */
-static inline void _starpu_worker_lock(int workerid)
-{
-	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	STARPU_ASSERT(worker != NULL);
-	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
-	int cur_workerid = starpu_worker_get_id();
-	if (workerid != cur_workerid)
-	{
-		struct _starpu_worker *cur_worker = cur_workerid<starpu_worker_get_count()?_starpu_get_worker_struct(cur_workerid):NULL;
-		int relax_own_observation_state = (cur_worker != NULL) && (cur_worker->state_safe_for_observation == 0);
-		if (relax_own_observation_state && !worker->state_safe_for_observation)
-		{
-			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
-
-			STARPU_PTHREAD_MUTEX_LOCK_SCHED(&cur_worker->sched_mutex);
-			cur_worker->state_safe_for_observation = 1;
-			STARPU_PTHREAD_COND_BROADCAST(&cur_worker->sched_cond);
-			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&cur_worker->sched_mutex);
-
-			STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
-		}
-		while (!worker->state_safe_for_observation)
-		{
-			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
-		}
-		if (relax_own_observation_state)
-		{
-			STARPU_PTHREAD_MUTEX_LOCK_SCHED(&cur_worker->sched_mutex);
-			cur_worker->state_safe_for_observation = 0;
-			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&cur_worker->sched_mutex);
-		}
-	}
-}
-
-static inline int _starpu_worker_trylock(int workerid)
-{
-	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	STARPU_ASSERT(worker != NULL);
-	int ret = STARPU_PTHREAD_MUTEX_TRYLOCK_SCHED(&worker->sched_mutex);
-	if (ret)
-		return ret;
-	int cur_workerid = starpu_worker_get_id();
-	if (workerid != cur_workerid) {
-		ret = !worker->state_safe_for_observation;
-		if (ret)
-			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
-	}
-	return ret;
-}
-
-static inline void _starpu_worker_unlock(int workerid)
-{
-	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	STARPU_ASSERT(worker != NULL);
-	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
-}
-
 /* Temporarily allow other worker to access current worker state, when still scheduling,
  * but the scheduling has not yet been made or is already done */
 #ifdef STARPU_SPINLOCK_CHECK
@@ -974,11 +913,11 @@ static inline void _starpu_worker_relax_on(void)
 		return;
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
 #ifdef STARPU_SPINLOCK_CHECK
-	STARPU_ASSERT_MSG(!worker->state_safe_for_observation, "relax last turn on in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
+	STARPU_ASSERT_MSG(worker->state_relax_refcnt<UINT_MAX, "relax last turn on in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
 #else
-	STARPU_ASSERT(!worker->state_safe_for_observation);
+	STARPU_ASSERT(worker->state_relax_refcnt<UINT_MAX);
 #endif
-	worker->state_safe_for_observation = 1;
+	worker->state_relax_refcnt++;
 #ifdef STARPU_SPINLOCK_CHECK
 	worker->relax_on_file = file;
 	worker->relax_on_line = line;
@@ -1006,11 +945,11 @@ static inline void _starpu_worker_relax_off(void)
 		return;
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
 #ifdef STARPU_SPINLOCK_CHECK
-	STARPU_ASSERT_MSG(worker->state_safe_for_observation, "relax last turn off in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
+	STARPU_ASSERT_MSG(worker->state_relax_refcnt>0, "relax last turn off in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
 #else
-	STARPU_ASSERT(worker->state_safe_for_observation);
+	STARPU_ASSERT(worker->state_relax_refcnt>0);
 #endif
-	worker->state_safe_for_observation = 0;
+	worker->state_relax_refcnt--;
 #ifdef STARPU_SPINLOCK_CHECK
 	worker->relax_off_file = file;
 	worker->relax_off_line = line;
@@ -1029,7 +968,65 @@ static inline int _starpu_worker_get_relax_state(void)
 		return 1;
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 	STARPU_ASSERT(worker != NULL);
-	return worker->state_safe_for_observation;
+	return worker->state_relax_refcnt != 0;
+}
+
+/* lock a worker for observing contents 
+ *
+ * notes:
+ * - if the observed worker is not in state_relax_refcnt, the function block until the state is reached */
+static inline void _starpu_worker_lock(int workerid)
+{
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	STARPU_ASSERT(worker != NULL);
+	int cur_workerid = starpu_worker_get_id();
+	if (workerid != cur_workerid)
+	{
+		_starpu_worker_relax_on();
+
+		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+		while (!worker->state_relax_refcnt)
+		{
+			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+		}
+	}
+	else
+	{
+		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
+	}
+}
+
+static inline int _starpu_worker_trylock(int workerid)
+{
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	STARPU_ASSERT(worker != NULL);
+	int ret = STARPU_PTHREAD_MUTEX_TRYLOCK_SCHED(&worker->sched_mutex);
+	if (!ret)
+	{
+		int cur_workerid = starpu_worker_get_id();
+		if (workerid != cur_workerid) {
+			ret = !worker->state_relax_refcnt;
+			if (ret)
+				STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
+		}
+	}
+	if (!ret)
+	{
+		_starpu_worker_relax_on();
+	}
+	return ret;
+}
+
+static inline void _starpu_worker_unlock(int workerid)
+{
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	STARPU_ASSERT(worker != NULL);
+	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
+	int cur_workerid = starpu_worker_get_id();
+	if (workerid != cur_workerid)
+	{
+		_starpu_worker_relax_off();
+	}
 }
 
 static inline void _starpu_worker_lock_self(void)