ソースを参照

- more ongoing work on the omp runtime support

Olivier Aumage 11 年 前
コミット
0bb92b846b
共有2 個のファイルを変更した175 個の追加20 個の削除を含む
  1. 160 12
      src/util/openmp_runtime_support.c
  2. 15 8
      src/util/openmp_runtime_support.h

+ 160 - 12
src/util/openmp_runtime_support.c

@@ -23,6 +23,7 @@
 #pragma GCC diagnostic push
 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
 #include <util/openmp_runtime_support.h>
+#include <core/task.h>
 #include <stdlib.h>
 #include <ctype.h>
 #include <strings.h>
@@ -57,11 +58,11 @@ static struct starpu_omp_region *create_omp_region_struct(struct starpu_omp_regi
 
 	region->parent_region = parent_region;
 	region->owner_device = owner_device;
-	region->threads_list = calloc(nb_threads, sizeof(*region->threads_list));
-	if (region->threads_list == NULL)
-		_STARPU_ERROR("memory allocation failed");
+	region->thread_list = starpu_omp_thread_list_new();
+	region->implicit_task_list = starpu_omp_task_list_new();
 
 	region->nb_threads = nb_threads;
+	region->level = (parent_region != NULL)?parent_region->level+1:0;
 	return region;
 }
 
@@ -130,12 +131,6 @@ static void omp_initial_thread_exit()
 	/* TODO: free initial_thread data structures */
 }
 
