소스 검색

merge trunk

Nathalie Furmento 8 년 전
부모
커밋
3f77945975
50개의 변경된 파일791개의 추가작업 그리고 625개의 파일을 삭제
  1. 0 7
      configure.ac
  2. 0 8
      doc/doxygen/chapters/510_configure_options.doxy
  3. 2 1
      mpi/src/Makefile.am
  4. 2 2
      mpi/src/load_balancer/policy/load_heat_propagation.c
  5. 103 343
      mpi/src/starpu_mpi.c
  6. 3 3
      mpi/src/starpu_mpi_cache.c
  7. 2 2
      mpi/src/starpu_mpi_cache.h
  8. 1 1
      mpi/src/starpu_mpi_cache_stats.c
  9. 2 2
      mpi/src/starpu_mpi_cache_stats.h
  10. 1 1
      mpi/src/starpu_mpi_comm.c
  11. 2 2
      mpi/src/starpu_mpi_comm.h
  12. 3 3
      mpi/src/starpu_mpi_datatype.c
  13. 3 3
      mpi/src/starpu_mpi_datatype.h
  14. 1 1
      mpi/src/starpu_mpi_early_data.c
  15. 1 1
      mpi/src/starpu_mpi_early_data.h
  16. 1 1
      mpi/src/starpu_mpi_early_request.c
  17. 2 2
      mpi/src/starpu_mpi_early_request.h
  18. 7 1
      mpi/src/starpu_mpi_fxt.h
  19. 236 0
      mpi/src/starpu_mpi_init.c
  20. 34 0
      mpi/src/starpu_mpi_init.h
  21. 13 6
      mpi/src/starpu_mpi_private.h
  22. 1 1
      mpi/src/starpu_mpi_stats.c
  23. 2 2
      mpi/src/starpu_mpi_stats.h
  24. 1 1
      mpi/src/starpu_mpi_sync_data.c
  25. 2 2
      mpi/src/starpu_mpi_sync_data.h
  26. 6 6
      mpi/src/starpu_mpi_tag.c
  27. 6 5
      mpi/src/starpu_mpi_tag.h
  28. 9 1
      src/common/fxt.h
  29. 237 106
      src/core/simgrid.c
  30. 6 3
      src/core/simgrid.h
  31. 1 1
      src/core/topology.c
  32. 10 13
      src/core/workers.c
  33. 2 9
      src/datawizard/copy_driver.c
  34. 2 3
      src/datawizard/copy_driver.h
  35. 8 8
      src/debug/traces/starpu_fxt.c
  36. 12 12
      src/debug/traces/starpu_fxt_mpi.c
  37. 10 10
      src/debug/traces/starpu_paje.c
  38. 1 1
      src/drivers/cpu/driver_cpu.c
  39. 2 13
      src/drivers/cuda/driver_cuda.c
  40. 4 1
      src/drivers/cuda/starpu_cublas.c
  41. 14 10
      src/drivers/driver_common/driver_common.c
  42. 2 2
      src/drivers/mic/driver_mic_common.c
  43. 2 2
      src/drivers/mic/driver_mic_common.h
  44. 3 3
      src/drivers/mic/driver_mic_source.c
  45. 1 1
      src/drivers/mic/driver_mic_source.h
  46. 1 13
      src/drivers/opencl/driver_opencl.c
  47. 11 1
      tests/Makefile.am
  48. 1 0
      tests/datawizard/interfaces/multiformat/advanced/multiformat_data_release.c
  49. 10 0
      tests/loader.c
  50. 5 5
      tools/cppcheck/suppressions.txt

+ 0 - 7
configure.ac

@@ -334,13 +334,6 @@ fi
 
 AC_SUBST(CC_OR_MPICC, $cc_or_mpicc)
 
-AC_ARG_ENABLE(mpi-progression-hook, [AS_HELP_STRING([--enable-mpi-progression-hook],
-				   [Enable StarPU MPI activity polling method])],
-				   enable_mpi_progression_hook=$enableval, enable_mpi_progression_hook=no)
-if  test x$enable_mpi_progression_hook = xyes; then
-	AC_DEFINE(STARPU_MPI_ACTIVITY, [1], [enable StarPU MPI activity polling method])
-fi
-
 AC_ARG_ENABLE(mpi-pedantic-isend, [AS_HELP_STRING([--enable-mpi-pedantic-isend],
 				   [Enable StarPU MPI pedantic isend])],
 				   enable_mpi_pedantic_isend=$enableval, enable_mpi_pedantic_isend=no)

+ 0 - 8
doc/doxygen/chapters/510_configure_options.doxy

@@ -343,14 +343,6 @@ Use the compiler <c>mpicc</c> at <c>path</c>, for StarPU-MPI.
 (\ref MPISupport).
 </dd>
 
-<dt>--enable-mpi-progression-hook</dt>
-<dd>
-\anchor enable-mpi-progression-hook
-\addindex __configure__--enable-mpi-progression-hook
-Enable the activity polling method for StarPU-MPI. This is however experimental,
-do not enable it unless you know what you are doing.
-</dd>
-
 <dt>--enable-mpi-pedantic-isend</dt>
 <dd>
 \anchor enable-mpi-pedantic-isend

+ 2 - 1
mpi/src/Makefile.am

@@ -70,6 +70,7 @@ noinst_HEADERS =					\
 	starpu_mpi_comm.h				\
 	starpu_mpi_tag.h				\
 	starpu_mpi_task_insert.h			\
+	starpu_mpi_init.h				\
 	load_balancer/policy/data_movements_interface.h	\
 	load_balancer/policy/load_data_interface.h	\
 	load_balancer/policy/load_balancer_policy.h
@@ -92,11 +93,11 @@ libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi_tag.c				\
 	starpu_mpi_fortran.c				\
 	starpu_mpi_task_insert_fortran.c		\
+	starpu_mpi_init.c				\
 	load_balancer/policy/data_movements_interface.c	\
 	load_balancer/policy/load_data_interface.c	\
 	load_balancer/policy/load_heat_propagation.c	\
 	load_balancer/load_balancer.c
 
-
 showcheck:
 	-cat /dev/null

+ 2 - 2
mpi/src/load_balancer/policy/load_heat_propagation.c

@@ -252,7 +252,7 @@ static void update_data_ranks()
 
 			for (j = 0; j < ndata_to_update; j++)
 			{
-				starpu_data_handle_t handle = _starpu_mpi_data_get_data_handle_from_tag((data_movements_get_tags_table(data_movements_handles[i]))[j]);
+				starpu_data_handle_t handle = _starpu_mpi_tag_get_data_handle_from_tag((data_movements_get_tags_table(data_movements_handles[i]))[j]);
 				STARPU_ASSERT(handle);
 				int dst_rank = (data_movements_get_ranks_table(data_movements_handles[i]))[j];
 
@@ -517,7 +517,7 @@ static void move_back_data()
 
 			for (j = 0; j < ndata_to_update; j++)
 			{
-				starpu_data_handle_t handle = _starpu_mpi_data_get_data_handle_from_tag((data_movements_get_tags_table(data_movements_handles[i]))[j]);
+				starpu_data_handle_t handle = _starpu_mpi_tag_get_data_handle_from_tag((data_movements_get_tags_table(data_movements_handles[i]))[j]);
 				STARPU_ASSERT(handle);
 
 				int dst_rank = (data_movements_get_ranks_table(data_movements_handles[i]))[j];

+ 103 - 343
mpi/src/starpu_mpi.c

@@ -30,6 +30,7 @@
 #include <starpu_mpi_select_node.h>
 #include <starpu_mpi_tag.h>
 #include <starpu_mpi_comm.h>
+#include <starpu_mpi_init.h>
 #include <common/config.h>
 #include <common/thread.h>
 #include <datawizard/interfaces/data_interface.h>
@@ -64,19 +65,16 @@ static struct _starpu_mpi_req_list *detached_requests;
 static starpu_pthread_mutex_t detached_requests_mutex;
 
 /* Condition to wake up progression thread */
-static starpu_pthread_cond_t cond_progression;
+static starpu_pthread_cond_t progress_cond;
 /* Condition to wake up waiting for all current MPI requests to finish */
-static starpu_pthread_cond_t cond_finished;
-static starpu_pthread_mutex_t mutex;
+static starpu_pthread_cond_t barrier_cond;
+static starpu_pthread_mutex_t progress_mutex;
 #ifndef STARPU_SIMGRID
 static starpu_pthread_t progress_thread;
 #endif
 static int running = 0;
 
 #ifdef STARPU_SIMGRID
-static int _mpi_world_size;
-static int _mpi_world_rank;
-
 static int wait_counter;
 static starpu_pthread_cond_t wait_counter_cond;
 static starpu_pthread_mutex_t wait_counter_mutex;
@@ -194,7 +192,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 
 	_STARPU_MPI_DEBUG(3, "new req %p srcdst %d tag %d and type %s %d\n", req, req->node_tag.rank, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->is_internal_req);
 
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 
 	if (req->request_type == RECV_REQ)
 	{
@@ -206,7 +204,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 		 * before the next submission of the envelope-catching request. */
 		if (req->is_internal_req)
 		{
-			_starpu_mpi_handle_allocate_datatype(req->data_handle, req);
+			_starpu_mpi_datatype_allocate(req->data_handle, req);
 			if (req->registered_datatype == 1)
 			{
 				req->count = 1;
@@ -224,12 +222,12 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 			_starpu_mpi_req_list_push_front(ready_requests, req);
 
 			/* inform the starpu mpi thread that the request has been pushed in the ready_requests list */
-			STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 			STARPU_PTHREAD_MUTEX_LOCK(&req->posted_mutex);
 			req->posted = 1;
 			STARPU_PTHREAD_COND_BROADCAST(&req->posted_cond);
 			STARPU_PTHREAD_MUTEX_UNLOCK(&req->posted_mutex);
-			STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+			STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 		}
 		else
 		{
@@ -242,12 +240,12 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 			 * will be called to bring the data back to the original data handle associated to the request.*/
 			if (early_data_handle)
 			{
-				STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 				STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req_mutex));
 				while (!(early_data_handle->req_ready))
 					STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req_cond), &(early_data_handle->req_mutex));
 				STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req_mutex));
-				STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+				STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 
 				_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->node_tag.data_tag);
 				STARPU_ASSERT(req->data_handle != early_data_handle->handle);
@@ -263,9 +261,9 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 				cb_args->req = req;
 
 				_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
-				STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 				starpu_data_acquire_cb(early_data_handle->handle,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args);
-				STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+				STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 			}
 			/* Case: no matching data has been received. Store the receive request as an early_request. */
 			else
@@ -275,7 +273,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 				if (sync_req)
 				{
 					req->sync = 1;
-					_starpu_mpi_handle_allocate_datatype(req->data_handle, req);
+					_starpu_mpi_datatype_allocate(req->data_handle, req);
 					if (req->registered_datatype == 1)
 					{
 						req->count = 1;
@@ -307,11 +305,11 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 	}
 
 	newer_requests = 1;
-	STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
+	STARPU_PTHREAD_COND_BROADCAST(&progress_cond);
 #ifdef STARPU_SIMGRID
 	starpu_pthread_queue_signal(&dontsleep);
 #endif
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 	_STARPU_MPI_LOG_OUT();
 }
 
@@ -378,6 +376,7 @@ int _starpu_mpi_simgrid_mpi_test(unsigned *done, int *flag)
 	}
 	return MPI_SUCCESS;
 }
+
 static void _starpu_mpi_simgrid_wait_req_func(void* arg)
 {
 	struct _starpu_simgrid_mpi_req *sim_req = arg;
@@ -400,6 +399,7 @@ static void _starpu_mpi_simgrid_wait_req_func(void* arg)
 		STARPU_PTHREAD_COND_SIGNAL(&wait_counter_cond);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&wait_counter_mutex);
 }
+
 void _starpu_mpi_simgrid_wait_req(MPI_Request *request, MPI_Status *status, starpu_pthread_queue_t *queue, unsigned *done)
 {
 	struct _starpu_simgrid_mpi_req *sim_req;
@@ -462,7 +462,7 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 
 static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 {
-	_starpu_mpi_handle_allocate_datatype(req->data_handle, req);
+	_starpu_mpi_datatype_allocate(req->data_handle, req);
 
 	_STARPU_MPI_CALLOC(req->envelope, 1,sizeof(struct _starpu_mpi_envelope));
 	req->envelope->mode = _STARPU_MPI_ENVELOPE_DATA;
@@ -879,11 +879,9 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 	if (submitted)
 	{
 		struct _starpu_mpi_req *testing_req;
-		_starpu_mpi_request_init(&testing_req);
 
 		/* Initialize the request structure */
-		STARPU_PTHREAD_MUTEX_INIT(&(testing_req->req_mutex), NULL);
-		STARPU_PTHREAD_COND_INIT(&(testing_req->req_cond), NULL);
+		_starpu_mpi_request_init(&testing_req);
 		testing_req->flag = flag;
 		testing_req->status = status;
 		testing_req->other_request = req;
@@ -949,35 +947,33 @@ int _starpu_mpi_barrier(MPI_Comm comm)
 
 	int ret = posted_requests;
 	struct _starpu_mpi_req *barrier_req;
-	_starpu_mpi_request_init(&barrier_req);
 
 	/* First wait for *both* all tasks and MPI requests to finish, in case
 	 * some tasks generate MPI requests, MPI requests generate tasks, etc.
 	 */
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 	STARPU_MPI_ASSERT_MSG(!barrier_running, "Concurrent starpu_mpi_barrier is not implemented, even on different communicators");
 	barrier_running = 1;
 	do
 	{
 		while (posted_requests)
 			/* Wait for all current MPI requests to finish */
-			STARPU_PTHREAD_COND_WAIT(&cond_finished, &mutex);
+			STARPU_PTHREAD_COND_WAIT(&barrier_cond, &progress_mutex);
 		/* No current request, clear flag */
 		newer_requests = 0;
-		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 		/* Now wait for all tasks */
 		starpu_task_wait_for_all();
-		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+		STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 		/* Check newer_requests again, in case some MPI requests
 		 * triggered by tasks completed and triggered tasks between
 		 * wait_for_all finished and we take the lock */
 	} while (posted_requests || newer_requests);
 	barrier_running = 0;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 
 	/* Initialize the request structure */
-	STARPU_PTHREAD_MUTEX_INIT(&(barrier_req->req_mutex), NULL);
-	STARPU_PTHREAD_COND_INIT(&(barrier_req->req_cond), NULL);
+	_starpu_mpi_request_init(&barrier_req);
 	barrier_req->func = _starpu_mpi_barrier_func;
 	barrier_req->request_type = BARRIER_REQ;
 	barrier_req->node_tag.comm = comm;
@@ -1064,7 +1060,7 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 			}
 			else
 			{
-				_starpu_mpi_handle_free_datatype(req->data_handle, &req->datatype);
+				_starpu_mpi_datatype_free(req->data_handle, &req->datatype);
 			}
 		}
 	}
@@ -1157,28 +1153,6 @@ static void _starpu_mpi_early_data_cb(void* arg)
 	args = NULL;
 }
 
