Browse Source

- modify the OpenMP tasks locking scheme to avoid multi-locks ordering issues

Olivier Aumage 10 years ago
parent
commit
b37f386a18
2 changed files with 104 additions and 85 deletions
  1. 103 85
      src/util/openmp_runtime_support.c
  2. 1 0
      src/util/openmp_runtime_support.h

+ 103 - 85
src/util/openmp_runtime_support.c

@@ -55,7 +55,7 @@ static void destroy_omp_thread_struct(struct starpu_omp_thread *thread);
 static struct starpu_omp_task *create_omp_task_struct(struct starpu_omp_task *parent_task,
 		struct starpu_omp_thread *owner_thread, struct starpu_omp_region *owner_region, int is_implicit);
 static void destroy_omp_task_struct(struct starpu_omp_task *task);
-static void _wake_up_locked_task(struct starpu_omp_task *task);
+static void wake_up_and_unlock_task(struct starpu_omp_task *task);
 static void wake_up_barrier(struct starpu_omp_region *parallel_region);
 static void starpu_omp_task_preempt(void);
 
@@ -71,6 +71,41 @@ struct starpu_omp_task *_starpu_omp_get_task(void)
 	return task;
 }
 
+static void weak_task_lock(struct starpu_omp_task *task) {
+	_starpu_spin_lock(&task->lock);
+	while (task->transaction_pending) {
+		_starpu_spin_unlock(&task->lock);
+		STARPU_UYIELD();
+		_starpu_spin_lock(&task->lock);
+	}
+}
+
+static void weak_task_unlock(struct starpu_omp_task *task) {
+	_starpu_spin_unlock(&task->lock);
+}
+
+static void wake_up_and_unlock_task(struct starpu_omp_task *task)
+{
+	STARPU_ASSERT(task->transaction_pending == 0);
+	if (task->wait_on == 0)
+	{
+		weak_task_unlock(task);
+		int ret = starpu_task_submit(task->starpu_task);
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+	} else {
+		weak_task_unlock(task);
+	}
+}
+
+static void transaction_callback(void *_task)
+{
+	struct starpu_omp_task *task = _task;
+	_starpu_spin_lock(&task->lock);
+	STARPU_ASSERT(task->transaction_pending != 0);
+	task->transaction_pending = 0;
+	_starpu_spin_unlock(&task->lock);
+}
+
 static void condition_init(struct starpu_omp_condition *condition)
 {
 	condition->contention_list_head = NULL;
@@ -82,24 +117,19 @@ static void condition_exit(struct starpu_omp_condition *condition)
 	condition->contention_list_head = NULL;
 }
 
-static void condition__sleep_callback(void *_lock)
-{
-	struct _starpu_spinlock *lock = _lock;
-	_starpu_spin_unlock(lock);
-}
-
 static void condition_wait(struct starpu_omp_condition *condition, struct _starpu_spinlock *lock)
 {
 	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
 	struct starpu_omp_task_link link;
 	_starpu_spin_lock(&task->lock);
 	task->wait_on |= starpu_omp_task_wait_on_condition;
-	_starpu_spin_unlock(&task->lock);
 	link.task = task;
 	link.next = condition->contention_list_head;
 	condition->contention_list_head = &link;
-
-	_starpu_task_prepare_for_continuation_ext(0, condition__sleep_callback, lock);
+	task->transaction_pending = 1;
+	_starpu_spin_unlock(&task->lock);
+	_starpu_spin_unlock(lock);
+	_starpu_task_prepare_for_continuation_ext(0, transaction_callback, task);
 	starpu_omp_task_preempt();
 
 	/* re-acquire the lock released by the callback */
@@ -113,12 +143,11 @@ static void condition_signal(struct starpu_omp_condition *condition)
 	if (condition->contention_list_head != NULL)
 	{
 		struct starpu_omp_task *next_task = condition->contention_list_head->task;
+		weak_task_lock(next_task);
 		condition->contention_list_head = condition->contention_list_head->next;
-		_starpu_spin_lock(&next_task->lock);
 		STARPU_ASSERT(next_task->wait_on & starpu_omp_task_wait_on_condition);
 		next_task->wait_on &= ~starpu_omp_task_wait_on_condition;
-		_wake_up_locked_task(next_task);
-		_starpu_spin_unlock(&next_task->lock);
+		wake_up_and_unlock_task(next_task);
 	}
 }
 #endif
@@ -128,12 +157,11 @@ static void condition_broadcast(struct starpu_omp_condition *condition)
 	while (condition->contention_list_head != NULL)
 	{
 		struct starpu_omp_task *next_task = condition->contention_list_head->task;
+		weak_task_lock(next_task);
 		condition->contention_list_head = condition->contention_list_head->next;
-		_starpu_spin_lock(&next_task->lock);
 		STARPU_ASSERT(next_task->wait_on & starpu_omp_task_wait_on_condition);
 		next_task->wait_on &= ~starpu_omp_task_wait_on_condition;
-		_wake_up_locked_task(next_task);
-		_starpu_spin_unlock(&next_task->lock);
+		wake_up_and_unlock_task(next_task);
 	}
 }
 
