Explorar o código

- add support for omp barrier, omp master [wait|nowait], omp single [wait|nowait]
- various fixes

Olivier Aumage %!s(int64=11) %!d(string=hai) anos
pai
achega
4383f09ef9
Modificáronse 2 ficheiros con 114 adicións e 23 borrados
  1. 108 23
      src/util/openmp_runtime_support.c
  2. 6 0
      src/util/openmp_runtime_support.h

+ 108 - 23
src/util/openmp_runtime_support.c

@@ -24,6 +24,7 @@
 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
 #include <util/openmp_runtime_support.h>
 #include <core/task.h>
+#include <common/starpu_spinlock.h>
 #include <stdlib.h>
 #include <ctype.h>
 #include <strings.h>
@@ -77,6 +78,8 @@ static struct starpu_omp_region *create_omp_region_struct(struct starpu_omp_regi
 	region->implicit_task_list = starpu_omp_task_list_new();
 
 	region->nb_threads = 0;
+	region->barrier_count = 0;
+	region->single_id = 0;
 	region->level = (parent_region != NULL)?parent_region->level+1:0;
 	region->continuation_starpu_task = NULL;
 	return region;
@@ -96,12 +99,12 @@ static void destroy_omp_region_struct(struct starpu_omp_region *region)
 
 static void omp_initial_thread_func(void)
 {
-	struct starpu_omp_thread *init_thread = _global_state.initial_thread;
-	struct starpu_omp_task *init_task = _global_state.initial_task;
-	struct starpu_task *continuation_starpu_task = init_task->nested_region->continuation_starpu_task;
+	struct starpu_omp_thread *initial_thread = _global_state.initial_thread;
+	struct starpu_omp_task *initial_task = _global_state.initial_task;
 	while (1)
 	{
-		starpu_driver_run_once(&init_thread->starpu_driver);
+		struct starpu_task *continuation_starpu_task = initial_task->nested_region->continuation_starpu_task;
+		starpu_driver_run_once(&initial_thread->starpu_driver);
 
 		/*
 		 * if we are leaving the first nested region we give control back to initial task
@@ -109,8 +112,9 @@ static void omp_initial_thread_func(void)
 		 */
 		if (_starpu_task_test_termination(continuation_starpu_task))
 		{
-			init_task->nested_region->continuation_starpu_task = NULL;
-			swapcontext(&init_thread->ctx, &init_task->ctx);
+			initial_task->nested_region->continuation_starpu_task = NULL;
+			STARPU_PTHREAD_SETSPECIFIC(omp_task_key, initial_task);
+			swapcontext(&initial_thread->ctx, &initial_task->ctx);
 		}
 	}
 }
@@ -261,6 +265,9 @@ static struct starpu_omp_task *create_omp_task_struct(struct starpu_omp_task *pa
 	task->owner_region = owner_region;
 	task->nested_region = NULL;
 	task->is_implicit = is_implicit;
+	_starpu_spin_init(&task->lock);
+	task->barrier_count = 0;
+	task->single_id = 0;
 	/* TODO: initialize task->data_env_icvs with proper values */ 
 	memset(&task->data_env_icvs, 0, sizeof(task->data_env_icvs));
 	if (is_implicit)
@@ -309,6 +316,7 @@ static void destroy_omp_task_struct(struct starpu_omp_task *task)
 	{
 		free(task->stack);
 	}
+	_starpu_spin_destroy(&task->lock);
 	memset(task, 0, sizeof(*task));
 	starpu_omp_task_delete(task);
 }
@@ -432,22 +440,22 @@ void starpu_omp_shutdown(void)
 	STARPU_PTHREAD_KEY_DELETE(omp_thread_key);
 }
 
