Browse Source

- preliminary implementation of omp for, support for schedule 'static', schedule 'dynamic' with or without chunk
- add test case for omp for
- modification of the allocation/cleanup of the starpu_omp_thread structs
- add test case for several level 1 omp parallel in sequence

Olivier Aumage 11 years ago
parent
commit
e4d10c0e75
4 changed files with 183 additions and 23 deletions
  1. 1 0
      include/starpu_openmp.h
  2. 158 18
      src/util/openmp_runtime_support.c
  3. 16 4
      src/util/openmp_runtime_support.h
  4. 8 1
      tests/Makefile.am

+ 1 - 0
include/starpu_openmp.h

@@ -61,6 +61,7 @@ extern void starpu_omp_single(void (*f)(void *arg), void *arg, int nowait) __STA
 extern void starpu_omp_critical(void (*f)(void *arg), void *arg, const char *name) __STARPU_OMP_NOTHROW;
 extern void starpu_omp_taskwait(void) __STARPU_OMP_NOTHROW;
 extern void starpu_omp_taskgroup(void (*f)(void *arg), void *arg) __STARPU_OMP_NOTHROW;
+extern void starpu_omp_for(void (*f)(unsigned long i, void *arg), void *arg, unsigned long nb_iterations, unsigned long chunk, int schedule, int ordered, int nowait) __STARPU_OMP_NOTHROW;
 
 extern void starpu_omp_set_num_threads(int threads) __STARPU_OMP_NOTHROW;
 extern int starpu_omp_get_num_threads() __STARPU_OMP_NOTHROW;

+ 158 - 18
src/util/openmp_runtime_support.c

@@ -66,6 +66,14 @@ static void register_thread_worker(struct starpu_omp_thread *thread)
 	HASH_ADD_PTR(_global_state.hash_workers, worker, thread);
 	_starpu_spin_unlock(&_global_state.hash_workers_lock);
 }
