Browse Source

- add support of anonymous omp critical
- add preliminary support of named omp criticals
- add test for anonymous omp critical
- fix api for omp master/omp single

Olivier Aumage 11 years ago
parent
commit
1dcec33ad2

+ 3 - 2
include/starpu_openmp.h

@@ -53,8 +53,9 @@ extern int starpu_omp_init(void) __STARPU_OMP_NOTHROW;
 extern void starpu_omp_shutdown(void) __STARPU_OMP_NOTHROW;
 extern void starpu_omp_parallel_region(const struct starpu_codelet * const parallel_region_cl, void * const parallel_region_cl_arg) __STARPU_OMP_NOTHROW;
 extern void starpu_omp_barrier(void) __STARPU_OMP_NOTHROW;
-extern void starpu_omp_master(void (*f)(void), int nowait) __STARPU_OMP_NOTHROW;
-extern void starpu_omp_single(void (*f)(void), int nowait) __STARPU_OMP_NOTHROW;
+extern void starpu_omp_master(void (*f)(void *arg), void *arg, int nowait) __STARPU_OMP_NOTHROW;
+extern void starpu_omp_single(void (*f)(void *arg), void *arg, int nowait) __STARPU_OMP_NOTHROW;
+extern void starpu_omp_critical(void (*f)(void *arg), void *arg, const char *name) __STARPU_OMP_NOTHROW;
 
 extern void starpu_omp_set_num_threads(int threads) __STARPU_OMP_NOTHROW;
 extern int starpu_omp_get_num_threads() __STARPU_OMP_NOTHROW;

+ 103 - 4
src/util/openmp_runtime_support.c

@@ -24,7 +24,9 @@
 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
 #include <util/openmp_runtime_support.h>
 #include <core/task.h>
+#include <common/list.h>
 #include <common/starpu_spinlock.h>
+#include <common/uthash.h>
 #include <stdlib.h>
 #include <ctype.h>
 #include <strings.h>
@@ -48,6 +50,25 @@ static struct starpu_omp_task *create_omp_task_struct(struct starpu_omp_task *pa
 		struct starpu_omp_thread *owner_thread, struct starpu_omp_region *owner_region, int is_implicit);
 static void destroy_omp_task_struct(struct starpu_omp_task *task);
 
+static struct starpu_omp_critical *create_omp_critical_struct(void)
+{
+	struct starpu_omp_critical *critical = malloc(sizeof(*critical));
+	_starpu_spin_init(&critical->lock);
+	critical->state = 0;
+	critical->contention_list_head = NULL;
+	critical->name = NULL;
+	return critical;
+}
+
+static void destroy_omp_critical_struct(struct starpu_omp_critical *critical)
+{
+	STARPU_ASSERT(critical->state == 0);
+	STARPU_ASSERT(critical->contention_list_head == NULL);
+	_starpu_spin_destroy(&critical->lock);
+	critical->name = NULL;
+	free(critical);
+}
+
 static struct starpu_omp_device *create_omp_device_struct(void)
 {
 	struct starpu_omp_device *dev = malloc(sizeof(*dev));
@@ -413,6 +434,9 @@ int starpu_omp_init(void)
 	_global_state.initial_thread = create_omp_thread_struct(_global_state.initial_region);
 	_global_state.initial_task = create_omp_task_struct(NULL,
 			_global_state.initial_thread, _global_state.initial_region, 1);
+	_global_state.default_critical = create_omp_critical_struct();
+	_global_state.named_criticals = NULL;
+	_starpu_spin_init(&_global_state.named_criticals_lock);
 	_starpu_omp_global_state = &_global_state;
 
 	omp_initial_region_setup();
@@ -436,6 +460,20 @@ void starpu_omp_shutdown(void)
 	_global_state.initial_region = NULL;
 	destroy_omp_device_struct(_global_state.initial_device);
 	_global_state.initial_device = NULL;
+	destroy_omp_critical_struct(_global_state.default_critical);
+	_global_state.default_critical = NULL;
+	_starpu_spin_lock(&_global_state.named_criticals_lock);
+	if (_global_state.named_criticals)
+	{
+		struct starpu_omp_critical *critical, *tmp;
+		HASH_ITER(hh, _global_state.named_criticals, critical, tmp);
+		{
+			HASH_DEL(_global_state.named_criticals, critical);
+			destroy_omp_critical_struct(critical);
+		}
+	}
+	_starpu_spin_unlock(&_global_state.named_criticals_lock);
+	_starpu_spin_destroy(&_global_state.named_criticals_lock);
 	STARPU_PTHREAD_KEY_DELETE(omp_task_key);
 	STARPU_PTHREAD_KEY_DELETE(omp_thread_key);
 }
