Просмотр исходного кода

merge branches/mpi_engine@18351: mpi: code cleaning

Nathalie Furmento лет назад: 8
Родитель
Сommit
0867e93cb4

+ 0 - 7
configure.ac

@@ -334,13 +334,6 @@ fi
 
 AC_SUBST(CC_OR_MPICC, $cc_or_mpicc)
 
-AC_ARG_ENABLE(mpi-progression-hook, [AS_HELP_STRING([--enable-mpi-progression-hook],
-				   [Enable StarPU MPI activity polling method])],
-				   enable_mpi_progression_hook=$enableval, enable_mpi_progression_hook=no)
-if  test x$enable_mpi_progression_hook = xyes; then
-	AC_DEFINE(STARPU_MPI_ACTIVITY, [1], [enable StarPU MPI activity polling method])
-fi
-
 AC_ARG_ENABLE(mpi-pedantic-isend, [AS_HELP_STRING([--enable-mpi-pedantic-isend],
 				   [Enable StarPU MPI pedantic isend])],
 				   enable_mpi_pedantic_isend=$enableval, enable_mpi_pedantic_isend=no)

+ 0 - 8
doc/doxygen/chapters/510_configure_options.doxy

@@ -335,14 +335,6 @@ Use the compiler <c>mpicc</c> at <c>path</c>, for StarPU-MPI.
 (\ref MPISupport).
 </dd>
 
-<dt>--enable-mpi-progression-hook</dt>
-<dd>
-\anchor enable-mpi-progression-hook
-\addindex __configure__--enable-mpi-progression-hook
-Enable the activity polling method for StarPU-MPI. This is however experimental,
-do not enable it unless you know what you are doing.
-</dd>
-
 <dt>--enable-mpi-pedantic-isend</dt>
 <dd>
 \anchor enable-mpi-pedantic-isend

+ 2 - 1
mpi/src/Makefile.am

@@ -70,6 +70,7 @@ noinst_HEADERS =					\
 	starpu_mpi_comm.h				\
 	starpu_mpi_tag.h				\
 	starpu_mpi_task_insert.h			\
+	starpu_mpi_init.h				\
 	load_balancer/policy/data_movements_interface.h	\
 	load_balancer/policy/load_data_interface.h	\
 	load_balancer/policy/load_balancer_policy.h
@@ -92,11 +93,11 @@ libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi_tag.c				\
 	starpu_mpi_fortran.c				\
 	starpu_mpi_task_insert_fortran.c		\
+	starpu_mpi_init.c				\
 	load_balancer/policy/data_movements_interface.c	\
 	load_balancer/policy/load_data_interface.c	\
 	load_balancer/policy/load_heat_propagation.c	\
 	load_balancer/load_balancer.c
 
-
 showcheck:
 	-cat /dev/null

+ 88 - 310
mpi/src/starpu_mpi.c

@@ -30,6 +30,7 @@
 #include <starpu_mpi_select_node.h>
 #include <starpu_mpi_tag.h>
 #include <starpu_mpi_comm.h>
+#include <starpu_mpi_init.h>
 #include <common/config.h>
 #include <common/thread.h>
 #include <datawizard/interfaces/data_interface.h>
@@ -64,18 +65,18 @@ static struct _starpu_mpi_req_list *detached_requests;
 static starpu_pthread_mutex_t detached_requests_mutex;
 
 /* Condition to wake up progression thread */
-static starpu_pthread_cond_t cond_progression;
+static starpu_pthread_cond_t progress_cond;
 /* Condition to wake up waiting for all current MPI requests to finish */
-static starpu_pthread_cond_t cond_finished;
-static starpu_pthread_mutex_t mutex;
+static starpu_pthread_cond_t barrier_cond;
+static starpu_pthread_mutex_t progress_mutex;
 #ifndef STARPU_SIMGRID
 static starpu_pthread_t progress_thread;
 #endif
 static int running = 0;
 
 #ifdef STARPU_SIMGRID
-static int _mpi_world_size;
-static int _mpi_world_rank;
+int _simgrid_mpi_world_size;
+int _simgrid_mpi_world_rank;
 
 static int wait_counter;
 static starpu_pthread_cond_t wait_counter_cond;
@@ -194,7 +195,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 
 	_STARPU_MPI_DEBUG(3, "new req %p srcdst %d tag %d and type %s %d\n", req, req->node_tag.rank, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->is_internal_req);
 
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 
 	if (req->request_type == RECV_REQ)
 	{
@@ -224,12 +225,12 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 			_starpu_mpi_req_list_push_front(ready_requests, req);
 
 			/* inform the starpu mpi thread that the request has been pushed in the ready_requests list */
-			STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 			STARPU_PTHREAD_MUTEX_LOCK(&req->posted_mutex);
 			req->posted = 1;
 			STARPU_PTHREAD_COND_BROADCAST(&req->posted_cond);
 			STARPU_PTHREAD_MUTEX_UNLOCK(&req->posted_mutex);
-			STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+			STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 		}
 		else
 		{
@@ -242,12 +243,12 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 			 * will be called to bring the data back to the original data handle associated to the request.*/
 			if (early_data_handle)
 			{
-				STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 				STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req_mutex));
 				while (!(early_data_handle->req_ready))
 					STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req_cond), &(early_data_handle->req_mutex));
 				STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req_mutex));
