Browse Source

- add support for ordered sections in omp for
- add test case

Olivier Aumage 11 years ago
parent
commit
ed5fcef62d

+ 3 - 0
include/starpu_openmp.h

@@ -68,6 +68,9 @@ extern void starpu_omp_taskgroup(void (*f)(void *arg), void *arg) __STARPU_OMP_N
 extern void starpu_omp_for(void (*f)(unsigned long _first_i, unsigned long _nb_i, void *arg), void *arg, unsigned long nb_iterations, unsigned long chunk, int schedule, int ordered, int nowait) __STARPU_OMP_NOTHROW;
 extern int starpu_omp_for_inline_first(unsigned long nb_iterations, unsigned long chunk, int schedule, int ordered, unsigned long *_first_i, unsigned long *_nb_i) __STARPU_OMP_NOTHROW;
 extern int starpu_omp_for_inline_next(unsigned long nb_iterations, unsigned long chunk, int schedule, int ordered, unsigned long *_first_i, unsigned long *_nb_i) __STARPU_OMP_NOTHROW;
+extern void starpu_omp_ordered_inline_begin(unsigned long i) __STARPU_OMP_NOTHROW;
+extern void starpu_omp_ordered_inline_end(void) __STARPU_OMP_NOTHROW;
+extern void starpu_omp_ordered(void (*f)(unsigned long _i, void *arg), void *arg, unsigned long i) __STARPU_OMP_NOTHROW;
 
 extern void starpu_omp_set_num_threads(int threads) __STARPU_OMP_NOTHROW;
 extern int starpu_omp_get_num_threads() __STARPU_OMP_NOTHROW;

+ 135 - 9
src/util/openmp_runtime_support.c

