Browse Source

- rework omp task implementation
- reorganize wake-ups

Olivier Aumage 11 years ago
parent
commit
7280af2f1a
2 changed files with 168 additions and 76 deletions
  1. 156 76
      src/util/openmp_runtime_support.c
  2. 12 0
      src/util/openmp_runtime_support.h

+ 156 - 76
src/util/openmp_runtime_support.c

@@ -24,6 +24,7 @@
 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
 #include <util/openmp_runtime_support.h>
 #include <core/task.h>
+#include <core/workers.h>
 #include <common/list.h>
 #include <common/starpu_spinlock.h>
 #include <common/uthash.h>
@@ -40,6 +41,8 @@ static starpu_pthread_key_t omp_task_key;
 struct starpu_omp_global *_starpu_omp_global_state = NULL;
 double _starpu_omp_clock_ref = 0.0; /* clock reference for starpu_omp_get_wtick */
 
+static struct starpu_omp_critical *create_omp_critical_struct(void);
+static void destroy_omp_critical_struct(struct starpu_omp_critical *critical);
 static struct starpu_omp_device *create_omp_device_struct(void);
 static void destroy_omp_device_struct(struct starpu_omp_device *device);
 static struct starpu_omp_region *create_omp_region_struct(struct starpu_omp_region *parent_region, struct starpu_omp_device *owner_device);
@@ -49,14 +52,14 @@ static void destroy_omp_thread_struct(struct starpu_omp_thread *thread);
 static struct starpu_omp_task *create_omp_task_struct(struct starpu_omp_task *parent_task,
 		struct starpu_omp_thread *owner_thread, struct starpu_omp_region *owner_region, int is_implicit);
 static void destroy_omp_task_struct(struct starpu_omp_task *task);
+static void _wake_up_locked_task(struct starpu_omp_task *task);
+static void wake_up_barrier(struct starpu_omp_region *parallel_region);
 
 static struct starpu_omp_critical *create_omp_critical_struct(void)
 {
 	struct starpu_omp_critical *critical = malloc(sizeof(*critical));
+	memset(critical, 0, sizeof(*critical));
 	_starpu_spin_init(&critical->lock);
-	critical->state = 0;
-	critical->contention_list_head = NULL;
-	critical->name = NULL;
 	return critical;
 }
 
@@ -71,14 +74,15 @@ static void destroy_omp_critical_struct(struct starpu_omp_critical *critical)
 
 static struct starpu_omp_device *create_omp_device_struct(void)
 {
-	struct starpu_omp_device *dev = malloc(sizeof(*dev));
-	if (dev == NULL)
+	struct starpu_omp_device *device = malloc(sizeof(*device));
+	if (device == NULL)
 		_STARPU_ERROR("memory allocation failed");
+	memset(device, 0, sizeof(*device));
 
-	/* TODO: initialize dev->icvs with proper values */ 
-	memset(&dev->icvs, 0, sizeof(dev->icvs));
+	/* TODO: initialize device->icvs with proper values */ 
+	memset(&device->icvs, 0, sizeof(device->icvs));
 
-	return dev;
+	return device;
 }
 
 static void destroy_omp_device_struct(struct starpu_omp_device *device)
@@ -93,16 +97,14 @@ static struct starpu_omp_region *create_omp_region_struct(struct starpu_omp_regi
 	if (region == NULL)
 		_STARPU_ERROR("memory allocation failed");
 
+	memset(region, 0, sizeof(*region));
 	region->parent_region = parent_region;
 	region->owner_device = owner_device;
 	region->thread_list = starpu_omp_thread_list_new();
 	region->implicit_task_list = starpu_omp_task_list_new();
 
-	region->nb_threads = 0;
-	region->barrier_count = 0;
-	region->single_id = 0;
+	_starpu_spin_init(&region->lock);
 	region->level = (parent_region != NULL)?parent_region->level+1:0;
-	region->continuation_starpu_task = NULL;
 	return region;
 }
 
@@ -114,6 +116,7 @@ static void destroy_omp_region_struct(struct starpu_omp_region *region)
 	STARPU_ASSERT(region->continuation_starpu_task == NULL);
 	starpu_omp_thread_list_delete(region->thread_list);
 	starpu_omp_task_list_delete(region->implicit_task_list);