+static struct starpu_omp_thread *get_worker_thread(struct _starpu_worker *starpu_worker)
+{
+	struct starpu_omp_thread *thread = NULL;
+	_starpu_spin_lock(&_global_state.hash_workers_lock);
+	HASH_FIND_PTR(_global_state.hash_workers, &starpu_worker, thread);
+	_starpu_spin_unlock(&_global_state.hash_workers_lock);
+	return thread;
+}
 static struct starpu_omp_thread *get_local_thread(void)
 {
 	struct starpu_omp_thread *thread = STARPU_PTHREAD_GETSPECIFIC(omp_thread_key);
@@ -183,16 +191,6 @@ static struct starpu_omp_thread *create_omp_thread_struct(struct starpu_omp_regi
 static void destroy_omp_thread_struct(struct starpu_omp_thread *thread)
 {
 	STARPU_ASSERT(thread->current_task == NULL);
-	if (thread->worker)
-	{
-		struct _starpu_worker *check = thread->worker;
-		struct starpu_omp_thread *_tmp;
-		_starpu_spin_lock(&_global_state.hash_workers_lock);
-		HASH_FIND_PTR(_global_state.hash_workers, &check, _tmp);
-		STARPU_ASSERT(_tmp == thread);
-		HASH_DEL(_global_state.hash_workers, _tmp);
-		_starpu_spin_unlock(&_global_state.hash_workers_lock);
-	}
 	memset(thread, 0, sizeof(*thread));
 	starpu_omp_thread_delete(thread);
 }
@@ -550,7 +548,6 @@ void starpu_omp_shutdown(void)
 	/* TODO: free task/thread/region/device structures */
 	destroy_omp_task_struct(_global_state.initial_task);
 	_global_state.initial_task = NULL;
-	destroy_omp_thread_struct(_global_state.initial_thread);
 	_global_state.initial_thread = NULL;
 	destroy_omp_region_struct(_global_state.initial_region);
 	_global_state.initial_region = NULL;
@@ -617,23 +614,29 @@ void starpu_omp_parallel_region(const struct starpu_codelet * const _parallel_re
 		else
 		{
 			/* TODO: specify actual starpu worker */
-			new_thread = create_omp_thread_struct(new_region);
 
 			/* TODO: use a less arbitrary thread/worker mapping scheme */
 			if (region->level == 0)
 			{
-				new_thread->worker = _starpu_get_worker_struct(i);
-				register_thread_worker(new_thread);
+				struct _starpu_worker *worker = _starpu_get_worker_struct(i);
+				new_thread = get_worker_thread(worker);
+				if (new_thread == NULL)
+				{
+					new_thread = create_omp_thread_struct(new_region);
+					new_thread->worker = _starpu_get_worker_struct(i);
+					register_thread_worker(new_thread);
+				}
 			}
 			else
 			{
-				new_thread->worker = master_thread->worker;
+				new_thread = master_thread;
 			}
 			starpu_omp_thread_list_push_back(new_region->thread_list, new_thread);
 		}
 
-		new_region->nb_threads++;
 		struct starpu_omp_task *new_task = create_omp_task_struct(task, new_thread, new_region, 1);
+		new_task->rank = new_region->nb_threads;
+		new_region->nb_threads++;
 		starpu_omp_task_list_push_back(new_region->implicit_task_list, new_task);
 
 	}
@@ -738,8 +741,8 @@ void starpu_omp_parallel_region(const struct starpu_codelet * const _parallel_re
 		}
 		else
 		{
-			struct starpu_omp_thread *region_thread = starpu_omp_thread_list_pop_front(new_region->thread_list);
-			destroy_omp_thread_struct(region_thread);
+			starpu_omp_thread_list_pop_front(new_region->thread_list);
+			/* TODO: cleanup unused threads */
 		}
 		new_region->nb_threads--;
 		struct starpu_omp_task *implicit_task = starpu_omp_task_list_pop_front(new_region->implicit_task_list);
@@ -994,6 +997,7 @@ void starpu_omp_task_region(const struct starpu_codelet * const _task_region_cl,
 		generated_task->is_final = is_final;
 		generated_task->is_untied = is_untied;
 		generated_task->task_group = generating_task->task_group;
+		generated_task->rank = -1;
 
 		/*
 		 * save pointer to the regions user function from the task region codelet
@@ -1098,6 +1102,142 @@ void starpu_omp_taskgroup(void (*f)(void *arg), void *arg)
 	}
 	task->task_group = p_previous_task_group;
 }
+
+void starpu_omp_for(void (*f)(unsigned long i, void *arg), void *arg, unsigned long nb_iterations, unsigned long chunk, int schedule, int ordered, int nowait)
+{
+	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
+	struct starpu_omp_region *parallel_region = task->owner_region;
+	struct starpu_omp_loop *loop;
+	if (ordered)
+		/* TODO: implement ordered statement */
+		_STARPU_ERROR("omp for / ordered not implemented\n");
+	_starpu_spin_lock(&parallel_region->lock);
+	loop = parallel_region->loop_list;
+	while (loop && loop->id != task->loop_id)
+	{
+		loop = loop->next_loop;
+	}
+	if (!loop)
+	{
+		loop = malloc(sizeof(*loop));
+		if (loop == NULL)
+			_STARPU_ERROR("memory allocation failed\n");
+		loop->id = task->loop_id;
+		loop->next_iteration = 0;
+		loop->nb_completed_threads = 0;
+		loop->next_loop = parallel_region->loop_list;
+		parallel_region->loop_list = loop;
+	}
+	_starpu_spin_unlock(&parallel_region->lock);
+
+	if (schedule == starpu_omp_schedule_static || schedule == starpu_omp_schedule_auto)
+	{
+		if (chunk > 0)
+		{
+			const unsigned long stride = parallel_region->nb_threads * chunk;
+			unsigned long nb_strides = 0;
+			unsigned long iteration = task->rank * chunk;
+			unsigned long current_chunk = chunk;
+			while (iteration < nb_iterations)
+			{
+				f(iteration, arg);
+				if (current_chunk > 0)
+				{
+					current_chunk--;
+					iteration++;
+				}
+				else
+				{
+					nb_strides++;
+					iteration = nb_strides * stride;
+					current_chunk = chunk;
+				}
+			}
+		}
+		else
+		{
+			chunk = nb_iterations / parallel_region->nb_threads;
+			unsigned long iteration = (unsigned)task->rank * chunk;
+			unsigned long extra = nb_iterations % parallel_region->nb_threads;
+
+			if (nb_iterations % parallel_region->nb_threads > 0)
+			{
+				if ((unsigned)task->rank < extra)
+				{
+					chunk++;
+					iteration += (unsigned)task->rank;
+				}
+				else
+				{
+					iteration += extra;
+				}
+			}
+			while (chunk > 0 && iteration < nb_iterations)
+			{
+				f(iteration, arg);
+				chunk--;
+				iteration++;
+			}
+		}
+
+		/* re-acquire lock before performing loop completion */
+		_starpu_spin_lock(&parallel_region->lock);
+	}
+	else if (schedule == starpu_omp_schedule_dynamic)
+	{
+		if (chunk == 0)
+		{
+			chunk = 1;
+		}
+		for (;;)
+		{
+			unsigned long iteration;
+			unsigned long current_chunk;
+			_starpu_spin_lock(&parallel_region->lock);
+			/* upon exiting the loop, the parallel_region-lock will already be held
+			 * for performing loop completion */
+			if (loop->next_iteration >= nb_iterations)
+				break;
+			iteration = loop->next_iteration;
+			loop->next_iteration += chunk;
+			_starpu_spin_unlock(&parallel_region->lock);
+			current_chunk = chunk;
+			while (current_chunk > 0 && iteration < nb_iterations)
+			{
+				f(iteration, arg);
+				iteration++;
+				current_chunk--;
+			}
+		}
+	}
+	else if (schedule == starpu_omp_schedule_guided)
+	{
+		/* TODO: implement omp_schedule_guided */
+		_STARPU_ERROR("omp for / guided schedule not implemented\n");
+	}
+
+	loop->nb_completed_threads++;
+	if (loop->nb_completed_threads == parallel_region->nb_threads)
+	{
+		struct starpu_omp_loop **p_loop;
+		STARPU_ASSERT(loop->next_loop == NULL);
+		p_loop = &(parallel_region->loop_list);
+		while (*p_loop != loop)
+		{
+			p_loop = &((*p_loop)->next_loop);
+		}
+		*p_loop = NULL;
+		free(loop);
+	}
+	_starpu_spin_unlock(&parallel_region->lock);
+
+	if (!nowait)
+	{
+		starpu_omp_barrier();
+	}
+	task->loop_id++;
+}
+
 /*
  * restore deprecated diagnostics (-Wdeprecated-declarations)
  */

+ 16 - 4
src/util/openmp_runtime_support.h

@@ -53,10 +53,11 @@ enum starpu_omp_bind_mode
 
 enum starpu_omp_schedule_mode
 {
-	starpu_omp_schedule_static  = 0,
-	starpu_omp_schedule_dynamic = 1,
-	starpu_omp_schedule_guided  = 2,
-	starpu_omp_schedule_auto    = 3
+	starpu_omp_schedule_undefined = 0,
+	starpu_omp_schedule_static    = 1,
+	starpu_omp_schedule_dynamic   = 2,
+	starpu_omp_schedule_guided    = 3,
+	starpu_omp_schedule_auto      = 4
 };
 
 /*
@@ -210,12 +211,14 @@ LIST_TYPE(starpu_omp_task,
 	int is_undeferred;
 	int is_final;
 	int is_untied;
+	int rank;
 	int child_task_count;
 	struct starpu_omp_task_group *task_group;
 	struct _starpu_spinlock lock;
 	int wait_on;
 	int barrier_count;
 	int single_id;
+	int loop_id;
 	struct starpu_omp_data_environment_icvs data_env_icvs;
 	struct starpu_omp_implicit_task_icvs implicit_task_icvs;
 
@@ -266,6 +269,14 @@ LIST_TYPE(starpu_omp_thread,
 	struct _starpu_worker *worker;
 )
 
+struct starpu_omp_loop
+{
+	int id;
+	unsigned long next_iteration;
+	int nb_completed_threads;
+	struct starpu_omp_loop *next_loop;
+};
+
 struct starpu_omp_region
 {
 	struct starpu_omp_region *parent_region;
@@ -283,6 +294,7 @@ struct starpu_omp_region
 	int bound_explicit_task_count;
 	int single_id;
 	int level;
+	struct starpu_omp_loop *loop_list;
 	struct starpu_task *continuation_starpu_task;
 };
 

+ 8 - 1
tests/Makefile.am

@@ -224,13 +224,14 @@ noinst_PROGRAMS =				\
 	openmp/environment			\
 	openmp/parallel_01			\
 	openmp/parallel_02			\
+	openmp/parallel_03			\
 	openmp/parallel_barrier_01		\
 	openmp/parallel_master_01		\
 	openmp/parallel_single_wait_01		\
 	openmp/parallel_single_nowait_01	\
 	openmp/parallel_critical_01		\
 	openmp/parallel_critical_named_01	\
-	openmp/task_01				\
+	openmp/parallel_for_01			\
 	openmp/task_01				\
 	openmp/taskwait_01			\
 	openmp/taskgroup_01			\
@@ -459,6 +460,9 @@ openmp_parallel_01_SOURCES = 	\
 openmp_parallel_02_SOURCES = 	\
 	openmp/parallel_02.c
 
+openmp_parallel_03_SOURCES = 	\
+	openmp/parallel_03.c
+
 openmp_parallel_barrier_01_SOURCES = 	\
 	openmp/parallel_barrier_01.c
 
@@ -477,6 +481,9 @@ openmp_parallel_critical_01_SOURCES = 	\
 openmp_parallel_critical_named_01_SOURCES = 	\
 	openmp/parallel_critical_named_01.c
 
+openmp_parallel_for_01_SOURCES = 	\
+	openmp/parallel_for_01.c
+
 openmp_task_01_SOURCES = 	\
 	openmp/task_01.c