Prechádzať zdrojové kódy

- optimize omp for to execute contiguous chunks of iterations in a single user function call
- add test case for omp for / nowait

Olivier Aumage 11 rokov pred
rodič
commit
88254f7eeb

+ 1 - 1
include/starpu_openmp.h

@@ -61,7 +61,7 @@ extern void starpu_omp_single(void (*f)(void *arg), void *arg, int nowait) __STA
 extern void starpu_omp_critical(void (*f)(void *arg), void *arg, const char *name) __STARPU_OMP_NOTHROW;
 extern void starpu_omp_taskwait(void) __STARPU_OMP_NOTHROW;
 extern void starpu_omp_taskgroup(void (*f)(void *arg), void *arg) __STARPU_OMP_NOTHROW;
-extern void starpu_omp_for(void (*f)(unsigned long i, void *arg), void *arg, unsigned long nb_iterations, unsigned long chunk, int schedule, int ordered, int nowait) __STARPU_OMP_NOTHROW;
+extern void starpu_omp_for(void (*f)(unsigned long _first_i, unsigned long _nb_i, void *arg), void *arg, unsigned long nb_iterations, unsigned long chunk, int schedule, int ordered, int nowait) __STARPU_OMP_NOTHROW;
 
 extern void starpu_omp_set_num_threads(int threads) __STARPU_OMP_NOTHROW;
 extern int starpu_omp_get_num_threads() __STARPU_OMP_NOTHROW;

+ 31 - 30
src/util/openmp_runtime_support.c

@@ -1103,7 +1103,7 @@ void starpu_omp_taskgroup(void (*f)(void *arg), void *arg)
 	task->task_group = p_previous_task_group;
 }
 
