Nathalie Furmento пре 8 година
родитељ
комит
f0a5f15604
5 измењених фајлова са 34 додато и 24 уклоњено
  1. 7 21
      mpi/src/starpu_mpi.c
  2. 2 2
      mpi/src/starpu_mpi_fxt.h
  3. 14 0
      mpi/src/starpu_mpi_init.c
  4. 2 0
      mpi/src/starpu_mpi_private.h
  5. 9 1
      src/common/fxt.h

+ 7 - 21
mpi/src/starpu_mpi.c

@@ -74,11 +74,6 @@ static starpu_pthread_t progress_thread;
 #endif
 static int running = 0;
 
-#ifdef STARPU_SIMGRID
-static int _mpi_world_size;
-static int _mpi_world_rank;
-#endif
-
 /* Count requests posted by the application and not yet submitted to MPI */
 static starpu_pthread_mutex_t mutex_posted_requests;
 static int posted_requests = 0, newer_requests, barrier_running = 0;
@@ -1225,20 +1220,11 @@ static void _starpu_mpi_add_sync_point_in_fxt(void)
 static void *_starpu_mpi_progress_thread_func(void *arg)
 {
 	struct _starpu_mpi_argc_argv *argc_argv = (struct _starpu_mpi_argc_argv *) arg;
-	int rank, worldsize;
 
 #ifndef STARPU_SIMGRID
 	_starpu_mpi_do_initialize(argc_argv);
 #endif
 
-	MPI_Comm_rank(argc_argv->comm, &rank);
-	MPI_Comm_size(argc_argv->comm, &worldsize);
-	MPI_Comm_set_errhandler(argc_argv->comm, MPI_ERRORS_RETURN);
-#ifdef STARPU_SIMGRID
-	_mpi_world_size = worldsize;
-	_mpi_world_rank = rank;
-#endif
-
 #ifdef STARPU_SIMGRID
 	/* Now that MPI is set up, let the rest of simgrid get initialized */
 	char ** argv_cpy = malloc(*(argc_argv->argc) * sizeof(char*));
@@ -1251,17 +1237,15 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	smpi_process_set_user_data(calloc(MAX_TSD, sizeof(void*)));
 #endif
 #endif
+
 #ifdef STARPU_USE_FXT
-	STARPU_PTHREAD_MUTEX_LOCK(&_starpu_fxt_started_mutex);
-	while (!_starpu_fxt_started)
-		STARPU_PTHREAD_COND_WAIT(&_starpu_fxt_started_cond, &_starpu_fxt_started_mutex);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_fxt_started_mutex);
+	_starpu_fxt_wait_initialisation();
 #endif //STARPU_USE_FXT
 
 	{
-		_STARPU_MPI_TRACE_START(rank, worldsize);
+		_STARPU_MPI_TRACE_START(argc_argv->rank, argc_argv->world_size);
 #ifdef STARPU_USE_FXT
-		starpu_profiling_set_id(rank);
+		starpu_profiling_set_id(argc_argv->rank);
 #endif //STARPU_USE_FXT
 	}
 
@@ -1512,12 +1496,15 @@ void _starpu_mpi_progress_shutdown(int *value)
 	running = 0;
 	STARPU_PTHREAD_COND_BROADCAST(&progress_cond);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
+
 #ifdef STARPU_SIMGRID
 	/* FIXME: should rather properly wait for _starpu_mpi_progress_thread_func to finish */
+	(void) value;
 	MSG_process_sleep(1);
 #else
 	starpu_pthread_join(progress_thread, (void *)value);
 #endif
+
 	/* free the request queues */
 	_starpu_mpi_req_list_delete(detached_requests);
 	_starpu_mpi_req_list_delete(ready_requests);
@@ -1700,4 +1687,3 @@ int starpu_mpi_wait_for_all(MPI_Comm comm)
 	}
 	return 0;
 }
-

+ 2 - 2
mpi/src/starpu_mpi_fxt.h