@@ -334,7 +362,10 @@ static void starpu_omp_explicit_task_entry(struct starpu_omp_task *task)
 	else
 		_STARPU_ERROR("invalid worker architecture");
 	_starpu_omp_unregister_task_handles(task);
+	_starpu_spin_lock(&task->lock);
 	task->state = starpu_omp_task_state_terminated;
+	task->transaction_pending=1;
+	_starpu_spin_unlock(&task->lock);
 	struct starpu_omp_thread *thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
 	/* 
 	 * the task reached the terminated state, definitively give hand back to the worker code.
@@ -446,21 +477,29 @@ static void starpu_omp_task_completion_accounting(struct starpu_omp_task *task)
 	struct starpu_omp_task *parent_task = task->parent_task;
 	struct starpu_omp_region *parallel_region = task->owner_region;
 
-	_starpu_spin_lock(&parent_task->lock);
+	weak_task_lock(parent_task);
 	if (STARPU_ATOMIC_ADD(&parent_task->child_task_count, -1) == 0)
 	{
 		if (parent_task->state == starpu_omp_task_state_zombie)
 		{
 			STARPU_ASSERT(!parent_task->is_implicit);
+			weak_task_unlock(parent_task);
 			destroy_omp_task_struct(parent_task);
 		}
 		else if (parent_task->wait_on & starpu_omp_task_wait_on_task_childs)
 		{
 			parent_task->wait_on &= ~starpu_omp_task_wait_on_task_childs;
-			_wake_up_locked_task(parent_task);
+			wake_up_and_unlock_task(parent_task);
 		}
+		else
+		{
+			weak_task_unlock(parent_task);
+		}
+	}
+	else
+	{
+		weak_task_unlock(parent_task);
 	}
-	_starpu_spin_unlock(&parent_task->lock);
 	_starpu_spin_lock(&parallel_region->lock);
 	if (STARPU_ATOMIC_ADD(&parallel_region->bound_explicit_task_count, -1) == 0)
 	{
@@ -469,14 +508,13 @@ static void starpu_omp_task_completion_accounting(struct starpu_omp_task *task)
 
 		if (waiting_task)
 		{
-			_starpu_spin_lock(&waiting_task->lock);
+			weak_task_lock(waiting_task);
 			_starpu_spin_lock(&parallel_region->lock);
 			parallel_region->waiting_task = NULL;
 			STARPU_ASSERT(waiting_task->wait_on & starpu_omp_task_wait_on_region_tasks);
 			waiting_task->wait_on &= ~starpu_omp_task_wait_on_region_tasks;
-			_wake_up_locked_task(waiting_task);
 			_starpu_spin_unlock(&parallel_region->lock);
-			_starpu_spin_unlock(&waiting_task->lock);
+			wake_up_and_unlock_task(waiting_task);
 		}
 	}
 	else
@@ -487,16 +525,23 @@ static void starpu_omp_task_completion_accounting(struct starpu_omp_task *task)
 	{
 		struct starpu_omp_task *leader_task = task->task_group->leader_task;
 		STARPU_ASSERT(leader_task != task);
-		_starpu_spin_lock(&leader_task->lock);
+		weak_task_lock(leader_task);
 		if (STARPU_ATOMIC_ADD(&task->task_group->descendent_task_count, -1) == 0)
 		{
 			if (leader_task->wait_on & starpu_omp_task_wait_on_group)
 			{
 				leader_task->wait_on &= ~starpu_omp_task_wait_on_group;
-				_wake_up_locked_task(leader_task);
+				wake_up_and_unlock_task(leader_task);
+			}
+			else
+			{
+				weak_task_unlock(leader_task);
 			}
 		}
-		_starpu_spin_unlock(&leader_task->lock);
+		else
+		{
+			weak_task_unlock(leader_task);
+		}
 	}
 }
 /*
@@ -1170,15 +1215,6 @@ void starpu_omp_parallel_region(const struct starpu_omp_parallel_region_attr *at
 	destroy_omp_region_struct(new_region);
 }
 
-static void _wake_up_locked_task(struct starpu_omp_task *task)
-{
-	if (task->wait_on == 0)
-	{
-		int ret = starpu_task_submit(task->starpu_task);
-		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
-	}
-}
-
 static void wake_up_barrier(struct starpu_omp_region *parallel_region)
 {
 	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
@@ -1189,28 +1225,13 @@ static void wake_up_barrier(struct starpu_omp_region *parallel_region)
 	{
 		if (implicit_task == task)
 			continue;
-		_starpu_spin_lock(&implicit_task->lock);
+		weak_task_lock(implicit_task);
 		STARPU_ASSERT(implicit_task->wait_on & starpu_omp_task_wait_on_barrier);
 		implicit_task->wait_on &= ~starpu_omp_task_wait_on_barrier;
-		_wake_up_locked_task(implicit_task);
-		_starpu_spin_unlock(&implicit_task->lock);
+		wake_up_and_unlock_task(implicit_task);
 	}
 }
 
-static void barrier__sleep_callback(void *_task)
-{
-	struct starpu_omp_task *task = _task;
-	_starpu_spin_unlock(&task->lock);
-}
-
-static void region_tasks__sleep_callback(void *_task)
-{
-	struct starpu_omp_task *task = _task;
-	struct starpu_omp_region *parallel_region = task->owner_region;
-	_starpu_spin_unlock(&task->lock);
-	_starpu_spin_unlock(&parallel_region->lock);
-}
-
 void starpu_omp_barrier(void)
 {
 	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
@@ -1233,13 +1254,16 @@ void starpu_omp_barrier(void)
 		{
 			task->wait_on |= starpu_omp_task_wait_on_region_tasks;
 			parallel_region->waiting_task = task;
-			_starpu_task_prepare_for_continuation_ext(0, region_tasks__sleep_callback, task);
+			task->transaction_pending = 1;
+			_starpu_spin_unlock(&parallel_region->lock);
+			_starpu_spin_unlock(&task->lock);
+			_starpu_task_prepare_for_continuation_ext(0, transaction_callback, task);
 			starpu_omp_task_preempt();
 		}
 		else
 		{
-			_starpu_spin_unlock(&task->lock);
 			_starpu_spin_unlock(&parallel_region->lock);
+			_starpu_spin_unlock(&task->lock);
 		}
 		wake_up_barrier(parallel_region);
 	}
@@ -1252,7 +1276,9 @@ void starpu_omp_barrier(void)
 		 */
 
 		task->wait_on |= starpu_omp_task_wait_on_barrier;