@@ -649,7 +687,7 @@ void starpu_omp_barrier(void)
 	}
 }
 
-void starpu_omp_master(void (*f)(void), int nowait)
+void starpu_omp_master(void (*f)(void *arg), void *arg, int nowait)
 {
 	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
 	struct starpu_omp_thread *thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
@@ -659,7 +697,7 @@ void starpu_omp_master(void (*f)(void), int nowait)
 
 	if (thread == region->master_thread)
 	{
-		f();
+		f(arg);
 	}
 
 	if (!nowait)
@@ -668,7 +706,7 @@ void starpu_omp_master(void (*f)(void), int nowait)
 	}
 }
 
-void starpu_omp_single(void (*f)(void), int nowait)
+void starpu_omp_single(void (*f)(void *arg), void *arg, int nowait)
 {
 	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
 	/* Assume singles are performed in by the implicit tasks of a region */
@@ -679,7 +717,7 @@ void starpu_omp_single(void (*f)(void), int nowait)
 
 	if (first)
 	{
-		f();
+		f(arg);
 	}
 
 	if (!nowait)
@@ -687,6 +725,67 @@ void starpu_omp_single(void (*f)(void), int nowait)
 		starpu_omp_barrier();
 	}
 }
+
+static void critical__sleep_callback(void *_critical)
+{
+	struct starpu_omp_critical *critical = _critical;
+	_starpu_spin_unlock(&critical->lock);
+}
+
+void starpu_omp_critical(void (*f)(void *arg), void *arg, const char *name)
+{
+	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
+	struct starpu_omp_critical *critical = NULL;
+	struct starpu_omp_task_link link;
+
+	if (name)
+	{
+		_starpu_spin_lock(&_global_state.named_criticals_lock);
+		HASH_FIND_STR(_global_state.named_criticals, name, critical);
+		if (critical == NULL)
+		{
+			critical = create_omp_critical_struct();
+			critical->name = name;
+			HASH_ADD_STR(_global_state.named_criticals, name, critical);
+		}
+		_starpu_spin_unlock(&_global_state.named_criticals_lock);
+	}
+	else
+	{
+		critical = _global_state.default_critical;
+	}
+
+	_starpu_spin_lock(&critical->lock);
+	while (critical->state != 0)
+	{
+		link.task = task;
+		link.next = critical->contention_list_head;
+		critical->contention_list_head = &link;
+		_starpu_task_prepare_for_continuation_ext(0, critical__sleep_callback, critical);
+		starpu_omp_task_preempt();
+
+		/* re-acquire the spin lock */
+		_starpu_spin_lock(&critical->lock);
+	}
+	critical->state = 1;
+	_starpu_spin_unlock(&critical->lock);
+
+	f(arg);
+
+	_starpu_spin_lock(&critical->lock);
+	STARPU_ASSERT(critical->state == 1);
+	critical->state = 0;
+	if (critical->contention_list_head != NULL)
+	{
+		int ret;
+		struct starpu_omp_task *next_task = critical->contention_list_head->task;
+		critical->contention_list_head = critical->contention_list_head->next;
+		ret = starpu_task_submit(next_task->starpu_task);
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+	}
+	_starpu_spin_unlock(&critical->lock);
+}
+
 /*
  * restore deprecated diagnostics (-Wdeprecated-declarations)
  */

+ 19 - 0
src/util/openmp_runtime_support.h

@@ -22,6 +22,7 @@
 #ifdef STARPU_OPENMP
 #include <common/list.h>
 #include <common/starpu_spinlock.h>
