瀏覽代碼

Merge branch 'master' of git+ssh://scm.gforge.inria.fr/gitroot/starpu/starpu

Nathalie Furmento 7 年之前
父節點
當前提交
3784ae2daf
共有 6 個文件被更改,包括 109 次插入32 次删除
  1. 10 14
      mpi/src/mpi/starpu_mpi_mpi.c
  2. 9 13
      mpi/src/nmad/starpu_mpi_nmad.c
  3. 2 2
      mpi/src/starpu_mpi_init.c
  4. 2 2
      mpi/src/starpu_mpi_private.h
  5. 2 1
      mpi/tests/Makefile.am
  6. 84 0
      mpi/tests/broadcast.c

+ 10 - 14
mpi/src/mpi/starpu_mpi_mpi.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2012-2013,2016-2017                      Inria
- * Copyright (C) 2009-2017                                Université de Bordeaux
+ * Copyright (C) 2009-2018                                Université de Bordeaux
  * Copyright (C) 2017                                     Guillaume Beauchamp
  * Copyright (C) 2010-2017                                CNRS
  *
@@ -1270,23 +1270,19 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	starpu_pthread_queue_register(&wait, &dontsleep);
 #endif
 
+#ifdef STARPU_USE_FXT
+	_starpu_fxt_wait_initialisation();
+	/* We need to record our ID in the trace before the main thread makes any MPI call */
+	_STARPU_MPI_TRACE_START(argc_argv->rank, argc_argv->world_size);
+	starpu_profiling_set_id(argc_argv->rank);
+#endif //STARPU_USE_FXT
+
 	/* notify the main thread that the progression thread is ready */
 	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 	running = 1;
 	STARPU_PTHREAD_COND_SIGNAL(&progress_cond);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 
-#ifdef STARPU_USE_FXT
-	_starpu_fxt_wait_initialisation();
-#endif //STARPU_USE_FXT
-
-	{
-		_STARPU_MPI_TRACE_START(argc_argv->rank, argc_argv->world_size);
-#ifdef STARPU_USE_FXT
-		starpu_profiling_set_id(argc_argv->rank);
-#endif //STARPU_USE_FXT
-	}
-
 	_starpu_mpi_add_sync_point_in_fxt();
 	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 
@@ -1619,7 +1615,7 @@ void _starpu_mpi_wait_for_initialization()
 }
 #endif
 
-void _starpu_mpi_progress_shutdown(uintptr_t value)
+void _starpu_mpi_progress_shutdown(void **value)
 {
         STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
         running = 0;
@@ -1635,7 +1631,7 @@ void _starpu_mpi_progress_shutdown(uintptr_t value)
 	(void) value;
 	MSG_process_sleep(1);
 #else
-	STARPU_PTHREAD_JOIN(progress_thread, (void *)value);
+	STARPU_PTHREAD_JOIN(progress_thread, value);
 #endif
 
         STARPU_PTHREAD_MUTEX_DESTROY(&mutex_posted_requests);

+ 9 - 13
mpi/src/nmad/starpu_mpi_nmad.c

@@ -3,7 +3,7 @@
  * Copyright (C) 2017                                     Inria
  * Copyright (C) 2017                                     Guillaume Beauchamp
  * Copyright (C) 2010-2015,2017                           CNRS
- * Copyright (C) 2009-2014,2017                           Université de Bordeaux
+ * Copyright (C) 2009-2014,2017-2018                      Université de Bordeaux
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -615,23 +615,19 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	_starpu_mpi_select_node_init();
 	_starpu_mpi_datatype_init();
 
+#ifdef STARPU_USE_FXT
+	_starpu_fxt_wait_initialisation();
+	/* We need to record our ID in the trace before the main thread makes any MPI call */
+	_STARPU_MPI_TRACE_START(argc_argv->rank, argc_argv->world_size);
+	starpu_profiling_set_id(argc_argv->rank);
+#endif //STARPU_USE_FXT
+
 	/* notify the main thread that the progression thread is ready */
 	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 	running = 1;
 	STARPU_PTHREAD_COND_SIGNAL(&progress_cond);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 
-#ifdef STARPU_USE_FXT
-	_starpu_fxt_wait_initialisation();
-#endif //STARPU_USE_FXT
-
-	{
-		_STARPU_MPI_TRACE_START(argc_argv->rank, argc_argv->world_size);
-#ifdef STARPU_USE_FXT
-		starpu_profiling_set_id(argc_argv->rank);
-#endif //STARPU_USE_FXT
-	}
-
 	_starpu_mpi_add_sync_point_in_fxt();
 
 	while (1)
@@ -757,7 +753,7 @@ int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv)
         return 0;
 }
 