+	_starpu_spin_destroy(&region->lock);
 	memset(region, 0, sizeof(*region));
 	free(region);
 }
@@ -145,18 +148,8 @@ static struct starpu_omp_thread *create_omp_thread_struct(struct starpu_omp_regi
 	struct starpu_omp_thread *thread = starpu_omp_thread_new();
 	if (thread == NULL)
 		_STARPU_ERROR("memory allocation failed");
-	/* .current_task */
-	thread->current_task = NULL;
-	/* .owner_region */
+	memset(thread, 0, sizeof(*thread));
 	thread->owner_region = owner_region;
-	/* .primary_task */
-	thread->primary_task = NULL;
-	/* .init_thread_stack */
-	thread->initial_thread_stack = NULL;
-	/* .ctx */
-	memset(&thread->ctx, 0, sizeof(thread->ctx));
-	/* .starpu_driver will be initialized later on */
-	/* .starpu_worker_id will be initialized later on */
 	return thread;
 }
 
@@ -204,20 +197,17 @@ static void starpu_omp_task_preempt(void)
 /*
  * wrap a task function to allow the task to be preempted
  */
-static void starpu_omp_task_exec(void *buffers[], void *cl_arg)
+static void starpu_omp_implicit_task_exec(void *buffers[], void *cl_arg)
 {
 	struct starpu_omp_task *task = starpu_task_get_current()->omp_task;
+	STARPU_ASSERT(task->is_implicit);
 	STARPU_PTHREAD_SETSPECIFIC(omp_task_key, task);
 	struct starpu_omp_thread *thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
 	if (thread == NULL)
 	{
 		/*
-		 * this is the first time an omp task is launched on the current worker.
-		 * this first task should be an implicit parallel region task.
+		 * this is the first time an implicit omp task is launched on the current worker.
 		 */
-		if (!task->is_implicit)
-			_STARPU_ERROR("unexpected omp task\n");
-
 		thread = task->owner_thread;
 		STARPU_ASSERT(thread->owner_region != NULL);
 		STARPU_ASSERT(thread->owner_region == task->owner_region);
@@ -265,30 +255,102 @@ static void starpu_omp_task_exec(void *buffers[], void *cl_arg)
 	{
 		task->starpu_task->omp_task = NULL;
 		task->starpu_task = NULL;
-		if (!task->is_implicit)
+	}
+	else if (task->state != starpu_omp_task_state_preempted)
+		_STARPU_ERROR("invalid omp task state");
+}
+/*
+ * wrap a task function to allow the task to be preempted
+ */
+static void starpu_omp_explicit_task_exec(void *buffers[], void *cl_arg)
+{
+	struct starpu_omp_task *task = starpu_task_get_current()->omp_task;
+	STARPU_ASSERT(!task->is_implicit);
+	STARPU_PTHREAD_SETSPECIFIC(omp_task_key, task);
+	struct starpu_omp_thread *thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
+	/* TODO: handle the case where an explicit task might get scheduled on the
+	 * thread before any implicit task: associate threads/worker beforehand */
+	STARPU_ASSERT(thread != NULL);
+	if (task->state != starpu_omp_task_state_preempted)
+	{
+		if (!task->is_untied)
+		{
+			struct _starpu_worker *starpu_worker = _starpu_get_local_worker_key();
+			task->starpu_task->workerid = starpu_worker->workerid;
+			task->starpu_task->execute_on_a_specific_worker = 1;
+		}
+		task->starpu_buffers = buffers;
+		task->starpu_cl_arg = cl_arg;
+	}
+	task->state = starpu_omp_task_state_clear;
+
+	/* 
+	 * start the task execution, or restore a previously preempted task.
+	 * about to run on the task stack...
+	 * */
+	swapcontext(&thread->ctx, &task->ctx);
+	/* now running on the worker stack again */
+
+	STARPU_ASSERT(task->state == starpu_omp_task_state_preempted
+			|| task->state == starpu_omp_task_state_terminated);
+	STARPU_PTHREAD_SETSPECIFIC(omp_task_key, NULL);
+	/* TODO: analyse the cause of the return and take appropriate steps */
+	if (task->state == starpu_omp_task_state_terminated)
+	{
+		struct starpu_omp_task *parent_task = task->parent_task;
+		struct starpu_omp_region *parallel_region = task->owner_region;
+		_starpu_spin_lock(&parent_task->lock);
+		if (STARPU_ATOMIC_ADD(&task->parent_task->child_task_count, -1) == 0)
 		{
-			destroy_omp_task_struct(task);
-			task = NULL;
+			if (parent_task->child_task_count == 0
+					&& (parent_task->wait_on & starpu_omp_task_wait_on_task_childs))
+			{
+				parent_task->wait_on &= ~starpu_omp_task_wait_on_task_childs;
+				_wake_up_locked_task(parent_task);
+			}
 		}
+		_starpu_spin_unlock(&parent_task->lock);
+		_starpu_spin_lock(&parallel_region->lock);
+		if (STARPU_ATOMIC_ADD(&parallel_region->bound_explicit_task_count, -1) == 0)
+		{
+			struct starpu_omp_task *waiting_task = parallel_region->waiting_task;
+			_starpu_spin_unlock(&parallel_region->lock);
+
+			if (waiting_task)
+			{
+				_starpu_spin_lock(&waiting_task->lock);
+				_starpu_spin_lock(&parallel_region->lock);
+				parallel_region->waiting_task = 0;
+				STARPU_ASSERT(waiting_task->wait_on & starpu_omp_task_wait_on_region_tasks);
+				waiting_task->wait_on &= ~starpu_omp_task_wait_on_region_tasks;
+				_wake_up_locked_task(waiting_task);
+				_starpu_spin_unlock(&waiting_task->lock);
+			}
+		}
+		_starpu_spin_unlock(&parallel_region->lock);
+		task->starpu_task->omp_task = NULL;
+		task->starpu_task = NULL;
+		destroy_omp_task_struct(task);
+		task = NULL;
 	}
 	else if (task->state != starpu_omp_task_state_preempted)
 		_STARPU_ERROR("invalid omp task state");
 }
 
+
 static struct starpu_omp_task *create_omp_task_struct(struct starpu_omp_task *parent_task,
 		struct starpu_omp_thread *owner_thread, struct starpu_omp_region *owner_region, int is_implicit)
 {
 	struct starpu_omp_task *task = starpu_omp_task_new();
 	if (task == NULL)
 		_STARPU_ERROR("memory allocation failed");
+
+	memset(task, 0, sizeof(*task));
 	task->parent_task = parent_task;
 	task->owner_thread = owner_thread;
 	task->owner_region = owner_region;
-	task->nested_region = NULL;
 	task->is_implicit = is_implicit;
 	_starpu_spin_init(&task->lock);
-	task->barrier_count = 0;
-	task->single_id = 0;
 	/* TODO: initialize task->data_env_icvs with proper values */ 
 	memset(&task->data_env_icvs, 0, sizeof(task->data_env_icvs));
 	if (is_implicit)
@@ -296,19 +358,8 @@ static struct starpu_omp_task *create_omp_task_struct(struct starpu_omp_task *pa
 	  /* TODO: initialize task->implicit_task_icvs with proper values */ 
 		memset(&task->implicit_task_icvs, 0, sizeof(task->implicit_task_icvs));
 	}
-	task->starpu_task = NULL;
-	task->starpu_buffers = NULL;
-	task->starpu_cl_arg = NULL;
-	task->f = NULL;
-	task->state = starpu_omp_task_state_clear;
 
-	if (parent_task == NULL)
-	{
-		/* do not allocate a stack for the initial task */
-		task->stack = NULL;
-		memset(&task->ctx, 0, sizeof(task->ctx));
-	}
-	else
+	if (parent_task != NULL)
 	{
 		/* TODO: use ICV stack size info instead */
 		task->stack = malloc(_STARPU_STACKSIZE);
@@ -564,7 +615,7 @@ void starpu_omp_parallel_region(const struct starpu_codelet * const _parallel_re
 	/*
 	 * plug the task wrapper into the parallel region codelet instead, to support task preemption
 	 */
-	parallel_region_cl.cpu_funcs[0] = starpu_omp_task_exec;
+	parallel_region_cl.cpu_funcs[0] = starpu_omp_implicit_task_exec;
 
 	/*
 	 * create the starpu tasks for the implicit omp tasks,
@@ -642,51 +693,74 @@ void starpu_omp_parallel_region(const struct starpu_codelet * const _parallel_re
 	destroy_omp_region_struct(new_region);
 }
 
+static void _wake_up_locked_task(struct starpu_omp_task *task)
+{
+	if (task->wait_on == 0)
+	{
+		int ret = starpu_task_submit(task->starpu_task);
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+	}
+}
+
+static void wake_up_barrier(struct starpu_omp_region *parallel_region)
+{
+	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
+	struct starpu_omp_task *implicit_task;
+	for (implicit_task  = starpu_omp_task_list_begin(parallel_region->implicit_task_list);
+			implicit_task != starpu_omp_task_list_end(parallel_region->implicit_task_list);
+			implicit_task  = starpu_omp_task_list_next(implicit_task))
+	{
+		if (implicit_task == task)
+			continue;
+		_starpu_spin_lock(&implicit_task->lock);
+		STARPU_ASSERT(implicit_task->wait_on & starpu_omp_task_wait_on_barrier);
+		implicit_task->wait_on &= ~starpu_omp_task_wait_on_barrier;
+		_wake_up_locked_task(implicit_task);
+		_starpu_spin_unlock(&implicit_task->lock);
+	}
+}
+
 static void barrier__sleep_callback(void *_task)
 {
 	struct starpu_omp_task *task = _task;
 	_starpu_spin_unlock(&task->lock);
 }
 
+static void region_tasks__sleep_callback(void *_task)
+{
+	struct starpu_omp_task *task = _task;
+	struct starpu_omp_region *parallel_region = task->owner_region;
+	_starpu_spin_unlock(&task->lock);
+	_starpu_spin_unlock(&parallel_region->lock);
+}
+
 void starpu_omp_barrier(void)
 {
 	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
-	/* Assume barriers are performed in by the implicit tasks of a region */
+	/* Assume barriers are performed in by the implicit tasks of a parallel_region */
 	STARPU_ASSERT(task->is_implicit);
-	struct starpu_omp_region *region = task->owner_region;
+	struct starpu_omp_region *parallel_region = task->owner_region;
 	_starpu_spin_lock(&task->lock);
-	int inc_barrier_count = STARPU_ATOMIC_ADD(&region->barrier_count, 1);
+	int inc_barrier_count = STARPU_ATOMIC_ADD(&parallel_region->barrier_count, 1);
 
-	if (inc_barrier_count == region->nb_threads)
+	if (inc_barrier_count == parallel_region->nb_threads)
 	{
 		/* last task reaching the barrier */
-		region->barrier_count = 0;
-		if (task->child_task_count > 0)
+		_starpu_spin_lock(&parallel_region->lock);
+		parallel_region->barrier_count = 0;
+		if (parallel_region->bound_explicit_task_count > 0)
 		{
-			_starpu_task_prepare_for_continuation_ext(0, barrier__sleep_callback, task);
+			task->wait_on |= starpu_omp_task_wait_on_region_tasks;
+			parallel_region->waiting_task = task;
+			_starpu_task_prepare_for_continuation_ext(0, region_tasks__sleep_callback, task);
 			starpu_omp_task_preempt();
 		}
 		else
 		{
 			_starpu_spin_unlock(&task->lock);
+			_starpu_spin_unlock(&parallel_region->lock);
 		}
-		STARPU_ASSERT(task->child_task_count == 0);
-		STARPU_ASSERT(region->bound_explicit_task_count == 0);
-		{
-			struct starpu_omp_task * implicit_task;
-			for (implicit_task  = starpu_omp_task_list_begin(region->implicit_task_list);
-					implicit_task != starpu_omp_task_list_end(region->implicit_task_list);
-					implicit_task  = starpu_omp_task_list_next(implicit_task))
-			{
-				int ret;
-				if (implicit_task == task)
-					continue;
-				_starpu_spin_lock(&implicit_task->lock);
-				ret = starpu_task_submit(implicit_task->starpu_task);
-				STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
-				_starpu_spin_unlock(&implicit_task->lock);
-			}
-		}
+		wake_up_barrier(parallel_region);
 	}
 	else
 	{
@@ -695,6 +769,7 @@ void starpu_omp_barrier(void)
 		 * . sleep
 		 */
 
+		task->wait_on |= starpu_omp_task_wait_on_barrier;
 		_starpu_task_prepare_for_continuation_ext(0, barrier__sleep_callback, task);
 		starpu_omp_task_preempt();
 		STARPU_ASSERT(task->child_task_count == 0);
@@ -772,6 +847,9 @@ void starpu_omp_critical(void (*f)(void *arg), void *arg, const char *name)
 	_starpu_spin_lock(&critical->lock);
 	while (critical->state != 0)
 	{
+		_starpu_spin_lock(&task->lock);
+		task->wait_on |= starpu_omp_task_wait_on_critical;
+		_starpu_spin_unlock(&task->lock);
 		link.task = task;
 		link.next = critical->contention_list_head;
 		critical->contention_list_head = &link;
@@ -791,11 +869,13 @@ void starpu_omp_critical(void (*f)(void *arg), void *arg, const char *name)
 	critical->state = 0;
 	if (critical->contention_list_head != NULL)
 	{
-		int ret;
 		struct starpu_omp_task *next_task = critical->contention_list_head->task;
 		critical->contention_list_head = critical->contention_list_head->next;
-		ret = starpu_task_submit(next_task->starpu_task);
-		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+		_starpu_spin_lock(&next_task->lock);
+		STARPU_ASSERT(next_task->wait_on & starpu_omp_task_wait_on_critical);
+		next_task->wait_on &= ~starpu_omp_task_wait_on_critical;
+		_wake_up_locked_task(next_task);
+		_starpu_spin_unlock(&next_task->lock);
 	}
 	_starpu_spin_unlock(&critical->lock);
 }
@@ -853,7 +933,7 @@ void starpu_omp_task_region(const struct starpu_codelet * const _task_region_cl,
 		generated_task->task_group = generating_task->task_group;
 
 		void (*task_region_f)(void **starpu_buffers, void *starpu_cl_arg) = task_region_cl.cpu_funcs[0];
-		task_region_cl.cpu_funcs[0] = starpu_omp_task_exec;
+		task_region_cl.cpu_funcs[0] = starpu_omp_explicit_task_exec;
 		generated_task->f = task_region_f;
 
 		generated_task->starpu_task = starpu_task_create();

+ 12 - 0
src/util/openmp_runtime_support.h

@@ -191,6 +191,15 @@ enum starpu_omp_task_state
 	starpu_omp_task_state_terminated = 2,
 };
 
+enum starpu_omp_task_wait_on
+{
+	starpu_omp_task_wait_on_task_childs  = 1 << 0,
+	starpu_omp_task_wait_on_region_tasks = 1 << 1,
+	starpu_omp_task_wait_on_barrier      = 1 << 2,
+	starpu_omp_task_wait_on_group        = 1 << 3,
+	starpu_omp_task_wait_on_critical     = 1 << 4
+};
+
 LIST_TYPE(starpu_omp_task,
 
 	struct starpu_omp_task *parent_task;
@@ -204,6 +213,7 @@ LIST_TYPE(starpu_omp_task,
 	int child_task_count;
 	struct starpu_omp_task_group *task_group;
 	struct _starpu_spinlock lock;
+	int wait_on;
 	int barrier_count;
 	int single_id;
 	struct starpu_omp_data_environment_icvs data_env_icvs;
@@ -267,6 +277,8 @@ struct starpu_omp_region
 	struct starpu_omp_task_list *implicit_task_list;
 	/* include both the master thread and the region own threads */
 	int nb_threads;
+	struct _starpu_spinlock lock;
+	struct starpu_omp_task *waiting_task;
 	int barrier_count;
 	int bound_explicit_task_count;
 	int single_id;