-#ifdef STARPU_MPI_ACTIVITY
-static unsigned _starpu_mpi_progression_hook_func(void *arg STARPU_ATTRIBUTE_UNUSED)
-{
-	unsigned may_block = 1;
-
-	STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
-	if (!_starpu_mpi_req_list_empty(detached_requests))
-	{
-		STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
-		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-		STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-		may_block = 0;
-	}
-	else
-		STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
-
-
-	return may_block;
-}
-#endif /* STARPU_MPI_ACTIVITY */
-
 static void _starpu_mpi_test_detached_requests(void)
 {
 	//_STARPU_MPI_LOG_IN();
@@ -1210,28 +1184,14 @@ static void _starpu_mpi_test_detached_requests(void)
 		     	struct _starpu_mpi_req *next_req;
 			next_req = _starpu_mpi_req_list_next(req);
 
-			if (req->request_type == RECV_REQ)
-			{
-				_STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
-			}
-			else if (req->request_type == SEND_REQ)
-			{
-				_STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(req->node_tag.rank, req->node_tag.data_tag, 0);
-			}
+			_STARPU_MPI_TRACE_COMPLETE_BEGIN(req->request_type, req->node_tag.rank, req->node_tag.data_tag);
 
 			STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
 			_starpu_mpi_req_list_erase(detached_requests, req);
 			STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
 			_starpu_mpi_handle_request_termination(req);
 
-			if (req->request_type == RECV_REQ)
-			{
-				_STARPU_MPI_TRACE_IRECV_COMPLETE_END(req->node_tag.rank, req->node_tag.data_tag);
-			}
-			else if (req->request_type == SEND_REQ)
-			{
-				_STARPU_MPI_TRACE_ISEND_COMPLETE_END(req->node_tag.rank, req->node_tag.data_tag, 0);
-			}
+			_STARPU_MPI_TRACE_COMPLETE_END(req->request_type, req->node_tag.rank, req->node_tag.data_tag);
 
 			if (req->is_internal_req == 0)
 			{
@@ -1260,9 +1220,9 @@ static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req)
 
 		starpu_wake_all_blocked_workers();
 
-		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-		STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+		STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
+		STARPU_PTHREAD_COND_SIGNAL(&progress_cond);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 	}
 }
 
@@ -1280,28 +1240,6 @@ static void _starpu_mpi_handle_ready_request(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_OUT();
 }
 
-static void _starpu_mpi_print_thread_level_support(int thread_level, char *msg)
-{
-	switch (thread_level)
-	{
-	case MPI_THREAD_SERIALIZED:
-	{
-		_STARPU_DISP("MPI%s MPI_THREAD_SERIALIZED; Multiple threads may make MPI calls, but only one at a time.\n", msg);
-		break;
-	}
-	case MPI_THREAD_FUNNELED:
-	{
-		_STARPU_DISP("MPI%s MPI_THREAD_FUNNELED; The application can safely make calls to StarPU-MPI functions, but should not call directly MPI communication functions.\n", msg);
-		break;
-	}
-	case MPI_THREAD_SINGLE:
-	{
-		_STARPU_DISP("MPI%s MPI_THREAD_SINGLE; MPI does not have multi-thread support, this might cause problems. The application can make calls to StarPU-MPI functions, but not call directly MPI Communication functions.\n", msg);
-		break;
-	}
-	}
-}
-
 static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope, MPI_Status status, MPI_Comm comm)
 {
 	_STARPU_MPI_DEBUG(20, "Request with tag %d and source %d not found, creating a early_data_handle to receive incoming data..\n", envelope->data_tag, status.MPI_SOURCE);
@@ -1311,9 +1249,9 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
 	_starpu_mpi_early_data_add(early_data_handle);
 
 	starpu_data_handle_t data_handle = NULL;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-	data_handle = _starpu_mpi_data_get_data_handle_from_tag(envelope->data_tag);
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
+	data_handle = _starpu_mpi_tag_get_data_handle_from_tag(envelope->data_tag);
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 
 	if (data_handle && starpu_data_get_interface_id(data_handle) < STARPU_MAX_INTERFACE_ID)
 	{
@@ -1336,18 +1274,18 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
 
 	_STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_data_handle with tag %d from comm %ld src %d ..\n",
 			  early_data_handle->node_tag.data_tag, (long int)comm, status.MPI_SOURCE);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 	early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
 							  early_data_handle->node_tag.data_tag, comm, 1, 0,
 							  NULL, NULL, 1, 1, envelope->size);
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 
 	// We wait until the request is pushed in the
 	// ready_request list, that ensures that the next loop
 	// will call _starpu_mpi_handle_ready_request
 	// on the request and post the corresponding mpi_irecv,
 	// otherwise, it may lead to read data as envelop
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 	STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req->posted_mutex));
 	while (!(early_data_handle->req->posted))
 		STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req->posted_cond), &(early_data_handle->req->posted_mutex));
@@ -1357,33 +1295,12 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
 	early_data_handle->req_ready = 1;
 	STARPU_PTHREAD_COND_BROADCAST(&early_data_handle->req_cond);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_handle->req_mutex);
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-}
-
-static void _starpu_mpi_do_initialize(struct _starpu_mpi_argc_argv *argc_argv)
-{
-	if (argc_argv->initialize_mpi)
-	{
-		int thread_support;
-		_STARPU_DEBUG("Calling MPI_Init_thread\n");
-		if (MPI_Init_thread(argc_argv->argc, argc_argv->argv, MPI_THREAD_SERIALIZED, &thread_support) != MPI_SUCCESS)
-		{
-			_STARPU_ERROR("MPI_Init_thread failed\n");
-		}
-		_starpu_mpi_print_thread_level_support(thread_support, "_Init_thread level =");
-	}
-	else
-	{
-		int provided;
-		MPI_Query_thread(&provided);
-		_starpu_mpi_print_thread_level_support(provided, " has been initialized with");
-	}
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 }
 
 static void *_starpu_mpi_progress_thread_func(void *arg)
 {
 	struct _starpu_mpi_argc_argv *argc_argv = (struct _starpu_mpi_argc_argv *) arg;
-	int rank, worldsize;
 
 	starpu_pthread_setname("MPI");
 
@@ -1391,13 +1308,6 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	_starpu_mpi_do_initialize(argc_argv);
 #endif
 
-	MPI_Comm_rank(argc_argv->comm, &rank);
-	MPI_Comm_size(argc_argv->comm, &worldsize);
-	MPI_Comm_set_errhandler(argc_argv->comm, MPI_ERRORS_RETURN);
-#ifdef STARPU_SIMGRID
-	_mpi_world_size = worldsize;
-	_mpi_world_rank = rank;
-#endif
 	_starpu_mpi_fake_world_size = starpu_get_env_number("STARPU_MPI_FAKE_SIZE");
 	_starpu_mpi_fake_world_rank = starpu_get_env_number("STARPU_MPI_FAKE_RANK");
 
@@ -1414,18 +1324,15 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	smpi_process_set_user_data(calloc(MAX_TSD + 1, sizeof(void*)));
 #endif
 #endif
+
 #ifdef STARPU_USE_FXT
-	/* Wait for FxT initialization before emitting FxT probes */
-	STARPU_PTHREAD_MUTEX_LOCK(&_starpu_fxt_started_mutex);
-	while (!_starpu_fxt_started)
-		STARPU_PTHREAD_COND_WAIT(&_starpu_fxt_started_cond, &_starpu_fxt_started_mutex);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_fxt_started_mutex);
+	_starpu_fxt_wait_initialisation();
 #endif //STARPU_USE_FXT
 
 	{
-		_STARPU_MPI_TRACE_START(rank, worldsize);
+		_STARPU_MPI_TRACE_START(argc_argv->rank, argc_argv->world_size);
 #ifdef STARPU_USE_FXT
-		starpu_profiling_set_id(rank);
+		starpu_profiling_set_id(argc_argv->rank);
 #endif //STARPU_USE_FXT
 	}
 
@@ -1442,10 +1349,10 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	_starpu_mpi_datatype_init();
 
 	/* notify the main thread that the progression thread is ready */
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 	running = 1;
-	STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	STARPU_PTHREAD_COND_SIGNAL(&progress_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 
 #ifdef STARPU_SIMGRID
 	starpu_pthread_wait_init(&wait);
@@ -1453,7 +1360,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	starpu_pthread_queue_register(&wait, &dontsleep);
 #endif
 
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 
  	int envelope_request_submitted = 0;
 
@@ -1463,24 +1370,17 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		starpu_pthread_wait_reset(&wait);
 #endif
 		/* shall we block ? */
-		unsigned block = _starpu_mpi_req_list_empty(ready_requests) && _starpu_mpi_early_request_count() == 0 && _starpu_mpi_sync_data_count() == 0;
-
-#ifndef STARPU_MPI_ACTIVITY
-		STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
-		block = block && _starpu_mpi_req_list_empty(detached_requests);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
-#endif /* STARPU_MPI_ACTIVITY */
+		unsigned block = _starpu_mpi_req_list_empty(ready_requests) && _starpu_mpi_early_request_count() == 0 && _starpu_mpi_sync_data_count() == 0 && _starpu_mpi_req_list_empty(detached_requests);
 
 		if (block)
 		{
 			_STARPU_MPI_DEBUG(3, "NO MORE REQUESTS TO HANDLE\n");
-
 			_STARPU_MPI_TRACE_SLEEP_BEGIN();
 
 			if (barrier_running)
 				/* Tell mpi_barrier */
-				STARPU_PTHREAD_COND_SIGNAL(&cond_finished);
-			STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
+				STARPU_PTHREAD_COND_SIGNAL(&barrier_cond);
+			STARPU_PTHREAD_COND_WAIT(&progress_cond, &progress_mutex);
 
 			_STARPU_MPI_TRACE_SLEEP_END();
 		}
@@ -1495,9 +1395,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 			 * (on a sync_data_with_mem call), we want to let the
 			 * application submit requests in the meantime, so we
 			 * release the lock. */
-			STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 			_starpu_mpi_handle_ready_request(req);
-			STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+			STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 		}
 
 		/* If there is no currently submitted envelope_request submitted to
@@ -1510,9 +1410,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		}
 
 		/* test whether there are some terminated "detached request" */
-		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 		_starpu_mpi_test_detached_requests();
-		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+		STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 
 		if (envelope_request_submitted == 1)
 		{
@@ -1532,9 +1432,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 					struct _starpu_mpi_req *_sync_req = _starpu_mpi_sync_data_find(envelope->data_tag, envelope_status.MPI_SOURCE, envelope_comm);
 					_STARPU_MPI_DEBUG(20, "Sending data with tag %d to node %d\n", _sync_req->node_tag.data_tag, envelope_status.MPI_SOURCE);
 					STARPU_MPI_ASSERT_MSG(envelope->data_tag == _sync_req->node_tag.data_tag, "Tag mismatch (envelope %d != req %d)\n", envelope->data_tag, _sync_req->node_tag.data_tag);
-					STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+					STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 					_starpu_mpi_isend_data_func(_sync_req);
-					STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+					STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 				}
 				else
 				{
@@ -1589,7 +1489,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 						_STARPU_MPI_DEBUG(2000, "Request sync %d\n", envelope->sync);
 
 						early_request->sync = envelope->sync;
-						_starpu_mpi_handle_allocate_datatype(early_request->data_handle, early_request);
+						_starpu_mpi_datatype_allocate(early_request->data_handle, early_request);
 						if (early_request->registered_datatype == 1)
 						{
 							early_request->count = 1;
@@ -1609,9 +1509,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 						 * (on a sync_data_with_mem call), we want to let the
 						 * application submit requests in the meantime, so we
 						 * release the lock. */
-						STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+						STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 						_starpu_mpi_handle_ready_request(early_request);
-						STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+						STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 					}
 				}
 				envelope_request_submitted = 0;
@@ -1622,9 +1522,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 			}
 		}
 #ifdef STARPU_SIMGRID
-		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 		starpu_pthread_wait_wait(&wait);
-		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+		STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 #endif
 	}
 
@@ -1662,27 +1562,17 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		MPI_Finalize();
 	}
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 
-	_starpu_mpi_sync_data_free();
-	_starpu_mpi_early_data_free();
-	_starpu_mpi_early_request_free();
-	_starpu_mpi_datatype_free();
+	_starpu_mpi_sync_data_shutdown();
+	_starpu_mpi_early_data_shutdown();
+	_starpu_mpi_early_request_shutdown();
+	_starpu_mpi_datatype_shutdown();
 	free(argc_argv);
 
 	return NULL;
 }
 
-/********************************************************/
-/*                                                      */
-/*  (De)Initialization methods                          */
-/*                                                      */
-/********************************************************/
-
-#ifdef STARPU_MPI_ACTIVITY
-static int hookid = - 1;
-#endif /* STARPU_MPI_ACTIVITY */
-
 static void _starpu_mpi_add_sync_point_in_fxt(void)
 {
 #ifdef STARPU_USE_FXT
@@ -1717,165 +1607,81 @@ static void _starpu_mpi_add_sync_point_in_fxt(void)
 #endif
 }
 
-static
-int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi, MPI_Comm comm)
+int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv)
 {
-	struct _starpu_mpi_argc_argv *argc_argv;
-	_STARPU_MALLOC(argc_argv, sizeof(struct _starpu_mpi_argc_argv));
-	argc_argv->initialize_mpi = initialize_mpi;
-	argc_argv->argc = argc;
-	argc_argv->argv = argv;
-	argc_argv->comm = comm;
+        STARPU_PTHREAD_MUTEX_INIT(&progress_mutex, NULL);
+        STARPU_PTHREAD_COND_INIT(&progress_cond, NULL);
+        STARPU_PTHREAD_COND_INIT(&barrier_cond, NULL);
+        ready_requests = _starpu_mpi_req_list_new();
 
-#ifdef STARPU_SIMGRID
-	/* Call MPI_Init_thread as early as possible, to initialize simgrid
-	 * before working with mutexes etc. */
-	_starpu_mpi_do_initialize(argc_argv);
-#endif
-
-	STARPU_PTHREAD_MUTEX_INIT(&mutex, NULL);
-	STARPU_PTHREAD_COND_INIT(&cond_progression, NULL);
-	STARPU_PTHREAD_COND_INIT(&cond_finished, NULL);
-	ready_requests = _starpu_mpi_req_list_new();
+        STARPU_PTHREAD_MUTEX_INIT(&detached_requests_mutex, NULL);
+        detached_requests = _starpu_mpi_req_list_new();
 
-	STARPU_PTHREAD_MUTEX_INIT(&detached_requests_mutex, NULL);
-	detached_requests = _starpu_mpi_req_list_new();
-
-	STARPU_PTHREAD_MUTEX_INIT(&mutex_posted_requests, NULL);
-	_starpu_mpi_comm = starpu_getenv("STARPU_MPI_COMM") != NULL;
+        STARPU_PTHREAD_MUTEX_INIT(&mutex_posted_requests, NULL);
+        _starpu_mpi_comm = starpu_getenv("STARPU_MPI_COMM") != NULL;
 
 #ifdef STARPU_SIMGRID
 	STARPU_PTHREAD_MUTEX_INIT(&wait_counter_mutex, NULL);
 	STARPU_PTHREAD_COND_INIT(&wait_counter_cond, NULL);
 #endif
 
-#ifdef STARPU_MPI_ACTIVITY
-	hookid = starpu_progression_hook_register(_starpu_mpi_progression_hook_func, NULL);
-	STARPU_MPI_ASSERT_MSG(hookid >= 0, "starpu_progression_hook_register failed");
-#endif /* STARPU_MPI_ACTIVITY */
-
 #ifdef STARPU_SIMGRID
-	_starpu_mpi_progress_thread_func(argc_argv);
-	return 0;
+        _starpu_mpi_progress_thread_func(argc_argv);
+        return 0;
 #else
-	STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
+        STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
 
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-	while (!running)
-		STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+        STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
+        while (!running)
+                STARPU_PTHREAD_COND_WAIT(&progress_cond, &progress_mutex);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 
-	return 0;
+        return 0;
 #endif
 }
 
 #ifdef STARPU_SIMGRID