-				STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+				STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 
 				_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->node_tag.data_tag);
 				STARPU_ASSERT(req->data_handle != early_data_handle->handle);
@@ -263,9 +264,9 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 				cb_args->req = req;
 
 				_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
-				STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 				starpu_data_acquire_cb(early_data_handle->handle,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args);
-				STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+				STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 			}
 			/* Case: no matching data has been received. Store the receive request as an early_request. */
 			else
@@ -307,11 +308,11 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 	}
 
 	newer_requests = 1;
-	STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
+	STARPU_PTHREAD_COND_BROADCAST(&progress_cond);
 #ifdef STARPU_SIMGRID
 	starpu_pthread_queue_signal(&dontsleep);
 #endif
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 	_STARPU_MPI_LOG_OUT();
 }
 
@@ -879,11 +880,9 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 	if (submitted)
 	{
 		struct _starpu_mpi_req *testing_req;
-		_starpu_mpi_request_init(&testing_req);
 
 		/* Initialize the request structure */
-		STARPU_PTHREAD_MUTEX_INIT(&(testing_req->req_mutex), NULL);
-		STARPU_PTHREAD_COND_INIT(&(testing_req->req_cond), NULL);
+		_starpu_mpi_request_init(&testing_req);
 		testing_req->flag = flag;
 		testing_req->status = status;
 		testing_req->other_request = req;
@@ -949,35 +948,33 @@ int _starpu_mpi_barrier(MPI_Comm comm)
 
 	int ret = posted_requests;
 	struct _starpu_mpi_req *barrier_req;
-	_starpu_mpi_request_init(&barrier_req);
 
 	/* First wait for *both* all tasks and MPI requests to finish, in case
 	 * some tasks generate MPI requests, MPI requests generate tasks, etc.
 	 */
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 	STARPU_MPI_ASSERT_MSG(!barrier_running, "Concurrent starpu_mpi_barrier is not implemented, even on different communicators");
 	barrier_running = 1;
 	do
 	{
 		while (posted_requests)
 			/* Wait for all current MPI requests to finish */
-			STARPU_PTHREAD_COND_WAIT(&cond_finished, &mutex);
+			STARPU_PTHREAD_COND_WAIT(&barrier_cond, &progress_mutex);
 		/* No current request, clear flag */
 		newer_requests = 0;
-		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 		/* Now wait for all tasks */
 		starpu_task_wait_for_all();
-		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+		STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 		/* Check newer_requests again, in case some MPI requests
 		 * triggered by tasks completed and triggered tasks between
 		 * wait_for_all finished and we take the lock */
 	} while (posted_requests || newer_requests);
 	barrier_running = 0;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 
 	/* Initialize the request structure */
-	STARPU_PTHREAD_MUTEX_INIT(&(barrier_req->req_mutex), NULL);
-	STARPU_PTHREAD_COND_INIT(&(barrier_req->req_cond), NULL);
+	_starpu_mpi_request_init(&barrier_req);
 	barrier_req->func = _starpu_mpi_barrier_func;
 	barrier_req->request_type = BARRIER_REQ;
 	barrier_req->node_tag.comm = comm;
@@ -1157,28 +1154,6 @@ static void _starpu_mpi_early_data_cb(void* arg)
 	args = NULL;
 }
 
-#ifdef STARPU_MPI_ACTIVITY
-static unsigned _starpu_mpi_progression_hook_func(void *arg STARPU_ATTRIBUTE_UNUSED)
-{
-	unsigned may_block = 1;
-
-	STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
-	if (!_starpu_mpi_req_list_empty(detached_requests))
-	{
-		STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
-		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-		STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-		may_block = 0;
-	}
-	else
-		STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
-
-
-	return may_block;
-}
-#endif /* STARPU_MPI_ACTIVITY */
-
 static void _starpu_mpi_test_detached_requests(void)
 {
 	//_STARPU_MPI_LOG_IN();
@@ -1210,28 +1185,14 @@ static void _starpu_mpi_test_detached_requests(void)
 		     	struct _starpu_mpi_req *next_req;
 			next_req = _starpu_mpi_req_list_next(req);
 
-			if (req->request_type == RECV_REQ)
-			{
-				_STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
-			}
-			else if (req->request_type == SEND_REQ)
-			{
-				_STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(req->node_tag.rank, req->node_tag.data_tag, 0);
-			}
+			_STARPU_MPI_TRACE_COMPLETE_BEGIN(req->request_type, req->node_tag.rank, req->node_tag.data_tag);
 
 			STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
 			_starpu_mpi_req_list_erase(detached_requests, req);
 			STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
 			_starpu_mpi_handle_request_termination(req);
 
-			if (req->request_type == RECV_REQ)
-			{
-				_STARPU_MPI_TRACE_IRECV_COMPLETE_END(req->node_tag.rank, req->node_tag.data_tag);
-			}
-			else if (req->request_type == SEND_REQ)
-			{
-				_STARPU_MPI_TRACE_ISEND_COMPLETE_END(req->node_tag.rank, req->node_tag.data_tag, 0);
-			}
+			_STARPU_MPI_TRACE_COMPLETE_END(req->request_type, req->node_tag.rank, req->node_tag.data_tag);
 
 			if (req->is_internal_req == 0)
 			{
@@ -1260,9 +1221,9 @@ static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req)
 
 		starpu_wake_all_blocked_workers();
 
-		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-		STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+		STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
+		STARPU_PTHREAD_COND_SIGNAL(&progress_cond);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 	}
 }
 