@@ -64,7 +64,7 @@ extern "C" {
 #define _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(dest, mpi_tag, size)	\
 	FUT_DO_PROBE4(_STARPU_MPI_FUT_ISEND_COMPLETE_BEGIN, (dest), (mpi_tag), (size), _starpu_gettid());
 #define _STARPU_MPI_TRACE_COMPLETE_BEGIN(type, rank, mpi_tag)		\
-	if (request_type == RECV_REQ) { _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN((rank), (mpi_tag)); } else if (request_type == SEND_REQ) { _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN((rank), (mpi_tag), 0); }
+	if (type == RECV_REQ) { _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN((rank), (mpi_tag)); } else if (type == SEND_REQ) { _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN((rank), (mpi_tag), 0); }
 #define _STARPU_MPI_TRACE_ISEND_COMPLETE_END(dest, mpi_tag, size)	\
 	FUT_DO_PROBE4(_STARPU_MPI_FUT_ISEND_COMPLETE_END, (dest), (mpi_tag), (size), _starpu_gettid());
 #define _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(src, mpi_tag)	\
@@ -72,7 +72,7 @@ extern "C" {
 #define _STARPU_MPI_TRACE_IRECV_COMPLETE_END(src, mpi_tag)	\
 	FUT_DO_PROBE3(_STARPU_MPI_FUT_IRECV_COMPLETE_END, (src), (mpi_tag), _starpu_gettid());
 #define _STARPU_MPI_TRACE_COMPLETE_END(type, rank, mpi_tag)		\
-	if (request_type == RECV_REQ) { _STARPU_MPI_TRACE_IRECV_COMPLETE_END((rank), (mpi_tag)); } else if (request_type == SEND_REQ) { _STARPU_MPI_TRACE_ISEND_COMPLETE_END((rank), (mpi_tag), 0); }
+	if (type == RECV_REQ) { _STARPU_MPI_TRACE_IRECV_COMPLETE_END((rank), (mpi_tag)); } else if (type == SEND_REQ) { _STARPU_MPI_TRACE_ISEND_COMPLETE_END((rank), (mpi_tag), 0); }
 #define _STARPU_MPI_TRACE_SLEEP_BEGIN()	\
 	FUT_DO_PROBE1(_STARPU_MPI_FUT_SLEEP_BEGIN, _starpu_gettid());
 #define _STARPU_MPI_TRACE_SLEEP_END()	\

+ 14 - 0
mpi/src/starpu_mpi_init.c

@@ -37,6 +37,11 @@
 #include <core/simgrid.h>
 #include <core/task.h>
 
+#ifdef STARPU_SIMGRID
+static int _mpi_world_size;
+static int _mpi_world_rank;
+#endif
+
 static void _starpu_mpi_print_thread_level_support(int thread_level, char *msg)
 {
 	switch (thread_level)
@@ -77,6 +82,15 @@ void _starpu_mpi_do_initialize(struct _starpu_mpi_argc_argv *argc_argv)
 		MPI_Query_thread(&provided);
 		_starpu_mpi_print_thread_level_support(provided, " has been initialized with");
 	}
+
+	MPI_Comm_rank(argc_argv->comm, &argc_argv->rank);
+	MPI_Comm_size(argc_argv->comm, &argc_argv->world_size);
+	MPI_Comm_set_errhandler(argc_argv->comm, MPI_ERRORS_RETURN);
+
+#ifdef STARPU_SIMGRID
+	_mpi_world_size = argc_argv->world_size;
+	_mpi_world_rank = argc_argv->rank;
+#endif
 }
 
 static

+ 2 - 0
mpi/src/starpu_mpi_private.h

@@ -228,6 +228,8 @@ struct _starpu_mpi_argc_argv
 	MPI_Comm comm;
 	int fargc;	// Fortran argc
 	char **fargv;	// Fortran argv
+	int rank;
+	int world_size;
 };
 
 void _starpu_mpi_progress_shutdown(int *value);

+ 9 - 1
src/common/fxt.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2009-2016  Université de Bordeaux
- * Copyright (C) 2010, 2011, 2012, 2013, 2014  CNRS
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2016  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -227,6 +227,14 @@ extern int _starpu_fxt_started;
 extern starpu_pthread_mutex_t _starpu_fxt_started_mutex;
 extern starpu_pthread_cond_t _starpu_fxt_started_cond;
 
+static inline void _starpu_fxt_wait_initialisation()
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&_starpu_fxt_started_mutex);
+	while (!_starpu_fxt_started)
+		STARPU_PTHREAD_COND_WAIT(&_starpu_fxt_started_cond, &_starpu_fxt_started_mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_fxt_started_mutex);
+}
+
 long _starpu_gettid(void);
 
 /* Initialize the FxT library. */