-/* This is called before application's main, to initialize SMPI before we can
- * create MSG processes to run application's main */
-int _starpu_mpi_simgrid_init(int argc, char *argv[])
-{
-	return _starpu_mpi_initialize(&argc, &argv, 1, MPI_COMM_WORLD);
-}
-#endif
-
-int starpu_mpi_init_comm(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU_ATTRIBUTE_UNUSED, int initialize_mpi STARPU_ATTRIBUTE_UNUSED, MPI_Comm comm STARPU_ATTRIBUTE_UNUSED)
+void _starpu_mpi_wait_for_initialization()
 {
-#ifdef STARPU_SIMGRID
 	/* Wait for MPI initialization to finish */
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 	while (!running)
-		STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-
-	return 0;
-#else
-	return _starpu_mpi_initialize(argc, argv, initialize_mpi, comm);
-#endif
+		STARPU_PTHREAD_COND_WAIT(&progress_cond, &progress_mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 }
-
-int starpu_mpi_init(int *argc, char ***argv, int initialize_mpi)
-{
-	return starpu_mpi_init_comm(argc, argv, initialize_mpi, MPI_COMM_WORLD);
-}
-
-int starpu_mpi_initialize(void)
-{
-#ifdef STARPU_SIMGRID
-	return 0;
-#else
-	return _starpu_mpi_initialize(NULL, NULL, 0, MPI_COMM_WORLD);
 #endif
-}
 
-int starpu_mpi_initialize_extended(int *rank, int *world_size)
+void _starpu_mpi_progress_shutdown(int *value)
 {
-#ifdef STARPU_SIMGRID
-	*world_size = _mpi_world_size;
-	*rank = _mpi_world_rank;
-	return 0;
-#else
-	int ret;
-
-	ret = _starpu_mpi_initialize(NULL, NULL, 1, MPI_COMM_WORLD);
-	if (ret == 0)
-	{
-		_STARPU_DEBUG("Calling MPI_Comm_rank\n");
-		MPI_Comm_rank(MPI_COMM_WORLD, rank);
-		MPI_Comm_size(MPI_COMM_WORLD, world_size);
-	}
-	return ret;
-#endif
-}
-
-int starpu_mpi_shutdown(void)
-{
-#ifndef STARPU_SIMGRID
-	void *value;
-#endif
-	int rank, world_size;
-
-	/* We need to get the rank before calling MPI_Finalize to pass to _starpu_mpi_comm_amounts_display() */
-	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
-	starpu_mpi_comm_size(MPI_COMM_WORLD, &world_size);
+        STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
+        running = 0;
+        STARPU_PTHREAD_COND_BROADCAST(&progress_cond);
 
-	/* kill the progression thread */
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-	running = 0;
-	STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
 #ifdef STARPU_SIMGRID
 	starpu_pthread_queue_signal(&dontsleep);
 #endif
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 
-#ifndef STARPU_SIMGRID
-	starpu_pthread_join(progress_thread, &value);
-#else
+#ifdef STARPU_SIMGRID
 	/* FIXME: should rather properly wait for _starpu_mpi_progress_thread_func to finish */
+	(void) value;
 	MSG_process_sleep(1);
+#else
+	starpu_pthread_join(progress_thread, (void *)value);
 #endif
 
-#ifdef STARPU_MPI_ACTIVITY
-	starpu_progression_hook_deregister(hookid);
-#endif /* STARPU_MPI_ACTIVITY */
-
-	_STARPU_MPI_TRACE_STOP(rank, world_size);
+        /* free the request queues */
+        _starpu_mpi_req_list_delete(detached_requests);
+        _starpu_mpi_req_list_delete(ready_requests);
 
-	/* free the request queues */
-	_starpu_mpi_req_list_delete(detached_requests);
-	_starpu_mpi_req_list_delete(ready_requests);
-
-	_starpu_mpi_comm_amounts_display(stderr, rank);
-	_starpu_mpi_comm_amounts_free();
-	_starpu_mpi_cache_free(world_size);
-	_starpu_mpi_tag_free();
-	_starpu_mpi_comm_free();
-
-	return 0;
+        STARPU_PTHREAD_MUTEX_DESTROY(&mutex_posted_requests);
+        STARPU_PTHREAD_MUTEX_DESTROY(&progress_mutex);
+        STARPU_PTHREAD_COND_DESTROY(&barrier_cond);
 }
 
 void _starpu_mpi_clear_cache(starpu_data_handle_t data_handle)
 {
-	_starpu_mpi_data_release_tag(data_handle);
+	_starpu_mpi_tag_data_release(data_handle);
 	struct _starpu_mpi_node_tag *mpi_data = data_handle->mpi_data;
 	_starpu_mpi_cache_flush(mpi_data->comm, data_handle);
 	free(data_handle->mpi_data);
@@ -1895,7 +1701,7 @@ void starpu_mpi_data_register_comm(starpu_data_handle_t data_handle, int tag, in
 		mpi_data->rank = -1;
 		mpi_data->comm = MPI_COMM_WORLD;
 		data_handle->mpi_data = mpi_data;
-		_starpu_mpi_data_register_tag(data_handle, tag);
+		_starpu_mpi_tag_data_register(data_handle, tag);
 		_starpu_data_set_unregister_hook(data_handle, _starpu_mpi_clear_cache);
 	}
 
@@ -2033,52 +1839,6 @@ void starpu_mpi_data_migrate(MPI_Comm comm, starpu_data_handle_t data, int new_r
 	return;
 }
 
-int starpu_mpi_comm_size(MPI_Comm comm, int *size)
-{
-	if (_starpu_mpi_fake_world_size != -1)
-	{
-		*size = _starpu_mpi_fake_world_size;
-		return 0;
-	}
-#ifdef STARPU_SIMGRID
-	STARPU_MPI_ASSERT_MSG(comm == MPI_COMM_WORLD, "StarPU-SMPI only works with MPI_COMM_WORLD for now");
-	*size = _mpi_world_size;
-	return 0;
-#else
-	return MPI_Comm_size(comm, size);
-#endif
-}
-
-int starpu_mpi_comm_rank(MPI_Comm comm, int *rank)
-{
-	if (_starpu_mpi_fake_world_rank != -1)
-	{
-		*rank = _starpu_mpi_fake_world_rank;
-		return 0;
-	}
-#ifdef STARPU_SIMGRID
-	STARPU_MPI_ASSERT_MSG(comm == MPI_COMM_WORLD, "StarPU-SMPI only works with MPI_COMM_WORLD for now");
-	*rank = _mpi_world_rank;
-	return 0;
-#else
-	return MPI_Comm_rank(comm, rank);
-#endif
-}
-
-int starpu_mpi_world_size(void)
-{
-	int size;
-	starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
-	return size;
-}
-
-int starpu_mpi_world_rank(void)
-{
-	int rank;
-	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
-	return rank;
-}
-
 int starpu_mpi_wait_for_all(MPI_Comm comm)
 {
 	int mpi = 1;

+ 3 - 3
mpi/src/starpu_mpi_cache.c

@@ -55,7 +55,7 @@ int starpu_mpi_cache_set(int enabled)
 		{
 			// We need to clean the cache
 			starpu_mpi_cache_flush_all_data(_starpu_cache_comm);
-			_starpu_mpi_cache_free(_starpu_cache_comm_size);
+			_starpu_mpi_cache_shutdown(_starpu_cache_comm_size);
 		}
 		_starpu_cache_enabled = 0;
 	}
@@ -129,7 +129,7 @@ void _starpu_mpi_cache_empty_tables(int world_size)
 	}
 }
 
-void _starpu_mpi_cache_free()
+void _starpu_mpi_cache_shutdown()
 {
 	int i;
 
@@ -147,7 +147,7 @@ void _starpu_mpi_cache_free()
 	free(_cache_sent_mutex);
 	free(_cache_received_mutex);
 
-	_starpu_mpi_cache_stats_free();
+	_starpu_mpi_cache_stats_shutdown();
 }
 
 void _starpu_mpi_cache_sent_data_clear(MPI_Comm comm, starpu_data_handle_t data)

+ 2 - 2
mpi/src/starpu_mpi_cache.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2011, 2012, 2013, 2014, 2015  CNRS
+ * Copyright (C) 2011, 2012, 2013, 2014, 2015, 2016  CNRS
  * Copyright (C) 2011-2014  Université de Bordeaux
  * Copyright (C) 2014 INRIA
  *