@@ -54,6 +54,73 @@ static struct starpu_omp_task *create_omp_task_struct(struct starpu_omp_task *pa
 static void destroy_omp_task_struct(struct starpu_omp_task *task);
 static void _wake_up_locked_task(struct starpu_omp_task *task);
 static void wake_up_barrier(struct starpu_omp_region *parallel_region);
+static void starpu_omp_task_preempt(void);
+
+static void condition_init(struct starpu_omp_condition *condition)
+{
+	condition->contention_list_head = NULL;
+}
+
+static void condition_exit(struct starpu_omp_condition *condition)
+{
+	STARPU_ASSERT(condition->contention_list_head == NULL);
+	condition->contention_list_head = NULL;
+}
+
+static void condition__sleep_callback(void *_lock)
+{
+	struct _starpu_spinlock *lock = _lock;
+	_starpu_spin_unlock(lock);
+}
+
+static void condition_wait(struct starpu_omp_condition *condition, struct _starpu_spinlock *lock)
+{
+	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
+	struct starpu_omp_task_link link;
+	_starpu_spin_lock(&task->lock);
+	task->wait_on |= starpu_omp_task_wait_on_condition;
+	_starpu_spin_unlock(&task->lock);
+	link.task = task;
+	link.next = condition->contention_list_head;
+	condition->contention_list_head = &link;
+
+	_starpu_task_prepare_for_continuation_ext(0, condition__sleep_callback, lock);
+	starpu_omp_task_preempt();
+
+	/* re-acquire the lock released by the callback */
+	_starpu_spin_lock(lock);
+}
+
+#if 0
+/* unused for now */
+static void condition_signal(struct starpu_omp_condition *condition)
+{
+	if (condition->contention_list_head != NULL)
+	{
+		struct starpu_omp_task *next_task = condition->contention_list_head->task;
+		condition->contention_list_head = condition->contention_list_head->next;
+		_starpu_spin_lock(&next_task->lock);
+		STARPU_ASSERT(next_task->wait_on & starpu_omp_task_wait_on_condition);
+		next_task->wait_on &= ~starpu_omp_task_wait_on_condition;
+		_wake_up_locked_task(next_task);
+		_starpu_spin_unlock(&next_task->lock);
+	}
+}
+#endif
+
+static void condition_broadcast(struct starpu_omp_condition *condition)
+{
+	while (condition->contention_list_head != NULL)
+	{
+		struct starpu_omp_task *next_task = condition->contention_list_head->task;
+		condition->contention_list_head = condition->contention_list_head->next;
+		_starpu_spin_lock(&next_task->lock);
+		STARPU_ASSERT(next_task->wait_on & starpu_omp_task_wait_on_condition);
+		next_task->wait_on &= ~starpu_omp_task_wait_on_condition;
+		_wake_up_locked_task(next_task);
+		_starpu_spin_unlock(&next_task->lock);
+	}
+}
 
 static void register_thread_worker(struct starpu_omp_thread *thread)
 {
@@ -1318,19 +1385,23 @@ static inline void _starpu_omp_for_loop(struct starpu_omp_region *parallel_regio
 	}
 }
 
-static inline struct starpu_omp_loop *_starpu_omp_for_loop_begin(struct starpu_omp_region *parallel_region, struct starpu_omp_task *task,
-		int ordered)
+static inline struct starpu_omp_loop *_starpu_omp_for_get_loop(struct starpu_omp_region *parallel_region, struct starpu_omp_task *task)
 {
 	struct starpu_omp_loop *loop;
-	if (ordered)
-		/* TODO: implement ordered statement */
-		_STARPU_ERROR("omp for / ordered not implemented\n");
-	_starpu_spin_lock(&parallel_region->lock);
 	loop = parallel_region->loop_list;
 	while (loop && loop->id != task->loop_id)
 	{
 		loop = loop->next_loop;
 	}
+	return loop;
+}
+
+static inline struct starpu_omp_loop *_starpu_omp_for_loop_begin(struct starpu_omp_region *parallel_region, struct starpu_omp_task *task,
+		int ordered)
+{
+	struct starpu_omp_loop *loop;
+	_starpu_spin_lock(&parallel_region->lock);
+	loop = _starpu_omp_for_get_loop(parallel_region, task);
 	if (!loop)
 	{
 		loop = malloc(sizeof(*loop));
@@ -1341,18 +1412,30 @@ static inline struct starpu_omp_loop *_starpu_omp_for_loop_begin(struct starpu_o
 		loop->nb_completed_threads = 0;
 		loop->next_loop = parallel_region->loop_list;
 		parallel_region->loop_list = loop;
+		if (ordered)
+		{
+			loop->ordered_iteration = 0;
+			_starpu_spin_init(&loop->ordered_lock);
+			condition_init(&loop->ordered_cond);
+		}
 	}
 	_starpu_spin_unlock(&parallel_region->lock);
 	return loop;
 }
 static inline void _starpu_omp_for_loop_end(struct starpu_omp_region *parallel_region, struct starpu_omp_task *task,
-		struct starpu_omp_loop *loop)
+		struct starpu_omp_loop *loop, int ordered)
 {
 	_starpu_spin_lock(&parallel_region->lock);
 	loop->nb_completed_threads++;
 	if (loop->nb_completed_threads == parallel_region->nb_threads)
 	{
 		struct starpu_omp_loop **p_loop;
+		if (ordered)
+		{
+			loop->ordered_iteration = 0;
+			condition_exit(&loop->ordered_cond);
+			_starpu_spin_destroy(&loop->ordered_lock);
+		}
 		STARPU_ASSERT(loop->next_loop == NULL);
 		p_loop = &(parallel_region->loop_list);
 		while (*p_loop != loop)
@@ -1375,7 +1458,7 @@ int starpu_omp_for_inline_first(unsigned long nb_iterations, unsigned long chunk
 	_starpu_omp_for_loop(parallel_region, task, loop, 1, nb_iterations, chunk, schedule, _first_i, _nb_i);
 	if (*_nb_i == 0)
 	{
-		_starpu_omp_for_loop_end(parallel_region, task, loop);
+		_starpu_omp_for_loop_end(parallel_region, task, loop, ordered);
 	}
 	return (*_nb_i != 0);
 }
@@ -1389,7 +1472,7 @@ int starpu_omp_for_inline_next(unsigned long nb_iterations, unsigned long chunk,
 	_starpu_omp_for_loop(parallel_region, task, loop, 0, nb_iterations, chunk, schedule, _first_i, _nb_i);
 	if (*_nb_i == 0)
 	{
-		_starpu_omp_for_loop_end(parallel_region, task, loop);
+		_starpu_omp_for_loop_end(parallel_region, task, loop, ordered);
 	}
 	return (*_nb_i != 0);
 }
@@ -1412,6 +1495,49 @@ void starpu_omp_for(void (*f)(unsigned long _first_i, unsigned long _nb_i, void
 	}
 }
 
+void starpu_omp_ordered(void (*f)(unsigned long _i, void *arg), void *arg, unsigned long i)
+{
+	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
+	struct starpu_omp_region *parallel_region = task->owner_region;
+	struct starpu_omp_loop *loop = _starpu_omp_for_get_loop(parallel_region, task);
+
+	_starpu_spin_lock(&loop->ordered_lock);
+	while (i != loop->ordered_iteration)
+	{
+		STARPU_ASSERT(i > loop->ordered_iteration);
+		condition_wait(&loop->ordered_cond, &loop->ordered_lock);
+	}
+	f(i, arg);
+	loop->ordered_iteration++;	
+	condition_broadcast(&loop->ordered_cond);
+	_starpu_spin_unlock(&loop->ordered_lock);
+}
+
+void starpu_omp_ordered_inline_begin(unsigned long i)
+{
+	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
+	struct starpu_omp_region *parallel_region = task->owner_region;
+	struct starpu_omp_loop *loop = _starpu_omp_for_get_loop(parallel_region, task);
+
+	_starpu_spin_lock(&loop->ordered_lock);
+	while (i != loop->ordered_iteration)
+	{
+		STARPU_ASSERT(i > loop->ordered_iteration);
+		condition_wait(&loop->ordered_cond, &loop->ordered_lock);
+	}
+}
+
+void starpu_omp_ordered_inline_end(void)
+{
+	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
+	struct starpu_omp_region *parallel_region = task->owner_region;
+	struct starpu_omp_loop *loop = _starpu_omp_for_get_loop(parallel_region, task);
+
+	loop->ordered_iteration++;	
+	condition_broadcast(&loop->ordered_cond);
+	_starpu_spin_unlock(&loop->ordered_lock);
+}
+
 /*
  * restore deprecated diagnostics (-Wdeprecated-declarations)
  */

+ 10 - 1
src/util/openmp_runtime_support.h

@@ -176,6 +176,11 @@ struct starpu_omp_task_link
 	struct starpu_omp_task_link *next;
 };
 
+struct starpu_omp_condition
+{
+	struct starpu_omp_task_link *contention_list_head;
+};
+
 struct starpu_omp_critical
 {
 	UT_hash_handle hh;
@@ -198,7 +203,8 @@ enum starpu_omp_task_wait_on
 	starpu_omp_task_wait_on_region_tasks = 1 << 1,
 	starpu_omp_task_wait_on_barrier      = 1 << 2,
 	starpu_omp_task_wait_on_group        = 1 << 3,
-	starpu_omp_task_wait_on_critical     = 1 << 4
+	starpu_omp_task_wait_on_critical     = 1 << 4,
+	starpu_omp_task_wait_on_condition    = 1 << 5
 };
 
 LIST_TYPE(starpu_omp_task,
@@ -275,6 +281,9 @@ struct starpu_omp_loop
 	unsigned long next_iteration;
 	int nb_completed_threads;
 	struct starpu_omp_loop *next_loop;
+	struct _starpu_spinlock ordered_lock;
+	struct starpu_omp_condition ordered_cond;
+	unsigned long ordered_iteration;
 };
 
 struct starpu_omp_region

+ 4 - 0
tests/Makefile.am

@@ -237,6 +237,7 @@ noinst_PROGRAMS =				\
 	openmp/parallel_critical_named_inline_01\
 	openmp/parallel_for_01			\
 	openmp/parallel_for_02			\
+	openmp/parallel_for_ordered_01		\
 	openmp/task_01				\
 	openmp/taskwait_01			\
 	openmp/taskgroup_01			\
@@ -504,6 +505,9 @@ openmp_parallel_for_01_SOURCES = 	\
 openmp_parallel_for_02_SOURCES = 	\
 	openmp/parallel_for_02.c
 
+openmp_parallel_for_ordered_01_SOURCES = 	\
+	openmp/parallel_for_ordered_01.c
+
 openmp_task_01_SOURCES = 	\
 	openmp/task_01.c
 

+ 235 - 0
tests/openmp/parallel_for_ordered_01.c

@@ -0,0 +1,235 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2014  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <pthread.h>
+#include <starpu.h>
+#include "../helper.h"
+#include <stdio.h>
+
+#if !defined(STARPU_OPENMP)
+int main(int argc, char **argv)
+{
+	return STARPU_TEST_SKIPPED;
+}
+#else
+#define NB_ITERS 256
+#define CHUNK 16
+unsigned long array[NB_ITERS];
+
+__attribute__((constructor))
+static void omp_constructor(void)
+{
+	int ret = starpu_omp_init();
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_omp_init");
+}
+
+__attribute__((destructor))
+static void omp_destructor(void)
+{
+	starpu_omp_shutdown();
+}
+
+void ordered_f(unsigned long i, void *arg)
+{
+	int worker_id;
+	pthread_t tid;
+	tid = pthread_self();
+	worker_id = starpu_worker_get_id();
+	printf("[tid %p] task thread = %d, for [%s] iteration (ordered) %lu\n", (void *)tid, worker_id, (const char *)arg, i);
+}
+
+void for_g(unsigned long i, unsigned long nb_i, void *arg)
+{
+	int worker_id;
+	pthread_t tid;
+	tid = pthread_self();
+	worker_id = starpu_worker_get_id();
+	printf("[tid %p] task thread = %d, for [%s] iterations first=%lu:nb=%lu\n", (void *)tid, worker_id, (const char *)arg, i, nb_i);
+	for (; nb_i > 0; i++, nb_i--)
+	{
+		array[i] = 1;
+		starpu_omp_ordered(ordered_f, arg, i);
+	}
+}
+
+void parallel_region_1_f(void *buffers[], void *args)
+{
+	(void) buffers;
+	(void) args;
+	int worker_id;
+	pthread_t tid;
+	tid = pthread_self();
+	worker_id = starpu_worker_get_id();
+	printf("[tid %p] task thread = %d\n", (void *)tid, worker_id);
+	starpu_omp_for(for_g, (void*)"static chunk", NB_ITERS, CHUNK, starpu_omp_sched_static, 1, 0);
+}
+
+static struct starpu_codelet parallel_region_1_cl =
+{
+	.cpu_funcs    = { parallel_region_1_f, NULL },
+	.where        = STARPU_CPU,
+	.nbuffers     = 0
+
+};
+
+void parallel_region_2_f(void *buffers[], void *args)
+{
+	(void) buffers;
+	(void) args;
+	int worker_id;
+	pthread_t tid;
+	tid = pthread_self();
+	worker_id = starpu_worker_get_id();
+	printf("[tid %p] task thread = %d\n", (void *)tid, worker_id);
+	starpu_omp_for(for_g, (void*)"static nochunk", NB_ITERS, 0, starpu_omp_sched_static, 1, 0);
+}
+
+static struct starpu_codelet parallel_region_2_cl =
+{
+	.cpu_funcs    = { parallel_region_2_f, NULL },
+	.where        = STARPU_CPU,
+	.nbuffers     = 0
+
+};
+
+void parallel_region_3_f(void *buffers[], void *args)
+{
+	(void) buffers;
+	(void) args;
+	int worker_id;
+	pthread_t tid;
+	tid = pthread_self();
+	worker_id = starpu_worker_get_id();
+	printf("[tid %p] task thread = %d\n", (void *)tid, worker_id);
+	starpu_omp_for(for_g, (void*)"dynamic chunk", NB_ITERS, CHUNK, starpu_omp_sched_dynamic, 1, 0);
+}
+
+static struct starpu_codelet parallel_region_3_cl =
+{
+	.cpu_funcs    = { parallel_region_3_f, NULL },
+	.where        = STARPU_CPU,
+	.nbuffers     = 0
+
+};
+
+void parallel_region_4_f(void *buffers[], void *args)
+{
+	(void) buffers;
+	(void) args;
+	int worker_id;
+	pthread_t tid;
+	tid = pthread_self();
+	worker_id = starpu_worker_get_id();
+	printf("[tid %p] task thread = %d\n", (void *)tid, worker_id);
+	starpu_omp_for(for_g, (void*)"dynamic nochunk", NB_ITERS, 0, starpu_omp_sched_dynamic, 1, 0);
+}
+
+static struct starpu_codelet parallel_region_4_cl =
+{
+	.cpu_funcs    = { parallel_region_4_f, NULL },
+	.where        = STARPU_CPU,
+	.nbuffers     = 0
+
+};
+
+void parallel_region_5_f(void *buffers[], void *args)
+{
+	(void) buffers;
+	(void) args;
+	int worker_id;
+	pthread_t tid;
+	tid = pthread_self();
+	worker_id = starpu_worker_get_id();
+	printf("[tid %p] task thread = %d\n", (void *)tid, worker_id);
+	starpu_omp_for(for_g, (void*)"guided nochunk", NB_ITERS, 0, starpu_omp_sched_guided, 1, 0);
+}
+
+static struct starpu_codelet parallel_region_5_cl =
+{
+	.cpu_funcs    = { parallel_region_5_f, NULL },
+	.where        = STARPU_CPU,
+	.nbuffers     = 0
+
+};
+
+void parallel_region_6_f(void *buffers[], void *args)
+{
+	(void) buffers;
+	(void) args;
+	int worker_id;
+	pthread_t tid;
+	tid = pthread_self();
+	worker_id = starpu_worker_get_id();
+	printf("[tid %p] task thread = %d\n", (void *)tid, worker_id);
+	starpu_omp_for(for_g, (void*)"guided nochunk", NB_ITERS, 0, starpu_omp_sched_guided, 1, 0);
+}
+
+static struct starpu_codelet parallel_region_6_cl =
+{
+	.cpu_funcs    = { parallel_region_6_f, NULL },
+	.where        = STARPU_CPU,
+	.nbuffers     = 0
+
+};
+
+static void clear_array(void)
+{
+	memset(array, 0, NB_ITERS*sizeof(unsigned long));
+}
+
+static void check_array(void)
+{
+	unsigned long i;
+	unsigned long s = 0;
+	for (i = 0; i < NB_ITERS; i++)
+	{
+		s += array[i];
+	}
+	if (s != NB_ITERS)
+	{
+		printf("missing iterations\n");
+		exit(1);
+	}
+}
+int
+main (int argc, char *argv[]) {
+	clear_array();
+	starpu_omp_parallel_region(&parallel_region_1_cl, NULL);
+	check_array();
+	return 0;
+
+	clear_array();
+	starpu_omp_parallel_region(&parallel_region_2_cl, NULL);
+	check_array();
+
+	clear_array();
+	starpu_omp_parallel_region(&parallel_region_3_cl, NULL);
+	check_array();
+
+	clear_array();
+	starpu_omp_parallel_region(&parallel_region_4_cl, NULL);
+	check_array();
+
+	clear_array();
+	starpu_omp_parallel_region(&parallel_region_5_cl, NULL);
+	check_array();
+
+	clear_array();
+	starpu_omp_parallel_region(&parallel_region_6_cl, NULL);
+	check_array();
+	return 0;
+}
+#endif