瀏覽代碼

- add inline version of omp for
- build outline version on top of inline version

Olivier Aumage 11 年之前
父節點
當前提交
00bc8739c4
共有 4 個文件被更改,包括 183 次插入100 次删除
  1. 2 0
      include/starpu_openmp.h
  2. 132 91
      src/util/openmp_runtime_support.c
  3. 44 5
      tests/openmp/parallel_for_01.c
  4. 5 4
      tests/openmp/parallel_for_02.c

+ 2 - 0
include/starpu_openmp.h

@@ -66,6 +66,8 @@ extern void starpu_omp_critical_inline_end(const char *name) __STARPU_OMP_NOTHRO
 extern void starpu_omp_taskwait(void) __STARPU_OMP_NOTHROW;
 extern void starpu_omp_taskgroup(void (*f)(void *arg), void *arg) __STARPU_OMP_NOTHROW;
 extern void starpu_omp_for(void (*f)(unsigned long _first_i, unsigned long _nb_i, void *arg), void *arg, unsigned long nb_iterations, unsigned long chunk, int schedule, int ordered, int nowait) __STARPU_OMP_NOTHROW;
+extern int starpu_omp_for_inline_first(unsigned long nb_iterations, unsigned long chunk, int schedule, int ordered, unsigned long *_first_i, unsigned long *_nb_i) __STARPU_OMP_NOTHROW;
+extern int starpu_omp_for_inline_next(unsigned long nb_iterations, unsigned long chunk, int schedule, int ordered, unsigned long *_first_i, unsigned long *_nb_i) __STARPU_OMP_NOTHROW;
 
 extern void starpu_omp_set_num_threads(int threads) __STARPU_OMP_NOTHROW;
 extern int starpu_omp_get_num_threads() __STARPU_OMP_NOTHROW;

+ 132 - 91
src/util/openmp_runtime_support.c

@@ -1207,82 +1207,58 @@ void starpu_omp_taskgroup(void (*f)(void *arg), void *arg)
 	task->task_group = p_previous_task_group;
 }
 