-		_starpu_task_prepare_for_continuation_ext(0, barrier__sleep_callback, task);
+		task->transaction_pending = 1;
+		_starpu_spin_unlock(&task->lock);
+		_starpu_task_prepare_for_continuation_ext(0, transaction_callback, task);
 		starpu_omp_task_preempt();
 		STARPU_ASSERT(task->child_task_count == 0);
 	}
@@ -1376,12 +1402,6 @@ void starpu_omp_single_copyprivate_inline_end(void)
 	starpu_omp_barrier();
 }
 
-static void critical__sleep_callback(void *_critical)
-{
-	struct starpu_omp_critical *critical = _critical;
-	_starpu_spin_unlock(&critical->lock);
-}
-
 void starpu_omp_critical(void (*f)(void *arg), void *arg, const char *name)
 {
 	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
@@ -1410,11 +1430,13 @@ void starpu_omp_critical(void (*f)(void *arg), void *arg, const char *name)
 	{
 		_starpu_spin_lock(&task->lock);
 		task->wait_on |= starpu_omp_task_wait_on_critical;
-		_starpu_spin_unlock(&task->lock);
+		task->transaction_pending = 1;
 		link.task = task;
 		link.next = critical->contention_list_head;
 		critical->contention_list_head = &link;
-		_starpu_task_prepare_for_continuation_ext(0, critical__sleep_callback, critical);
+		_starpu_spin_unlock(&task->lock);
+		_starpu_spin_unlock(&critical->lock);
+		_starpu_task_prepare_for_continuation_ext(0, transaction_callback, task);
 		starpu_omp_task_preempt();
 
 		/* re-acquire the spin lock */
@@ -1431,12 +1453,11 @@ void starpu_omp_critical(void (*f)(void *arg), void *arg, const char *name)
 	if (critical->contention_list_head != NULL)
 	{
 		struct starpu_omp_task *next_task = critical->contention_list_head->task;
+		weak_task_lock(next_task);
 		critical->contention_list_head = critical->contention_list_head->next;
-		_starpu_spin_lock(&next_task->lock);
 		STARPU_ASSERT(next_task->wait_on & starpu_omp_task_wait_on_critical);
 		next_task->wait_on &= ~starpu_omp_task_wait_on_critical;
-		_wake_up_locked_task(next_task);
-		_starpu_spin_unlock(&next_task->lock);
+		wake_up_and_unlock_task(next_task);
 	}
 	_starpu_spin_unlock(&critical->lock);
 }
@@ -1469,11 +1490,13 @@ void starpu_omp_critical_inline_begin(const char *name)
 	{
 		_starpu_spin_lock(&task->lock);
 		task->wait_on |= starpu_omp_task_wait_on_critical;
-		_starpu_spin_unlock(&task->lock);
+		task->transaction_pending = 1;
 		link.task = task;
 		link.next = critical->contention_list_head;
 		critical->contention_list_head = &link;
-		_starpu_task_prepare_for_continuation_ext(0, critical__sleep_callback, critical);
+		_starpu_spin_unlock(&task->lock);
+		_starpu_spin_unlock(&critical->lock);
+		_starpu_task_prepare_for_continuation_ext(0, transaction_callback, task);
 		starpu_omp_task_preempt();
 
 		/* re-acquire the spin lock */
@@ -1505,12 +1528,11 @@ void starpu_omp_critical_inline_end(const char *name)
 	if (critical->contention_list_head != NULL)
 	{
 		struct starpu_omp_task *next_task = critical->contention_list_head->task;
+		weak_task_lock(next_task);
 		critical->contention_list_head = critical->contention_list_head->next;
-		_starpu_spin_lock(&next_task->lock);
 		STARPU_ASSERT(next_task->wait_on & starpu_omp_task_wait_on_critical);
 		next_task->wait_on &= ~starpu_omp_task_wait_on_critical;
-		_wake_up_locked_task(next_task);
-		_starpu_spin_unlock(&next_task->lock);
+		wake_up_and_unlock_task(next_task);
 	}
 	_starpu_spin_unlock(&critical->lock);
 }
@@ -1522,6 +1544,8 @@ static void explicit_task__destroy_callback(void *_task)
 	task->starpu_task->omp_task = NULL;
 	task->starpu_task = NULL;
 	_starpu_spin_lock(&task->lock);
+	STARPU_ASSERT(task->transaction_pending == 1);
+	task->transaction_pending = 0;
 	if (task->child_task_count != 0)
 	{
 		task->state = starpu_omp_task_state_zombie;
@@ -1710,12 +1734,6 @@ void starpu_omp_task_region(const struct starpu_omp_task_region_attr *attr)
 	}
 }
 