@@ -1280,28 +1241,6 @@ static void _starpu_mpi_handle_ready_request(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_OUT();
 }
 
-static void _starpu_mpi_print_thread_level_support(int thread_level, char *msg)
-{
-	switch (thread_level)
-	{
-	case MPI_THREAD_SERIALIZED:
-	{
-		_STARPU_DISP("MPI%s MPI_THREAD_SERIALIZED; Multiple threads may make MPI calls, but only one at a time.\n", msg);
-		break;
-	}
-	case MPI_THREAD_FUNNELED:
-	{
-		_STARPU_DISP("MPI%s MPI_THREAD_FUNNELED; The application can safely make calls to StarPU-MPI functions, but should not call directly MPI communication functions.\n", msg);
-		break;
-	}
-	case MPI_THREAD_SINGLE:
-	{
-		_STARPU_DISP("MPI%s MPI_THREAD_SINGLE; MPI does not have multi-thread support, this might cause problems. The application can make calls to StarPU-MPI functions, but not call directly MPI Communication functions.\n", msg);
-		break;
-	}
-	}
-}
-
 static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope, MPI_Status status, MPI_Comm comm)
 {
 	_STARPU_MPI_DEBUG(20, "Request with tag %d and source %d not found, creating a early_data_handle to receive incoming data..\n", envelope->data_tag, status.MPI_SOURCE);
@@ -1311,9 +1250,9 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
 	_starpu_mpi_early_data_add(early_data_handle);
 
 	starpu_data_handle_t data_handle = NULL;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 	data_handle = _starpu_mpi_data_get_data_handle_from_tag(envelope->data_tag);
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 
 	if (data_handle && starpu_data_get_interface_id(data_handle) < STARPU_MAX_INTERFACE_ID)
 	{
@@ -1336,18 +1275,18 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
 
 	_STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_data_handle with tag %d from comm %ld src %d ..\n",
 			  early_data_handle->node_tag.data_tag, (long int)comm, status.MPI_SOURCE);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 	early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
 							  early_data_handle->node_tag.data_tag, comm, 1, 0,
 							  NULL, NULL, 1, 1, envelope->size);
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 
 	// We wait until the request is pushed in the
 	// ready_request list, that ensures that the next loop
 	// will call _starpu_mpi_handle_ready_request
 	// on the request and post the corresponding mpi_irecv,
 	// otherwise, it may lead to read data as envelop
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 	STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req->posted_mutex));
 	while (!(early_data_handle->req->posted))
 		STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req->posted_cond), &(early_data_handle->req->posted_mutex));
@@ -1357,27 +1296,7 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
 	early_data_handle->req_ready = 1;
 	STARPU_PTHREAD_COND_BROADCAST(&early_data_handle->req_cond);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_handle->req_mutex);
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-}
-
-static void _starpu_mpi_do_initialize(struct _starpu_mpi_argc_argv *argc_argv)
-{
-	if (argc_argv->initialize_mpi)
-	{
-		int thread_support;
-		_STARPU_DEBUG("Calling MPI_Init_thread\n");
-		if (MPI_Init_thread(argc_argv->argc, argc_argv->argv, MPI_THREAD_SERIALIZED, &thread_support) != MPI_SUCCESS)
-		{
-			_STARPU_ERROR("MPI_Init_thread failed\n");
-		}
-		_starpu_mpi_print_thread_level_support(thread_support, "_Init_thread level =");
-	}
-	else
-	{
-		int provided;
-		MPI_Query_thread(&provided);
-		_starpu_mpi_print_thread_level_support(provided, " has been initialized with");
-	}
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 }
 
 static void *_starpu_mpi_progress_thread_func(void *arg)
@@ -1395,8 +1314,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	MPI_Comm_size(argc_argv->comm, &worldsize);
 	MPI_Comm_set_errhandler(argc_argv->comm, MPI_ERRORS_RETURN);
 #ifdef STARPU_SIMGRID
-	_mpi_world_size = worldsize;
-	_mpi_world_rank = rank;
+	_simgrid_mpi_world_size = worldsize;
+	_simgrid_mpi_world_rank = rank;
 #endif
 	_starpu_mpi_fake_world_size = starpu_get_env_number("STARPU_MPI_FAKE_SIZE");
 	_starpu_mpi_fake_world_rank = starpu_get_env_number("STARPU_MPI_FAKE_RANK");
