Преглед на файлове

New StarPU-MPI initialization function (starpu_mpi_init_with_driver)
which allows the user to give a StarPU driver to the StarPU-MPI
communication layer, so that the MPI progression thread started by
StarPU-MPI can also execute tasks and not only take care of
communications.

(originally cherry picked from commit 591b28d37423fa7e76647d32e5c364692da08091)

Nathalie Furmento преди 8 години
родител
ревизия
fc900ba4ac

+ 5 - 0
ChangeLog

@@ -41,6 +41,11 @@ New features:
     more, StarPU will make the appropriate calls as needed.
   * Add starpu_task_notify_ready_soon_register to be notified when it is
     determined when a task will be ready an estimated amount of time from now.
+  * New StarPU-MPI initialization function (starpu_mpi_init_with_driver)
+    which allows the user to give a StarPU driver to the StarPU-MPI
+    communication layer, so that the MPI progression thread started by
+    StarPU-MPI can also execute tasks and not only take care of
+    communications.
 
 Small features:
   * Scheduling contexts may now be associated a user data pointer at creation

+ 41 - 1
doc/doxygen/chapters/410_mpi_support.doxy

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2010-2018                                CNRS
- * Copyright (C) 2011-2013,2017                           Inria
+ * Copyright (C) 2011-2013,2016,2017                      Inria
  * Copyright (C) 2009-2011,2013-2018                      Université de Bordeaux
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -846,6 +846,46 @@ starpu_mpi_gather_detached(data_handles, nblocks, 0, MPI_COMM_WORLD);
 Other collective operations would be easy to define, just ask starpu-devel for
 them!
 
+\section MPIDriver Make StarPU-MPI progression thread execute tasks
+
+The default behaviour of StarPU-MPI is to spawn an MPI thread to take care only
+of MPI communications in an active fashion (i.e the StarPU-MPI thread sleeps
+only when there is no active request submitted by the application), with the
+goal of being as reactive as possible to communications. Knowing that, users
+usually leave one free core for the MPI thread when starting a distributed
+execution with StarPU-MPI.  However, this could result in a loss of performance
+for applications that does not require an extreme reactivity to MPI
+communications.
+
+The starpu_mpi_init_with_driver() routine allows the user to give the
+starpu_conf configuration structure of StarPU (usually given to the
+starpu_init() routine) to StarPU-MPI, so that StarPU-MPI reserves for its own
+use one of the CPU drivers of the current computing node, and then calls
+starpu_init() internally.
+
+This allows the MPI communication thread to call a StarPU CPU driver to run
+tasks when there is no active requests to take care of, and thus recover the
+computational power of the "lost" core. Since there is a trade-off between
+executing tasks and polling MPI requests, which is how much the application
+wants to lose in reactivity to MPI communications to get back the computing
+power of the core dedicated to the StarPU-MPI thread, there are two environment
+variables to pilot the behaviour of the MPI thread so that users can tune
+this trade-off depending of the behaviour of the application.
+
+The \ref STARPU_MPI_DRIVER_CALL_FREQUENCY environment variable sets how many times
+the MPI progression thread goes through the MPI_Test() loop on each active communication request
+(and thus try to make communications progress by going into the MPI layer)
+before executing tasks. The default value for this environment variable is 0,
+which means that the support for interleaving task execution and communication
+polling is deactivated, thus returning the MPI progression thread to its
+original behaviour.
+
+The \ref STARPU_MPI_DRIVER_TASK_FREQUENCY environment variable sets how many tasks
+are executed by the MPI communication thread before checking all active
+requests again. While this environment variable allows a better use of the core
+dedicated to StarPU-MPI for computations, it also decreases the reactivity of
+the MPI communication thread as much.
+
 \section MPIDebug Debugging MPI
 
 Communication trace will be enabled when the environment variable

+ 24 - 0
doc/doxygen/chapters/501_environment_variables.doxy

@@ -615,6 +615,30 @@ of one of the nodes of a big cluster without actually running the rest.
 It of course does not provide computation results and timing.
 </dd>
 