-void starpu_omp_for(void (*f)(unsigned long _first_i, unsigned long _nb_i, void *arg), void *arg, unsigned long nb_iterations, unsigned long chunk, int schedule, int ordered, int nowait)
+static inline void _starpu_omp_for_loop(struct starpu_omp_region *parallel_region, struct starpu_omp_task *task,
+		struct starpu_omp_loop *loop, int first_call,
+		unsigned long nb_iterations, unsigned long chunk, int schedule, unsigned long *_first_i, unsigned long *_nb_i)
 {
-	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
-	struct starpu_omp_region *parallel_region = task->owner_region;
-	struct starpu_omp_loop *loop;
-	if (ordered)
-		/* TODO: implement ordered statement */
-		_STARPU_ERROR("omp for / ordered not implemented\n");
-	_starpu_spin_lock(&parallel_region->lock);
-	loop = parallel_region->loop_list;
-	while (loop && loop->id != task->loop_id)
-	{
-		loop = loop->next_loop;
-	}
-	if (!loop)
-	{
-		loop = malloc(sizeof(*loop));
-		if (loop == NULL)
-			_STARPU_ERROR("memory allocation failed\n");
-		loop->id = task->loop_id;
-		loop->next_iteration = 0;
-		loop->nb_completed_threads = 0;
-		loop->next_loop = parallel_region->loop_list;
-		parallel_region->loop_list = loop;
-	}
-	_starpu_spin_unlock(&parallel_region->lock);
-
+	*_nb_i = 0;
 	if (schedule == starpu_omp_schedule_static || schedule == starpu_omp_schedule_auto)
 	{
 		if (chunk > 0)
 		{
-			const unsigned long stride = parallel_region->nb_threads * chunk;
-			unsigned long nb_strides = 0;
-			unsigned long first_i = task->rank * chunk;
-			unsigned long nb_i;
-			while (first_i < nb_iterations)
+			if (first_call)
 			{
-				if (first_i + chunk <= nb_iterations)
+				*_first_i = task->rank * chunk;
+			}
+			else
+			{
+				*_first_i += parallel_region->nb_threads * chunk;
+			}
+
+			if (*_first_i < nb_iterations)
+			{
+				if (*_first_i + chunk > nb_iterations)
 				{
-					nb_i = chunk;
+					*_nb_i = nb_iterations - *_first_i;
 				}
 				else
 				{
-					nb_i = nb_iterations - first_i;
+					*_nb_i = chunk;
 				}
-				f(first_i, nb_i, arg);
-				nb_strides++;
-				first_i = stride * nb_strides;
 			}
 		}
 		else
 		{
-			unsigned long nb_i = nb_iterations / parallel_region->nb_threads;
-			unsigned long first_i = (unsigned)task->rank * nb_i;
-			unsigned long remainder = nb_iterations % parallel_region->nb_threads;
-
-			if (nb_iterations % parallel_region->nb_threads > 0)
+			if (first_call)
 			{
-				if ((unsigned)task->rank < remainder)
-				{
-					nb_i++;
-					first_i += (unsigned)task->rank;
-				}
-				else
+				*_nb_i = nb_iterations / parallel_region->nb_threads;
+				*_first_i = (unsigned)task->rank * (*_nb_i);
+				unsigned long remainder = nb_iterations % parallel_region->nb_threads;
+
+				if (remainder > 0)
 				{
-					first_i += remainder;
+					if ((unsigned)task->rank < remainder)
+					{
+						(*_nb_i)++;
+						*_first_i += (unsigned)task->rank;
+					}
+					else
+					{
+						*_first_i += remainder;
+					}
 				}
 			}
-			if (nb_i > 0)
-			{
-				f(first_i, nb_i, arg);
-			}
 		}
-
-		/* re-acquire lock before performing loop completion */
-		_starpu_spin_lock(&parallel_region->lock);
 	}
 	else if (schedule == starpu_omp_schedule_dynamic)
 	{
@@ -1290,29 +1266,25 @@ void starpu_omp_for(void (*f)(unsigned long _first_i, unsigned long _nb_i, void
 		{
 			chunk = 1;
 		}
-		for (;;)
+		if (first_call)
+		{
+			*_first_i = 0;
+		}
+		_starpu_spin_lock(&parallel_region->lock);
+		if (loop->next_iteration < nb_iterations)
 		{
-			unsigned long first_i;
-			unsigned long nb_i;
-
-			_starpu_spin_lock(&parallel_region->lock);
-			/* upon exiting the loop, the parallel_region-lock will already be held
-			 * for performing loop completion */
-			if (loop->next_iteration >= nb_iterations)
-				break;
-			first_i = loop->next_iteration;
-			if (first_i + chunk > nb_iterations)
+			*_first_i = loop->next_iteration;
+			if (*_first_i + chunk > nb_iterations)
 			{
-				nb_i = nb_iterations - first_i;
+				*_nb_i = nb_iterations - *_first_i;
 			}
 			else
 			{
-				nb_i = chunk;
+				*_nb_i = chunk;
 			}
-			loop->next_iteration += nb_i;
-			_starpu_spin_unlock(&parallel_region->lock);
-			f(first_i, nb_i, arg);
+			loop->next_iteration += *_nb_i;
 		}
+		_starpu_spin_unlock(&parallel_region->lock);
 	}
 	else if (schedule == starpu_omp_schedule_guided)
 	{
@@ -1320,35 +1292,63 @@ void starpu_omp_for(void (*f)(unsigned long _first_i, unsigned long _nb_i, void
 		{
 			chunk = 1;
 		}
-		for (;;)
+		if (first_call)
 		{
-			unsigned long first_i;
-			unsigned long nb_i;
-
-			_starpu_spin_lock(&parallel_region->lock);
-			/* upon exiting the loop, the parallel_region-lock will already be held
-			 * for performing loop completion */
-			if (loop->next_iteration >= nb_iterations)
-				break;
-			first_i = loop->next_iteration;
-			nb_i = (nb_iterations - first_i)/parallel_region->nb_threads;
-			if (nb_i < chunk)
+			*_first_i = 0;
+		}
+		_starpu_spin_lock(&parallel_region->lock);
+		if (loop->next_iteration < nb_iterations)
+		{
+			*_first_i = loop->next_iteration;
+			*_nb_i = (nb_iterations - *_first_i)/parallel_region->nb_threads;
+			if (*_nb_i < chunk)
 			{
-				if (first_i+chunk > nb_iterations)
+				if (*_first_i+chunk > nb_iterations)
 				{
-					nb_i = nb_iterations - first_i;
+					*_nb_i = nb_iterations - *_first_i;
 				}
 				else
 				{
-					nb_i = chunk;
+					*_nb_i = chunk;
 				}
 			}
-			loop->next_iteration += nb_i;
-			_starpu_spin_unlock(&parallel_region->lock);
-			f(first_i, nb_i, arg);
+			loop->next_iteration += *_nb_i;
 		}
+		_starpu_spin_unlock(&parallel_region->lock);
 	}
+}
 
+static inline struct starpu_omp_loop *_starpu_omp_for_loop_begin(struct starpu_omp_region *parallel_region, struct starpu_omp_task *task,
+		int ordered)
+{
+	struct starpu_omp_loop *loop;
+	if (ordered)
+		/* TODO: implement ordered statement */
+		_STARPU_ERROR("omp for / ordered not implemented\n");
+	_starpu_spin_lock(&parallel_region->lock);
+	loop = parallel_region->loop_list;
+	while (loop && loop->id != task->loop_id)
+	{
+		loop = loop->next_loop;
+	}
+	if (!loop)
+	{
+		loop = malloc(sizeof(*loop));
+		if (loop == NULL)
+			_STARPU_ERROR("memory allocation failed\n");
+		loop->id = task->loop_id;
+		loop->next_iteration = 0;
+		loop->nb_completed_threads = 0;
+		loop->next_loop = parallel_region->loop_list;
+		parallel_region->loop_list = loop;
+	}
+	_starpu_spin_unlock(&parallel_region->lock);
+	return loop;
+}
+static inline void _starpu_omp_for_loop_end(struct starpu_omp_region *parallel_region, struct starpu_omp_task *task,
+		struct starpu_omp_loop *loop)
+{
+	_starpu_spin_lock(&parallel_region->lock);
 	loop->nb_completed_threads++;
 	if (loop->nb_completed_threads == parallel_region->nb_threads)
 	{
@@ -1363,12 +1363,53 @@ void starpu_omp_for(void (*f)(unsigned long _first_i, unsigned long _nb_i, void
 		free(loop);
 	}
 	_starpu_spin_unlock(&parallel_region->lock);
+	task->loop_id++;
+}
 
+int starpu_omp_for_inline_first(unsigned long nb_iterations, unsigned long chunk, int schedule, int ordered, unsigned long *_first_i, unsigned long *_nb_i)
+{
+	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
+	struct starpu_omp_region *parallel_region = task->owner_region;
+	struct starpu_omp_loop *loop = _starpu_omp_for_loop_begin(parallel_region, task, ordered);
+
+	_starpu_omp_for_loop(parallel_region, task, loop, 1, nb_iterations, chunk, schedule, _first_i, _nb_i);
+	if (*_nb_i == 0)
+	{
+		_starpu_omp_for_loop_end(parallel_region, task, loop);
+	}
+	return (*_nb_i != 0);
+}
+
+int starpu_omp_for_inline_next(unsigned long nb_iterations, unsigned long chunk, int schedule, int ordered, unsigned long *_first_i, unsigned long *_nb_i)
+{
+	struct starpu_omp_task *task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
+	struct starpu_omp_region *parallel_region = task->owner_region;
+	struct starpu_omp_loop *loop = _starpu_omp_for_loop_begin(parallel_region, task, ordered);
+
+	_starpu_omp_for_loop(parallel_region, task, loop, 0, nb_iterations, chunk, schedule, _first_i, _nb_i);
+	if (*_nb_i == 0)
+	{
+		_starpu_omp_for_loop_end(parallel_region, task, loop);
+	}
+	return (*_nb_i != 0);
+}
+
+void starpu_omp_for(void (*f)(unsigned long _first_i, unsigned long _nb_i, void *arg), void *arg, unsigned long nb_iterations, unsigned long chunk, int schedule, int ordered, int nowait)
+{
+	unsigned long _first_i = 0;
+	unsigned long _nb_i = 0;
+	if (starpu_omp_for_inline_first(nb_iterations, chunk, schedule, ordered, &_first_i, &_nb_i))
+	{
+		do
+		{
+			f(_first_i, _nb_i, arg);
+		}
+		while (starpu_omp_for_inline_next(nb_iterations, chunk, schedule, ordered, &_first_i, &_nb_i));
+	}
 	if (!nowait)
 	{
 		starpu_omp_barrier();
 	}
-	task->loop_id++;
 }
 
 /*

+ 44 - 5
tests/openmp/parallel_for_01.c

@@ -27,6 +27,8 @@ int main(int argc, char **argv)
 #else
 #define NB_ITERS 256
 #define CHUNK 16
+unsigned long array[NB_ITERS];
+
 __attribute__((constructor))
 static void omp_constructor(void)
 {
@@ -42,13 +44,14 @@ static void omp_destructor(void)
 
 void for_g(unsigned long i, unsigned long nb_i, void *arg)
 {
+	int worker_id;
+	pthread_t tid;
+	tid = pthread_self();
+	worker_id = starpu_worker_get_id();
+	printf("[tid %p] task thread = %d, for [%s] iterations first=%lu:nb=%lu\n", (void *)tid, worker_id, (const char *)arg, i, nb_i);
 	for (; nb_i > 0; i++, nb_i--)
 	{
-		int worker_id;
-		pthread_t tid;
-		tid = pthread_self();
-		worker_id = starpu_worker_get_id();
-		printf("[tid %p] task thread = %d, for [%s] iteration %lu\n", (void *)tid, worker_id, (const char *)arg, i);
+		array[i] = 1;
 	}
 }
 
@@ -172,14 +175,50 @@ static struct starpu_codelet parallel_region_6_cl =
 
 };
 
+static void clear_array(void)
+{
+	memset(array, 0, NB_ITERS*sizeof(unsigned long));
+}
+
+static void check_array(void)
+{
+	unsigned long i;
+	unsigned long s = 0;
+	for (i = 0; i < NB_ITERS; i++)
+	{
+		s += array[i];
+	}
+	if (s != NB_ITERS)
+	{
+		printf("missing iterations\n");
+		exit(1);
+	}
+}
 int
 main (int argc, char *argv[]) {
+	clear_array();
 	starpu_omp_parallel_region(&parallel_region_1_cl, NULL);
+	check_array();
+
+	clear_array();
 	starpu_omp_parallel_region(&parallel_region_2_cl, NULL);
+	check_array();
+
+	clear_array();
 	starpu_omp_parallel_region(&parallel_region_3_cl, NULL);
+	check_array();
+
+	clear_array();
 	starpu_omp_parallel_region(&parallel_region_4_cl, NULL);
+	check_array();
+
+	clear_array();
 	starpu_omp_parallel_region(&parallel_region_5_cl, NULL);
+	check_array();
+
+	clear_array();
 	starpu_omp_parallel_region(&parallel_region_6_cl, NULL);
+	check_array();
 	return 0;
 }
 #endif

+ 5 - 4
tests/openmp/parallel_for_02.c

@@ -42,12 +42,13 @@ static void omp_destructor(void)
 
 void for_g(unsigned long i, unsigned long nb_i, void *arg)
 {
+	int worker_id;
+	pthread_t tid;
+	tid = pthread_self();
+	worker_id = starpu_worker_get_id();
+	printf("[tid %p] task thread = %d, for [%s] iterations first=%lu:nb=%lu\n", (void *)tid, worker_id, (const char *)arg, i, nb_i);
 	for (; nb_i > 0; i++, nb_i--)
 	{
-		int worker_id;
-		pthread_t tid;
-		tid = pthread_self();
-		worker_id = starpu_worker_get_id();
 		printf("[tid %p] task thread = %d, for [%s] iteration %lu\n", (void *)tid, worker_id, (const char *)arg, i);
 	}
 }