@@ -29,7 +29,7 @@ extern "C" {
 
 extern int _starpu_cache_enabled;
 void _starpu_mpi_cache_init(MPI_Comm comm);
-void _starpu_mpi_cache_free();
+void _starpu_mpi_cache_shutdown();
 
 /*
  * If the data is already available in the cache, return a pointer to the data

+ 1 - 1
mpi/src/starpu_mpi_cache_stats.c

@@ -41,7 +41,7 @@ void _starpu_mpi_cache_stats_init(MPI_Comm comm)
 	_STARPU_MPI_CALLOC(comm_cache_amount, world_size, sizeof(size_t));
 }
 
-void _starpu_mpi_cache_stats_free()
+void _starpu_mpi_cache_stats_shutdown()
 {
 	if (stats_enabled == 0) return;
 	free(comm_cache_amount);

+ 2 - 2
mpi/src/starpu_mpi_cache_stats.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2014, 2015  CNRS
+ * Copyright (C) 2014, 2015, 2016  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -26,7 +26,7 @@ extern "C" {
 #endif
 
 void _starpu_mpi_cache_stats_init(MPI_Comm comm);
-void _starpu_mpi_cache_stats_free();
+void _starpu_mpi_cache_stats_shutdown();
 
 void _starpu_mpi_cache_stats_update(unsigned dst, starpu_data_handle_t data_handle, int count);
 

+ 1 - 1
mpi/src/starpu_mpi_comm.c

@@ -61,7 +61,7 @@ void _starpu_mpi_comm_init(MPI_Comm comm)
 	_starpu_mpi_comm_register(comm);
 }
 
-void _starpu_mpi_comm_free()
+void _starpu_mpi_comm_shutdown()
 {
 	int i;
 	for(i=0 ; i<_starpu_mpi_comm_nb ; i++)

+ 2 - 2
mpi/src/starpu_mpi_comm.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2015  CNRS
+ * Copyright (C) 2015, 2016  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -26,7 +26,7 @@ extern "C" {
 #endif
 
 void _starpu_mpi_comm_init(MPI_Comm comm);
-void _starpu_mpi_comm_free();
+void _starpu_mpi_comm_shutdown();
 void _starpu_mpi_comm_register(MPI_Comm comm);
 void _starpu_mpi_comm_post_recv();
 int _starpu_mpi_comm_test_recv(MPI_Status *status, struct _starpu_mpi_envelope **envelope, MPI_Comm *comm);

+ 3 - 3
mpi/src/starpu_mpi_datatype.c

@@ -35,7 +35,7 @@ void _starpu_mpi_datatype_init(void)
 	STARPU_PTHREAD_MUTEX_INIT(&_starpu_mpi_datatype_funcs_table_mutex, NULL);
 }
 
-void _starpu_mpi_datatype_free(void)
+void _starpu_mpi_datatype_shutdown(void)
 {
 	STARPU_PTHREAD_MUTEX_DESTROY(&_starpu_mpi_datatype_funcs_table_mutex);
 }
@@ -155,7 +155,7 @@ static starpu_mpi_datatype_allocate_func_t handle_to_datatype_funcs[STARPU_MAX_I
 	[STARPU_MULTIFORMAT_INTERFACE_ID] = NULL,
 };
 
-void _starpu_mpi_handle_allocate_datatype(starpu_data_handle_t data_handle, struct _starpu_mpi_req *req)
+void _starpu_mpi_datatype_allocate(starpu_data_handle_t data_handle, struct _starpu_mpi_req *req)
 {
 	enum starpu_data_interface_id id = starpu_data_get_interface_id(data_handle);
 
@@ -243,7 +243,7 @@ static starpu_mpi_datatype_free_func_t handle_free_datatype_funcs[STARPU_MAX_INT
 	[STARPU_MULTIFORMAT_INTERFACE_ID] = NULL,
 };
 
-void _starpu_mpi_handle_free_datatype(starpu_data_handle_t data_handle, MPI_Datatype *datatype)
+void _starpu_mpi_datatype_free(starpu_data_handle_t data_handle, MPI_Datatype *datatype)
 {
 	enum starpu_data_interface_id id = starpu_data_get_interface_id(data_handle);
 

+ 3 - 3
mpi/src/starpu_mpi_datatype.h

@@ -26,10 +26,10 @@ extern "C" {
 #endif
 
 void _starpu_mpi_datatype_init(void);
-void _starpu_mpi_datatype_free(void);
+void _starpu_mpi_datatype_shutdown(void);
 
-void _starpu_mpi_handle_allocate_datatype(starpu_data_handle_t data_handle, struct _starpu_mpi_req *req);
-void _starpu_mpi_handle_free_datatype(starpu_data_handle_t data_handle, MPI_Datatype *datatype);
+void _starpu_mpi_datatype_allocate(starpu_data_handle_t data_handle, struct _starpu_mpi_req *req);
+void _starpu_mpi_datatype_free(starpu_data_handle_t data_handle, MPI_Datatype *datatype);
 
 #ifdef __cplusplus
 }

+ 1 - 1
mpi/src/starpu_mpi_early_data.c

@@ -45,7 +45,7 @@ void _starpu_mpi_early_data_check_termination(void)
 	STARPU_ASSERT_MSG(_starpu_mpi_early_data_handle_hashmap_count == 0, "Number of unexpected received messages left is not zero (but %d), did you forget to post a receive corresponding to a send?", _starpu_mpi_early_data_handle_hashmap_count);
 }
 
-void _starpu_mpi_early_data_free(void)
+void _starpu_mpi_early_data_shutdown(void)
 {
 	struct _starpu_mpi_early_data_handle_hashlist *current, *tmp;
 	HASH_ITER(hh, _starpu_mpi_early_data_handle_hashmap, current, tmp)

+ 1 - 1
mpi/src/starpu_mpi_early_data.h

@@ -42,7 +42,7 @@ LIST_TYPE(_starpu_mpi_early_data_handle,
 
 void _starpu_mpi_early_data_init(void);
 void _starpu_mpi_early_data_check_termination(void);
-void _starpu_mpi_early_data_free(void);
+void _starpu_mpi_early_data_shutdown(void);
 
 struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_create(struct _starpu_mpi_envelope *envelope, int source, MPI_Comm comm) STARPU_ATTRIBUTE_MALLOC;
 struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_find(struct _starpu_mpi_node_tag *node_tag);

+ 1 - 1
mpi/src/starpu_mpi_early_request.c

@@ -40,7 +40,7 @@ void _starpu_mpi_early_request_init()
 	STARPU_PTHREAD_MUTEX_INIT(&_starpu_mpi_early_request_mutex, NULL);
 }
 
-void _starpu_mpi_early_request_free()
+void _starpu_mpi_early_request_shutdown()
 {
 	struct _starpu_mpi_early_request_hashlist *entry, *tmp;
 	HASH_ITER(hh, _starpu_mpi_early_request_hash, entry, tmp)

+ 2 - 2
mpi/src/starpu_mpi_early_request.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2009, 2010-2014  Université de Bordeaux
- * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015  CNRS
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -29,7 +29,7 @@ extern "C" {
 #endif
 
 void _starpu_mpi_early_request_init(void);
-void _starpu_mpi_early_request_free(void);
+void _starpu_mpi_early_request_shutdown(void);
 int _starpu_mpi_early_request_count(void);
 void _starpu_mpi_early_request_check_termination(void);
 

+ 7 - 1
mpi/src/starpu_mpi_fxt.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2010  Université de Bordeaux
- * Copyright (C) 2010, 2012  CNRS
+ * Copyright (C) 2010, 2012, 2016  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -63,12 +63,16 @@ extern "C" {
 	FUT_DO_PROBE3(_STARPU_MPI_FUT_IRECV_SUBMIT_END, (src), (mpi_tag), _starpu_gettid());
 #define _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(dest, mpi_tag, size)	\
 	FUT_DO_PROBE4(_STARPU_MPI_FUT_ISEND_COMPLETE_BEGIN, (dest), (mpi_tag), (size), _starpu_gettid());
+#define _STARPU_MPI_TRACE_COMPLETE_BEGIN(type, rank, mpi_tag)		\
+	if (type == RECV_REQ) { _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN((rank), (mpi_tag)); } else if (type == SEND_REQ) { _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN((rank), (mpi_tag), 0); }
 #define _STARPU_MPI_TRACE_ISEND_COMPLETE_END(dest, mpi_tag, size)	\
 	FUT_DO_PROBE4(_STARPU_MPI_FUT_ISEND_COMPLETE_END, (dest), (mpi_tag), (size), _starpu_gettid());
 #define _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(src, mpi_tag)	\
 	FUT_DO_PROBE3(_STARPU_MPI_FUT_IRECV_COMPLETE_BEGIN, (src), (mpi_tag), _starpu_gettid());
 #define _STARPU_MPI_TRACE_IRECV_COMPLETE_END(src, mpi_tag)	\
 	FUT_DO_PROBE3(_STARPU_MPI_FUT_IRECV_COMPLETE_END, (src), (mpi_tag), _starpu_gettid());
+#define _STARPU_MPI_TRACE_COMPLETE_END(type, rank, mpi_tag)		\
+	if (type == RECV_REQ) { _STARPU_MPI_TRACE_IRECV_COMPLETE_END((rank), (mpi_tag)); } else if (type == SEND_REQ) { _STARPU_MPI_TRACE_ISEND_COMPLETE_END((rank), (mpi_tag), 0); }
 #define _STARPU_MPI_TRACE_SLEEP_BEGIN()	\
 	FUT_DO_PROBE1(_STARPU_MPI_FUT_SLEEP_BEGIN, _starpu_gettid());
 #define _STARPU_MPI_TRACE_SLEEP_END()	\
@@ -95,6 +99,8 @@ extern "C" {
 #define _STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(a, b)		do {} while(0);
 #define _STARPU_MPI_TRACE_IRECV_SUBMIT_END(a, b)		do {} while(0);
 #define _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(a, b, c)		do {} while(0);
+#define _STARPU_MPI_TRACE_COMPLETE_BEGIN(a, b, c)		do {} while(0);
+#define _STARPU_MPI_TRACE_COMPLETE_END(a, b, c)			do {} while(0);
 #define _STARPU_MPI_TRACE_ISEND_COMPLETE_END(a, b, c)		do {} while(0);
 #define _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(a, b)		do {} while(0);
 #define _STARPU_MPI_TRACE_IRECV_COMPLETE_END(a, b)		do {} while(0);

+ 236 - 0
mpi/src/starpu_mpi_init.c

@@ -0,0 +1,236 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2009, 2010-2016  Université de Bordeaux
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017  CNRS
+ * Copyright (C) 2016  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdlib.h>
+#include <starpu_mpi.h>
+#include <starpu_mpi_datatype.h>
+#include <starpu_mpi_private.h>
+#include <starpu_mpi_cache.h>
+#include <starpu_profiling.h>
+#include <starpu_mpi_stats.h>
+#include <starpu_mpi_cache.h>
+#include <starpu_mpi_sync_data.h>
+#include <starpu_mpi_early_data.h>
+#include <starpu_mpi_early_request.h>
+#include <starpu_mpi_select_node.h>
+#include <starpu_mpi_tag.h>
+#include <starpu_mpi_comm.h>
+#include <common/config.h>
+#include <common/thread.h>
+#include <datawizard/interfaces/data_interface.h>
+#include <datawizard/coherency.h>
+#include <core/simgrid.h>
+#include <core/task.h>
+
+#ifdef STARPU_SIMGRID
+static int _mpi_world_size;
+static int _mpi_world_rank;
+#endif
+
+static void _starpu_mpi_print_thread_level_support(int thread_level, char *msg)
+{
+	switch (thread_level)
+	{
+		case MPI_THREAD_SERIALIZED:
+		{
+			_STARPU_DISP("MPI%s MPI_THREAD_SERIALIZED; Multiple threads may make MPI calls, but only one at a time.\n", msg);
+			break;
+		}
+		case MPI_THREAD_FUNNELED:
+		{
+			_STARPU_DISP("MPI%s MPI_THREAD_FUNNELED; The application can safely make calls to StarPU-MPI functions, but should not call directly MPI communication functions.\n", msg);
+			break;
+		}
+		case MPI_THREAD_SINGLE:
+		{
+			_STARPU_DISP("MPI%s MPI_THREAD_SINGLE; MPI does not have multi-thread support, this might cause problems. The application can make calls to StarPU-MPI functions, but not call directly MPI Communication functions.\n", msg);
+			break;
+		}
+	}
+}
+
+void _starpu_mpi_do_initialize(struct _starpu_mpi_argc_argv *argc_argv)
+{
+	if (argc_argv->initialize_mpi)
+	{
+		int thread_support;
+		_STARPU_DEBUG("Calling MPI_Init_thread\n");
+		if (MPI_Init_thread(argc_argv->argc, argc_argv->argv, MPI_THREAD_SERIALIZED, &thread_support) != MPI_SUCCESS)
+		{
+			_STARPU_ERROR("MPI_Init_thread failed\n");
+		}
+		_starpu_mpi_print_thread_level_support(thread_support, "_Init_thread level =");
+	}
+	else
+	{
+		int provided;
+		MPI_Query_thread(&provided);
+		_starpu_mpi_print_thread_level_support(provided, " has been initialized with");
+	}
+
+	MPI_Comm_rank(argc_argv->comm, &argc_argv->rank);
+	MPI_Comm_size(argc_argv->comm, &argc_argv->world_size);
+	MPI_Comm_set_errhandler(argc_argv->comm, MPI_ERRORS_RETURN);
+
+#ifdef STARPU_SIMGRID
+	_mpi_world_size = argc_argv->world_size;
+	_mpi_world_rank = argc_argv->rank;
+#endif
+}
+
+static
+int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi, MPI_Comm comm)
+{
+	struct _starpu_mpi_argc_argv *argc_argv;
+	_STARPU_MALLOC(argc_argv, sizeof(struct _starpu_mpi_argc_argv));
+	argc_argv->initialize_mpi = initialize_mpi;
+	argc_argv->argc = argc;
+	argc_argv->argv = argv;
+	argc_argv->comm = comm;
+
+#ifdef STARPU_SIMGRID
+	/* Call MPI_Init_thread as early as possible, to initialize simgrid
+	 * before working with mutexes etc. */
+	_starpu_mpi_do_initialize(argc_argv);
+#endif
+
+	return _starpu_mpi_progress_init(argc_argv);
+}
+
+#ifdef STARPU_SIMGRID
+/* This is called before application's main, to initialize SMPI before we can
+ * create MSG processes to run application's main */
+int _starpu_mpi_simgrid_init(int argc, char *argv[])
+{
+	return _starpu_mpi_initialize(&argc, &argv, 1, MPI_COMM_WORLD);
+}
+#endif
+
+int starpu_mpi_init_comm(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU_ATTRIBUTE_UNUSED, int initialize_mpi STARPU_ATTRIBUTE_UNUSED, MPI_Comm comm STARPU_ATTRIBUTE_UNUSED)
+{
+#ifdef STARPU_SIMGRID
+	_starpu_mpi_wait_for_initialization();
+	return 0;
+#else
+	return _starpu_mpi_initialize(argc, argv, initialize_mpi, comm);
+#endif
+}
+
+int starpu_mpi_init(int *argc, char ***argv, int initialize_mpi)
+{
+	return starpu_mpi_init_comm(argc, argv, initialize_mpi, MPI_COMM_WORLD);
+}
+
+int starpu_mpi_initialize(void)
+{
+#ifdef STARPU_SIMGRID
+	return 0;
+#else
+	return _starpu_mpi_initialize(NULL, NULL, 0, MPI_COMM_WORLD);
+#endif
+}
+
+int starpu_mpi_initialize_extended(int *rank, int *world_size)
+{
+#ifdef STARPU_SIMGRID
+	*world_size = _mpi_world_size;
+	*rank = _mpi_world_rank;
+	return 0;
+#else
+	int ret;
+
+	ret = _starpu_mpi_initialize(NULL, NULL, 1, MPI_COMM_WORLD);
+	if (ret == 0)
+	{
+		_STARPU_DEBUG("Calling MPI_Comm_rank\n");
+		MPI_Comm_rank(MPI_COMM_WORLD, rank);
+		MPI_Comm_size(MPI_COMM_WORLD, world_size);
+	}
+	return ret;
+#endif
+}
+
+int starpu_mpi_shutdown(void)
+{
+	int value;
+	int rank, world_size;
+
+	/* We need to get the rank before calling MPI_Finalize to pass to _starpu_mpi_comm_amounts_display() */
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &world_size);
+
+	/* kill the progression thread */
+	_starpu_mpi_progress_shutdown(&value);
+
+	_STARPU_MPI_TRACE_STOP(rank, world_size);
+
+	_starpu_mpi_comm_amounts_display(stderr, rank);
+	_starpu_mpi_comm_amounts_shutdown();
+	_starpu_mpi_cache_shutdown(world_size);
+	_starpu_mpi_tag_shutdown();
+	_starpu_mpi_comm_shutdown();
+
+	return 0;
+}
+
+int starpu_mpi_comm_size(MPI_Comm comm, int *size)
+{
+	if (_starpu_mpi_fake_world_size != -1)
+	{
+		*size = _starpu_mpi_fake_world_size;
+		return 0;
+	}
+#ifdef STARPU_SIMGRID
+	STARPU_MPI_ASSERT_MSG(comm == MPI_COMM_WORLD, "StarPU-SMPI only works with MPI_COMM_WORLD for now");
+	*size = _mpi_world_size;
+	return 0;
+#else
+	return MPI_Comm_size(comm, size);
+#endif
+}
+
+int starpu_mpi_comm_rank(MPI_Comm comm, int *rank)
+{
+	if (_starpu_mpi_fake_world_rank != -1)
+	{
+		*rank = _starpu_mpi_fake_world_rank;
+		return 0;
+	}
+#ifdef STARPU_SIMGRID
+	STARPU_MPI_ASSERT_MSG(comm == MPI_COMM_WORLD, "StarPU-SMPI only works with MPI_COMM_WORLD for now");
+	*rank = _mpi_world_rank;
+	return 0;
+#else
+	return MPI_Comm_rank(comm, rank);
+#endif
+}
+
+int starpu_mpi_world_size(void)
+{
+	int size;
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
+	return size;
+}
+
+int starpu_mpi_world_rank(void)
+{
+	int rank;
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
+	return rank;
+}
+

+ 34 - 0
mpi/src/starpu_mpi_init.h

@@ -0,0 +1,34 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010, 2012-2015  Université de Bordeaux
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016  CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_MPI_INIT_H__
+#define __STARPU_MPI_INIT_H__
+
+#include <starpu.h>
+#include <starpu_mpi.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void _starpu_mpi_do_initialize(struct _starpu_mpi_argc_argv *argc_argv);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // __STARPU_MPI_INIT_H__

+ 13 - 6
mpi/src/starpu_mpi_private.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2010, 2012-2017  Université de Bordeaux
- * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016  CNRS
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -21,15 +21,15 @@
 #include <starpu.h>
 #include <common/config.h>
 #include <common/uthash.h>
-#include "starpu_mpi.h"
-#include "starpu_mpi_fxt.h"
+#include <starpu_mpi.h>
+#include <starpu_mpi_fxt.h>
 #include <common/list.h>
 #include <core/simgrid.h>
 
 #ifdef __cplusplus
 extern "C" {
 #endif
-	
+
 #ifdef STARPU_SIMGRID
 starpu_pthread_wait_t wait;
 starpu_pthread_queue_t dontsleep;
@@ -45,7 +45,7 @@ struct _starpu_simgrid_mpi_req
 int _starpu_mpi_simgrid_mpi_test(unsigned *done, int *flag);
 void _starpu_mpi_simgrid_wait_req(MPI_Request *request, 	MPI_Status *status, starpu_pthread_queue_t *queue, unsigned *done);
 #endif
-	
+
 extern int _starpu_debug_rank;
 char *_starpu_mpi_get_mpi_error_code(int code);
 extern int _starpu_mpi_comm;
@@ -247,7 +247,7 @@ LIST_TYPE(_starpu_mpi_req,
 	starpu_pthread_queue_t queue;
 	unsigned done;
 #endif
-	  
+
 );
 
 struct _starpu_mpi_argc_argv
@@ -258,8 +258,15 @@ struct _starpu_mpi_argc_argv
 	MPI_Comm comm;
 	int fargc;	// Fortran argc
 	char **fargv;	// Fortran argv
+	int rank;
+	int world_size;
 };
 
+void _starpu_mpi_progress_shutdown(int *value);
+int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv);
+#ifdef STARPU_SIMGRID
+void _starpu_mpi_wait_for_initialization();
+#endif
 
 #ifdef __cplusplus
 }

+ 1 - 1
mpi/src/starpu_mpi_stats.c

@@ -42,7 +42,7 @@ void _starpu_mpi_comm_amounts_init(MPI_Comm comm)
 	_STARPU_MPI_CALLOC(comm_amount, world_size, sizeof(size_t));
 }
 