+<dt>STARPU_MPI_DRIVER_CALL_FREQUENCY</dt>
+<dd>
+\anchor STARPU_MPI_DRIVER_CALL_FREQUENCY
+\addindex __env__STARPU_MPI_DRIVER_CALL_FREQUENCY
+When set to a positive value, activates the interleaving of the execution of
+tasks with the progression of MPI communications (\ref MPISupport). The
+starpu_mpi_init_with_driver() function must have been called by the application
+for that environment variable to be used. When set to 0, the MPI progression
+thread does not use at all the driver given by the user, and only focuses on
+making MPI communications progress.
+</dd>
+
+<dt>STARPU_MPI_DRIVER_TASK_FREQUENCY</dt>
+<dd>
+\anchor STARPU_MPI_DRIVER_TASK_FREQUENCY
+\addindex __env__STARPU_MPI_DRIVER_TASK_FREQUENCY
+When set to a positive value, the interleaving of the execution of tasks with
+the progression of MPI communications mechanism to execute several tasks before
+checking communication requests again (\ref MPISupport). The
+starpu_mpi_init_with_driver() function must have been called by the application
+for that environment variable to be used, and the
+STARPU_MPI_DRIVER_CALL_FREQUENCY environment variable set to a positive value.
+</dd>
+
 <dt>STARPU_SIMGRID_CUDA_MALLOC_COST</dt>
 <dd>
 \anchor STARPU_SIMGRID_CUDA_MALLOC_COST

+ 9 - 2
doc/doxygen/chapters/api/mpi.doxy

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2017                                CNRS
- * Copyright (C) 2011-2012,2017                           Inria
+ * Copyright (C) 2010-2018                                CNRS
+ * Copyright (C) 2011-2012,2016,2017                      Inria
  * Copyright (C) 2009-2011,2014-2018                      Université de Bordeaux
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -38,6 +38,13 @@ starpu_init() must be called before starpu_mpi_init_comm().
 \ingroup API_MPI_Support
 Call starpu_mpi_init_comm() with the MPI communicator \c MPI_COMM_WORLD.
 
+\fn int starpu_mpi_init_with_driver(int *argc, char ***argv, int initialize_mpi, struct starpu_conf *conf)
+\ingroup API_MPI_Support
+Call starpu_mpi_init_comm() with the MPI communicator MPI_COMM_WORLD,
+and keeps the CPU driver 0 for the MPI thread. This driver will be run
+and so execute tasks some times when the MPI thread has no requests to
+handle. starpu_mpi_init_with_driver() also calls starpu_init() internally.
+
 \fn int starpu_mpi_initialize(void)
 \deprecated
 \ingroup API_MPI_Support

+ 1 - 0
mpi/include/starpu_mpi.h

@@ -56,6 +56,7 @@ int starpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_h
 
 int starpu_mpi_init_comm(int *argc, char ***argv, int initialize_mpi, MPI_Comm comm);
 int starpu_mpi_init(int *argc, char ***argv, int initialize_mpi);
+int starpu_mpi_init_with_driver(int *argc, char ***argv, int initialize_mpi, struct starpu_conf *conf);
 int starpu_mpi_initialize(void) STARPU_DEPRECATED;
 int starpu_mpi_initialize_extended(int *rank, int *world_size) STARPU_DEPRECATED;
 int starpu_mpi_shutdown(void);

+ 2 - 1
mpi/src/Makefile.am

@@ -1,7 +1,7 @@
 # StarPU --- Runtime system for heterogeneous multicore architectures.
 #
 # Copyright (C) 2012                                     Inria
-# Copyright (C) 2010-2017                                CNRS
+# Copyright (C) 2010-2018                                CNRS
 # Copyright (C) 2009-2014, 2018                                Université de Bordeaux
 #
 # StarPU is free software; you can redistribute it and/or modify
@@ -72,6 +72,7 @@ noinst_HEADERS =					\
 	mpi/starpu_mpi_sync_data.h			\
 	mpi/starpu_mpi_comm.h				\
 	mpi/starpu_mpi_tag.h				\
+	mpi/starpu_mpi_driver.h				\
 	load_balancer/policy/data_movements_interface.h	\
 	load_balancer/policy/load_data_interface.h	\
 	load_balancer/policy/load_balancer_policy.h

+ 37 - 0
mpi/src/mpi/starpu_mpi_driver.h