-static void task_childs__sleep_callback(void *_task)
-{
-	struct starpu_omp_task *task = _task;
-	_starpu_spin_unlock(&task->lock);
-}
-
 void starpu_omp_taskwait(void)
 {
 	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
@@ -1723,7 +1741,9 @@ void starpu_omp_taskwait(void)
 	if (task->child_task_count > 0)
 	{
 		task->wait_on |= starpu_omp_task_wait_on_task_childs;
-		_starpu_task_prepare_for_continuation_ext(0, task_childs__sleep_callback, task);
+		task->transaction_pending = 1;
+		_starpu_spin_unlock(&task->lock);
+		_starpu_task_prepare_for_continuation_ext(0, transaction_callback, task);
 		starpu_omp_task_preempt();
 		STARPU_ASSERT(task->child_task_count == 0);
 	}
@@ -1733,12 +1753,6 @@ void starpu_omp_taskwait(void)
 	}
 }
 
-static void group__sleep_callback(void *_task)
-{
-	struct starpu_omp_task *task = _task;
-	_starpu_spin_unlock(&task->lock);
-}
-
 void starpu_omp_taskgroup(void (*f)(void *arg), void *arg)
 {
 	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
@@ -1752,7 +1766,9 @@ void starpu_omp_taskgroup(void (*f)(void *arg), void *arg)
 	if (task_group.descendent_task_count > 0)
 	{
 		task->wait_on |= starpu_omp_task_wait_on_group;
-		_starpu_task_prepare_for_continuation_ext(0, group__sleep_callback, task);
+		task->transaction_pending = 1;
+		_starpu_spin_unlock(&task->lock);
+		_starpu_task_prepare_for_continuation_ext(0, transaction_callback, task);
 		starpu_omp_task_preempt();
 		STARPU_ASSERT(task_group.descendent_task_count == 0);
 	}
@@ -1783,7 +1799,9 @@ void starpu_omp_taskgroup_inline_end(void)
 	if (p_task_group->descendent_task_count > 0)
 	{
 		task->wait_on |= starpu_omp_task_wait_on_group;
-		_starpu_task_prepare_for_continuation_ext(0, group__sleep_callback, task);
+		task->transaction_pending = 1;
+		_starpu_spin_unlock(&task->lock);
+		_starpu_task_prepare_for_continuation_ext(0, transaction_callback, task);
 		starpu_omp_task_preempt();
 		STARPU_ASSERT(p_task_group->descendent_task_count == 0);
 	}

+ 1 - 0
src/util/openmp_runtime_support.h

@@ -208,6 +208,7 @@ LIST_TYPE(starpu_omp_task,
 	int child_task_count;
 	struct starpu_omp_task_group *task_group;
 	struct _starpu_spinlock lock;
+	int transaction_pending;
 	int wait_on;
 	int barrier_count;
 	int single_id;