+#include <common/uthash.h>
 
 /* ucontexts have been deprecated as of POSIX 1-2004
  * _XOPEN_SOURCE required at least on OS/X
@@ -162,6 +163,21 @@ struct starpu_omp_initial_icv_values
 	struct starpu_omp_place places;
 };
 
+struct starpu_omp_task_link
+{
+	struct starpu_omp_task *task;
+	struct starpu_omp_task_link *next;
+};
+
+struct starpu_omp_critical
+{
+	UT_hash_handle hh;
+	struct _starpu_spinlock lock;
+	unsigned state;
+	struct starpu_omp_task_link *contention_list_head;
+	const char *name;
+};
+
 enum starpu_omp_task_state
 {
 	starpu_omp_task_state_clear      = 0,
@@ -258,6 +274,9 @@ struct starpu_omp_global
 	struct starpu_omp_thread *initial_thread;
 	struct starpu_omp_region *initial_region;
 	struct starpu_omp_device *initial_device;
+	struct starpu_omp_critical *default_critical;
+	struct starpu_omp_critical *named_criticals;
+	struct _starpu_spinlock named_criticals_lock;
 };
 
 /* 

+ 4 - 0
tests/Makefile.am

@@ -229,6 +229,7 @@ noinst_PROGRAMS =				\
 	openmp/parallel_master_nowait_01	\
 	openmp/parallel_single_wait_01		\
 	openmp/parallel_single_nowait_01	\
+	openmp/parallel_critical_01		\
 	overlap/overlap				\
 	overlap/gpu_concurrency			\
 	parallel_tasks/explicit_combined_worker	\
@@ -469,6 +470,9 @@ openmp_parallel_single_wait_01_SOURCES = 	\
 openmp_parallel_single_nowait_01_SOURCES = 	\
 	openmp/parallel_single_nowait_01.c
 
+openmp_parallel_critical_01_SOURCES = 	\
+	openmp/parallel_critical_01.c
+
 ###################
 # Block interface #
 ###################

+ 86 - 0
tests/openmp/parallel_critical_01.c

@@ -0,0 +1,86 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2014  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <pthread.h>
+#include <starpu.h>
+#include "../helper.h"
+#include <stdio.h>
+
+#if !defined(STARPU_OPENMP)
+int main(int argc, char **argv)
+{
+	return STARPU_TEST_SKIPPED;
+}
+#else
+__attribute__((constructor))
+static void omp_constructor(void)
+{
+	int ret = starpu_omp_init();
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_omp_init");
+}
+
+__attribute__((destructor))
+static void omp_destructor(void)
+{
+	starpu_omp_shutdown();
+}
+
+void critical_g(void *arg)
+{
+	(void) arg;
+	int worker_id;
+	pthread_t tid;
+	tid = pthread_self();
+	worker_id = starpu_worker_get_id();
+	printf("[tid %p] task thread = %d -- critical\n", (void *)tid, worker_id);
+}
+
+void parallel_region_f(void *buffers[], void *args)
+{
+	(void) buffers;
+	(void) args;
+	int worker_id;
+	pthread_t tid;
+	tid = pthread_self();
+	worker_id = starpu_worker_get_id();
+	printf("[tid %p] task thread = %d -- parallel -->\n", (void *)tid, worker_id);
+	starpu_omp_critical(critical_g, NULL, NULL);
+	starpu_omp_critical(critical_g, NULL, NULL);
+	starpu_omp_critical(critical_g, NULL, NULL);
+	starpu_omp_critical(critical_g, NULL, NULL);
+	printf("[tid %p] task thread = %d -- parallel <--\n", (void *)tid, worker_id);
+}
+
+static struct starpu_codelet parallel_region_cl =
+{
+	.cpu_funcs    = { parallel_region_f, NULL },
+	.where        = STARPU_CPU,
+	.nbuffers     = 0
+
+};
+
+int
+main (int argc, char *argv[]) {
+	pthread_t tid;
+	tid = pthread_self();
+	printf("<main>\n");
+	starpu_omp_parallel_region(&parallel_region_cl, NULL);
+	printf("<main>\n");
+	starpu_omp_parallel_region(&parallel_region_cl, NULL);
+	printf("<main>\n");
+	return 0;
+}
+#endif

+ 6 - 5
tests/openmp/parallel_master_nowait_01.c

@@ -38,8 +38,9 @@ static void omp_destructor(void)
 	starpu_omp_shutdown();
 }
 
-void master_g(void)
+void master_g(void *arg)
 {
+	(void) arg;
 	int worker_id;
 	pthread_t tid;
 	tid = pthread_self();
@@ -56,10 +57,10 @@ void parallel_region_f(void *buffers[], void *args)
 	tid = pthread_self();
 	worker_id = starpu_worker_get_id();
 	printf("[tid %p] task thread = %d -- parallel -->\n", (void *)tid, worker_id);
-	starpu_omp_master(master_g, 1);
-	starpu_omp_master(master_g, 1);
-	starpu_omp_master(master_g, 1);
-	starpu_omp_master(master_g, 1);
+	starpu_omp_master(master_g, NULL, 1);
+	starpu_omp_master(master_g, NULL, 1);
+	starpu_omp_master(master_g, NULL, 1);
+	starpu_omp_master(master_g, NULL, 1);
 	printf("[tid %p] task thread = %d -- parallel <--\n", (void *)tid, worker_id);
 }
 

+ 6 - 5
tests/openmp/parallel_master_wait_01.c

@@ -38,8 +38,9 @@ static void omp_destructor(void)
 	starpu_omp_shutdown();
 }
 
-void master_g(void)
+void master_g(void *arg)
 {
+	(void) arg;
 	int worker_id;
 	pthread_t tid;
 	tid = pthread_self();
@@ -56,10 +57,10 @@ void parallel_region_f(void *buffers[], void *args)
 	tid = pthread_self();
 	worker_id = starpu_worker_get_id();
 	printf("[tid %p] task thread = %d -- parallel -->\n", (void *)tid, worker_id);
-	starpu_omp_master(master_g, 0);
-	starpu_omp_master(master_g, 0);
-	starpu_omp_master(master_g, 0);
-	starpu_omp_master(master_g, 0);
+	starpu_omp_master(master_g, NULL, 0);
+	starpu_omp_master(master_g, NULL, 0);
+	starpu_omp_master(master_g, NULL, 0);
+	starpu_omp_master(master_g, NULL, 0);
 	printf("[tid %p] task thread = %d -- parallel <--\n", (void *)tid, worker_id);
 }
 

+ 6 - 5
tests/openmp/parallel_single_nowait_01.c

@@ -38,8 +38,9 @@ static void omp_destructor(void)
 	starpu_omp_shutdown();
 }
 
-void single_g(void)
+void single_g(void *arg)
 {
+	(void) arg;
 	int worker_id;
 	pthread_t tid;
 	tid = pthread_self();
@@ -56,10 +57,10 @@ void parallel_region_f(void *buffers[], void *args)
 	tid = pthread_self();
 	worker_id = starpu_worker_get_id();
 	printf("[tid %p] task thread = %d -- parallel -->\n", (void *)tid, worker_id);
-	starpu_omp_single(single_g, 1);
-	starpu_omp_single(single_g, 1);
-	starpu_omp_single(single_g, 1);
-	starpu_omp_single(single_g, 1);
+	starpu_omp_single(single_g, NULL, 1);
+	starpu_omp_single(single_g, NULL, 1);
+	starpu_omp_single(single_g, NULL, 1);
+	starpu_omp_single(single_g, NULL, 1);
 	printf("[tid %p] task thread = %d -- parallel <--\n", (void *)tid, worker_id);
 }
 

+ 6 - 5
tests/openmp/parallel_single_wait_01.c

@@ -38,8 +38,9 @@ static void omp_destructor(void)
 	starpu_omp_shutdown();
 }
 
-void single_g(void)
+void single_g(void *arg)
 {
+	(void) arg;
 	int worker_id;
 	pthread_t tid;
 	tid = pthread_self();
@@ -56,10 +57,10 @@ void parallel_region_f(void *buffers[], void *args)
 	tid = pthread_self();
 	worker_id = starpu_worker_get_id();
 	printf("[tid %p] task thread = %d -- parallel -->\n", (void *)tid, worker_id);
-	starpu_omp_single(single_g, 0);
-	starpu_omp_single(single_g, 0);
-	starpu_omp_single(single_g, 0);
-	starpu_omp_single(single_g, 0);
+	starpu_omp_single(single_g, NULL, 0);
+	starpu_omp_single(single_g, NULL, 0);
+	starpu_omp_single(single_g, NULL, 0);
+	starpu_omp_single(single_g, NULL, 0);
 	printf("[tid %p] task thread = %d -- parallel <--\n", (void *)tid, worker_id);
 }