@@ -0,0 +1,37 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015-2018                                CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_MPI_DRIVER_H__
+#define __STARPU_MPI_DRIVER_H__
+
+#include <starpu.h>
+
+#ifdef STARPU_USE_MPI_MPI
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+void _starpu_mpi_driver_init(struct starpu_conf *conf);
+void _starpu_mpi_driver_shutdown();
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // STARPU_USE_MPI_MPI
+#endif // __STARPU_MPI_DRIVER_H__

+ 75 - 1
mpi/src/mpi/starpu_mpi_mpi.c

@@ -3,7 +3,7 @@
  * Copyright (C) 2012-2013,2016-2017                      Inria
  * Copyright (C) 2009-2018                                Université de Bordeaux
  * Copyright (C) 2017                                     Guillaume Beauchamp
- * Copyright (C) 2010-2017                                CNRS
+ * Copyright (C) 2010-2018                                CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -78,6 +78,12 @@ static starpu_pthread_t progress_thread;
 #endif
 static int running = 0;
 
+/* Driver taken by StarPU-MPI to process tasks when there is no requests to
+ * handle instead of polling endlessly */
+static struct starpu_driver *mpi_driver = NULL;
+static int mpi_driver_call_freq = 0;
+static int mpi_driver_task_freq = 0;
+
 #ifdef STARPU_SIMGRID
 static int wait_counter;
 static starpu_pthread_cond_t wait_counter_cond;
@@ -962,6 +968,7 @@ static void _starpu_mpi_test_detached_requests(void)
 		}
 		else
 		{
+			_STARPU_MPI_TRACE_POLLING_END();
 		     	struct _starpu_mpi_req *next_req;
 			next_req = _starpu_mpi_req_list_next(req);
 
@@ -993,6 +1000,7 @@ static void _starpu_mpi_test_detached_requests(void)
 			}
 
 			req = next_req;
+			_STARPU_MPI_TRACE_POLLING_BEGIN();
 		}
 
 		STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
@@ -1149,6 +1157,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	_starpu_mpi_sync_data_init();
 	_starpu_mpi_datatype_init();
 
+	if (mpi_driver)
+		starpu_driver_init(mpi_driver);
+
 #ifdef STARPU_SIMGRID
 	starpu_pthread_wait_init(&wait);
 	starpu_pthread_queue_init(&dontsleep);
@@ -1172,6 +1183,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 
  	int envelope_request_submitted = 0;
+	int mpi_driver_loop_counter = 0;
+	int mpi_driver_task_counter = 0;
+	_STARPU_MPI_TRACE_POLLING_BEGIN();
 
 	while (running || posted_requests || !(_starpu_mpi_req_list_empty(&ready_recv_requests)) || !(_starpu_mpi_req_prio_list_empty(&ready_send_requests)) || !(_starpu_mpi_req_list_empty(&detached_requests)))// || !(_starpu_mpi_early_request_count()) || !(_starpu_mpi_sync_data_count()))
 	{
@@ -1198,6 +1212,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		unsigned n = 0;
 		while (!_starpu_mpi_req_list_empty(&ready_recv_requests))
 		{
+			_STARPU_MPI_TRACE_POLLING_END();
 			struct _starpu_mpi_req *req;
 
 			if (n++ == nready_process)
@@ -1238,6 +1253,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 			STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 		}
 
+		_STARPU_MPI_TRACE_POLLING_BEGIN();
+
 		/* If there is no currently submitted envelope_request submitted to
                  * catch envelopes from senders, and there is some pending
                  * receive requests on our side, we resubmit a header request. */
@@ -1264,6 +1281,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 			if (flag)
 			{
+				_STARPU_MPI_TRACE_POLLING_END();
 				_STARPU_MPI_COMM_FROM_DEBUG(envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, envelope_status.MPI_SOURCE, _STARPU_MPI_TAG_ENVELOPE, envelope->data_tag, envelope_comm);
 				_STARPU_MPI_DEBUG(4, "Envelope received with mode %d\n", envelope->mode);
 				if (envelope->mode == _STARPU_MPI_ENVELOPE_SYNC_READY)
@@ -1355,9 +1373,34 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 					}
 				}
 				envelope_request_submitted = 0;