-static void set_master_thread(struct starpu_omp_region *region, struct starpu_omp_thread *master_thread)
-{
-	STARPU_ASSERT(region->nb_threads >= 1 && region->threads_list[0] == NULL);
-	region->threads_list[0] = master_thread;
-}
-
 static struct starpu_omp_thread *create_omp_thread_struct(struct starpu_omp_region *owner_region)
 {
 	struct starpu_omp_thread *thread = malloc(sizeof(*thread));
@@ -145,6 +140,8 @@ static struct starpu_omp_thread *create_omp_thread_struct(struct starpu_omp_regi
 	thread->current_task = NULL;
 	/* .owner_region */
 	thread->owner_region = owner_region;
+	/* .primary_task */
+	thread->primary_task = NULL;
 	/* .init_thread_stack */
 	thread->initial_thread_stack = NULL;
 	/* .ctx */
@@ -194,6 +191,25 @@ static void starpu_omp_task_exec(void *buffers[], void *cl_arg)
 	struct starpu_omp_task *task = starpu_task_get_current()->omp_task;
 	STARPU_PTHREAD_SETSPECIFIC(omp_task_key, task);
 	struct starpu_omp_thread *thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
+	if (thread == NULL)
+	{
+		/*
+		 * this is the first time an omp task is launched on the current worker.
+		 * this first task should be an implicit parallel region task.
+		 */
+		if (!task->is_implicit)
+			_STARPU_ERROR("unexpected omp task\n");
+
+		thread = task->owner_thread;
+		STARPU_ASSERT(thread->owner_region != NULL);
+		STARPU_ASSERT(thread->owner_region == task->owner_region);
+		thread->primary_task = task;
+
+		/*
+		 * make this worker an omp-enabled worker
+		 */
+		STARPU_PTHREAD_SETSPECIFIC(omp_thread_key, thread);
+	}
 	if (task->state != starpu_omp_task_state_preempted)
 	{
 		task->starpu_buffers = buffers;
@@ -211,10 +227,37 @@ static void starpu_omp_task_exec(void *buffers[], void *cl_arg)
 	STARPU_ASSERT(task->state == starpu_omp_task_state_preempted
 			|| task->state == starpu_omp_task_state_terminated);
 	STARPU_PTHREAD_SETSPECIFIC(omp_task_key, NULL);
+	if (task->state == starpu_omp_task_state_terminated && task == thread->primary_task)
+	{
+		/*
+		 * make this worker an omp-disabled worker
+		 */
+		STARPU_PTHREAD_SETSPECIFIC(omp_thread_key, NULL);
+		thread->primary_task = NULL;
+
+		/*
+		 * make sure this worker wont be used for running omp tasks
+		 * until a new region is created
+		 */
+		thread->owner_region = NULL;
+	}
 
 	/* TODO: analyse the cause of the return and take appropriate steps */
 }
 
+/*
+ * prepare the starpu_task fields of a currently running task
+ * for accepting a new set of dependencies in anticipation of a preemption
+ *
+ * when the task becomes preempted, it will only be queued again when the new
+ * set of dependencies is fulfilled
+ */
+static void _starpu_task_prepare_for_preemption(struct starpu_task *starpu_task)
+{
+	/* TODO: implement funciton */
+	(void)starpu_task;
+}
+
 static struct starpu_omp_task *create_omp_task_struct(struct starpu_omp_task *parent_task,
 		struct starpu_omp_thread *owner_thread, struct starpu_omp_region *owner_region, int is_implicit)
 {
@@ -274,9 +317,8 @@ int starpu_omp_init(void)
 	_global_state.initial_device = create_omp_device_struct();
 	_global_state.initial_region = create_omp_region_struct(NULL, _global_state.initial_device, 1);
 	_global_state.initial_thread = create_omp_thread_struct(_global_state.initial_region);
-	/* TODO: initialize context for initial thread */
-
-	set_master_thread(_global_state.initial_region, _global_state.initial_thread);
+	starpu_omp_thread_list_push_back(_global_state.initial_region->thread_list,
+			_global_state.initial_thread);
 	_global_state.initial_task = create_omp_task_struct(NULL,
 			_global_state.initial_thread, _global_state.initial_region, 1);
 	_starpu_omp_global_state = &_global_state;
@@ -300,6 +342,112 @@ void starpu_omp_shutdown(void)
 	/* TODO: free ICV variables */
 	/* TODO: free task/thread/region/device structures */
 }
+
+void starpu_parallel_region(struct starpu_codelet *parallel_region_cl, void *parallel_region_cl_arg)
+{
+	struct starpu_omp_thread *master_thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
+	struct starpu_omp_task *parent_task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
+	struct starpu_omp_region *parent_region = parent_task->owner_region;
+	int ret;
+
+	/* TODO: compute the proper nb_threads and launch additional workers as needed.
+	 * for now, the level 1 parallel region spans all the threads
+	 * and level >= 2 parallel regions have only one thread */
+	int nb_threads = (parent_region->level == 0)?starpu_cpu_worker_get_count():1;
+
+	struct starpu_omp_region *new_region = 
+		create_omp_region_struct(parent_region, _global_state.initial_device, 1);
+
+	int i;
+	for (i = 0; i < nb_threads; i++)
+	{
+		struct starpu_omp_thread *new_thread =
+			(i == 0) ? master_thread : create_omp_thread_struct(new_region);
+		/* TODO: specify actual starpu worker */
+
+		starpu_omp_thread_list_push_back(new_region->thread_list, new_thread);
+		struct starpu_omp_task *new_task = create_omp_task_struct(parent_task, new_thread, new_region, 1);
+		starpu_omp_task_list_push_back(new_region->implicit_task_list, new_task);
+	}
+
+	/* 
+	 * if parent_task == initial_task, create a starpu task as a continuation to all the implicit
+	 * tasks of the new region, else prepare the parent_task for preemption,
+	 * to become itself a continuation to the implicit tasks of the new region
+	 */
+	if (parent_task == _global_state.initial_task)
+	{
+		new_region->continuation_starpu_task = starpu_task_create();
+
+		/* in that case, the continuation starpu task is only used for synchronisation */
+		new_region->continuation_starpu_task->cl = NULL;
+	}
+	else
+	{
+		/* through the preemption, the parent starpu task becomes the continuation task */
+		_starpu_task_prepare_for_preemption(parent_task->starpu_task);
+		new_region->continuation_starpu_task = parent_task->starpu_task;
+	}
+
+	/*
+	 * save pointer to the regions user function from the parallel region codelet
+	 *
+	 * TODO: add support for multiple/heterogeneous implementations
+	 */
+	void (*parallel_region_f)(void **starpu_buffers, void *starpu_cl_arg) = parallel_region_cl->cpu_funcs[0];
+
+	/*
+	 * plug the task wrapper into the parallel region codelet instead, to support task preemption
+	 */
+	parallel_region_cl->cpu_funcs[0] = starpu_omp_task_exec;
+
+	/*
+	 * create the starpu tasks for the implicit omp tasks,
+	 * create explicit dependencies between these starpu tasks and the continuation starpu task
+	 */
+	struct starpu_omp_task * implicit_task;
+	for (implicit_task  = starpu_omp_task_list_begin(new_region->implicit_task_list);
+			implicit_task != starpu_omp_task_list_end(new_region->implicit_task_list);
+			implicit_task  = starpu_omp_task_list_next(implicit_task))
+	{
+		implicit_task->f = parallel_region_f;
+
+		implicit_task->starpu_task = starpu_task_create();
+		implicit_task->starpu_task->cl = parallel_region_cl;
+		implicit_task->starpu_task->cl_arg = parallel_region_cl_arg;
+		starpu_task_declare_deps_array(new_region->continuation_starpu_task, 1, &implicit_task->starpu_task);
+	}
+
+	/*
+	 * submit all the region implicit starpu tasks
+	 */
+	for (implicit_task  = starpu_omp_task_list_begin(new_region->implicit_task_list);
+			implicit_task != starpu_omp_task_list_end(new_region->implicit_task_list);
+			implicit_task  = starpu_omp_task_list_next(implicit_task))
+	{
+		ret = starpu_task_submit(implicit_task->starpu_task);
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+	}
+
+	/*
+	 * submit the region continuation starpu task if parent_task == initial_task
+	 */
+	if (parent_task == _global_state.initial_task)
+	{
+		ret = _starpu_task_submit_internally(new_region->continuation_starpu_task);
+		STARPU_CHECK_RETURN_VALUE(ret, "_starpu_task_submit_internally");
+	}
+
+	/*
+	 * preempt for completion of the region
+	 */
+	starpu_omp_task_preempt();
+
+	/*
+	 * TODO: free region resources
+	 */
+}
+
 /*
  * restore deprecated diagnostics (-Wdeprecated-declarations)
  */

+ 15 - 8
src/util/openmp_runtime_support.h

@@ -20,6 +20,7 @@
 #include <starpu.h>
 
 #ifdef STARPU_OPENMP
+#include <common/list.h>
 
 /* ucontexts have been deprecated as of POSIX 1-2004
  * _XOPEN_SOURCE required at least on OS/X
@@ -165,8 +166,8 @@ enum starpu_omp_task_state
 	starpu_omp_task_state_terminated = 2,
 };
 
-struct starpu_omp_task
-{
+LIST_TYPE(starpu_omp_task,
+
 	struct starpu_omp_task *parent_task;
 	struct starpu_omp_thread *owner_thread;
 	struct starpu_omp_region *owner_region;
@@ -179,7 +180,7 @@ struct starpu_omp_task
 	void *starpu_cl_arg;
 
 	/* actual task function to be run */
-	void (*f)(void **, void*);
+	void (*f)(void **starpu_buffers, void *starpu_cl_arg);
 
 	enum starpu_omp_task_state state;
 
@@ -194,13 +195,15 @@ struct starpu_omp_task
 	 * in case blocking/recursive task operation
 	 */
 	void *stack;
-};
+)
+
+LIST_TYPE(starpu_omp_thread,
 
-struct starpu_omp_thread
-{
 	struct starpu_omp_task *current_task;
 	struct starpu_omp_region *owner_region;
 
+	struct starpu_omp_task *primary_task;
+
 	/*
 	 * stack to execute the initial thread over
 	 * when preempting the initial task
@@ -216,15 +219,19 @@ struct starpu_omp_thread
 	ucontext_t ctx;
 
 	struct starpu_driver starpu_driver;
-};
+)
 
 struct starpu_omp_region
 {
 	struct starpu_omp_region *parent_region;
 	struct starpu_omp_device *owner_device;
 	/* note: the list of threads include the master_thread as first element */
-	struct starpu_omp_thread **threads_list;
+	struct starpu_omp_thread_list *thread_list;
+	/* list of implicit omp task created to run the region */
+	struct starpu_omp_task_list *implicit_task_list;
 	int nb_threads;
+	int level;
+	struct starpu_task *continuation_starpu_task;
 };
 
 struct starpu_omp_device