Explorar o código

merge branches/mpi_engine@18361: mpi: more code cleaning

Nathalie Furmento %!s(int64=8) %!d(string=hai) anos
pai
achega
e33aad694a

+ 6 - 18
mpi/src/starpu_mpi.c

@@ -75,9 +75,6 @@ static starpu_pthread_t progress_thread;
 static int running = 0;
 
 #ifdef STARPU_SIMGRID
-int _simgrid_mpi_world_size;
-int _simgrid_mpi_world_rank;
-
 static int wait_counter;
 static starpu_pthread_cond_t wait_counter_cond;
 static starpu_pthread_mutex_t wait_counter_mutex;
@@ -1302,7 +1299,6 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
 static void *_starpu_mpi_progress_thread_func(void *arg)
 {
 	struct _starpu_mpi_argc_argv *argc_argv = (struct _starpu_mpi_argc_argv *) arg;
-	int rank, worldsize;
 
 	starpu_pthread_setname("MPI");
 
@@ -1310,13 +1306,6 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	_starpu_mpi_do_initialize(argc_argv);
 #endif
 
-	MPI_Comm_rank(argc_argv->comm, &rank);
-	MPI_Comm_size(argc_argv->comm, &worldsize);
-	MPI_Comm_set_errhandler(argc_argv->comm, MPI_ERRORS_RETURN);
-#ifdef STARPU_SIMGRID
-	_simgrid_mpi_world_size = worldsize;
-	_simgrid_mpi_world_rank = rank;
-#endif
 	_starpu_mpi_fake_world_size = starpu_get_env_number("STARPU_MPI_FAKE_SIZE");
 	_starpu_mpi_fake_world_rank = starpu_get_env_number("STARPU_MPI_FAKE_RANK");
 
@@ -1333,18 +1322,15 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	smpi_process_set_user_data(calloc(MAX_TSD + 1, sizeof(void*)));
 #endif
 #endif
+
 #ifdef STARPU_USE_FXT
-	/* Wait for FxT initialization before emitting FxT probes */
-	STARPU_PTHREAD_MUTEX_LOCK(&_starpu_fxt_started_mutex);
-	while (!_starpu_fxt_started)
-		STARPU_PTHREAD_COND_WAIT(&_starpu_fxt_started_cond, &_starpu_fxt_started_mutex);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_fxt_started_mutex);
+	_starpu_fxt_wait_initialisation();
 #endif //STARPU_USE_FXT
 
 	{
-		_STARPU_MPI_TRACE_START(rank, worldsize);
+		_STARPU_MPI_TRACE_START(argc_argv->rank, argc_argv->world_size);
 #ifdef STARPU_USE_FXT
-		starpu_profiling_set_id(rank);
+		starpu_profiling_set_id(argc_argv->rank);
 #endif //STARPU_USE_FXT
 	}
 
@@ -1675,6 +1661,7 @@ void _starpu_mpi_progress_shutdown(int *value)
         STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
         running = 0;
         STARPU_PTHREAD_COND_BROADCAST(&progress_cond);
+
 #ifdef STARPU_SIMGRID
 	starpu_pthread_queue_signal(&dontsleep);
 #endif
@@ -1683,6 +1670,7 @@ void _starpu_mpi_progress_shutdown(int *value)
 #ifdef STARPU_SIMGRID
 	(void) value;
 	/* FIXME: should rather properly wait for _starpu_mpi_progress_thread_func to finish */
+	(void) value;
 	MSG_process_sleep(1);
 #else
 	starpu_pthread_join(progress_thread, (void *)value);

+ 2 - 2
mpi/src/starpu_mpi_fxt.h