@@ -1442,10 +1361,10 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	_starpu_mpi_datatype_init();
 
 	/* notify the main thread that the progression thread is ready */
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 	running = 1;
-	STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	STARPU_PTHREAD_COND_SIGNAL(&progress_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 
 #ifdef STARPU_SIMGRID
 	starpu_pthread_wait_init(&wait);
@@ -1453,7 +1372,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	starpu_pthread_queue_register(&wait, &dontsleep);
 #endif
 
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 
  	int envelope_request_submitted = 0;
 
@@ -1479,8 +1398,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 			if (barrier_running)
 				/* Tell mpi_barrier */
-				STARPU_PTHREAD_COND_SIGNAL(&cond_finished);
-			STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
+				STARPU_PTHREAD_COND_SIGNAL(&barrier_cond);
+			STARPU_PTHREAD_COND_WAIT(&progress_cond, &progress_mutex);
 
 			_STARPU_MPI_TRACE_SLEEP_END();
 		}
@@ -1495,9 +1414,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 			 * (on a sync_data_with_mem call), we want to let the
 			 * application submit requests in the meantime, so we
 			 * release the lock. */
-			STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 			_starpu_mpi_handle_ready_request(req);
-			STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+			STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 		}
 
 		/* If there is no currently submitted envelope_request submitted to
@@ -1510,9 +1429,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		}
 
 		/* test whether there are some terminated "detached request" */
-		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 		_starpu_mpi_test_detached_requests();
-		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+		STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 
 		if (envelope_request_submitted == 1)
 		{
@@ -1532,9 +1451,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 					struct _starpu_mpi_req *_sync_req = _starpu_mpi_sync_data_find(envelope->data_tag, envelope_status.MPI_SOURCE, envelope_comm);
 					_STARPU_MPI_DEBUG(20, "Sending data with tag %d to node %d\n", _sync_req->node_tag.data_tag, envelope_status.MPI_SOURCE);
 					STARPU_MPI_ASSERT_MSG(envelope->data_tag == _sync_req->node_tag.data_tag, "Tag mismatch (envelope %d != req %d)\n", envelope->data_tag, _sync_req->node_tag.data_tag);
-					STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+					STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 					_starpu_mpi_isend_data_func(_sync_req);
-					STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+					STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 				}
 				else
 				{
@@ -1609,9 +1528,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 						 * (on a sync_data_with_mem call), we want to let the
 						 * application submit requests in the meantime, so we
 						 * release the lock. */
-						STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+						STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 						_starpu_mpi_handle_ready_request(early_request);
-						STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+						STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 					}
 				}
 				envelope_request_submitted = 0;
@@ -1622,9 +1541,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 			}
 		}
 #ifdef STARPU_SIMGRID
-		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 		starpu_pthread_wait_wait(&wait);
-		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+		STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 #endif
 	}
 
@@ -1662,7 +1581,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		MPI_Finalize();
 	}
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 
 	_starpu_mpi_sync_data_free();
 	_starpu_mpi_early_data_free();
@@ -1673,16 +1592,6 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	return NULL;
 }
 
-/********************************************************/
-/*                                                      */
-/*  (De)Initialization methods                          */
-/*                                                      */
-/********************************************************/
-
-#ifdef STARPU_MPI_ACTIVITY
-static int hookid = - 1;
-#endif /* STARPU_MPI_ACTIVITY */
-
 static void _starpu_mpi_add_sync_point_in_fxt(void)
 {
 #ifdef STARPU_USE_FXT
@@ -1717,160 +1626,75 @@ static void _starpu_mpi_add_sync_point_in_fxt(void)
 #endif
 }
 
-static
-int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi, MPI_Comm comm)
+int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv)
 {
-	struct _starpu_mpi_argc_argv *argc_argv;
-	_STARPU_MALLOC(argc_argv, sizeof(struct _starpu_mpi_argc_argv));
-	argc_argv->initialize_mpi = initialize_mpi;
-	argc_argv->argc = argc;
-	argc_argv->argv = argv;
-	argc_argv->comm = comm;
+        STARPU_PTHREAD_MUTEX_INIT(&progress_mutex, NULL);
+        STARPU_PTHREAD_COND_INIT(&progress_cond, NULL);
+        STARPU_PTHREAD_COND_INIT(&barrier_cond, NULL);
+        ready_requests = _starpu_mpi_req_list_new();
 
-#ifdef STARPU_SIMGRID
-	/* Call MPI_Init_thread as early as possible, to initialize simgrid
-	 * before working with mutexes etc. */
-	_starpu_mpi_do_initialize(argc_argv);
-#endif
-
-	STARPU_PTHREAD_MUTEX_INIT(&mutex, NULL);
-	STARPU_PTHREAD_COND_INIT(&cond_progression, NULL);
-	STARPU_PTHREAD_COND_INIT(&cond_finished, NULL);
-	ready_requests = _starpu_mpi_req_list_new();
+        STARPU_PTHREAD_MUTEX_INIT(&detached_requests_mutex, NULL);
+        detached_requests = _starpu_mpi_req_list_new();
 
-	STARPU_PTHREAD_MUTEX_INIT(&detached_requests_mutex, NULL);
-	detached_requests = _starpu_mpi_req_list_new();
-
-	STARPU_PTHREAD_MUTEX_INIT(&mutex_posted_requests, NULL);
-	_starpu_mpi_comm = starpu_getenv("STARPU_MPI_COMM") != NULL;
+        STARPU_PTHREAD_MUTEX_INIT(&mutex_posted_requests, NULL);
+        _starpu_mpi_comm = starpu_getenv("STARPU_MPI_COMM") != NULL;
 
 #ifdef STARPU_SIMGRID
 	STARPU_PTHREAD_MUTEX_INIT(&wait_counter_mutex, NULL);
 	STARPU_PTHREAD_COND_INIT(&wait_counter_cond, NULL);
 #endif
 
-#ifdef STARPU_MPI_ACTIVITY
-	hookid = starpu_progression_hook_register(_starpu_mpi_progression_hook_func, NULL);
-	STARPU_MPI_ASSERT_MSG(hookid >= 0, "starpu_progression_hook_register failed");
-#endif /* STARPU_MPI_ACTIVITY */
-
 #ifdef STARPU_SIMGRID
-	_starpu_mpi_progress_thread_func(argc_argv);
-	return 0;
+        _starpu_mpi_progress_thread_func(argc_argv);
+        return 0;
 #else
-	STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
+        STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
 
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-	while (!running)
-		STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+        STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
+        while (!running)
+                STARPU_PTHREAD_COND_WAIT(&progress_cond, &progress_mutex);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 
-	return 0;
+        return 0;
 #endif
 }
 
 #ifdef STARPU_SIMGRID
