Browse Source

- initial thread handling

Olivier Aumage 11 years ago
parent
commit
934fa3edfc
2 changed files with 122 additions and 9 deletions
  1. 113 9
      src/util/openmp_runtime_support.c
  2. 9 0
      src/util/openmp_runtime_support.h

+ 113 - 9
src/util/openmp_runtime_support.c

@@ -16,6 +16,12 @@
 
 #include <starpu.h>
 #ifdef STARPU_OPENMP
+/*
+ * locally disable -Wdeprecated-declarations to avoid
+ * lots of deprecated warnings for ucontext related functions
+ */
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
 #include <util/openmp_runtime_support.h>
 #include <stdlib.h>
 #include <ctype.h>
@@ -59,6 +65,71 @@ static struct starpu_omp_region *create_omp_region_struct(struct starpu_omp_regi
 	return region;
 }
 
+static void omp_initial_thread_func(struct starpu_omp_thread *init_thread, struct starpu_omp_task *init_task)
+{
+	while (1)
+	{
+		starpu_driver_run_once(&init_thread->starpu_driver);
+
+		/* TODO: check if we are leaving the first nested region or not
+		 *
+		 * if we are leaving the first nested region we give control back to initial task
+		 * otherwise, we should continue to execute work */
+		swapcontext(&init_thread->ctx, &init_task->ctx);
+	}
+}
+
+/*
+ * setup the main application thread to handle the possible preemption of the initial task
+ */
+static void omp_initial_thread_setup(void)
+{
+	struct starpu_omp_thread *initial_thread = _global_state.initial_thread;
+	struct starpu_omp_task *initial_task = _global_state.initial_task;
+	/* .current_task */
+	initial_thread->current_task = initial_task;
+	/* .owner_region already set in create_omp_thread_struct */
+	/* .initial_thread_stack */
+	initial_thread->initial_thread_stack = malloc(_STARPU_STACKSIZE);
+	if (initial_thread->initial_thread_stack == NULL)
+		_STARPU_ERROR("memory allocation failed");
+	/* .ctx */
+	getcontext(&initial_thread->ctx);
+	/*
+	 * we do not use uc_link, the initial thread always should give hand back to the initial task
+	 */
+	initial_thread->ctx.uc_link          = NULL;
+	initial_thread->ctx.uc_stack.ss_sp   = initial_thread->initial_thread_stack;
+	initial_thread->ctx.uc_stack.ss_size = _STARPU_STACKSIZE;
+	makecontext(&initial_thread->ctx, omp_initial_thread_func, 2, initial_thread, initial_task);
+	/* .starpu_driver */
+	/*
+	 * we configure starpu to not launch CPU worker 0
+	 * because we will use the main thread to play the role of worker 0
+	 */
+	struct starpu_conf conf;
+	int ret = starpu_conf_init(&conf);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_conf_init");
+	initial_thread->starpu_driver.type = STARPU_CPU_WORKER;
+	initial_thread->starpu_driver.id.cpu_id = 0;
+	conf.not_launched_drivers = &initial_thread->starpu_driver;
+	conf.n_not_launched_drivers = 1;
+	ret = starpu_init(&conf);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+	ret = starpu_driver_init(&initial_thread->starpu_driver);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_driver_init");
+}
+
+static void omp_initial_thread_exit()
+{
+	struct starpu_omp_thread *initial_thread = _global_state.initial_thread;
+	int ret = starpu_driver_deinit(&initial_thread->starpu_driver);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_driver_deinit");
+	starpu_shutdown();
+
+	/* TODO: free initial_thread data structures */
+}
+
 static void set_master_thread(struct starpu_omp_region *region, struct starpu_omp_thread *master_thread)
 {
 	STARPU_ASSERT(region->nb_threads >= 1 && region->threads_list[0] == NULL);
@@ -70,8 +141,15 @@ static struct starpu_omp_thread *create_omp_thread_struct(struct starpu_omp_regi
 	struct starpu_omp_thread *thread = malloc(sizeof(*thread));
 	if (thread == NULL)
 		_STARPU_ERROR("memory allocation failed");
+	/* .current_task */
+	thread->current_task = NULL;
+	/* .owner_region */
 	thread->owner_region = owner_region;
+	/* .init_thread_stack */
+	thread->initial_thread_stack = NULL;
+	/* .ctx */
 	memset(&thread->ctx, 0, sizeof(thread->ctx));
+	/* .starpu_driver will be initialized later on */
 	return thread;
 }
 
@@ -79,20 +157,38 @@ static void starpu_omp_task_entry(struct starpu_omp_task *task)
 {
 	task->f(task->starpu_buffers, task->starpu_cl_arg);
 	task->state = starpu_omp_task_state_terminated;
-	/* at the end of starpu task function, give hand back to the owner thread */
-	setcontext(&task->owner_thread->ctx);
+	struct starpu_omp_thread *thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
+	/* 
+	 * the task reached the terminated state, definitively give hand back to the worker code.
+	 *
+	 * about to run on the worker stack...
+	 */
+	setcontext(&thread->ctx);
+	STARPU_ASSERT(0); /* unreachable code */
 }
 
+/*
+ * stop executing a task that is about to block
+ * and give hand back to the thread
+ */
 static void starpu_omp_task_preempt(void)
 {
 	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
 	struct starpu_omp_thread *thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
 	task->state = starpu_omp_task_state_preempted;
 
-	/* we reached a blocked state, give hand back to the thread */
+	/* 
+	 * the task reached a blocked state, give hand back to the worker code.
+	 *
+	 * about to run on the worker stack...
+	 */
 	swapcontext(&task->ctx, &thread->ctx);
+	/* now running on the task stack again */
 }
 
+/*
+ * wrap a task function to allow the task to be preempted
+ */
 static void starpu_omp_task_exec(void *buffers[], void *cl_arg)
 {
 	struct starpu_omp_task *task = starpu_task_get_current()->omp_task;
@@ -105,8 +201,12 @@ static void starpu_omp_task_exec(void *buffers[], void *cl_arg)
 	}
 	task->state = starpu_omp_task_state_clear;
 
-	/* launch actual task on its own stack, or restore a previously preempted task */
+	/* 
+	 * start the task execution, or restore a previously preempted task.
+	 * about to run on the task stack...
+	 * */
 	swapcontext(&thread->ctx, &task->ctx);
+	/* now running on the worker stack again */
 
 	STARPU_ASSERT(task->state == starpu_omp_task_state_preempted
 			|| task->state == starpu_omp_task_state_terminated);
@@ -148,6 +248,8 @@ static struct starpu_omp_task *create_omp_task_struct(struct starpu_omp_task *pa
 	{
 		/* TODO: use ICV stack size info instead */
 		task->stack = malloc(_STARPU_STACKSIZE);
+		if (task->stack == NULL)
+			_STARPU_ERROR("memory allocation failed");
 		getcontext(&task->ctx);
 		/*
 		 * we do not use uc_link, starpu_omp_task_entry will handle
@@ -181,9 +283,8 @@ int starpu_omp_init(void)
 
 	STARPU_PTHREAD_KEY_CREATE(&omp_thread_key, NULL);
 	STARPU_PTHREAD_KEY_CREATE(&omp_task_key, NULL);
-	int ret = starpu_init(0);
-	if(ret < 0)
-		return ret;
+
+	omp_initial_thread_setup();
 
 	/* init clock reference for starpu_omp_get_wtick */
 	_starpu_omp_clock_ref = starpu_timing_now();
@@ -193,11 +294,14 @@ int starpu_omp_init(void)
 
 void starpu_omp_shutdown(void)
 {
-	starpu_shutdown();
+	omp_initial_thread_exit();
 	STARPU_PTHREAD_KEY_DELETE(omp_task_key);
 	STARPU_PTHREAD_KEY_DELETE(omp_thread_key);
 	/* TODO: free ICV variables */
 	/* TODO: free task/thread/region/device structures */
 }
-
+/*
+ * restore deprecated diagnostics (-Wdeprecated-declarations)
+ */
+#pragma GCC diagnostic pop
 #endif /* STARPU_OPENMP */

+ 9 - 0
src/util/openmp_runtime_support.h

@@ -202,11 +202,20 @@ struct starpu_omp_thread
 	struct starpu_omp_region *owner_region;
 
 	/*
+	 * stack to execute the initial thread over
+	 * when preempting the initial task
+	 * note: should not be used for other threads
+	 */
+	void *initial_thread_stack;
+
+	/*
 	 * context to store the 'scheduler' state of the thread,
 	 * to which the execution of thread comes back upon a
 	 * blocking/recursive task operation
 	 */
 	ucontext_t ctx;
+
+	struct starpu_driver starpu_driver;
 };
 
 struct starpu_omp_region