@@ -64,7 +64,7 @@ extern "C" {
 #define _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(dest, mpi_tag, size)	\
 	FUT_DO_PROBE4(_STARPU_MPI_FUT_ISEND_COMPLETE_BEGIN, (dest), (mpi_tag), (size), _starpu_gettid());
 #define _STARPU_MPI_TRACE_COMPLETE_BEGIN(type, rank, mpi_tag)		\
-	if (request_type == RECV_REQ) { _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN((rank), (mpi_tag)); } else if (request_type == SEND_REQ) { _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN((rank), (mpi_tag), 0); }
+	if (type == RECV_REQ) { _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN((rank), (mpi_tag)); } else if (type == SEND_REQ) { _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN((rank), (mpi_tag), 0); }
 #define _STARPU_MPI_TRACE_ISEND_COMPLETE_END(dest, mpi_tag, size)	\
 	FUT_DO_PROBE4(_STARPU_MPI_FUT_ISEND_COMPLETE_END, (dest), (mpi_tag), (size), _starpu_gettid());
 #define _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(src, mpi_tag)	\
@@ -72,7 +72,7 @@ extern "C" {
 #define _STARPU_MPI_TRACE_IRECV_COMPLETE_END(src, mpi_tag)	\
 	FUT_DO_PROBE3(_STARPU_MPI_FUT_IRECV_COMPLETE_END, (src), (mpi_tag), _starpu_gettid());
 #define _STARPU_MPI_TRACE_COMPLETE_END(type, rank, mpi_tag)		\
-	if (request_type == RECV_REQ) { _STARPU_MPI_TRACE_IRECV_COMPLETE_END((rank), (mpi_tag)); } else if (request_type == SEND_REQ) { _STARPU_MPI_TRACE_ISEND_COMPLETE_END((rank), (mpi_tag), 0); }
+	if (type == RECV_REQ) { _STARPU_MPI_TRACE_IRECV_COMPLETE_END((rank), (mpi_tag)); } else if (type == SEND_REQ) { _STARPU_MPI_TRACE_ISEND_COMPLETE_END((rank), (mpi_tag), 0); }
 #define _STARPU_MPI_TRACE_SLEEP_BEGIN()	\
 	FUT_DO_PROBE1(_STARPU_MPI_FUT_SLEEP_BEGIN, _starpu_gettid());
 #define _STARPU_MPI_TRACE_SLEEP_END()	\

+ 18 - 4
mpi/src/starpu_mpi_init.c

@@ -37,6 +37,11 @@
 #include <core/simgrid.h>
 #include <core/task.h>
 
+#ifdef STARPU_SIMGRID
+static int _mpi_world_size;
+static int _mpi_world_rank;
+#endif
+
 static void _starpu_mpi_print_thread_level_support(int thread_level, char *msg)
 {
 	switch (thread_level)
@@ -77,6 +82,15 @@ void _starpu_mpi_do_initialize(struct _starpu_mpi_argc_argv *argc_argv)
 		MPI_Query_thread(&provided);
 		_starpu_mpi_print_thread_level_support(provided, " has been initialized with");
 	}
+
+	MPI_Comm_rank(argc_argv->comm, &argc_argv->rank);
+	MPI_Comm_size(argc_argv->comm, &argc_argv->world_size);
+	MPI_Comm_set_errhandler(argc_argv->comm, MPI_ERRORS_RETURN);
+
+#ifdef STARPU_SIMGRID
+	_mpi_world_size = argc_argv->world_size;
+	_mpi_world_rank = argc_argv->rank;
+#endif
 }
 
 static
@@ -134,8 +148,8 @@ int starpu_mpi_initialize(void)
 int starpu_mpi_initialize_extended(int *rank, int *world_size)
 {
 #ifdef STARPU_SIMGRID
-	*world_size = _simgrid_mpi_world_size;
-	*rank = _simgrid_mpi_world_rank;
+	*world_size = _mpi_world_size;
+	*rank = _mpi_world_rank;
 	return 0;
 #else
 	int ret;
@@ -183,7 +197,7 @@ int starpu_mpi_comm_size(MPI_Comm comm, int *size)
 	}
 #ifdef STARPU_SIMGRID
 	STARPU_MPI_ASSERT_MSG(comm == MPI_COMM_WORLD, "StarPU-SMPI only works with MPI_COMM_WORLD for now");
-	*size = _simgrid_mpi_world_size;
+	*size = _mpi_world_size;
 	return 0;
 #else
 	return MPI_Comm_size(comm, size);
@@ -199,7 +213,7 @@ int starpu_mpi_comm_rank(MPI_Comm comm, int *rank)
 	}
 #ifdef STARPU_SIMGRID
 	STARPU_MPI_ASSERT_MSG(comm == MPI_COMM_WORLD, "StarPU-SMPI only works with MPI_COMM_WORLD for now");
-	*rank = _simgrid_mpi_world_rank;
+	*rank = _mpi_world_rank;
 	return 0;
 #else
 	return MPI_Comm_rank(comm, rank);

+ 2 - 4
mpi/src/starpu_mpi_private.h

@@ -58,10 +58,6 @@ void _starpu_mpi_set_debug_level_max(int level);
 #endif
 extern int _starpu_mpi_fake_world_size;
 extern int _starpu_mpi_fake_world_rank;
-#ifdef STARPU_SIMGRID
-extern int _simgrid_mpi_world_size;
-extern int _simgrid_mpi_world_rank;
-#endif
 
 #ifdef STARPU_NO_ASSERT
 #  define STARPU_MPI_ASSERT_MSG(x, msg, ...)	do { if (0) { (void) (x); }} while(0)
@@ -262,6 +258,8 @@ struct _starpu_mpi_argc_argv
 	MPI_Comm comm;
 	int fargc;	// Fortran argc
 	char **fargv;	// Fortran argv
+	int rank;
+	int world_size;
 };
 
 void _starpu_mpi_progress_shutdown(int *value);

+ 9 - 1
src/common/fxt.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2009-2017  Université de Bordeaux
- * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2016  CNRS
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2016, 2017  CNRS
  * Copyright (C) 2016  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -234,6 +234,14 @@ extern int _starpu_fxt_started;
 extern starpu_pthread_mutex_t _starpu_fxt_started_mutex;
 extern starpu_pthread_cond_t _starpu_fxt_started_cond;
 
+static inline void _starpu_fxt_wait_initialisation()
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&_starpu_fxt_started_mutex);
+	while (!_starpu_fxt_started)
+		STARPU_PTHREAD_COND_WAIT(&_starpu_fxt_started_cond, &_starpu_fxt_started_mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_fxt_started_mutex);
+}
+
 long _starpu_gettid(void);
 
 /* Initialize the FxT library. */