-void _starpu_mpi_comm_amounts_free()
+void _starpu_mpi_comm_amounts_shutdown()
 {
 	if (stats_enabled == 0) return;
 	free(comm_amount);

+ 2 - 2
mpi/src/starpu_mpi_stats.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012, 2017  CNRS
+ * Copyright (C) 2012, 2016, 2017  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -26,7 +26,7 @@ extern "C" {
 #endif
 
 void _starpu_mpi_comm_amounts_init(MPI_Comm comm);
-void _starpu_mpi_comm_amounts_free();
+void _starpu_mpi_comm_amounts_shutdown();
 void _starpu_mpi_comm_amounts_inc(MPI_Comm comm, unsigned dst, MPI_Datatype datatype, int count);
 void _starpu_mpi_comm_amounts_display(FILE *stream, int node);
 

+ 1 - 1
mpi/src/starpu_mpi_sync_data.c

@@ -39,7 +39,7 @@ void _starpu_mpi_sync_data_init(void)
 	_starpu_mpi_sync_data_handle_hashmap_count = 0;
 }
 
-void _starpu_mpi_sync_data_free(void)
+void _starpu_mpi_sync_data_shutdown(void)
 {
 	struct _starpu_mpi_sync_data_handle_hashlist *current, *tmp;
 	HASH_ITER(hh, _starpu_mpi_sync_data_handle_hashmap, current, tmp)

+ 2 - 2
mpi/src/starpu_mpi_sync_data.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2015  CNRS
+ * Copyright (C) 2015, 2016  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -29,7 +29,7 @@ extern "C" {
 
 void _starpu_mpi_sync_data_init(void);
 void _starpu_mpi_sync_data_check_termination(void);
-void _starpu_mpi_sync_data_free(void);
+void _starpu_mpi_sync_data_shutdown(void);
 
 struct _starpu_mpi_req *_starpu_mpi_sync_data_find(int data_tag, int source, MPI_Comm comm);
 void _starpu_mpi_sync_data_add(struct _starpu_mpi_req *req);

+ 6 - 6
mpi/src/starpu_mpi_tag.c

@@ -40,7 +40,7 @@ void _starpu_mpi_tag_init(void)
 	_starpu_spin_init(&registered_tag_handles_lock);
 }
 
-void _starpu_mpi_tag_free(void)
+void _starpu_mpi_tag_shutdown(void)
 {
      	struct handle_tag_entry *tag_entry, *tag_tmp;
 
@@ -55,7 +55,7 @@ void _starpu_mpi_tag_free(void)
 	registered_tag_handles = NULL;
 }
 
-starpu_data_handle_t _starpu_mpi_data_get_data_handle_from_tag(int tag)
+starpu_data_handle_t _starpu_mpi_tag_get_data_handle_from_tag(int tag)
 {
 	struct handle_tag_entry *ret;
 
@@ -73,13 +73,13 @@ starpu_data_handle_t _starpu_mpi_data_get_data_handle_from_tag(int tag)
 	}
 }
 
-void _starpu_mpi_data_register_tag(starpu_data_handle_t handle, int tag)
+void _starpu_mpi_tag_data_register(starpu_data_handle_t handle, int tag)
 {
 	struct handle_tag_entry *entry;
 	_STARPU_MPI_MALLOC(entry, sizeof(*entry));
 
-	STARPU_ASSERT_MSG(!(_starpu_mpi_data_get_data_handle_from_tag(tag)),
-			  "There is already a data handle %p registered with the tag %d\n", _starpu_mpi_data_get_data_handle_from_tag(tag), tag);
+	STARPU_ASSERT_MSG(!(_starpu_mpi_tag_get_data_handle_from_tag(tag)),
+			  "There is already a data handle %p registered with the tag %d\n", _starpu_mpi_tag_get_data_handle_from_tag(tag), tag);
 
 	_STARPU_MPI_DEBUG(42, "Adding handle %p with tag %d in hashtable\n", handle, tag);
 
@@ -91,7 +91,7 @@ void _starpu_mpi_data_register_tag(starpu_data_handle_t handle, int tag)
 	_starpu_spin_unlock(&registered_tag_handles_lock);
 }
 
-int _starpu_mpi_data_release_tag(starpu_data_handle_t handle)
+int _starpu_mpi_tag_data_release(starpu_data_handle_t handle)
 {
 	int tag = starpu_mpi_data_get_tag(handle);
 

+ 6 - 5
mpi/src/starpu_mpi_tag.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2015  CNRS
+ * Copyright (C) 2015, 2016  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -26,10 +26,11 @@ extern "C" {
 #endif
 
 void _starpu_mpi_tag_init(void);
-void _starpu_mpi_tag_free(void);
-void _starpu_mpi_data_register_tag(starpu_data_handle_t handle, int tag);
-int _starpu_mpi_data_release_tag(starpu_data_handle_t handle);
-starpu_data_handle_t _starpu_mpi_data_get_data_handle_from_tag(int tag);
+void _starpu_mpi_tag_shutdown(void);
+
+void _starpu_mpi_tag_data_register(starpu_data_handle_t handle, int tag);
+int _starpu_mpi_tag_data_release(starpu_data_handle_t handle);
+starpu_data_handle_t _starpu_mpi_tag_get_data_handle_from_tag(int tag);
 
 #ifdef __cplusplus
 }

+ 9 - 1
src/common/fxt.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2009-2017  Université de Bordeaux
- * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2016  CNRS
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2016, 2017  CNRS
  * Copyright (C) 2016  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -234,6 +234,14 @@ extern int _starpu_fxt_started;
 extern starpu_pthread_mutex_t _starpu_fxt_started_mutex;
 extern starpu_pthread_cond_t _starpu_fxt_started_cond;
 
+static inline void _starpu_fxt_wait_initialisation()
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&_starpu_fxt_started_mutex);
+	while (!_starpu_fxt_started)
+		STARPU_PTHREAD_COND_WAIT(&_starpu_fxt_started_cond, &_starpu_fxt_started_mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_fxt_started_mutex);
+}
+
 long _starpu_gettid(void);
 
 /* Initialize the FxT library. */

+ 237 - 106
src/core/simgrid.c

@@ -42,8 +42,22 @@ extern int _starpu_mpi_simgrid_init(int argc, char *argv[]);
 
 static int simgrid_started;
 
+static int runners_running;
 starpu_pthread_queue_t _starpu_simgrid_transfer_queue[STARPU_MAXNODES];
+static struct transfer_runner {
+	struct transfer *first_transfer, *last_transfer;
+	msg_sem_t sem;
+	msg_process_t runner;
+} transfer_runner[STARPU_MAXNODES][STARPU_MAXNODES];
+static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED);
+
 starpu_pthread_queue_t _starpu_simgrid_task_queue[STARPU_NMAXWORKERS];
+static struct worker_runner {
+	struct task *first_task, *last_task;
+	msg_sem_t sem;
+	msg_process_t runner;
+} worker_runner[STARPU_NMAXWORKERS];
+static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED);
 
 /* In case the MPI application didn't use smpicc to build the file containing
  * main(), try to cope by calling starpu_main */
@@ -272,7 +286,7 @@ static void maestro(void *data STARPU_ATTRIBUTE_UNUSED)
 	MSG_main();
 }
 