+				_STARPU_MPI_TRACE_POLLING_BEGIN();
 			}
 			else
 			{
+				/* A call is made to driver_run_once only when
+				 * the progression thread have gone through the
+				 * communication progression loop
+				 * mpi_driver_call_freq times. It is
+				 * interesting to tune the
+				 * STARPU_MPI_DRIVER_CALL_FREQUENCY
+				 * depending on whether the user wants
+				 * reactivity or computing power from the MPI
+				 * progression thread. */
+				if ( mpi_driver && ( ++mpi_driver_loop_counter == mpi_driver_call_freq ))
+				{
+					mpi_driver_loop_counter = 0;
+					mpi_driver_task_counter = 0;
+					while (mpi_driver_task_counter++ < mpi_driver_task_freq)
+					{
+						_STARPU_MPI_TRACE_DRIVER_RUN_BEGIN();
+						STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
+						fprintf(stderr, "running once mpi driver\n");
+						starpu_driver_run_once(mpi_driver);
+						STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
+						_STARPU_MPI_TRACE_DRIVER_RUN_END();
+					}
+				}
+
 				//_STARPU_MPI_DEBUG(4, "Nothing received, continue ..\n");
 			}
 		}
@@ -1368,6 +1411,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 #endif
 	}
 
+	_STARPU_MPI_TRACE_POLLING_END();
 	if (envelope_request_submitted)
 	{
 		_starpu_mpi_comm_cancel_recv();
@@ -1539,4 +1583,34 @@ int starpu_mpi_comm_get_attr(MPI_Comm comm, int keyval, void *attribute_val, int
 	return 0;
 }
 
+void _starpu_mpi_driver_init(struct starpu_conf *conf)
+{
+	/* We only initialize the driver if the environment variable
+	 * STARPU_MPI_DRIVER_CALL_FREQUENCY is defined by the user. If this environment
+	 * variable is not defined or defined at a value lower than or equal to zero,
+	 * StarPU-MPI will not use a driver. */
+	char *driver_env = starpu_getenv("STARPU_MPI_DRIVER_CALL_FREQUENCY");
+	if (driver_env && atoi(driver_env) > 0)
+	{
+		mpi_driver_call_freq = atoi(driver_env);
+
+		_STARPU_MALLOC(mpi_driver, sizeof(struct starpu_driver));
+		mpi_driver->type = STARPU_CPU_WORKER;
+		mpi_driver->id.cpu_id = 0;
+
+		conf->not_launched_drivers = mpi_driver;
+		conf->n_not_launched_drivers = 1;
+
+		char *tasks_freq_env = starpu_getenv("STARPU_MPI_DRIVER_TASK_FREQUENCY");
+		if (tasks_freq_env)
+			mpi_driver_task_freq = atoi(tasks_freq_env);
+	}
+}
+
+void _starpu_mpi_driver_shutdown()
+{
+	if (mpi_driver)
+		starpu_driver_deinit(mpi_driver);
+}
+
 #endif /* STARPU_USE_MPI_MPI */

+ 40 - 12
mpi/src/starpu_mpi_fxt.h

@@ -1,8 +1,8 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012-2013                                Inria
+ * Copyright (C) 2012-2013,2016                           Inria
  * Copyright (C) 2010-2011,2014,2017                      Université de Bordeaux
- * Copyright (C) 2010,2012,2014-2017                      CNRS
+ * Copyright (C) 2010,2012,2014-2018                      CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -37,16 +37,6 @@ extern "C"
 #define _STARPU_MPI_FUT_IRECV_SUBMIT_END		0x5207
 #define _STARPU_MPI_FUT_ISEND_COMPLETE_BEGIN		0x5208
 #define _STARPU_MPI_FUT_ISEND_COMPLETE_END		0x5209
-#define _STARPU_MPI_FUT_IRECV_COMPLETE_BEGIN		0x5210
-#define _STARPU_MPI_FUT_IRECV_COMPLETE_END		0x5211
-#define _STARPU_MPI_FUT_SLEEP_BEGIN			0x5212
-#define _STARPU_MPI_FUT_SLEEP_END			0x5213
-#define _STARPU_MPI_FUT_DTESTING_BEGIN			0x5214
-#define _STARPU_MPI_FUT_DTESTING_END			0x5215
-#define _STARPU_MPI_FUT_UTESTING_BEGIN			0x5216
-#define _STARPU_MPI_FUT_UTESTING_END			0x5217
-#define _STARPU_MPI_FUT_UWAIT_BEGIN			0x5218
-#define _STARPU_MPI_FUT_UWAIT_END			0x5219
 #define _STARPU_MPI_FUT_DATA_SET_RANK			0x521a
 #define _STARPU_MPI_FUT_IRECV_TERMINATED		0x521b
 #define _STARPU_MPI_FUT_ISEND_TERMINATED		0x521c
@@ -54,8 +44,24 @@ extern "C"
 #define _STARPU_MPI_FUT_TESTING_DETACHED_END		0x521e
 #define _STARPU_MPI_FUT_TEST_BEGIN			0x521f
 #define _STARPU_MPI_FUT_TEST_END			0x5220
+#define _STARPU_MPI_FUT_IRECV_COMPLETE_BEGIN		0x520a
+#define _STARPU_MPI_FUT_IRECV_COMPLETE_END		0x520b
+#define _STARPU_MPI_FUT_SLEEP_BEGIN			0x520c
+#define _STARPU_MPI_FUT_SLEEP_END			0x520d
+#define _STARPU_MPI_FUT_DTESTING_BEGIN			0x520e
+#define _STARPU_MPI_FUT_DTESTING_END			0x520f
+#define _STARPU_MPI_FUT_UTESTING_BEGIN			0x5210
+#define _STARPU_MPI_FUT_UTESTING_END			0x5211
+#define _STARPU_MPI_FUT_UWAIT_BEGIN			0x5212
+#define _STARPU_MPI_FUT_UWAIT_END			0x5213
+#define _STARPU_MPI_FUT_POLLING_BEGIN			0x5214
+#define _STARPU_MPI_FUT_POLLING_END			0x5215
+#define _STARPU_MPI_FUT_DRIVER_RUN_BEGIN		0x5216
+#define _STARPU_MPI_FUT_DRIVER_RUN_END			0x5217
 
 #ifdef STARPU_USE_FXT
+static int trace_loop = 0;
+
 #define _STARPU_MPI_TRACE_START(rank, worldsize)	\
 	FUT_DO_PROBE3(_STARPU_MPI_FUT_START, (rank), (worldsize), _starpu_gettid());
 #define _STARPU_MPI_TRACE_STOP(rank, worldsize)	\
@@ -119,6 +125,24 @@ extern "C"
 #define _STARPU_MPI_TRACE_TEST_BEGIN(peer, data_tag)		do {} while(0)
 #define _STARPU_MPI_TRACE_TEST_END(peer, data_tag)		do {} while(0)
 #endif
+#define _STARPU_MPI_TRACE_POLLING_BEGIN()					\
+	if(!trace_loop) {						\
+		trace_loop = 1;							\
+		FUT_DO_PROBE1(_STARPU_MPI_FUT_POLLING_BEGIN, _starpu_gettid()); \
+	}
+#define _STARPU_MPI_TRACE_POLLING_END()	\
+	if(trace_loop) {							\
+		trace_loop = 0;							\
+		FUT_DO_PROBE1(_STARPU_MPI_FUT_POLLING_END, _starpu_gettid());	\
+	}
+#define _STARPU_MPI_TRACE_DRIVER_RUN_BEGIN()	\
+	FUT_DO_PROBE1(_STARPU_MPI_FUT_DRIVER_RUN_BEGIN,  _starpu_gettid());
+#define _STARPU_MPI_TRACE_DRIVER_RUN_END()	\
+	FUT_DO_PROBE1(_STARPU_MPI_FUT_DRIVER_RUN_END, _starpu_gettid());
+#define _STARPU_MPI_TRACE_DRIVER_RUN_BEGIN()	\
+	FUT_DO_PROBE1(_STARPU_MPI_FUT_DRIVER_RUN_BEGIN,  _starpu_gettid());
+#define _STARPU_MPI_TRACE_DRIVER_RUN_END()	\
+	FUT_DO_PROBE1(_STARPU_MPI_FUT_DRIVER_RUN_END, _starpu_gettid());
 #define TRACE
 #else
 #define _STARPU_MPI_TRACE_START(a, b)				do {} while(0);
@@ -148,6 +172,10 @@ extern "C"
 #define _STARPU_MPI_TRACE_TESTING_DETACHED_END()		do {} while(0)
 #define _STARPU_MPI_TRACE_TEST_BEGIN(peer, data_tag)		do {} while(0)
 #define _STARPU_MPI_TRACE_TEST_END(peer, data_tag)		do {} while(0)
+#define _STARPU_MPI_TRACE_POLLING_BEGIN()			do {} while(0);
+#define _STARPU_MPI_TRACE_POLLING_END()				do {} while(0);
+#define _STARPU_MPI_TRACE_DRIVER_RUN_BEGIN()			do {} while(0);
+#define _STARPU_MPI_TRACE_DRIVER_RUN_END()			do {} while(0);
 #endif
 
 #ifdef __cplusplus

+ 15 - 1
mpi/src/starpu_mpi_init.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2017                                CNRS
+ * Copyright (C) 2010-2018                                CNRS
  * Copyright (C) 2009-2018                                Université de Bordeaux
  * Copyright (C) 2016                                     Inria
  *
@@ -35,6 +35,7 @@
 #if defined(STARPU_USE_MPI_MPI)
 #include <mpi/starpu_mpi_comm.h>
 #include <mpi/starpu_mpi_tag.h>
+#include <mpi/starpu_mpi_driver.h>
 #endif
 
 #ifdef STARPU_SIMGRID
@@ -177,6 +178,18 @@ int starpu_mpi_initialize_extended(int *rank, int *world_size)
 #endif
 }
 