-void starpu_parallel_region(const struct starpu_codelet * const _parallel_region_cl,
+void starpu_omp_parallel_region(const struct starpu_codelet * const _parallel_region_cl,
 		void * const parallel_region_cl_arg)
 {
 	struct starpu_codelet parallel_region_cl = *_parallel_region_cl;
 	struct starpu_omp_thread *master_thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
-	struct starpu_omp_task *parent_task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
-	struct starpu_omp_region *parent_region = parent_task->owner_region;
+	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
+	struct starpu_omp_region *region = task->owner_region;
 	int ret;
 
 	/* TODO: compute the proper nb_threads and launch additional workers as needed.
 	 * for now, the level 1 parallel region spans all the threads
 	 * and level >= 2 parallel regions have only one thread */
-	int nb_threads = (parent_region->level == 0)?starpu_cpu_worker_get_count():1;
+	int nb_threads = (region->level == 0)?starpu_cpu_worker_get_count():1;
 
 	struct starpu_omp_region *new_region = 
-		create_omp_region_struct(parent_region, _global_state.initial_device);
+		create_omp_region_struct(region, _global_state.initial_device);
 
 	int i;
 	for (i = 0; i < nb_threads; i++)
@@ -465,7 +473,7 @@ void starpu_parallel_region(const struct starpu_codelet * const _parallel_region
 			new_thread = create_omp_thread_struct(new_region);
 
 			/* TODO: use a less arbitrary thread/worker mapping scheme */
-			if (parent_region->level == 0)
+			if (region->level == 0)
 			{
 				new_thread->starpu_worker_id = i;
 			}
@@ -477,21 +485,20 @@ void starpu_parallel_region(const struct starpu_codelet * const _parallel_region
 		}
 
 		new_region->nb_threads++;
-		struct starpu_omp_task *new_task = create_omp_task_struct(parent_task, new_thread, new_region, 1);
+		struct starpu_omp_task *new_task = create_omp_task_struct(task, new_thread, new_region, 1);
 		starpu_omp_task_list_push_back(new_region->implicit_task_list, new_task);
 
 	}
 	STARPU_ASSERT(new_region->nb_threads == nb_threads);
 
 	/* 
-	 * if parent_task == initial_task, create a starpu task as a continuation to all the implicit
-	 * tasks of the new region, else prepare the parent_task for preemption,
+	 * if task == initial_task, create a starpu task as a continuation to all the implicit
+	 * tasks of the new region, else prepare the task for preemption,
 	 * to become itself a continuation to the implicit tasks of the new region
 	 */
-	if (parent_task == _global_state.initial_task)
+	if (task == _global_state.initial_task)
 	{
 		new_region->continuation_starpu_task = starpu_task_create();
-
 		/* in that case, the continuation starpu task is only used for synchronisation */
 		new_region->continuation_starpu_task->cl = NULL;
 		new_region->continuation_starpu_task->workerid = master_thread->starpu_worker_id;
@@ -504,9 +511,9 @@ void starpu_parallel_region(const struct starpu_codelet * const _parallel_region
 	{
 		/* through the preemption, the parent starpu task becomes the continuation task */
 		_starpu_task_prepare_for_continuation();
-		new_region->continuation_starpu_task = parent_task->starpu_task;
+		new_region->continuation_starpu_task = task->starpu_task;
 	}
-	parent_task->nested_region = new_region;
+	task->nested_region = new_region;
 
 	/*
 	 * save pointer to the regions user function from the parallel region codelet
@@ -552,9 +559,9 @@ void starpu_parallel_region(const struct starpu_codelet * const _parallel_region
 	}
 
 	/*
-	 * submit the region continuation starpu task if parent_task == initial_task
+	 * submit the region continuation starpu task if task == initial_task
 	 */
-	if (parent_task == _global_state.initial_task)
+	if (task == _global_state.initial_task)
 	{
 		ret = _starpu_task_submit_internally(new_region->continuation_starpu_task);
 		STARPU_CHECK_RETURN_VALUE(ret, "_starpu_task_submit_internally");
@@ -564,7 +571,7 @@ void starpu_parallel_region(const struct starpu_codelet * const _parallel_region
 	 * preempt for completion of the region
 	 */
 	starpu_omp_task_preempt();
-	if (parent_task == _global_state.initial_task)
+	if (task == _global_state.initial_task)
 	{
 		STARPU_ASSERT(new_region->continuation_starpu_task == NULL);
 	}
@@ -592,10 +599,88 @@ void starpu_parallel_region(const struct starpu_codelet * const _parallel_region
 		destroy_omp_task_struct(implicit_task);
 	}
 	STARPU_ASSERT(new_region->nb_threads == 0);
-	parent_task->nested_region = NULL;
+	task->nested_region = NULL;
 	destroy_omp_region_struct(new_region);
 }
 
+void starpu_omp_barrier(void)
+{
+	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
+	/* Assume barriers are performed in by the implicit tasks of a region */
+	STARPU_ASSERT(task->is_implicit);
+	struct starpu_omp_region *region = task->owner_region;
+	_starpu_spin_lock(&task->lock);
+	int inc_barrier_count = STARPU_ATOMIC_ADD(&region->barrier_count, 1);
+
+	if (inc_barrier_count == region->nb_threads)
+	{
+		struct starpu_omp_task * implicit_task;
+		/* last task reaching the barrier */
+		region->barrier_count = 0;
+		_starpu_spin_unlock(&task->lock);
+		for (implicit_task  = starpu_omp_task_list_begin(region->implicit_task_list);
+				implicit_task != starpu_omp_task_list_end(region->implicit_task_list);
+				implicit_task  = starpu_omp_task_list_next(implicit_task))
+		{
+			int ret;
+			if (implicit_task == task)
+				continue;
+			_starpu_spin_lock(&implicit_task->lock);
+			ret = starpu_task_submit(implicit_task->starpu_task);
+			STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+			_starpu_spin_unlock(&implicit_task->lock);
+		}
+	}
+	else
+	{
+		/* not the last task reaching the barrier
+		 * . prepare for conditional continuation 
+		 * . sleep
+		 */
+
+		_starpu_task_prepare_for_conditional_continuation(&task->lock);
+		starpu_omp_task_preempt();
+	}
+}
+
+void starpu_omp_master(void (*f)(void), int nowait)
+{
+	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
+	struct starpu_omp_thread *thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
+	/* Assume singles are performed in by the implicit tasks of a region */
+	STARPU_ASSERT(task->is_implicit);
+	struct starpu_omp_region *region = task->owner_region;
+
+	if (thread == region->master_thread)
+	{
+		f();
+	}
+
+	if (!nowait)
+	{
+		starpu_omp_barrier();
+	}
+}
+
+void starpu_omp_single(void (*f)(void), int nowait)
+{
+	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
+	/* Assume singles are performed in by the implicit tasks of a region */
+	STARPU_ASSERT(task->is_implicit);
+	struct starpu_omp_region *region = task->owner_region;
+	int first = STARPU_BOOL_COMPARE_AND_SWAP(&region->single_id, task->single_id, task->single_id+1);
+	task->single_id++;
+
+	if (first)
+	{
+		f();
+	}
+
+	if (!nowait)
+	{
+		starpu_omp_barrier();
+	}
+}
 /*
  * restore deprecated diagnostics (-Wdeprecated-declarations)
  */

+ 6 - 0
src/util/openmp_runtime_support.h

@@ -21,6 +21,7 @@
 
 #ifdef STARPU_OPENMP
 #include <common/list.h>
+#include <common/starpu_spinlock.h>
 
 /* ucontexts have been deprecated as of POSIX 1-2004
  * _XOPEN_SOURCE required at least on OS/X
@@ -175,6 +176,9 @@ LIST_TYPE(starpu_omp_task,
 	struct starpu_omp_region *owner_region;
 	struct starpu_omp_region *nested_region;
 	int is_implicit;
+	struct _starpu_spinlock lock;
+	int barrier_count;
+	int single_id;
 	struct starpu_omp_data_environment_icvs data_env_icvs;
 	struct starpu_omp_implicit_task_icvs implicit_task_icvs;
 
@@ -236,6 +240,8 @@ struct starpu_omp_region
 	struct starpu_omp_task_list *implicit_task_list;
 	/* include both the master thread and the region own threads */
 	int nb_threads;
+	int barrier_count;
+	int single_id;
 	int level;
 	struct starpu_task *continuation_starpu_task;
 };