-void _starpu_simgrid_init(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU_ATTRIBUTE_UNUSED)
+void _starpu_simgrid_init_early(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU_ATTRIBUTE_UNUSED)
 {
 #ifdef HAVE_MSG_PROCESS_ATTACH
 	if (!simgrid_started && !(smpi_main && smpi_simulated_main_ != _starpu_smpi_simulated_main_))
@@ -292,7 +306,6 @@ void _starpu_simgrid_init(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU
 	}
 #endif
 
-	unsigned i;
 	if (!simgrid_started && !starpu_main && !(smpi_main && smpi_simulated_main_ != _starpu_smpi_simulated_main_))
 	{
 		_STARPU_ERROR("In simgrid mode, the file containing the main() function of this application needs to be compiled with starpu.h or starpu_simgrid_wrap.h included, to properly rename it into starpu_main\n");
@@ -304,14 +317,67 @@ void _starpu_simgrid_init(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU
 #endif
 		MSG_process_set_data(MSG_process_self(), calloc(MAX_TSD, sizeof(void*)));
 	}
+	unsigned i;
 	for (i = 0; i < STARPU_MAXNODES; i++)
 		starpu_pthread_queue_init(&_starpu_simgrid_transfer_queue[i]);
 	for (i = 0; i < STARPU_NMAXWORKERS; i++)
 		starpu_pthread_queue_init(&_starpu_simgrid_task_queue[i]);
 }
 
+void _starpu_simgrid_init(void)
+{
+	unsigned i;
+	runners_running = 1;
+	for (i = 0; i < starpu_worker_get_count(); i++)
+	{
+		char s[32];
+		snprintf(s, sizeof(s), "worker %u runner", i);
+		void **tsd = calloc(MAX_TSD+1, sizeof(void*));
+		worker_runner[i].sem = MSG_sem_init(0);
+		tsd[0] = (void*)(uintptr_t) i;
+		worker_runner[i].runner = MSG_process_create_with_arguments(s, task_execute, tsd, _starpu_simgrid_get_host_by_worker(_starpu_get_worker_struct(i)), 0, NULL);
+	}
+}
+
 void _starpu_simgrid_deinit(void)
 {
+	unsigned i, j;
+	runners_running = 0;
+	for (i = 0; i < STARPU_MAXNODES; i++)
+	{
+		for (j = 0; j < STARPU_MAXNODES; j++)
+		{
+			struct transfer_runner *t = &transfer_runner[i][j];
+			if (t->runner)
+			{
+				MSG_sem_release(t->sem);
+#if SIMGRID_VERSION_MAJOR > 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 14)
+				MSG_process_join(t->runner, 1000000);
+#else
+				MSG_process_sleep(1);
+#endif
+				STARPU_ASSERT(t->first_transfer == NULL);
+				STARPU_ASSERT(t->last_transfer == NULL);
+				MSG_sem_destroy(t->sem);
+			}
+		}
+		/* FIXME: queue not empty at this point, needs proper unregistration */
+		/* starpu_pthread_queue_destroy(&_starpu_simgrid_transfer_queue[i]); */
+	}
+	for (i = 0; i < starpu_worker_get_count(); i++)
+	{
+		struct worker_runner *w = &worker_runner[i];
+		MSG_sem_release(w->sem);
+#if SIMGRID_VERSION_MAJOR > 3 || (SIMGRID_VERSION_MAJOR == 3 && SIMGRID_VERSION_MINOR >= 14)
+		MSG_process_join(w->runner, 1000000);
+#else
+		MSG_process_sleep(1);
+#endif
+		STARPU_ASSERT(w->first_task == NULL);
+		STARPU_ASSERT(w->last_task == NULL);
+		MSG_sem_destroy(w->sem);
+		starpu_pthread_queue_destroy(&_starpu_simgrid_task_queue[i]);
+	}
 #ifdef HAVE_MSG_PROCESS_ATTACH
 	if (simgrid_started == 2)
 	{
@@ -329,68 +395,76 @@ void _starpu_simgrid_deinit(void)
 struct task
 {
 	msg_task_t task;
-	int workerid;
 
 	/* communication termination signalization */
 	unsigned *finished;
-	starpu_pthread_mutex_t *mutex;
-	starpu_pthread_cond_t *cond;
 
-	/* Task which waits for this task */
+	/* Next task on this worker */
 	struct task *next;
 };
 
-static struct task *last_task[STARPU_NMAXWORKERS];
-
 /* Actually execute the task.  */
 static int task_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
 {
 	/* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
 	MSG_process_sleep(0.000001);
 
-	struct task *task = starpu_pthread_getspecific(0);
-	_STARPU_DEBUG("task %p started\n", task);
-	MSG_task_execute(task->task);
-	MSG_task_destroy(task->task);
-	_STARPU_DEBUG("task %p finished\n", task);
-	STARPU_PTHREAD_MUTEX_LOCK(task->mutex);
-	*task->finished = 1;
-	STARPU_PTHREAD_COND_BROADCAST(task->cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(task->mutex);
-
-	/* The worker which started this task may be sleeping out of tasks, wake it  */
-	starpu_wake_worker(task->workerid);
-
-	if (last_task[task->workerid] == task)
-		last_task[task->workerid] = NULL;
-	if (task->next)
-	{
-		void **tsd = calloc(MAX_TSD+1, sizeof(void*));
-		tsd[0] = task->next;
-		MSG_process_create_with_arguments("task", task_execute, tsd, MSG_host_self(), 0, NULL);
+	unsigned workerid = (uintptr_t) starpu_pthread_getspecific(0);
+	struct worker_runner *w = &worker_runner[workerid];
+
+	_STARPU_DEBUG("worker runner %u started\n", workerid);
+	while (1) {
+		struct task *task;
+
+		MSG_sem_acquire(w->sem);
+		if (!runners_running)
+			break;
+
+		task = w->first_task;
+		w->first_task = task->next;
+		if (w->last_task == task)
+			w->last_task = NULL;
+
+		_STARPU_DEBUG("task %p started\n", task);
+		MSG_task_execute(task->task);
+		MSG_task_destroy(task->task);
+		_STARPU_DEBUG("task %p finished\n", task);
+
+		*task->finished = 1;
+		/* The worker which started this task may be sleeping out of tasks, wake it  */
+		starpu_wake_worker(workerid);
+
+		free(task);
 	}
-	/* Task is freed with process context */
+	_STARPU_DEBUG("worker %u stopped\n", workerid);
 	return 0;
 }
 
 /* Wait for completion of all asynchronous tasks for this worker */
 void _starpu_simgrid_wait_tasks(int workerid)
 {
-	struct task *task = last_task[workerid];
+	struct task *task = worker_runner[workerid].last_task;
 	if (!task)
 		return;
 
 	unsigned *finished = task->finished;
-	starpu_pthread_mutex_t *mutex = task->mutex;
-	starpu_pthread_cond_t *cond = task->cond;
-	STARPU_PTHREAD_MUTEX_LOCK(mutex);
-	while (!*finished)
-		STARPU_PTHREAD_COND_WAIT(cond, mutex);
-	STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+	starpu_pthread_wait_t wait;
+	starpu_pthread_wait_init(&wait);
+	starpu_pthread_queue_register(&wait, &_starpu_simgrid_task_queue[workerid]);
+
+	while(1)
+	{
+		starpu_pthread_wait_reset(&wait);
+		if (*finished)
+			break;
+		starpu_pthread_wait_wait(&wait);
+	}
+	starpu_pthread_queue_unregister(&wait, &_starpu_simgrid_task_queue[workerid]);
+	starpu_pthread_wait_destroy(&wait);
 }
 
 /* Task execution submitted by StarPU */
-void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch, double length, unsigned *finished, starpu_pthread_mutex_t *mutex, starpu_pthread_cond_t *cond)
+void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *j, struct starpu_perfmodel_arch* perf_arch, double length, unsigned *finished)
 {
 	struct starpu_task *starpu_task = j->task;
 	msg_task_t simgrid_task;
@@ -428,31 +502,28 @@ void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *j, struct star
 	{
 		/* Asynchronous execution */
 		struct task *task;
+		struct worker_runner *w = &worker_runner[workerid];
 		_STARPU_MALLOC(task, sizeof(*task));
 		task->task = simgrid_task;
-		task->workerid = workerid;
 		task->finished = finished;
 		*finished = 0;
-		task->mutex = mutex;
-		task->cond = cond;
 		task->next = NULL;
 		/* Sleep 10µs for the GPU task queueing */
 		if (_starpu_simgrid_queue_malloc_cost())
 			MSG_process_sleep(0.000010);
-		if (last_task[workerid])
+		if (w->last_task)
 		{
-			/* Make this task depend on the previous */
-			last_task[workerid]->next = task;
-			last_task[workerid] = task;
+			/* Already running a task, queue */
+			w->last_task->next = task;
+			w->last_task = task;
 		}
 		else
 		{
-			void **tsd;
-			last_task[workerid] = task;
-			tsd = calloc(MAX_TSD+1, sizeof(void*));
-			tsd[0] = task;
-			MSG_process_create_with_arguments("task", task_execute, tsd, MSG_host_self(), 0, NULL);
+			STARPU_ASSERT(!w->first_task);
+			w->first_task = task;
+			w->last_task = task;
 		}
+		MSG_sem_release(w->sem);
 	}
 }
 
@@ -469,8 +540,6 @@ LIST_TYPE(transfer,
 
 	/* communication termination signalization */
 	unsigned *finished;
-	starpu_pthread_mutex_t *mutex;
-	starpu_pthread_cond_t *cond;
 
 	/* transfers which wait for this transfer */
 	struct transfer **wake;
@@ -478,6 +547,9 @@ LIST_TYPE(transfer,
 
 	/* Number of transfers that this transfer waits for */
 	unsigned nwait;
+
+	/* Next transfer on this stream */
+	struct transfer *next;
 )
 
 struct transfer_list pending;
@@ -533,47 +605,97 @@ static int transfers_are_sequential(struct transfer *new_transfer, struct transf
 	return 0;
 }
 
+static void transfer_queue(struct transfer *transfer)
+{
+	unsigned src = transfer->src_node;
+	unsigned dst = transfer->dst_node;
+	struct transfer_runner *t = &transfer_runner[src][dst];
+
+	if (!t->runner)
+	{
+		/* No runner yet, start it */
+		static starpu_pthread_mutex_t mutex; /* process_create may yield */
+		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+		if (!t->runner)
+		{
+			char s[64];
+			snprintf(s, sizeof(s), "transfer %u-%u runner", src, dst);
+			void **tsd = calloc(MAX_TSD+1, sizeof(void*));
+			tsd[0] = (void*)(uintptr_t)((src<<16) + dst);
+			t->runner = MSG_process_create_with_arguments(s, transfer_execute, tsd, _starpu_simgrid_get_memnode_host(src), 0, NULL);
+			t->sem = MSG_sem_init(0);
+		}
+		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	}
+
+	if (t->last_transfer)
+	{
+		/* Already running a transfer, queue */
+		t->last_transfer->next = transfer;
+		t->last_transfer = transfer;
+	}
+	else
+	{
+		STARPU_ASSERT(!t->first_transfer);
+		t->first_transfer = transfer;
+		t->last_transfer = transfer;
+	}
+	MSG_sem_release(t->sem);
+}
+
 /* Actually execute the transfer, and then start transfers waiting for this one.  */
 static int transfer_execute(int argc STARPU_ATTRIBUTE_UNUSED, char *argv[] STARPU_ATTRIBUTE_UNUSED)
 {
 	/* FIXME: Ugly work-around for bug in simgrid: the MPI context is not properly set at MSG process startup */
 	MSG_process_sleep(0.000001);
 
-	struct transfer *transfer = starpu_pthread_getspecific(0);
-	unsigned i;
-	_STARPU_DEBUG("transfer %p started\n", transfer);
-	MSG_task_execute(transfer->task);
-	MSG_task_destroy(transfer->task);
-	_STARPU_DEBUG("transfer %p finished\n", transfer);
-	STARPU_PTHREAD_MUTEX_LOCK(transfer->mutex);
-	*transfer->finished = 1;
-	STARPU_PTHREAD_COND_BROADCAST(transfer->cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(transfer->mutex);
-
-	/* The workers which started this request may be sleeping out of tasks, wake it  */
-	_starpu_wake_all_blocked_workers_on_node(transfer->run_node);
-
-	/* Wake transfers waiting for my termination */
-	/* Note: due to possible preemption inside process_create, the array
-	 * may grow while doing this */
-	for (i = 0; i < transfer->nwake; i++)
-	{
-		struct transfer *wake = transfer->wake[i];
-		STARPU_ASSERT(wake->nwait > 0);
-		wake->nwait--;
-		if (!wake->nwait)
+	unsigned src_dst = (uintptr_t) starpu_pthread_getspecific(0);
+	unsigned src = src_dst >> 16;
+	unsigned dst = src_dst & 0xffff;
+	struct transfer_runner *t = &transfer_runner[src][dst];
+
+	_STARPU_DEBUG("transfer runner %u-%u started\n", src, dst);
+	while (1) {
+		struct transfer *transfer;
+
+		MSG_sem_acquire(t->sem);
+		if (!runners_running)
+			break;
+		transfer = t->first_transfer;
+		t->first_transfer = transfer->next;
+		if (t->last_transfer == transfer)
+			t->last_transfer = NULL;
+
+		_STARPU_DEBUG("transfer %p started\n", transfer);
+		MSG_task_execute(transfer->task);
+		MSG_task_destroy(transfer->task);
+		_STARPU_DEBUG("transfer %p finished\n", transfer);
+
+		*transfer->finished = 1;
+		transfer_list_erase(&pending, transfer);
+
+		/* The workers which started this request may be sleeping out of tasks, wake it  */
+		_starpu_wake_all_blocked_workers_on_node(transfer->run_node);
+
+		unsigned i;
+		/* Wake transfers waiting for my termination */
+		/* Note: due to possible preemption inside process_create, the array
+		 * may grow while doing this */
+		for (i = 0; i < transfer->nwake; i++)
 		{
-			void **tsd;
-			_STARPU_DEBUG("triggering transfer %p\n", wake);
-			tsd = calloc(MAX_TSD+1, sizeof(void*));
-			tsd[0] = wake;
-			MSG_process_create_with_arguments("transfer task", transfer_execute, tsd, _starpu_simgrid_get_host_by_name("MAIN"), 0, NULL);
+			struct transfer *wake = transfer->wake[i];
+			STARPU_ASSERT(wake->nwait > 0);
+			wake->nwait--;
+			if (!wake->nwait)
+			{
+				_STARPU_DEBUG("triggering transfer %p\n", wake);
+				transfer_queue(wake);
+			}
 		}
+		free(transfer->wake);
+		free(transfer);
 	}
 
-	free(transfer->wake);
-	transfer_list_erase(&pending, transfer);
-	/* transfer is freed with process context */
 	return 0;
 }
 
@@ -604,12 +726,33 @@ static void transfer_submit(struct transfer *transfer)
 
 	if (!transfer->nwait)
 	{
-		void **tsd;
 		_STARPU_DEBUG("transfer %p waits for nobody, starting\n", transfer);
-		tsd = calloc(MAX_TSD+1, sizeof(void*));
-		tsd[0] = transfer;
-		MSG_process_create_with_arguments("transfer task", transfer_execute, tsd, _starpu_simgrid_get_host_by_name("MAIN"), 0, NULL);
+		transfer_queue(transfer);
+	}
+}
+
+int _starpu_simgrid_wait_transfer_event(union _starpu_async_channel_event *event)
+{
+	/* this is not associated to a request so it's synchronous */
+	starpu_pthread_wait_t wait;
+	starpu_pthread_wait_init(&wait);
+	starpu_pthread_queue_register(&wait, event->queue);
+
+	while(1)
+	{
+		starpu_pthread_wait_reset(&wait);
+		if (event->finished)
+			break;
+		starpu_pthread_wait_wait(&wait);
 	}
+	starpu_pthread_queue_unregister(&wait, event->queue);
+	starpu_pthread_wait_destroy(&wait);
+	return 0;
+}
+
+int _starpu_simgrid_test_transfer_event(union _starpu_async_channel_event *event)
+{
+	return event->finished;
 }
 
 /* Data transfer issued by StarPU */
@@ -623,9 +766,7 @@ int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node,
 	msg_host_t *hosts;
 	double *computation;
 	double *communication;
-	starpu_pthread_mutex_t mutex;
-	starpu_pthread_cond_t cond;
-	unsigned finished;
+	union _starpu_async_channel_event *event, myevent;
 
 	_STARPU_CALLOC(hosts, 2, sizeof(*hosts));
 	_STARPU_CALLOC(computation, 2, sizeof(*computation));
@@ -648,24 +789,17 @@ int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node,
 	transfer->run_node = _starpu_memory_node_get_local_key();
 
 	if (req)
-	{
-		transfer->finished = &req->async_channel.event.finished;
-		transfer->mutex = &req->async_channel.event.mutex;
-		transfer->cond = &req->async_channel.event.cond;
-	}
+		event = &req->async_channel.event;
 	else
-	{
-		transfer->finished = &finished;
-		transfer->mutex = &mutex;
-		transfer->cond = &cond;
-	}
+		event = &myevent;
+	event->finished = 0;
+	transfer->finished = &event->finished;
+	event->queue = &_starpu_simgrid_transfer_queue[transfer->run_node];
 
-	*transfer->finished = 0;
-	STARPU_PTHREAD_MUTEX_INIT(transfer->mutex, NULL);
-	STARPU_PTHREAD_COND_INIT(transfer->cond, NULL);
 	transfer->wake = NULL;
 	transfer->nwake = 0;
 	transfer->nwait = 0;
+	transfer->next = NULL;
 
 	if (req)
 		_STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
@@ -685,10 +819,7 @@ int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node,
 	else
 	{
 		/* this is not associated to a request so it's synchronous */
-		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-		while (!finished)
-			STARPU_PTHREAD_COND_WAIT(&cond, &mutex);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+		_starpu_simgrid_wait_transfer_event(event);
 		return 0;
 	}
 }

+ 6 - 3
src/core/simgrid.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012-2016  Université de Bordeaux
+ * Copyright (C) 2012-2017  Université de Bordeaux
  * Copyright (C) 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -38,11 +38,14 @@ struct _starpu_pthread_args
 #define STARPU_MPI_AS_PREFIX "StarPU-MPI"
 #define _starpu_simgrid_running_smpi() (getenv("SMPI_GLOBAL_SIZE") != NULL)
 
-void _starpu_simgrid_init(int *argc, char ***argv);
+void _starpu_simgrid_init_early(int *argc, char ***argv);
+void _starpu_simgrid_init(void);
 void _starpu_simgrid_deinit(void);
 void _starpu_simgrid_wait_tasks(int workerid);
-void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *job, struct starpu_perfmodel_arch* perf_arch, double length, unsigned *finished, starpu_pthread_mutex_t *mutex, starpu_pthread_cond_t *cond);
+void _starpu_simgrid_submit_job(int workerid, struct _starpu_job *job, struct starpu_perfmodel_arch* perf_arch, double length, unsigned *finished);
 int _starpu_simgrid_transfer(size_t size, unsigned src_node, unsigned dst_node, struct _starpu_data_request *req);
+int _starpu_simgrid_wait_transfer_event(union _starpu_async_channel_event *event);
+int _starpu_simgrid_test_transfer_event(union _starpu_async_channel_event *event);
 /* Return the number of hosts prefixed by PREFIX */
 int _starpu_simgrid_get_nbhosts(const char *prefix);
 unsigned long long _starpu_simgrid_get_memsize(const char *prefix, unsigned devid);

+ 1 - 1
src/core/topology.c

@@ -1994,7 +1994,7 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 
                                 _starpu_worker_drives_memory_node(&workerarg->set->workers[0], STARPU_MAIN_RAM);
 				if (memory_node != STARPU_MAIN_RAM)
-					_starpu_worker_drives_memory_node(workerarg->workerid, memory_node);
+					_starpu_worker_drives_memory_node(&workerarg->set->workers[0], memory_node);
 				break;
 #endif /* STARPU_USE_MIC */
 

+ 10 - 13
src/core/workers.c

@@ -1192,7 +1192,7 @@ int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv)
 
 #ifdef STARPU_SIMGRID
 	/* This initializes the simgrid thread library, thus needs to be early */
-	_starpu_simgrid_init(argc, argv);
+	_starpu_simgrid_init_early(argc, argv);
 #endif
 
 	STARPU_PTHREAD_MUTEX_LOCK(&init_mutex);
@@ -1414,6 +1414,9 @@ int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv)
 #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
 	_starpu_cuda_init();
 #endif
+#ifdef STARPU_SIMGRID
+	_starpu_simgrid_init();
+#endif
 	/* Launch "basic" workers (ie. non-combined workers) */
 	if (!is_a_sink)
 		_starpu_launch_drivers(&_starpu_config);
@@ -1460,10 +1463,10 @@ static void _starpu_terminate_workers(struct _starpu_machine_config *pconfig)
 	unsigned workerid;
 	unsigned n;
 