+int starpu_mpi_init_with_driver(int *argc, char ***argv, int initialize_mpi, struct starpu_conf *conf)
+{
+#if defined(STARPU_USE_MPI_MPI)
+	_starpu_mpi_driver_init(conf);
+#endif
+
+	int ret = starpu_init(conf);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	return starpu_mpi_init(argc, argv, initialize_mpi);
+}
+
 int starpu_mpi_shutdown(void)
 {
 	void *value;
@@ -197,6 +210,7 @@ int starpu_mpi_shutdown(void)
 #if defined(STARPU_USE_MPI_MPI)
 	_starpu_mpi_tag_shutdown();
 	_starpu_mpi_comm_shutdown();
+	_starpu_mpi_driver_shutdown();
 #endif
 
 	return 0;

+ 5 - 3
mpi/tests/Makefile.am

@@ -1,6 +1,6 @@
 # StarPU --- Runtime system for heterogeneous multicore architectures.
 #
-# Copyright (C) 2010-2017                                CNRS
+# Copyright (C) 2010-2018                                CNRS
 # Copyright (C) 2009-2018                                Université de Bordeaux
 # Copyright (C) 2013                                     Thibaut Lambert
 #
@@ -127,7 +127,8 @@ starpu_mpi_TESTS +=				\
 	policy_selection			\
 	policy_selection2			\
 	ring_async_implicit			\
-	temporary
+	temporary				\
+	driver
 
 if !STARPU_SIMGRID
 starpu_mpi_TESTS +=				\
@@ -228,7 +229,8 @@ noinst_PROGRAMS =				\
 	policy_selection2			\
 	early_request				\
 	starpu_redefine				\
-	load_balancer
+	load_balancer				\
+	driver
 
 XFAIL_TESTS=					\
 	policy_register_toomany			\

+ 135 - 0
mpi/tests/driver.c

@@ -0,0 +1,135 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2017, 2018 CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu_mpi.h>
+#include <math.h>
+#include "helper.h"
+
+int main(int argc, char **argv)
+{
+	struct starpu_conf conf;
+	int ret, rank, size, i;
+	starpu_data_handle_t tab_handle[4];
+	int values[4];
+	starpu_mpi_req request[2] = {NULL, NULL};
+	int mpi_init;
+
+	setenv("STARPU_MPI_DRIVER_CALL_FREQUENCY", "1", 1);
+	setenv("STARPU_MPI_DRIVER_TASK_FREQUENCY", "10", 1);
+
+	MPI_INIT_THREAD(&argc, &argv, MPI_THREAD_SERIALIZED, &mpi_init);
+
+	starpu_conf_init(&conf);
+	ret = starpu_mpi_init_with_driver(&argc, &argv, mpi_init, &conf);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init_with_driver");
+
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
+
+	if (size%2 != 0)
+	{
+		FPRINTF_MPI(stderr, "We need a even number of processes.\n");
+		starpu_mpi_shutdown();
+		starpu_shutdown();
+		if (!mpi_init)
+			MPI_Finalize();
+		return STARPU_TEST_SKIPPED;
+	}
+
+	for(i=0 ; i<4 ; i++)
+	{
+		if (i<3 || rank%2)
+		{
+			// all data are registered on all nodes, but the 4th data which is not registered on the receiving node
+			values[i] = (rank+1) * (i+1);
+			starpu_variable_data_register(&tab_handle[i], STARPU_MAIN_RAM, (uintptr_t)&values[i], sizeof(values[i]));
+			starpu_mpi_data_register(tab_handle[i], i, rank);
+		}
+	}
+
+	int other_rank = rank%2 == 0 ? rank+1 : rank-1;
+
+	FPRINTF_MPI(stderr, "rank %d exchanging with rank %d\n", rank, other_rank);
+
+	if (rank%2)
+	{
+		FPRINTF_MPI(stderr, "Sending values %d and %d to node %d\n", values[0], values[3], other_rank);
+		// this data will be received as an early registered data
+		starpu_mpi_isend(tab_handle[0], &request[0], other_rank, 0, MPI_COMM_WORLD);
+		// this data will be received as an early UNregistered data
+		starpu_mpi_isend(tab_handle[3], &request[1], other_rank, 3, MPI_COMM_WORLD);
+
+		starpu_mpi_send(tab_handle[1], other_rank, 1, MPI_COMM_WORLD);
+		starpu_mpi_recv(tab_handle[2], other_rank, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
+	}
+	else
+	{
+		starpu_mpi_recv(tab_handle[1], other_rank, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
+		starpu_mpi_send(tab_handle[2], other_rank, 2, MPI_COMM_WORLD);
+
+		// we register the data
+		starpu_variable_data_register(&tab_handle[3], -1, (uintptr_t)NULL, sizeof(int));
+		starpu_mpi_data_register(tab_handle[3], 3, rank);
+		starpu_mpi_irecv(tab_handle[3], &request[1], other_rank, 3, MPI_COMM_WORLD);
+		starpu_mpi_irecv(tab_handle[0], &request[0], other_rank, 0, MPI_COMM_WORLD);
+	}
+
+	int finished=0;
+	while (!finished)
+	{
+		for(i=0 ; i<2 ; i++)
+		{
+			if (request[i])
+			{
+				int flag;
+				MPI_Status status;
+				starpu_mpi_test(&request[i], &flag, &status);
+				if (flag)
+					FPRINTF_MPI(stderr, "request[%d] = %d %p\n", i, flag, request[i]);
+			}
+		}
+		finished = request[0] == NULL && request[1] == NULL;
+	}
+
+	if (rank%2 == 0)
+	{
+		void *ptr0;
+		void *ptr3;
+
+		starpu_data_acquire(tab_handle[0], STARPU_RW);
+		ptr0 = starpu_data_get_local_ptr(tab_handle[0]);
+		starpu_data_release(tab_handle[0]);
+
+		starpu_data_acquire(tab_handle[3], STARPU_RW);
+		ptr3 = starpu_data_get_local_ptr(tab_handle[3]);
+		starpu_data_release(tab_handle[3]);
+
+		ret = (*((int *)ptr0) == (other_rank+1)*1) && (*((int *)ptr3) == (other_rank+1)*4);
+		ret = !ret;
+		FPRINTF_MPI(stderr, "[%s] Received values %d and %d from node %d\n", ret?"FAILURE":"SUCCESS", *((int *)ptr0), *((int *)ptr3), other_rank);
+	}
+
+	for(i=0 ; i<4 ; i++)
+		starpu_data_unregister(tab_handle[i]);
+
+	starpu_mpi_shutdown();
+	starpu_shutdown();
+
+	if (!mpi_init)
+		MPI_Finalize();
+
+	return 0;
+}

+ 49 - 2
src/debug/traces/starpu_fxt.c

@@ -2928,9 +2928,9 @@ static void handle_mpi_sleep_end(struct fxt_ev_64 *ev, struct starpu_fxt_options
 	double date = get_event_time_stamp(ev, options);
 
 	if (out_paje_file)
-		mpicommthread_set_state(date, options->file_prefix, "P");
+		mpicommthread_set_state(date, options->file_prefix, "Pl");
 	if (trace_file)
-		recfmt_mpicommthread_set_state(date, "P");
+		recfmt_mpicommthread_set_state(date, "Pl");
 }
 
 static void handle_mpi_dtesting_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
@@ -3033,6 +3033,38 @@ static void handle_mpi_test_end(struct fxt_ev_64 *ev, struct starpu_fxt_options
 		recfmt_mpicommthread_pop_state(date);
 }
 
+static void handle_mpi_polling_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "Pl");
+}
+
+static void handle_mpi_polling_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "P");
+}
+
+static void handle_mpi_driver_run_begin(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "Dr");
+}
+
+static void handle_mpi_driver_run_end(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
+{
+	double date = get_event_time_stamp(ev, options);
+
+	if (out_paje_file)
+		mpicommthread_set_state(date, options->file_prefix, "Pl");
+}
+
 static void handle_set_profiling(struct fxt_ev_64 *ev, struct starpu_fxt_options *options)
 {
 	int status = ev->param[0];
@@ -3666,6 +3698,21 @@ void _starpu_fxt_parse_new_file(char *filename_in, struct starpu_fxt_options *op
 
 			case _STARPU_MPI_FUT_TEST_END:
 				handle_mpi_test_end(&ev, options);
+
+			case _STARPU_MPI_FUT_POLLING_BEGIN:
+				handle_mpi_polling_begin(&ev, options);
+				break;
+
+			case _STARPU_MPI_FUT_POLLING_END:
+				handle_mpi_polling_end(&ev, options);
+				break;
+
+			case _STARPU_MPI_FUT_DRIVER_RUN_BEGIN:
+				handle_mpi_driver_run_begin(&ev, options);
+				break;
+
+			case _STARPU_MPI_FUT_DRIVER_RUN_END:
+				handle_mpi_driver_run_end(&ev, options);
 				break;
 
 			case _STARPU_FUT_SET_PROFILING:

+ 5 - 1
src/debug/traces/starpu_paje.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2011-2014,2016-2017                      Inria
- * Copyright (C) 2012,2014,2017                           CNRS
+ * Copyright (C) 2012,2014,2017,2018                      CNRS
  * Copyright (C) 2017                                     Universidade Federal do Rio Grande do Sul (UFRGS)
  * Copyright (C) 2010-2017                                Université de Bordeaux
  *
@@ -277,6 +277,8 @@ void _starpu_fxt_write_paje_header(FILE *file STARPU_ATTRIBUTE_UNUSED)
 	poti_DefineVariableType("bwo_mpi", "MPICt", "Bandwidth Out (MB/s)", "0 0 0");
 	poti_DefineStateType("CtS", "MPICt", "Communication Thread State");
 	poti_DefineEntityValue("P", "CtS", "Processing", "0 0 0");
+	poti_DefineEntityValue("Pl", "CtS", "Polling", "1.0 .5 0");
+	poti_DefineEntityValue("Dr", "CtS", "DriverRun", ".1 .1 1.0");
 	poti_DefineEntityValue("Sl", "CtS", "Sleeping", ".9 .1 .0");
 	poti_DefineEntityValue("UT", "CtS", "UserTesting", ".2 .1 .6");
 	poti_DefineEntityValue("UW", "CtS", "UserWaiting", ".4 .1 .3");
@@ -426,6 +428,8 @@ void _starpu_fxt_write_paje_header(FILE *file STARPU_ATTRIBUTE_UNUSED)
 ");
 	fprintf(file, "\
 6       P       CtS       Processing         \"0 0 0\"		\n\
+6       Pl       CtS      Polling	   \"1.0 .5 0\"		\n\
+6       Dr       CtS      DriverRun	   \".1 .1 1.0\"	\n\
 6       Sl       CtS      Sleeping         \".9 .1 .0\"		\n\
 6       UT       CtS      UserTesting        \".2 .1 .6\"	\n\
 6       UW       CtS      UserWaiting        \".4 .1 .3\"	\n\