-void starpu_omp_for(void (*f)(unsigned long i, void *arg), void *arg, unsigned long nb_iterations, unsigned long chunk, int schedule, int ordered, int nowait)
+void starpu_omp_for(void (*f)(unsigned long _first_i, unsigned long _nb_i, void *arg), void *arg, unsigned long nb_iterations, unsigned long chunk, int schedule, int ordered, int nowait)
 {
 	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
 	struct starpu_omp_region *parallel_region = task->owner_region;
@@ -1136,47 +1136,44 @@ void starpu_omp_for(void (*f)(unsigned long i, void *arg), void *arg, unsigned l
 		{
 			const unsigned long stride = parallel_region->nb_threads * chunk;
 			unsigned long nb_strides = 0;
-			unsigned long iteration = task->rank * chunk;
-			unsigned long current_chunk = chunk;
-			while (iteration < nb_iterations)
+			unsigned long first_i = task->rank * chunk;
+			unsigned long nb_i;
+			while (first_i < nb_iterations)
 			{
-				f(iteration, arg);
-				if (current_chunk > 0)
+				if (first_i + chunk <= nb_iterations)
 				{
-					current_chunk--;
-					iteration++;
+					nb_i = chunk;
 				}
 				else
 				{
-					nb_strides++;
-					iteration = nb_strides * stride;
-					current_chunk = chunk;
+					nb_i = nb_iterations - first_i;
 				}
+				f(first_i, nb_i, arg);
+				nb_strides++;
+				first_i = stride * nb_strides;
 			}
 		}
 		else
 		{
-			chunk = nb_iterations / parallel_region->nb_threads;
-			unsigned long iteration = (unsigned)task->rank * chunk;
-			unsigned long extra = nb_iterations % parallel_region->nb_threads;
+			unsigned long nb_i = nb_iterations / parallel_region->nb_threads;
+			unsigned long first_i = (unsigned)task->rank * nb_i;
+			unsigned long remainder = nb_iterations % parallel_region->nb_threads;
 
 			if (nb_iterations % parallel_region->nb_threads > 0)
 			{
-				if ((unsigned)task->rank < extra)
+				if ((unsigned)task->rank < remainder)
 				{
-					chunk++;
-					iteration += (unsigned)task->rank;
+					nb_i++;
+					first_i += (unsigned)task->rank;
 				}
 				else
 				{
-					iteration += extra;
+					first_i += remainder;
 				}
 			}
-			while (chunk > 0 && iteration < nb_iterations)
+			if (nb_i > 0)
 			{
-				f(iteration, arg);
-				chunk--;
-				iteration++;
+				f(first_i, nb_i, arg);
 			}
 		}
 
@@ -1191,23 +1188,27 @@ void starpu_omp_for(void (*f)(unsigned long i, void *arg), void *arg, unsigned l
 		}
 		for (;;)
 		{
-			unsigned long iteration;
-			unsigned long current_chunk;
+			unsigned long first_i;
+			unsigned long nb_i;
+
 			_starpu_spin_lock(&parallel_region->lock);
 			/* upon exiting the loop, the parallel_region-lock will already be held
 			 * for performing loop completion */
 			if (loop->next_iteration >= nb_iterations)
 				break;
-			iteration = loop->next_iteration;
+			first_i = loop->next_iteration;
 			loop->next_iteration += chunk;
 			_starpu_spin_unlock(&parallel_region->lock);
-			current_chunk = chunk;
-			while (current_chunk > 0 && iteration < nb_iterations)
+
+			if (first_i + chunk <= nb_iterations)
+			{
+				nb_i = chunk;
+			}
+			else
 			{
-				f(iteration, arg);
-				iteration++;
-				current_chunk--;
+				nb_i = nb_iterations - first_i;
 			}
+			f(first_i, nb_i, arg);
 		}
 	}
 	else if (schedule == starpu_omp_schedule_guided)

+ 4 - 0
tests/Makefile.am

@@ -232,6 +232,7 @@ noinst_PROGRAMS =				\
 	openmp/parallel_critical_01		\
 	openmp/parallel_critical_named_01	\
 	openmp/parallel_for_01			\
+	openmp/parallel_for_02			\
 	openmp/task_01				\
 	openmp/taskwait_01			\
 	openmp/taskgroup_01			\
@@ -484,6 +485,9 @@ openmp_parallel_critical_named_01_SOURCES = 	\
 openmp_parallel_for_01_SOURCES = 	\
 	openmp/parallel_for_01.c
 
+openmp_parallel_for_02_SOURCES = 	\
+	openmp/parallel_for_02.c
+
 openmp_task_01_SOURCES = 	\
 	openmp/task_01.c
 

+ 12 - 10
tests/openmp/parallel_for_01.c

@@ -40,14 +40,16 @@ static void omp_destructor(void)
 	starpu_omp_shutdown();
 }
 
-void for_g(unsigned long i, void *arg)
+void for_g(unsigned long i, unsigned long nb_i, void *arg)
 {
-	(void) arg;
-	int worker_id;
-	pthread_t tid;
-	tid = pthread_self();
-	worker_id = starpu_worker_get_id();
-	printf("[tid %p] task thread = %d, for iteration %lu\n", (void *)tid, worker_id, i);
+	for (; nb_i > 0; i++, nb_i--)
+	{
+		int worker_id;
+		pthread_t tid;
+		tid = pthread_self();
+		worker_id = starpu_worker_get_id();
+		printf("[tid %p] task thread = %d, for [%s] iteration %lu\n", (void *)tid, worker_id, (const char *)arg, i);
+	}
 }
 
 void parallel_region_1_f(void *buffers[], void *args)
@@ -59,7 +61,7 @@ void parallel_region_1_f(void *buffers[], void *args)
 	tid = pthread_self();
 	worker_id = starpu_worker_get_id();
 	printf("[tid %p] task thread = %d\n", (void *)tid, worker_id);