+	starpu_wake_all_blocked_workers();
+
 	for (workerid = 0; workerid < pconfig->topology.nworkers; workerid++)
 	{
-		starpu_wake_all_blocked_workers();
-
 		_STARPU_DEBUG("wait for worker %u\n", workerid);
 
 		struct _starpu_worker_set *set = pconfig->workers[workerid].set;
@@ -1475,14 +1478,10 @@ static void _starpu_terminate_workers(struct _starpu_machine_config *pconfig)
 		{
 			if (set->started)
 			{
-#ifdef STARPU_SIMGRID
-				status = starpu_pthread_join(set->worker_thread, NULL);
-#else
+#ifndef STARPU_SIMGRID
 				if (!pthread_equal(pthread_self(), set->worker_thread))
-				{
-					status = starpu_pthread_join(set->worker_thread, NULL);
-				}
 #endif
+					status = starpu_pthread_join(set->worker_thread, NULL);
 				if (status)
 				{
 #ifdef STARPU_VERBOSE
@@ -1497,12 +1496,10 @@ static void _starpu_terminate_workers(struct _starpu_machine_config *pconfig)
 			if (!worker->run_by_starpu)
 				goto out;
 
-#ifdef STARPU_SIMGRID
-			status = starpu_pthread_join(worker->worker_thread, NULL);
-#else
+#ifndef STARPU_SIMGRID
 			if (!pthread_equal(pthread_self(), worker->worker_thread))
-				status = starpu_pthread_join(worker->worker_thread, NULL);
 #endif
+				status = starpu_pthread_join(worker->worker_thread, NULL);
 			if (status)
 			{
 #ifdef STARPU_VERBOSE

+ 2 - 9
src/datawizard/copy_driver.c

@@ -803,10 +803,7 @@ int starpu_interface_copy(uintptr_t src, size_t src_offset, unsigned src_node, u
 void _starpu_driver_wait_request_completion(struct _starpu_async_channel *async_channel)
 {
 #ifdef STARPU_SIMGRID
-	STARPU_PTHREAD_MUTEX_LOCK(&async_channel->event.mutex);
-	while (!async_channel->event.finished)
-		STARPU_PTHREAD_COND_WAIT(&async_channel->event.cond, &async_channel->event.mutex);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&async_channel->event.mutex);
+	_starpu_simgrid_wait_transfer_event(&async_channel->event);
 #else /* !SIMGRID */
 	enum starpu_node_kind kind = async_channel->type;
 #ifdef STARPU_USE_CUDA
@@ -867,11 +864,7 @@ void _starpu_driver_wait_request_completion(struct _starpu_async_channel *async_
 unsigned _starpu_driver_test_request_completion(struct _starpu_async_channel *async_channel)
 {
 #ifdef STARPU_SIMGRID
-	unsigned ret;
-	STARPU_PTHREAD_MUTEX_LOCK(&async_channel->event.mutex);
-	ret = async_channel->event.finished;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&async_channel->event.mutex);
-	return ret;
+	return _starpu_simgrid_test_transfer_event(&async_channel->event);
 #else /* !SIMGRID */
 	enum starpu_node_kind kind = async_channel->type;
 	unsigned success = 0;

+ 2 - 3
src/datawizard/copy_driver.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010, 2012-2015  Université de Bordeaux
+ * Copyright (C) 2010, 2012-2015, 2017  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2013, 2015  CNRS
  * Copyright (C) 2016  INRIA
  *
@@ -86,8 +86,7 @@ union _starpu_async_channel_event
 	struct
 	{
 		unsigned finished;
-		starpu_pthread_mutex_t mutex;
-		starpu_pthread_cond_t cond;
+		starpu_pthread_queue_t *queue;
 	};
 #endif
 #ifdef STARPU_USE_CUDA

+ 8 - 8
src/debug/traces/starpu_fxt.c

@@ -850,12 +850,12 @@ static void handle_new_mem_node(struct fxt_ev_64 *ev, struct starpu_fxt_options
 		{
 #ifdef STARPU_HAVE_POTI
 			poti_SetVariable(get_event_time_stamp(ev, options), new_memmanager_container_alias, "use", 0.0);
-			poti_SetVariable(get_event_time_stamp(ev, options), new_memmanager_container_alias, "bwi", 0.0);
-			poti_SetVariable(get_event_time_stamp(ev, options), new_memmanager_container_alias, "bwo", 0.0);
+			poti_SetVariable(get_event_time_stamp(ev, options), new_memmanager_container_alias, "bwi_mm", 0.0);
+			poti_SetVariable(get_event_time_stamp(ev, options), new_memmanager_container_alias, "bwo_mm", 0.0);
 #else
 			fprintf(out_paje_file, "13	%.9f	%smm%"PRIu64"	use	0.0\n", get_event_time_stamp(ev, options), prefix, ev->param[0]);
-			fprintf(out_paje_file, "13	%.9f	%smm%"PRIu64"	bwi	0.0\n", get_event_time_stamp(ev, options), prefix, ev->param[0]);
-			fprintf(out_paje_file, "13	%.9f	%smm%"PRIu64"	bwo	0.0\n", get_event_time_stamp(ev, options), prefix, ev->param[0]);
+			fprintf(out_paje_file, "13	%.9f	%smm%"PRIu64"	bwi_mm	0.0\n", get_event_time_stamp(ev, options), prefix, ev->param[0]);
+			fprintf(out_paje_file, "13	%.9f	%smm%"PRIu64"	bwo_mm	0.0\n", get_event_time_stamp(ev, options), prefix, ev->param[0]);
 #endif
 		}
 	}
@@ -2488,9 +2488,9 @@ void _starpu_fxt_display_bandwidth(struct starpu_fxt_options *options)
 #ifdef STARPU_HAVE_POTI
 			char src_memnode_container[STARPU_POTI_STR_LEN];
 			memmanager_container_alias(src_memnode_container, STARPU_POTI_STR_LEN, prefix, itor->src_node);
-			poti_SetVariable(itor->comm_start, src_memnode_container, "bwo", current_bandwidth_out_per_node[itor->src_node]);
+			poti_SetVariable(itor->comm_start, src_memnode_container, "bwo_mm", current_bandwidth_out_per_node[itor->src_node]);
 #else
-			fprintf(out_paje_file, "13	%.9f	%smm%u	bwo	%f\n",
+			fprintf(out_paje_file, "13	%.9f	%smm%u	bwo_mm	%f\n",
 				itor->comm_start, prefix, itor->src_node, current_bandwidth_out_per_node[itor->src_node]);
 #endif
 		}
@@ -2501,9 +2501,9 @@ void _starpu_fxt_display_bandwidth(struct starpu_fxt_options *options)
 #ifdef STARPU_HAVE_POTI
 			char dst_memnode_container[STARPU_POTI_STR_LEN];
 			memmanager_container_alias(dst_memnode_container, STARPU_POTI_STR_LEN, prefix, itor->dst_node);
-			poti_SetVariable(itor->comm_start, dst_memnode_container, "bwi", current_bandwidth_in_per_node[itor->dst_node]);
+			poti_SetVariable(itor->comm_start, dst_memnode_container, "bwi_mm", current_bandwidth_in_per_node[itor->dst_node]);
 #else
-			fprintf(out_paje_file, "13	%.9f	%smm%u	bwi	%f\n",
+			fprintf(out_paje_file, "13	%.9f	%smm%u	bwi_mm	%f\n",
 				itor->comm_start, prefix, itor->dst_node, current_bandwidth_in_per_node[itor->dst_node]);
 #endif
 		}

+ 12 - 12
src/debug/traces/starpu_fxt_mpi.c

@@ -229,11 +229,11 @@ static void display_all_transfers_from_trace(FILE *out_paje_file, unsigned n)
 	{
 #ifdef STARPU_HAVE_POTI
 		snprintf(mpi_container, sizeof(mpi_container), "%u_mpict", node);
-		poti_SetVariable(0., mpi_container, "bwi", 0.);
-		poti_SetVariable(0., mpi_container, "bwo", 0.);
+		poti_SetVariable(0., mpi_container, "bwi_mpi", 0.);
+		poti_SetVariable(0., mpi_container, "bwo_mpi", 0.);
 #else
-		fprintf(out_paje_file, "13	%.9f	%u_mpict	bwi	%f\n", 0., node, 0.);
-		fprintf(out_paje_file, "13	%.9f	%u_mpict	bwo	%f\n", 0., node, 0.);
+		fprintf(out_paje_file, "13	%.9f	%u_mpict	bwi_mpi	%f\n", 0., node, 0.);
+		fprintf(out_paje_file, "13	%.9f	%u_mpict	bwo_mpi	%f\n", 0., node, 0.);
 #endif
 	}
 
@@ -272,12 +272,12 @@ static void display_all_transfers_from_trace(FILE *out_paje_file, unsigned n)
 			current_in_bandwidth[match->dst] -= match->bandwidth;
 #ifdef STARPU_HAVE_POTI
 			snprintf(mpi_container, sizeof(mpi_container), "%d_mpict", match->src);
-			poti_SetVariable(match->date, mpi_container, "bwo", current_out_bandwidth[match->src]);
+			poti_SetVariable(match->date, mpi_container, "bwo_mpi", current_out_bandwidth[match->src]);
 			snprintf(mpi_container, sizeof(mpi_container), "%d_mpict", match->dst);
-			poti_SetVariable(match->date, mpi_container, "bwi", current_in_bandwidth[match->dst]);
+			poti_SetVariable(match->date, mpi_container, "bwi_mpi", current_in_bandwidth[match->dst]);
 #else
-			fprintf(out_paje_file, "13	%.9f	%d_mpict	bwo	%f\n", match->date, match->src, current_out_bandwidth[match->src]);
-			fprintf(out_paje_file, "13	%.9f	%d_mpict	bwi	%f\n", match->date, match->dst, current_in_bandwidth[match->dst]);
+			fprintf(out_paje_file, "13	%.9f	%d_mpict	bwo_mpi	%f\n", match->date, match->src, current_out_bandwidth[match->src]);
+			fprintf(out_paje_file, "13	%.9f	%d_mpict	bwi_mpi	%f\n", match->date, match->dst, current_in_bandwidth[match->dst]);
 #endif
 			continue;
 		}
@@ -324,15 +324,15 @@ static void display_all_transfers_from_trace(FILE *out_paje_file, unsigned n)
 			snprintf(paje_key, STARPU_POTI_STR_LEN, "mpicom_%lu", id);
 			snprintf(mpi_container, sizeof(mpi_container), "%d_mpict", src);
 			poti_StartLink(start_date, "MPICt", "MPIL", mpi_container, paje_value, paje_key);
-			poti_SetVariable(start_date, mpi_container, "bwo", current_out_bandwidth[src]);
+			poti_SetVariable(start_date, mpi_container, "bwo_mpi", current_out_bandwidth[src]);
 			snprintf(mpi_container, sizeof(mpi_container), "%d_mpict", dst);
 			poti_EndLink(end_date, "MPICt", "MPIL", mpi_container, paje_value, paje_key);
-			poti_SetVariable(start_date, mpi_container, "bwo", current_in_bandwidth[dst]);
+			poti_SetVariable(start_date, mpi_container, "bwo_mpi", current_in_bandwidth[dst]);
 #else
 			fprintf(out_paje_file, "18	%.9f	MPIL	MPIroot	%lu	%d_mpict	mpicom_%lu\n", start_date, (unsigned long)size, src, id);
 			fprintf(out_paje_file, "19	%.9f	MPIL	MPIroot	%lu	%d_mpict	mpicom_%lu\n", end_date, (unsigned long)size, dst, id);
-			fprintf(out_paje_file, "13	%.9f	%d_mpict	bwo	%f\n", start_date, src, current_out_bandwidth[src]);
-			fprintf(out_paje_file, "13	%.9f	%d_mpict	bwi	%f\n", start_date, dst, current_in_bandwidth[dst]);
+			fprintf(out_paje_file, "13	%.9f	%d_mpict	bwo_mpi	%f\n", start_date, src, current_out_bandwidth[src]);
+			fprintf(out_paje_file, "13	%.9f	%d_mpict	bwi_mpi	%f\n", start_date, dst, current_in_bandwidth[dst]);
 #endif
 		}
 		else

+ 10 - 10
src/debug/traces/starpu_paje.c

@@ -161,8 +161,8 @@ void _starpu_fxt_write_paje_header(FILE *file STARPU_ATTRIBUTE_UNUSED)
 	/* Types for the memory node */
 	poti_DefineEventType("invalidate", "Mm", "data invalidation");
 	poti_DefineVariableType("use", "Mm", "Used (MB)", "0 0 0");
-	poti_DefineVariableType("bwi", "Mm", "Bandwidth In (MB/s)", "0 0 0");
-	poti_DefineVariableType("bwo", "Mm", "Bandwidth Out (MB/s)", "0 0 0");
+	poti_DefineVariableType("bwi_mm", "Mm", "Bandwidth In (MB/s)", "0 0 0");
+	poti_DefineVariableType("bwo_mm", "Mm", "Bandwidth Out (MB/s)", "0 0 0");
 	poti_DefineStateType("MS", "Mm", "Memory Node State");
 	poti_DefineEntityValue("A", "MS", "Allocating", ".4 .1 .0");
 	poti_DefineEntityValue("Ar", "MS", "AllocatingReuse", ".1 .1 .8");
@@ -177,7 +177,7 @@ void _starpu_fxt_write_paje_header(FILE *file STARPU_ATTRIBUTE_UNUSED)
 	/* Types for the Worker of the Memory Node */
 	poti_DefineEventType("user_event", "T", "user event type");
 	poti_DefineEventType("thread_event", "T", "thread event type");
-	poti_DefineVariableType("gf", "T", "GFlops", "0 0 0");
+	poti_DefineVariableType("gf", "W", "GFlops", "0 0 0");
 	poti_DefineStateType("S", "T", "Thread State");
 	poti_DefineEntityValue("I", "S", "Idle", ".9 .1 0");
 	poti_DefineEntityValue("In", "S", "Initializing", "0.0 .7 1.0");
@@ -216,8 +216,8 @@ void _starpu_fxt_write_paje_header(FILE *file STARPU_ATTRIBUTE_UNUSED)
 
 	/* Types for the MPI Communication Thread of the Memory Node */
 	poti_DefineEventType("MPIev", "MPICt", "MPI event type");
-	poti_DefineVariableType("bwi", "MPICt", "Bandwidth In (MB/s)", "0 0 0");
-	poti_DefineVariableType("bwo", "MPICt", "Bandwidth Out (MB/s)", "0 0 0");
+	poti_DefineVariableType("bwi_mpi", "MPICt", "Bandwidth In (MB/s)", "0 0 0");
+	poti_DefineVariableType("bwo_mpi", "MPICt", "Bandwidth Out (MB/s)", "0 0 0");
 	poti_DefineStateType("CtS", "MPICt", "Communication Thread State");
 	poti_DefineEntityValue("P", "CtS", "Processing", "0 0 0");
 	poti_DefineEntityValue("Sl", "CtS", "Sleeping", ".9 .1 .0");
@@ -305,11 +305,11 @@ void _starpu_fxt_write_paje_header(FILE *file STARPU_ATTRIBUTE_UNUSED)
 4       nsubmitted    Sc       \"Number of Submitted Uncompleted Tasks\"                        \n\
 4       nready    Sc       \"Number of Ready Tasks\"                        \n\
 4       use     Mm       \"Used (MB)\"                        \n\
-4       bwi     Mm       \"Bandwidth In (MB/s)\"                        \n\
-4       bwo     Mm       \"Bandwidth Out (MB/s)\"                        \n\
-4       bwi     MPICt       \"Bandwidth In (MB/s)\"                        \n\
-4       bwo     MPICt       \"Bandwidth Out (MB/s)\"                        \n\
-4       gf      T       \"GFlops\"                        \n\
+4       bwi_mm     Mm       \"Bandwidth In (MB/s)\"                        \n\
+4       bwo_mm     Mm       \"Bandwidth Out (MB/s)\"                        \n\
+4       bwi_mpi     MPICt       \"Bandwidth In (MB/s)\"                        \n\
+4       bwo_mpi     MPICt       \"Bandwidth Out (MB/s)\"                        \n\
+4       gf      W       \"GFlops\"                        \n\
 6       I       S       Idle         \".9 .1 .0\"		\n\
 6       In       S      Initializing       \"0.0 .7 1.0\"            \n\
 6       D       S      Deinitializing       \"0.0 .1 .7\"            \n\

+ 1 - 1
src/drivers/cpu/driver_cpu.c

@@ -106,7 +106,7 @@ static int execute_job_on_cpu(struct _starpu_job *j, struct starpu_task *worker_
 			if (cl->flags & STARPU_CODELET_SIMGRID_EXECUTE)
 				func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
 			else
-				_starpu_simgrid_submit_job(cpu_args->workerid, j, perf_arch, NAN, NULL, NULL, NULL);
+				_starpu_simgrid_submit_job(cpu_args->workerid, j, perf_arch, NAN, NULL);
 #else
 			func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
 #endif

+ 2 - 13
src/drivers/cuda/driver_cuda.c

@@ -67,8 +67,6 @@ static cudaEvent_t task_events[STARPU_NMAXWORKERS][STARPU_MAX_PIPELINE];
 #endif /* STARPU_USE_CUDA */
 #ifdef STARPU_SIMGRID
 static unsigned task_finished[STARPU_NMAXWORKERS][STARPU_MAX_PIPELINE];
-static starpu_pthread_mutex_t task_mutex[STARPU_NMAXWORKERS][STARPU_MAX_PIPELINE];
-static starpu_pthread_cond_t task_cond[STARPU_NMAXWORKERS][STARPU_MAX_PIPELINE];
 #endif /* STARPU_SIMGRID */
 
 static enum initialization cuda_device_init[STARPU_MAXCUDADEVS];
@@ -375,11 +373,7 @@ static void init_worker_context(unsigned workerid)
 	int j;
 #ifdef STARPU_SIMGRID
 	for (j = 0; j < STARPU_MAX_PIPELINE; j++)
-	{
 		task_finished[workerid][j] = 0;
-		STARPU_PTHREAD_MUTEX_INIT(&task_mutex[workerid][j], NULL);
-		STARPU_PTHREAD_COND_INIT(&task_cond[workerid][j], NULL);
-	}
 #else /* !STARPU_SIMGRID */
 	cudaError_t cures;
 
@@ -418,10 +412,7 @@ static void deinit_worker_context(unsigned workerid)
 	unsigned j;
 #ifdef STARPU_SIMGRID
 	for (j = 0; j < STARPU_MAX_PIPELINE; j++)
-	{
-		STARPU_PTHREAD_MUTEX_DESTROY(&task_mutex[workerid][j]);
-		STARPU_PTHREAD_COND_DESTROY(&task_cond[workerid][j]);
-	}
+		task_finished[workerid][j] = 0;
 #else /* STARPU_SIMGRID */
 	for (j = 0; j < STARPU_MAX_PIPELINE; j++)
 		cudaEventDestroy(task_events[workerid][j]);
@@ -508,9 +499,7 @@ static int start_job_on_cuda(struct _starpu_job *j, struct _starpu_worker *worke
 			func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
 		else
 			_starpu_simgrid_submit_job(workerid, j, &worker->perf_arch, NAN,
-				async ? &task_finished[workerid][pipeline_idx] : NULL,
-				async ? &task_mutex[workerid][pipeline_idx] : NULL,
-				async ? &task_cond[workerid][pipeline_idx] : NULL);
+				async ? &task_finished[workerid][pipeline_idx] : NULL);
 #else
 		func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
 #endif

+ 4 - 1
src/drivers/cuda/starpu_cublas.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2012, 2014  Université de Bordeaux
+ * Copyright (C) 2009-2012, 2014, 2017  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -37,6 +37,9 @@ static void shutdown_cublas_func(void *args STARPU_ATTRIBUTE_UNUSED)
 }
 #endif
 
+#ifdef STARPU_DEVEL
+#warning FIXME should actually be done once per driver thread only, otherwise shutdown crashes
+#endif
 void starpu_cublas_init(void)
 {
 #ifdef STARPU_USE_CUDA

+ 14 - 10
src/drivers/driver_common/driver_common.c

@@ -321,6 +321,7 @@ static void _starpu_worker_set_status_wakeup(int workerid)
 }
 
 
+#if !defined(STARPU_SIMGRID)
 static void _starpu_exponential_backoff(struct _starpu_worker *worker)
 {
 	int delay = worker->spinning_backoff;
@@ -331,16 +332,17 @@ static void _starpu_exponential_backoff(struct _starpu_worker *worker)
 	while(delay--)
 		STARPU_UYIELD();
 }
+#endif
 
 
 
 /* Workers may block when there is no work to do at all. */
-struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int workerid, unsigned memnode)
+struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int workerid, unsigned memnode STARPU_ATTRIBUTE_UNUSED)
 {
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
 	struct starpu_task *task;
 	unsigned needed = 1;
-	unsigned executing = 0;
+	unsigned executing STARPU_ATTRIBUTE_UNUSED = 0;
 
 	_starpu_worker_set_status_scheduling(workerid);
 	while(needed)
@@ -449,9 +451,15 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 	}
 #endif
 
-	_starpu_worker_set_status_scheduling_done(workerid);
-
-	_starpu_worker_set_status_wakeup(workerid);
+	if (task)
+	{
+		_starpu_worker_set_status_scheduling_done(workerid);
+		_starpu_worker_set_status_wakeup(workerid);
+	}
+	else
+	{
+		_starpu_worker_set_status_sleeping(workerid);
+	}
 	worker->spinning_backoff = BACKOFF_MIN;
 
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
@@ -469,9 +477,7 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 	struct _starpu_job * j;
 	int is_parallel_task;
 	struct _starpu_combined_worker *combined_worker;
-#ifndef STARPU_NON_BLOCKING_DRIVERS
-	int executing = 0;
-#endif
+	int executing STARPU_ATTRIBUTE_UNUSED = 0;
 	/*for each worker*/
 #ifndef STARPU_NON_BLOCKING_DRIVERS
 	/* This assumes only 1 worker */
@@ -480,12 +486,10 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 #endif
 	for (i = 0; i < nworkers; i++)
 	{
-#ifndef STARPU_NON_BLOCKING_DRIVERS
 		if ((workers[i].pipeline_length == 0 && workers[i].current_task)
 			|| (workers[i].pipeline_length != 0 && workers[i].ntasks))
 			/* At least this worker is executing something */
 			executing = 1;
-#endif
 		/*if the worker is already executing a task then */
 		if((workers[i].pipeline_length == 0 && workers[i].current_task)
 			|| (workers[i].pipeline_length != 0 &&

+ 2 - 2
src/drivers/mic/driver_mic_common.c

@@ -31,7 +31,7 @@ void _starpu_mic_common_report_scif_error(const char *func, const char *file, co
  * care about it.
  */
 
-void _starpu_mic_common_send(const struct _starpu_mp_node *node, void *msg, int len, void * event)
+void _starpu_mic_common_send(const struct _starpu_mp_node *node, void *msg, int len)
 {
   if ((scif_send(node->mp_connection.mic_endpoint, msg, len, SCIF_SEND_BLOCK)) < 0)
 		STARPU_MP_COMMON_REPORT_ERROR(node, errno);
@@ -56,7 +56,7 @@ int _starpu_mic_common_recv_is_ready(const struct _starpu_mp_node *mp_node)
  * care about it.
  */
 
-void _starpu_mic_common_recv(const struct _starpu_mp_node *node, void *msg, int len, void * event)
+void _starpu_mic_common_recv(const struct _starpu_mp_node *node, void *msg, int len)
 {
 	if ((scif_recv(node->mp_connection.mic_endpoint, msg, len, SCIF_RECV_BLOCK)) < 0)
 		STARPU_MP_COMMON_REPORT_ERROR(node, errno);

+ 2 - 2
src/drivers/mic/driver_mic_common.h

@@ -56,9 +56,9 @@ void _starpu_mic_common_report_scif_error(const char *func, const char *file, in
 
 int _starpu_mic_common_recv_is_ready(const struct _starpu_mp_node *mp_node);
 
-void _starpu_mic_common_send(const struct _starpu_mp_node *node, void *msg, int len, void * event);
+void _starpu_mic_common_send(const struct _starpu_mp_node *node, void *msg, int len);
 
-void _starpu_mic_common_recv(const struct _starpu_mp_node *node, void *msg, int len, void * event);
+void _starpu_mic_common_recv(const struct _starpu_mp_node *node, void *msg, int len);
 
 void _starpu_mic_common_dt_send(const struct _starpu_mp_node *node, void *msg, int len, void * event);
 

+ 3 - 3
src/drivers/mic/driver_mic_source.c

@@ -85,7 +85,7 @@ struct _starpu_mp_node *_starpu_mic_src_get_actual_thread_mp_node()
 	return mic_nodes[devid];
 }
 
-const struct _starpu_mp_node *_starpu_mic_src_get_mp_node_from_memory_node(int memory_node)
+struct _starpu_mp_node *_starpu_mic_src_get_mp_node_from_memory_node(int memory_node)
 {
 	int devid = _starpu_memory_node_get_devid(memory_node);
 	STARPU_ASSERT_MSG(devid >= 0 && devid < STARPU_MAXMICDEVS, "bogus devid %d for memory node %d\n", devid, memory_node);
@@ -405,7 +405,7 @@ void _starpu_mic_free_memory(void *addr, size_t size, unsigned memory_node)
  */
 int _starpu_mic_copy_ram_to_mic(void *src, unsigned src_node STARPU_ATTRIBUTE_UNUSED, void *dst, unsigned dst_node, size_t size)
 {
-	const struct _starpu_mp_node *mp_node = _starpu_mic_src_get_mp_node_from_memory_node(dst_node);
+	struct _starpu_mp_node *mp_node = _starpu_mic_src_get_mp_node_from_memory_node(dst_node);
 
 	return _starpu_src_common_copy_host_to_sink_sync(mp_node, src, dst, size);
 }
@@ -415,7 +415,7 @@ int _starpu_mic_copy_ram_to_mic(void *src, unsigned src_node STARPU_ATTRIBUTE_UN
  */
 int _starpu_mic_copy_mic_to_ram(void *src, unsigned src_node, void *dst, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, size_t size)
 {
-	const struct _starpu_mp_node *mp_node = _starpu_mic_src_get_mp_node_from_memory_node(src_node);
+	struct _starpu_mp_node *mp_node = _starpu_mic_src_get_mp_node_from_memory_node(src_node);
 
 	return _starpu_src_common_copy_sink_to_host_sync(mp_node, src, dst, size);
 }

+ 1 - 1
src/drivers/mic/driver_mic_source.h

@@ -43,7 +43,7 @@ struct _starpu_mic_async_event *event;
 	_starpu_mic_src_report_scif_error(__starpu_func__, __FILE__, __LINE__, status)
 
 struct _starpu_mp_node *_starpu_mic_src_get_actual_thread_mp_node();
-const struct _starpu_mp_node *_starpu_mic_src_get_mp_node_from_memory_node(int memory_node);
+struct _starpu_mp_node *_starpu_mic_src_get_mp_node_from_memory_node(int memory_node);
 
 void(* _starpu_mic_src_get_kernel_from_job(const struct _starpu_mp_node *node STARPU_ATTRIBUTE_UNUSED, struct _starpu_job *j))(void);
 int _starpu_mic_src_register_kernel(starpu_mic_func_symbol_t *symbol, const char *func_name);

+ 1 - 13
src/drivers/opencl/driver_opencl.c

@@ -57,8 +57,6 @@ static cl_event task_events[STARPU_MAXOPENCLDEVS][STARPU_MAX_PIPELINE];
 #endif
 #ifdef STARPU_SIMGRID
 static unsigned task_finished[STARPU_MAXOPENCLDEVS][STARPU_MAX_PIPELINE];
-static starpu_pthread_mutex_t task_mutex[STARPU_MAXOPENCLDEVS][STARPU_MAX_PIPELINE];
-static starpu_pthread_cond_t task_cond[STARPU_MAXOPENCLDEVS][STARPU_MAX_PIPELINE];
 #endif /* STARPU_SIMGRID */
 
 void
@@ -153,11 +151,7 @@ int _starpu_opencl_init_context(int devid)
 #ifdef STARPU_SIMGRID
 	int j;
 	for (j = 0; j < STARPU_MAX_PIPELINE; j++)
-	{
 		task_finished[devid][j] = 0;
-		STARPU_PTHREAD_MUTEX_INIT(&task_mutex[devid][j], NULL);
-		STARPU_PTHREAD_COND_INIT(&task_cond[devid][j], NULL);
-	}
 #else /* !STARPU_SIMGRID */
 	cl_int err;
 	cl_uint uint;
@@ -206,11 +200,7 @@ int _starpu_opencl_deinit_context(int devid)
 #ifdef STARPU_SIMGRID
 	int j;
 	for (j = 0; j < STARPU_MAX_PIPELINE; j++)
-	{
 		task_finished[devid][j] = 0;
-		STARPU_PTHREAD_MUTEX_DESTROY(&task_mutex[devid][j]);
-		STARPU_PTHREAD_COND_DESTROY(&task_cond[devid][j]);
-	}
 #else /* !STARPU_SIMGRID */
         cl_int err;
 
@@ -978,9 +968,7 @@ static int _starpu_opencl_start_job(struct _starpu_job *j, struct _starpu_worker
 		}
 		if (simulate)
 			_starpu_simgrid_submit_job(worker->workerid, j, &worker->perf_arch, length,
-						   async ? &task_finished[worker->devid][pipeline_idx] : NULL,
-						   async ? &task_mutex[worker->devid][pipeline_idx] : NULL,
-						   async ? &task_cond[worker->devid][pipeline_idx] : NULL);
+						   async ? &task_finished[worker->devid][pipeline_idx] : NULL);
 #else
 		func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
 #endif

+ 11 - 1
tests/Makefile.am

@@ -111,7 +111,17 @@ LOADER_BIN		=	$(top_builddir)/tests/loader-cross.sh
 endif
 
 if STARPU_USE_MPI_MASTER_SLAVE
-MPI 			= $(MPIEXEC) $(MPIEXEC_ARGS) -np 4
+if STARPU_QUICK_CHECK
+export MPIEXEC_TIMEOUT=60
+else 
+if STARPU_LONG_CHECK
+export MPIEXEC_TIMEOUT=1800
+else
+export MPIEXEC_TIMEOUT=300
+endif
+endif
+
+MPI 			= $(MPIEXEC)  $(MPIEXEC_ARGS) -np 4
 LOADER_BIN2		= $(MPI) $(LOADER_BIN)
 else
 LOADER_BIN2		= $(LOADER_BIN)

+ 1 - 0
tests/datawizard/interfaces/multiformat/advanced/multiformat_data_release.c

@@ -151,6 +151,7 @@ main(int argc, char **argv)
 	conf.ncuda = 1;
 	conf.nopencl = 1;
 	conf.nmic = 1;
+	conf.nmpi_ms = 0;
 	memset(&global_stats, 0, sizeof(global_stats));
 	ret = starpu_initialize(&conf, &argc, &argv);
 	if (ret == -ENODEV || starpu_cpu_worker_get_count() == 0) return STARPU_TEST_SKIPPED;

+ 10 - 0
tests/loader.c

@@ -260,6 +260,16 @@ int main(int argc, char *argv[])
 	if (timeout <= 0)
 		timeout = DEFAULT_TIMEOUT;
 
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+	/* compare values between the 2 values of timeout */
+	if (getenv("MPIEXEC_TIMEOUT"))
+	{
+		int mpiexec_timeout = strtol(getenv("MPIEXEC_TIMEOUT"), NULL, 10);	
+		if (mpiexec_timeout != timeout)
+			fprintf(stderr, "[warning] MPIEXEC_TIMEOUT and STARPU_TIMEOUT_ENV values are different (%d and %d). The behavior may be different than expected !\n", mpiexec_timeout, timeout);
+	}
+#endif
+
 	if (strstr(test_name, "tasks_size_overhead_scheds.sh"))
 		/* This extensively tests various schedulers, let it run longer */
 		timeout *= 10;

+ 5 - 5
tools/cppcheck/suppressions.txt

@@ -44,7 +44,7 @@ redundantAssignment:tests/main/driver_api/init_run_deinit.c
 redundantAssignment:tests/main/driver_api/run_driver.c
 
 uselessAssignmentPtrArg:mpi/src/starpu_mpi.c:171
-unreadVariable:mpi/src/starpu_mpi.c:943
+unreadVariable:mpi/src/starpu_mpi.c:950
 redundantAssignment:src/core/workers.c
 
 invalidPointerCast:src/core/perfmodel/perfmodel_nan.c:74
@@ -57,7 +57,7 @@ unusedStructMember:src/core/perfmodel/perfmodel_bus.c:65
 unusedStructMember:src/core/perfmodel/perfmodel_bus.c:66
 unusedStructMember:src/core/simgrid.c:225
 unusedStructMember:src/core/simgrid.c:226
-wrongPrintfScanfArgNum:src/core/simgrid.c:731
+wrongPrintfScanfArgNum:src/core/simgrid.c:862
 duplicateExpression:src/util/starpu_task_insert.c:52
 
 // TODO: this could be an error?
@@ -65,10 +65,10 @@ redundantCopy:src/core/disk_ops/disk_leveldb.cpp:194
 
 nullPointerRedundantCheck:src/common/rbtree.c
 unreadVariable:src/datawizard/interfaces/*
-unreadVariable:src/drivers/driver_common/driver_common.c:487
-clarifyCondition:src/drivers/opencl/driver_opencl.c:955
+unreadVariable:src/drivers/driver_common/driver_common.c:492
+clarifyCondition:src/drivers/opencl/driver_opencl.c:945
 unreadVariable:src/drivers/opencl/driver_opencl.c:767
-clarifyCondition:src/drivers/cuda/driver_cuda.c:507
+clarifyCondition:src/drivers/cuda/driver_cuda.c:498
 arithOperationsOnVoidPointer:src/drivers/scc/*
 nullPointerRedundantCheck:src/sched_policies/deque_modeling_policy_data_aware.c:198
 sizeofDereferencedVoidPointer:src/util/fstarpu.c