-/* This is called before application's main, to initialize SMPI before we can
- * create MSG processes to run application's main */
-int _starpu_mpi_simgrid_init(int argc, char *argv[])
-{
-	return _starpu_mpi_initialize(&argc, &argv, 1, MPI_COMM_WORLD);
-}
-#endif
-
-int starpu_mpi_init_comm(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU_ATTRIBUTE_UNUSED, int initialize_mpi STARPU_ATTRIBUTE_UNUSED, MPI_Comm comm STARPU_ATTRIBUTE_UNUSED)
+void _starpu_mpi_wait_for_initialization()
 {
-#ifdef STARPU_SIMGRID
 	/* Wait for MPI initialization to finish */
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 	while (!running)
-		STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-
-	return 0;
-#else
-	return _starpu_mpi_initialize(argc, argv, initialize_mpi, comm);
-#endif
+		STARPU_PTHREAD_COND_WAIT(&progress_cond, &progress_mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 }
-
-int starpu_mpi_init(int *argc, char ***argv, int initialize_mpi)
-{
-	return starpu_mpi_init_comm(argc, argv, initialize_mpi, MPI_COMM_WORLD);
-}
-
-int starpu_mpi_initialize(void)
-{
-#ifdef STARPU_SIMGRID
-	return 0;
-#else
-	return _starpu_mpi_initialize(NULL, NULL, 0, MPI_COMM_WORLD);
-#endif
-}
-
-int starpu_mpi_initialize_extended(int *rank, int *world_size)
-{
-#ifdef STARPU_SIMGRID
-	*world_size = _mpi_world_size;
-	*rank = _mpi_world_rank;
-	return 0;
-#else
-	int ret;
-
-	ret = _starpu_mpi_initialize(NULL, NULL, 1, MPI_COMM_WORLD);
-	if (ret == 0)
-	{
-		_STARPU_DEBUG("Calling MPI_Comm_rank\n");
-		MPI_Comm_rank(MPI_COMM_WORLD, rank);
-		MPI_Comm_size(MPI_COMM_WORLD, world_size);
-	}
-	return ret;
 #endif
-}
 
-int starpu_mpi_shutdown(void)
+void _starpu_mpi_progress_shutdown(int *value)
 {
-#ifndef STARPU_SIMGRID
-	void *value;
-#endif
-	int rank, world_size;
-
-	/* We need to get the rank before calling MPI_Finalize to pass to _starpu_mpi_comm_amounts_display() */
-	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
-	starpu_mpi_comm_size(MPI_COMM_WORLD, &world_size);
-
-	/* kill the progression thread */
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-	running = 0;
-	STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
+        STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
+        running = 0;
+        STARPU_PTHREAD_COND_BROADCAST(&progress_cond);
 #ifdef STARPU_SIMGRID
 	starpu_pthread_queue_signal(&dontsleep);
 #endif
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 
-#ifndef STARPU_SIMGRID
-	starpu_pthread_join(progress_thread, &value);
-#else
+#ifdef STARPU_SIMGRID
+	(void) value;
 	/* FIXME: should rather properly wait for _starpu_mpi_progress_thread_func to finish */
 	MSG_process_sleep(1);
+#else
+	starpu_pthread_join(progress_thread, (void *)value);
 #endif
 
-#ifdef STARPU_MPI_ACTIVITY
-	starpu_progression_hook_deregister(hookid);
-#endif /* STARPU_MPI_ACTIVITY */
-
-	_STARPU_MPI_TRACE_STOP(rank, world_size);
-
-	/* free the request queues */
-	_starpu_mpi_req_list_delete(detached_requests);
-	_starpu_mpi_req_list_delete(ready_requests);
-
-	_starpu_mpi_comm_amounts_display(stderr, rank);
-	_starpu_mpi_comm_amounts_free();
-	_starpu_mpi_cache_free(world_size);
-	_starpu_mpi_tag_free();
-	_starpu_mpi_comm_free();
+        /* free the request queues */
+        _starpu_mpi_req_list_delete(detached_requests);
+        _starpu_mpi_req_list_delete(ready_requests);
 
-	return 0;
+        STARPU_PTHREAD_MUTEX_DESTROY(&mutex_posted_requests);
+        STARPU_PTHREAD_MUTEX_DESTROY(&progress_mutex);
+        STARPU_PTHREAD_COND_DESTROY(&barrier_cond);
 }
 
 void _starpu_mpi_clear_cache(starpu_data_handle_t data_handle)
@@ -2033,52 +1857,6 @@ void starpu_mpi_data_migrate(MPI_Comm comm, starpu_data_handle_t data, int new_r
 	return;
 }
 
-int starpu_mpi_comm_size(MPI_Comm comm, int *size)
-{
-	if (_starpu_mpi_fake_world_size != -1)
-	{
-		*size = _starpu_mpi_fake_world_size;
-		return 0;
-	}
-#ifdef STARPU_SIMGRID
-	STARPU_MPI_ASSERT_MSG(comm == MPI_COMM_WORLD, "StarPU-SMPI only works with MPI_COMM_WORLD for now");
-	*size = _mpi_world_size;
-	return 0;
-#else
-	return MPI_Comm_size(comm, size);
-#endif
-}
-
-int starpu_mpi_comm_rank(MPI_Comm comm, int *rank)
-{
-	if (_starpu_mpi_fake_world_rank != -1)
-	{
-		*rank = _starpu_mpi_fake_world_rank;
-		return 0;
-	}
-#ifdef STARPU_SIMGRID
-	STARPU_MPI_ASSERT_MSG(comm == MPI_COMM_WORLD, "StarPU-SMPI only works with MPI_COMM_WORLD for now");
-	*rank = _mpi_world_rank;
-	return 0;
-#else
-	return MPI_Comm_rank(comm, rank);
-#endif
-}
-
-int starpu_mpi_world_size(void)
-{
-	int size;
-	starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
-	return size;
-}
-
-int starpu_mpi_world_rank(void)
-{
-	int rank;
-	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
-	return rank;
-}
-
 int starpu_mpi_wait_for_all(MPI_Comm comm)
 {
 	int mpi = 1;

+ 7 - 1
mpi/src/starpu_mpi_fxt.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2010  Université de Bordeaux
- * Copyright (C) 2010, 2012  CNRS
+ * Copyright (C) 2010, 2012, 2016  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -63,12 +63,16 @@ extern "C" {
 	FUT_DO_PROBE3(_STARPU_MPI_FUT_IRECV_SUBMIT_END, (src), (mpi_tag), _starpu_gettid());
 #define _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(dest, mpi_tag, size)	\
 	FUT_DO_PROBE4(_STARPU_MPI_FUT_ISEND_COMPLETE_BEGIN, (dest), (mpi_tag), (size), _starpu_gettid());
+#define _STARPU_MPI_TRACE_COMPLETE_BEGIN(type, rank, mpi_tag)		\
+	if (request_type == RECV_REQ) { _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN((rank), (mpi_tag)); } else if (request_type == SEND_REQ) { _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN((rank), (mpi_tag), 0); }
 #define _STARPU_MPI_TRACE_ISEND_COMPLETE_END(dest, mpi_tag, size)	\
 	FUT_DO_PROBE4(_STARPU_MPI_FUT_ISEND_COMPLETE_END, (dest), (mpi_tag), (size), _starpu_gettid());
 #define _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(src, mpi_tag)	\
 	FUT_DO_PROBE3(_STARPU_MPI_FUT_IRECV_COMPLETE_BEGIN, (src), (mpi_tag), _starpu_gettid());
 #define _STARPU_MPI_TRACE_IRECV_COMPLETE_END(src, mpi_tag)	\
 	FUT_DO_PROBE3(_STARPU_MPI_FUT_IRECV_COMPLETE_END, (src), (mpi_tag), _starpu_gettid());
+#define _STARPU_MPI_TRACE_COMPLETE_END(type, rank, mpi_tag)		\
+	if (request_type == RECV_REQ) { _STARPU_MPI_TRACE_IRECV_COMPLETE_END((rank), (mpi_tag)); } else if (request_type == SEND_REQ) { _STARPU_MPI_TRACE_ISEND_COMPLETE_END((rank), (mpi_tag), 0); }
 #define _STARPU_MPI_TRACE_SLEEP_BEGIN()	\
 	FUT_DO_PROBE1(_STARPU_MPI_FUT_SLEEP_BEGIN, _starpu_gettid());
 #define _STARPU_MPI_TRACE_SLEEP_END()	\
@@ -95,6 +99,8 @@ extern "C" {
 #define _STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(a, b)		do {} while(0);
 #define _STARPU_MPI_TRACE_IRECV_SUBMIT_END(a, b)		do {} while(0);
 #define _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(a, b, c)		do {} while(0);
+#define _STARPU_MPI_TRACE_COMPLETE_BEGIN(a, b, c)		do {} while(0);
+#define _STARPU_MPI_TRACE_COMPLETE_END(a, b, c)			do {} while(0);
 #define _STARPU_MPI_TRACE_ISEND_COMPLETE_END(a, b, c)		do {} while(0);
 #define _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(a, b)		do {} while(0);
 #define _STARPU_MPI_TRACE_IRECV_COMPLETE_END(a, b)		do {} while(0);

+ 222 - 0
mpi/src/starpu_mpi_init.c

@@ -0,0 +1,222 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2009, 2010-2016  Université de Bordeaux
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017  CNRS
+ * Copyright (C) 2016  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdlib.h>
+#include <starpu_mpi.h>
+#include <starpu_mpi_datatype.h>
+#include <starpu_mpi_private.h>
+#include <starpu_mpi_cache.h>
+#include <starpu_profiling.h>
+#include <starpu_mpi_stats.h>
+#include <starpu_mpi_cache.h>
+#include <starpu_mpi_sync_data.h>
+#include <starpu_mpi_early_data.h>
+#include <starpu_mpi_early_request.h>
+#include <starpu_mpi_select_node.h>
+#include <starpu_mpi_tag.h>
+#include <starpu_mpi_comm.h>
+#include <common/config.h>
+#include <common/thread.h>
+#include <datawizard/interfaces/data_interface.h>
+#include <datawizard/coherency.h>
+#include <core/simgrid.h>
+#include <core/task.h>
+
+static void _starpu_mpi_print_thread_level_support(int thread_level, char *msg)
+{
+	switch (thread_level)
+	{
+		case MPI_THREAD_SERIALIZED:
+		{
+			_STARPU_DISP("MPI%s MPI_THREAD_SERIALIZED; Multiple threads may make MPI calls, but only one at a time.\n", msg);
+			break;
+		}
+		case MPI_THREAD_FUNNELED:
+		{
+			_STARPU_DISP("MPI%s MPI_THREAD_FUNNELED; The application can safely make calls to StarPU-MPI functions, but should not call directly MPI communication functions.\n", msg);
+			break;
+		}
+		case MPI_THREAD_SINGLE:
+		{
+			_STARPU_DISP("MPI%s MPI_THREAD_SINGLE; MPI does not have multi-thread support, this might cause problems. The application can make calls to StarPU-MPI functions, but not call directly MPI Communication functions.\n", msg);
+			break;
+		}
+	}
+}
+
+void _starpu_mpi_do_initialize(struct _starpu_mpi_argc_argv *argc_argv)
+{
+	if (argc_argv->initialize_mpi)
+	{
+		int thread_support;
+		_STARPU_DEBUG("Calling MPI_Init_thread\n");
+		if (MPI_Init_thread(argc_argv->argc, argc_argv->argv, MPI_THREAD_SERIALIZED, &thread_support) != MPI_SUCCESS)
+		{
+			_STARPU_ERROR("MPI_Init_thread failed\n");
+		}
+		_starpu_mpi_print_thread_level_support(thread_support, "_Init_thread level =");
+	}
+	else
+	{
+		int provided;
+		MPI_Query_thread(&provided);
+		_starpu_mpi_print_thread_level_support(provided, " has been initialized with");
+	}
+}
+
+static
+int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi, MPI_Comm comm)
+{
+	struct _starpu_mpi_argc_argv *argc_argv;
+	_STARPU_MALLOC(argc_argv, sizeof(struct _starpu_mpi_argc_argv));
+	argc_argv->initialize_mpi = initialize_mpi;
+	argc_argv->argc = argc;
+	argc_argv->argv = argv;
+	argc_argv->comm = comm;
+
+#ifdef STARPU_SIMGRID
+	/* Call MPI_Init_thread as early as possible, to initialize simgrid
+	 * before working with mutexes etc. */
+	_starpu_mpi_do_initialize(argc_argv);
+#endif
+
+	return _starpu_mpi_progress_init(argc_argv);
+}
+
+#ifdef STARPU_SIMGRID
+/* This is called before application's main, to initialize SMPI before we can
+ * create MSG processes to run application's main */
+int _starpu_mpi_simgrid_init(int argc, char *argv[])
+{
+	return _starpu_mpi_initialize(&argc, &argv, 1, MPI_COMM_WORLD);
+}
+#endif
+
+int starpu_mpi_init_comm(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU_ATTRIBUTE_UNUSED, int initialize_mpi STARPU_ATTRIBUTE_UNUSED, MPI_Comm comm STARPU_ATTRIBUTE_UNUSED)
+{
+#ifdef STARPU_SIMGRID
+	_starpu_mpi_wait_for_initialization();
+	return 0;
+#else
+	return _starpu_mpi_initialize(argc, argv, initialize_mpi, comm);
+#endif
+}
+
+int starpu_mpi_init(int *argc, char ***argv, int initialize_mpi)
+{
+	return starpu_mpi_init_comm(argc, argv, initialize_mpi, MPI_COMM_WORLD);
+}
+
+int starpu_mpi_initialize(void)
+{
+#ifdef STARPU_SIMGRID
+	return 0;
+#else
+	return _starpu_mpi_initialize(NULL, NULL, 0, MPI_COMM_WORLD);
+#endif
+}
+
+int starpu_mpi_initialize_extended(int *rank, int *world_size)
+{
+#ifdef STARPU_SIMGRID
+	*world_size = _simgrid_mpi_world_size;
+	*rank = _simgrid_mpi_world_rank;
+	return 0;
+#else
+	int ret;
+
+	ret = _starpu_mpi_initialize(NULL, NULL, 1, MPI_COMM_WORLD);
+	if (ret == 0)
+	{
+		_STARPU_DEBUG("Calling MPI_Comm_rank\n");
+		MPI_Comm_rank(MPI_COMM_WORLD, rank);
+		MPI_Comm_size(MPI_COMM_WORLD, world_size);
+	}
+	return ret;
+#endif
+}
+
+int starpu_mpi_shutdown(void)
+{
+	int value;
+	int rank, world_size;
+
+	/* We need to get the rank before calling MPI_Finalize to pass to _starpu_mpi_comm_amounts_display() */
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &world_size);
+
+	/* kill the progression thread */
+	_starpu_mpi_progress_shutdown(&value);
+
+	_STARPU_MPI_TRACE_STOP(rank, world_size);
+
+	_starpu_mpi_comm_amounts_display(stderr, rank);
+	_starpu_mpi_comm_amounts_free();
+	_starpu_mpi_cache_free(world_size);
+	_starpu_mpi_tag_free();
+	_starpu_mpi_comm_free();
+
+	return 0;
+}
+
+int starpu_mpi_comm_size(MPI_Comm comm, int *size)
+{
+	if (_starpu_mpi_fake_world_size != -1)
+	{
+		*size = _starpu_mpi_fake_world_size;
+		return 0;
+	}
+#ifdef STARPU_SIMGRID
+	STARPU_MPI_ASSERT_MSG(comm == MPI_COMM_WORLD, "StarPU-SMPI only works with MPI_COMM_WORLD for now");
+	*size = _simgrid_mpi_world_size;
+	return 0;
+#else
+	return MPI_Comm_size(comm, size);
+#endif
+}
+
+int starpu_mpi_comm_rank(MPI_Comm comm, int *rank)
+{
+	if (_starpu_mpi_fake_world_rank != -1)
+	{
+		*rank = _starpu_mpi_fake_world_rank;
+		return 0;
+	}
+#ifdef STARPU_SIMGRID
+	STARPU_MPI_ASSERT_MSG(comm == MPI_COMM_WORLD, "StarPU-SMPI only works with MPI_COMM_WORLD for now");
+	*rank = _simgrid_mpi_world_rank;
+	return 0;
+#else
+	return MPI_Comm_rank(comm, rank);
+#endif
+}
+
+int starpu_mpi_world_size(void)
+{
+	int size;
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
+	return size;
+}
+
+int starpu_mpi_world_rank(void)
+{
+	int rank;
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
+	return rank;
+}
+

+ 34 - 0
mpi/src/starpu_mpi_init.h

@@ -0,0 +1,34 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010, 2012-2015  Université de Bordeaux
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016  CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_MPI_INIT_H__
+#define __STARPU_MPI_INIT_H__
+
+#include <starpu.h>
+#include <starpu_mpi.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void _starpu_mpi_do_initialize(struct _starpu_mpi_argc_argv *argc_argv);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // __STARPU_MPI_INIT_H__

+ 15 - 6
mpi/src/starpu_mpi_private.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2010, 2012-2017  Université de Bordeaux
- * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016  CNRS
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -21,15 +21,15 @@
 #include <starpu.h>
 #include <common/config.h>
 #include <common/uthash.h>
-#include "starpu_mpi.h"
-#include "starpu_mpi_fxt.h"
+#include <starpu_mpi.h>
+#include <starpu_mpi_fxt.h>
 #include <common/list.h>
 #include <core/simgrid.h>
 
 #ifdef __cplusplus
 extern "C" {
 #endif
-	
+
 #ifdef STARPU_SIMGRID
 starpu_pthread_wait_t wait;
 starpu_pthread_queue_t dontsleep;
@@ -45,7 +45,7 @@ struct _starpu_simgrid_mpi_req
 int _starpu_mpi_simgrid_mpi_test(unsigned *done, int *flag);
 void _starpu_mpi_simgrid_wait_req(MPI_Request *request, 	MPI_Status *status, starpu_pthread_queue_t *queue, unsigned *done);
 #endif
-	
+
 extern int _starpu_debug_rank;
 char *_starpu_mpi_get_mpi_error_code(int code);
 extern int _starpu_mpi_comm;
@@ -58,6 +58,10 @@ void _starpu_mpi_set_debug_level_max(int level);
 #endif
 extern int _starpu_mpi_fake_world_size;
 extern int _starpu_mpi_fake_world_rank;
+#ifdef STARPU_SIMGRID
+extern int _simgrid_mpi_world_size;
+extern int _simgrid_mpi_world_rank;
+#endif
 
 #ifdef STARPU_NO_ASSERT
 #  define STARPU_MPI_ASSERT_MSG(x, msg, ...)	do { if (0) { (void) (x); }} while(0)
@@ -247,7 +251,7 @@ LIST_TYPE(_starpu_mpi_req,
 	starpu_pthread_queue_t queue;
 	unsigned done;
 #endif
-	  
+
 );
 
 struct _starpu_mpi_argc_argv
@@ -260,6 +264,11 @@ struct _starpu_mpi_argc_argv
 	char **fargv;	// Fortran argv
 };
 
+void _starpu_mpi_progress_shutdown(int *value);
+int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv);
+#ifdef STARPU_SIMGRID
+void _starpu_mpi_wait_for_initialization();
+#endif
 
 #ifdef __cplusplus
 }