-	starpu_omp_for(for_g, NULL, NB_ITERS, CHUNK, starpu_omp_sched_static, 0, 0);
+	starpu_omp_for(for_g, (void*)"static chunk", NB_ITERS, CHUNK, starpu_omp_sched_static, 0, 0);
 }
 
 static struct starpu_codelet parallel_region_1_cl =
@@ -79,7 +81,7 @@ void parallel_region_2_f(void *buffers[], void *args)
 	tid = pthread_self();
 	worker_id = starpu_worker_get_id();
 	printf("[tid %p] task thread = %d\n", (void *)tid, worker_id);
-	starpu_omp_for(for_g, NULL, NB_ITERS, 0, starpu_omp_sched_static, 0, 0);
+	starpu_omp_for(for_g, (void*)"static nochunk", NB_ITERS, 0, starpu_omp_sched_static, 0, 0);
 }
 
 static struct starpu_codelet parallel_region_2_cl =
@@ -99,7 +101,7 @@ void parallel_region_3_f(void *buffers[], void *args)
 	tid = pthread_self();
 	worker_id = starpu_worker_get_id();
 	printf("[tid %p] task thread = %d\n", (void *)tid, worker_id);
-	starpu_omp_for(for_g, NULL, NB_ITERS, CHUNK, starpu_omp_sched_dynamic, 0, 0);
+	starpu_omp_for(for_g, (void*)"dynamic", NB_ITERS, CHUNK, starpu_omp_sched_dynamic, 0, 0);
 }
 
 static struct starpu_codelet parallel_region_3_cl =

+ 83 - 0
tests/openmp/parallel_for_02.c

@@ -0,0 +1,83 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2014  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <pthread.h>
+#include <starpu.h>
+#include "../helper.h"
+#include <stdio.h>
+
+#if !defined(STARPU_OPENMP)
+int main(int argc, char **argv)
+{
+	return STARPU_TEST_SKIPPED;
+}
+#else
+#define NB_ITERS 4321
+#define CHUNK 42
+__attribute__((constructor))
+static void omp_constructor(void)
+{
+	int ret = starpu_omp_init();
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_omp_init");
+}
+
+__attribute__((destructor))
+static void omp_destructor(void)
+{
+	starpu_omp_shutdown();
+}
+
+void for_g(unsigned long i, unsigned long nb_i, void *arg)
+{
+	for (; nb_i > 0; i++, nb_i--)
+	{
+		int worker_id;
+		pthread_t tid;
+		tid = pthread_self();
+		worker_id = starpu_worker_get_id();
+		printf("[tid %p] task thread = %d, for [%s] iteration %lu\n", (void *)tid, worker_id, (const char *)arg, i);
+	}
+}
+
+void parallel_region_1_f(void *buffers[], void *args)
+{
+	(void) buffers;
+	(void) args;
+	int worker_id;
+	pthread_t tid;
+	tid = pthread_self();
+	worker_id = starpu_worker_get_id();
+	printf("[tid %p] task thread = %d\n", (void *)tid, worker_id);
+	starpu_omp_for(for_g, (void*)"static chunk", NB_ITERS, CHUNK, starpu_omp_sched_static, 0, 1);
+	printf("[tid %p] task thread = %d\n", (void *)tid, worker_id);
+	starpu_omp_for(for_g, (void*)"static nochunk", NB_ITERS, 0, starpu_omp_sched_static, 0, 1);
+	printf("[tid %p] task thread = %d\n", (void *)tid, worker_id);
+	starpu_omp_for(for_g, (void*)"dynamic", NB_ITERS, CHUNK, starpu_omp_sched_dynamic, 0, 1);
+}
+
+static struct starpu_codelet parallel_region_1_cl =
+{
+	.cpu_funcs    = { parallel_region_1_f, NULL },
+	.where        = STARPU_CPU,
+	.nbuffers     = 0
+};
+
+int
+main (int argc, char *argv[]) {
+	starpu_omp_parallel_region(&parallel_region_1_cl, NULL);
+	return 0;
+}
+#endif