-void _starpu_mpi_progress_shutdown(uintptr_t value)
+void _starpu_mpi_progress_shutdown(void *value)
 {
 	/* kill the progression thread */
         STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);

+ 2 - 2
mpi/src/starpu_mpi_init.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2010-2017                                CNRS
- * Copyright (C) 2009-2017                                Université de Bordeaux
+ * Copyright (C) 2009-2018                                Université de Bordeaux
  * Copyright (C) 2016                                     Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -178,7 +178,7 @@ int starpu_mpi_initialize_extended(int *rank, int *world_size)
 
 int starpu_mpi_shutdown(void)
 {
-	uintptr_t value;
+	void *value;
 	int rank, world_size;
 
 	/* We need to get the rank before calling MPI_Finalize to pass to _starpu_mpi_comm_amounts_display() */

+ 2 - 2
mpi/src/starpu_mpi_private.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2017                                Université de Bordeaux
+ * Copyright (C) 2010-2018                                Université de Bordeaux
  * Copyright (C) 2010-2017                                CNRS
  * Copyright (C) 2016-2017                                Inria
  *
@@ -320,7 +320,7 @@ struct _starpu_mpi_argc_argv
 	int world_size;
 };
 
-void _starpu_mpi_progress_shutdown(uintptr_t value);
+void _starpu_mpi_progress_shutdown(void **value);
 int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv);
 #ifdef STARPU_SIMGRID
 void _starpu_mpi_wait_for_initialization();

+ 2 - 1
mpi/tests/Makefile.am

@@ -1,7 +1,7 @@
 # StarPU --- Runtime system for heterogeneous multicore architectures.
 #
 # Copyright (C) 2010-2017                                CNRS
-# Copyright (C) 2009-2017                                Université de Bordeaux
+# Copyright (C) 2009-2018                                Université de Bordeaux
 # Copyright (C) 2013                                     Thibaut Lambert
 #
 # StarPU is free software; you can redistribute it and/or modify
@@ -104,6 +104,7 @@ if BUILD_TESTS
 starpu_mpi_TESTS =
 
 starpu_mpi_TESTS +=				\
+	broadcast				\
 	cache					\
 	cache_disable				\
 	callback				\

+ 84 - 0
mpi/tests/broadcast.c

@@ -0,0 +1,84 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013,2015,2017                           CNRS
+ * Copyright (C) 2014-2015,2017-2018                      Université de Bordeaux
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu_mpi.h>
+#include "helper.h"
+
+void wait_CPU(void *descr[], void *_args)
+{
+	(void)_args;
+	int *var = (int*) STARPU_VARIABLE_GET_PTR(descr[0]);
+	*var = 42;
+	starpu_sleep(1);
+}
+
+static struct starpu_codelet cl =
+{
+	.cpu_funcs = { wait_CPU },
+	.cpu_funcs_name = { "wait_CPU" },
+	.nbuffers = 1,
+	.flags = STARPU_CODELET_SIMGRID_EXECUTE,
+	.modes = { STARPU_W },
+};
+
+int main(int argc, char **argv)
+{
+	int ret, rank, size;
+	starpu_data_handle_t handle;
+	int var;
+	int mpi_init;
+	MPI_Status status;
+
+	MPI_INIT_THREAD(&argc, &argv, MPI_THREAD_SERIALIZED, &mpi_init);
+
+	ret = starpu_init(NULL);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+	ret = starpu_mpi_init(&argc, &argv, mpi_init);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init");
+
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
+
+	starpu_variable_data_register(&handle, STARPU_MAIN_RAM, (uintptr_t)&var, sizeof(var));
+
+	if (rank == 0)
+	{
+		starpu_task_insert(&cl, STARPU_W, handle, 0);
+
+		int n;
+		for(n = 1 ; n < size ; n++)
+		{
+			FPRINTF_MPI(stderr, "sending data to %d\n", n);
+			starpu_mpi_isend_detached(handle, n, 0, MPI_COMM_WORLD, NULL, NULL);
+		}
+	}
+	else
+	{
+		starpu_mpi_recv(handle, 0, 0, MPI_COMM_WORLD, &status);
+		FPRINTF_MPI(stderr, "received data\n");
+	}
+
+	starpu_data_unregister(handle);
+	STARPU_ASSERT(var == 42);
+
+	starpu_mpi_shutdown();
+	starpu_shutdown();
+	if (!mpi_init)
+		MPI_Finalize();
+
+	return 0;
+}