Selaa lähdekoodia

add asynchronous MPI master-slave transfers

Corentin Salingue 8 vuotta sitten
vanhempi
commit
28d9ec43f7

+ 13 - 0
configure.ac

@@ -1008,6 +1008,19 @@ if test x$disable_asynchronous_mic_copy = xyes ; then
    AC_DEFINE([STARPU_DISABLE_ASYNCHRONOUS_MIC_COPY], [1], [Define to 1 to disable asynchronous copy between CPU and MIC devices])
 fi
 
+AC_MSG_CHECKING(whether asynchronous MPI Master Slave copy should be disabled)
+AC_ARG_ENABLE(asynchronous-mpi-master-slave-copy, [AS_HELP_STRING([--disable-asynchronous-mpi-master-slave-copy],
+			[disable asynchronous copy between MPI Master and MPI Slave devices])],
+			enable_asynchronous_mpi_master_slave_copy=$enableval, enable_asynchronous_mpi_master_slave_copy=yes)
+disable_asynchronous_mpi_master_slave_copy=no
+if test x$enable_asynchronous_mpi_master_slave_copy = xno ; then
+   disable_asynchronous_mpi_master_slave_copy=yes
+fi
+AC_MSG_RESULT($disable_asynchronous_mpi_master_slave_copy)
+if test x$disable_asynchronous_mpi_master_slave_copy = xyes ; then
+   AC_DEFINE([STARPU_DISABLE_ASYNCHRONOUS_MPI_MS_COPY], [1], [Define to 1 to disable asynchronous copy between MPI Master and MPI Slave devices])
+fi
+
 ###############################################################################
 #                                                                             #
 #                                 Drivers                                     #

+ 2 - 0
include/starpu.h

@@ -121,6 +121,7 @@ struct starpu_conf
 	int disable_asynchronous_cuda_copy;
 	int disable_asynchronous_opencl_copy;
 	int disable_asynchronous_mic_copy;
+	int disable_asynchronous_mpi_ms_copy;
 
 	unsigned *cuda_opengl_interoperability;
 	unsigned n_cuda_opengl_interoperability;
@@ -150,6 +151,7 @@ int starpu_asynchronous_copy_disabled(void);
 int starpu_asynchronous_cuda_copy_disabled(void);
 int starpu_asynchronous_opencl_copy_disabled(void);
 int starpu_asynchronous_mic_copy_disabled(void);
+int starpu_asynchronous_mpi_ms_copy_disabled(void);
 
 void starpu_display_stats();
 

+ 8 - 0
include/starpu_data_interfaces.h

@@ -59,6 +59,10 @@ struct starpu_data_copy_methods
 	int (*scc_sink_to_src)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node);
 	int (*scc_sink_to_sink)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node);
 
+	int (*ram_to_mpi_ms)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node);
+	int (*mpi_ms_to_ram)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node);
+	int (*mpi_ms_to_mpi_ms)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node);
+
 #ifdef STARPU_USE_CUDA
 	int (*ram_to_cuda_async)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node, starpu_cudaStream_t stream);
 	int (*cuda_to_ram_async)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node, starpu_cudaStream_t stream);
@@ -79,6 +83,10 @@ struct starpu_data_copy_methods
 	int (*opencl_to_opencl_async)();
 #endif
 
+	int (*ram_to_mpi_ms_async)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node, void * event);
+	int (*mpi_ms_to_ram_async)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node, void * event);
+	int (*mpi_ms_to_mpi_ms_async)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node, void * event);
+
 	int (*ram_to_mic_async)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node);
 	int (*mic_to_ram_async)(void *src_interface, unsigned srd_node, void *dst_interface, unsigned dst_node);
 

+ 1 - 1
src/core/topology.c

@@ -874,7 +874,7 @@ _starpu_init_mpi_config (struct _starpu_machine_config *config,
 	_starpu_init_mpi_topology (config, mpi_idx);
 
 	int nmpicores;
-	nmpicores = starpu_get_env_number("STARPU_NMPITHREADS");
+	nmpicores = starpu_get_env_number("STARPU_NMPIMSTHREADS");
 
 	if (nmpicores == -1)
 	{

+ 14 - 0
src/core/workers.c

@@ -1049,6 +1049,14 @@ int starpu_conf_init(struct starpu_conf *conf)
 		conf->disable_asynchronous_mic_copy = 0;
 #endif
 
+#if defined(STARPU_DISABLE_ASYNCHRONOUS_MPI_MS_COPY)
+    conf->disable_asynchronous_mpi_ms_copy = 1;
+#else
+    conf->disable_asynchronous_mpi_ms_copy = starpu_get_env_number("STARPU_DISABLE_ASYNCHRONOUS_MPI_MS_COPY");
+    if(conf->disable_asynchronous_mpi_ms_copy == -1)
+        conf->disable_asynchronous_mpi_ms_copy = 0;
+#endif
+
 	/* 64MiB by default */
 	conf->trace_buffer_size = starpu_get_env_number_default("STARPU_TRACE_BUFFER_SIZE", 64) << 20;
 	return 0;
@@ -1093,6 +1101,7 @@ void _starpu_conf_check_environment(struct starpu_conf *conf)
 	_starpu_conf_set_value_against_environment("STARPU_DISABLE_ASYNCHRONOUS_CUDA_COPY", &conf->disable_asynchronous_cuda_copy);
 	_starpu_conf_set_value_against_environment("STARPU_DISABLE_ASYNCHRONOUS_OPENCL_COPY", &conf->disable_asynchronous_opencl_copy);
 	_starpu_conf_set_value_against_environment("STARPU_DISABLE_ASYNCHRONOUS_MIC_COPY", &conf->disable_asynchronous_mic_copy);
+	_starpu_conf_set_value_against_environment("STARPU_DISABLE_ASYNCHRONOUS_MPI_MS_COPY", &conf->disable_asynchronous_mpi_ms_copy);
 }
 
 struct starpu_tree* starpu_workers_get_tree(void)
@@ -1775,6 +1784,11 @@ int starpu_asynchronous_mic_copy_disabled(void)
 	return _starpu_config.conf.disable_asynchronous_mic_copy;
 }
 
+int starpu_asynchronous_mpi_ms_copy_disabled(void)
+{
+    return _starpu_config.conf.disable_asynchronous_mpi_ms_copy;
+}
+
 unsigned starpu_mic_worker_get_count(void)
 {
 	int i = 0, count = 0;

+ 105 - 28
src/datawizard/copy_driver.c

@@ -24,6 +24,7 @@
 #include <drivers/disk/driver_disk.h>
 #include <drivers/mpi/driver_mpi_sink.h>
 #include <drivers/mpi/driver_mpi_source.h>
+#include <drivers/mpi/driver_mpi_common.h>
 #include <common/fxt.h>
 #include "copy_driver.h"
 #include "memalloc.h"
@@ -424,24 +425,75 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 #endif
 #ifdef STARPU_USE_MPI_MASTER_SLAVE
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_CPU_RAM,STARPU_MPI_MS_RAM):
-	//	if (copy_methods->mpi_ms_src_to_sink)
-	//		copy_methods->mpi_ms_src_to_sink(src_interface, src_node, dst_interface, dst_node);
-	//	else
-			copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, NULL);
+        if (!req || starpu_asynchronous_copy_disabled() || starpu_asynchronous_mpi_ms_copy_disabled() ||
+                !(copy_methods->ram_to_mpi_ms_async || copy_methods->any_to_any))
+        {
+            /* this is not associated to a request so it's synchronous */
+            STARPU_ASSERT(copy_methods->ram_to_mpi_ms || copy_methods->any_to_any);
+            if (copy_methods->ram_to_mpi_ms)
+                copy_methods->ram_to_mpi_ms(src_interface, src_node, dst_interface, dst_node);
+            else
+                copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, NULL);
+        }
+        else
+        {
+            req->async_channel.type = STARPU_MPI_MS_RAM;
+            if(copy_methods->ram_to_mpi_ms_async)
+                ret = copy_methods->ram_to_mpi_ms_async(src_interface, src_node, dst_interface, dst_node, &req->async_channel);
+            else
+            {
+                STARPU_ASSERT(copy_methods->any_to_any);
+                ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, &req->async_channel);
+            }
+        }
 		break;
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_MPI_MS_RAM,STARPU_CPU_RAM):
-	//	if (copy_methods->mpi_ms_sink_to_src)
-	//		copy_methods->mpi_ms_sink_to_src(src_interface, src_node, dst_interface, dst_node);
-	//	else
-			copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, NULL);
+        if (!req || starpu_asynchronous_copy_disabled() || starpu_asynchronous_mpi_ms_copy_disabled() ||
+                !(copy_methods->mpi_ms_to_ram_async || copy_methods->any_to_any))
+        {
+            /* this is not associated to a request so it's synchronous */
+            STARPU_ASSERT(copy_methods->mpi_ms_to_ram || copy_methods->any_to_any);
+            if (copy_methods->mpi_ms_to_ram)
+                copy_methods->mpi_ms_to_ram(src_interface, src_node, dst_interface, dst_node);
+            else
+                copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, NULL);
+        }
+        else
+        {
+            req->async_channel.type = STARPU_MPI_MS_RAM;
+            if(copy_methods->mpi_ms_to_ram_async)
+                ret = copy_methods->mpi_ms_to_ram_async(src_interface, src_node, dst_interface, dst_node, &req->async_channel);
+            else
+            {
+                STARPU_ASSERT(copy_methods->any_to_any);
+                ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, &req->async_channel);
+            }
+        }
 		break;
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_MPI_MS_RAM,STARPU_MPI_MS_RAM):
-	//	if (copy_methods->mpi_ms_sink_to_sink)
-	//		copy_methods->mpi_ms_sink_to_sink(src_interface, src_node, dst_interface, dst_node);
-	//	else
-			copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, NULL);
+        if (!req || starpu_asynchronous_copy_disabled() || starpu_asynchronous_mpi_ms_copy_disabled() ||
+                !(copy_methods->mpi_ms_to_mpi_ms_async || copy_methods->any_to_any))
+        {
+            /* this is not associated to a request so it's synchronous */
+            STARPU_ASSERT(copy_methods->mpi_ms_to_mpi_ms || copy_methods->any_to_any);
+            if (copy_methods->mpi_ms_to_mpi_ms)
+                copy_methods->mpi_ms_to_mpi_ms(src_interface, src_node, dst_interface, dst_node);
+            else
+                copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, NULL);
+        }
+        else
+        {
+            req->async_channel.type = STARPU_MPI_MS_RAM;
+            if(copy_methods->mpi_ms_to_mpi_ms_async)
+                ret = copy_methods->mpi_ms_to_mpi_ms_async(src_interface, src_node, dst_interface, dst_node, &req->async_channel);
+            else
+            {
+                STARPU_ASSERT(copy_methods->any_to_any);
+                ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, &req->async_channel);
+            }
+        }
 		break;
 #endif
 #ifdef STARPU_USE_SCC
@@ -688,26 +740,41 @@ int starpu_interface_copy(uintptr_t src, size_t src_offset, unsigned src_node, u
 #endif
 #ifdef STARPU_USE_MPI_MASTER_SLAVE
     case _STARPU_MEMORY_NODE_TUPLE(STARPU_CPU_RAM, STARPU_MPI_MS_RAM):
-        /* TODO ASYNC */
-        return _starpu_mpi_copy_ram_to_mpi(
-                (void*) (src + src_offset), src_node,
-                (void*) (dst + dst_offset), dst_node,
-                size);
-
+        if (async_data)
+            return _starpu_mpi_copy_ram_to_mpi_async(
+                    (void*) (src + src_offset), src_node,
+                    (void*) (dst + dst_offset), dst_node,
+                    size, async_data);
+        else
+            return _starpu_mpi_copy_ram_to_mpi_sync(
+                    (void*) (src + src_offset), src_node,
+                    (void*) (dst + dst_offset), dst_node,
+                    size);
     case _STARPU_MEMORY_NODE_TUPLE(STARPU_MPI_MS_RAM, STARPU_CPU_RAM):
-        /* TODO ASYNC */
-        return _starpu_mpi_copy_mpi_to_ram(
-                (void*) (src + src_offset), src_node,
-                (void*) (dst + dst_offset), dst_node,
-                size);
+        if (async_data)
+            return _starpu_mpi_copy_mpi_to_ram_async(
+                    (void*) (src + src_offset), src_node,
+                    (void*) (dst + dst_offset), dst_node,
+                    size, async_data);
+        else
+            return _starpu_mpi_copy_mpi_to_ram_sync(
+                    (void*) (src + src_offset), src_node,
+                    (void*) (dst + dst_offset), dst_node,
+                    size);
 
     case _STARPU_MEMORY_NODE_TUPLE(STARPU_MPI_MS_RAM, STARPU_MPI_MS_RAM):
-        /* TODO : not used now + ASYNC */
+        /* TODO : not used now */
         STARPU_ABORT();
-        return _starpu_mpi_copy_sink_to_sink(
-                (void*) (src + src_offset), src_node,
-                (void*) (dst + dst_offset), dst_node,
-                size);
+        if (async_data)
+            return _starpu_mpi_copy_sink_to_sink_async(
+                    (void*) (src + src_offset), src_node,
+                    (void*) (dst + dst_offset), dst_node,
+                    size, async_data);
+        else
+            return _starpu_mpi_copy_sink_to_sink_sync(
+                    (void*) (src + src_offset), src_node,
+                    (void*) (dst + dst_offset), dst_node,
+                    size);
 #endif
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_CPU_RAM, STARPU_DISK_RAM):
@@ -786,6 +853,11 @@ void _starpu_driver_wait_request_completion(struct _starpu_async_channel *async_
 		_starpu_mic_wait_request_completion(&(async_channel->event.mic_event));
 		break;
 #endif
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+    case STARPU_MPI_MS_RAM:
+        _starpu_mpi_src_wait_event(async_channel);
+        break;
+#endif
 	case STARPU_MAIN_RAM:
 		starpu_disk_wait_request(async_channel);
 		break;
@@ -850,6 +922,11 @@ unsigned _starpu_driver_test_request_completion(struct _starpu_async_channel *as
 		success = _starpu_mic_request_is_complete(&(async_channel->event.mic_event));
 		break;
 #endif
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+    case STARPU_MPI_MS_RAM:
+        success = _starpu_mpi_common_test_event(async_channel);
+        break;
+#endif
 	case STARPU_DISK_RAM:
 		success = starpu_disk_test_request(async_channel);
 		break;

+ 21 - 1
src/datawizard/copy_driver.h

@@ -34,6 +34,10 @@
 #include <starpu_opencl.h>
 #endif
 
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+#include <mpi.h>
+#endif
+
 #ifdef __cplusplus
 extern "C"
 {
@@ -54,6 +58,16 @@ struct _starpu_mic_async_event
 };
 #endif
 
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+struct _starpu_mpi_ms_async_event
+{
+    /* to know if request is finished and already handled */
+    unsigned finished;
+    int is_sender;
+    MPI_Request request;
+};
+#endif
+
 struct _starpu_disk_async_event
 {
 	unsigned memory_node;
@@ -76,7 +90,10 @@ union _starpu_async_channel_event
 	cudaEvent_t cuda_event;
 #endif
 #ifdef STARPU_USE_OPENCL
-        cl_event opencl_event;
+    cl_event opencl_event;
+#endif
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+    struct _starpu_mpi_ms_async_event mpi_ms_event;
 #endif
 #ifdef STARPU_USE_MIC
 	struct _starpu_mic_async_event mic_event;
@@ -88,6 +105,9 @@ struct _starpu_async_channel
 {
 	union _starpu_async_channel_event event;
 	enum starpu_node_kind type;
+    /* Used to know if the acknowlegdment msg is arrived from sinks */
+    volatile int starpu_mp_common_finished_sender; 
+    volatile int starpu_mp_common_finished_receiver; 
 };
 
 void _starpu_wake_all_blocked_workers_on_node(unsigned nodeid);

+ 4 - 4
src/drivers/mic/driver_mic_common.c

@@ -31,7 +31,7 @@ void _starpu_mic_common_report_scif_error(const char *func, const char *file, co
  * care about it.
  */
 
-void _starpu_mic_common_send(const struct _starpu_mp_node *node, void *msg, int len)
+void _starpu_mic_common_send(const struct _starpu_mp_node *node, void *msg, int len, void * event)
 {
   if ((scif_send(node->mp_connection.mic_endpoint, msg, len, SCIF_SEND_BLOCK)) < 0)
 		STARPU_MP_COMMON_REPORT_ERROR(node, errno);
@@ -56,7 +56,7 @@ int _starpu_mic_common_recv_is_ready(const struct _starpu_mp_node *mp_node)
  * care about it.
  */
 
-void _starpu_mic_common_recv(const struct _starpu_mp_node *node, void *msg, int len)
+void _starpu_mic_common_recv(const struct _starpu_mp_node *node, void *msg, int len, void * event)
 {
 	if ((scif_recv(node->mp_connection.mic_endpoint, msg, len, SCIF_RECV_BLOCK)) < 0)
 		STARPU_MP_COMMON_REPORT_ERROR(node, errno);
@@ -65,7 +65,7 @@ void _starpu_mic_common_recv(const struct _starpu_mp_node *node, void *msg, int
 /* Handles the error so the caller (which must be generic) doesn't have to
  * care about it.
  */
-void _starpu_mic_common_dt_send(const struct _starpu_mp_node *mp_node, void *msg, int len)
+void _starpu_mic_common_dt_send(const struct _starpu_mp_node *mp_node, void *msg, int len, void * event)
 {
 	if ((scif_send(mp_node->host_sink_dt_connection.mic_endpoint, msg, len, SCIF_SEND_BLOCK)) < 0)
 		STARPU_MP_COMMON_REPORT_ERROR(mp_node, errno);
@@ -74,7 +74,7 @@ void _starpu_mic_common_dt_send(const struct _starpu_mp_node *mp_node, void *msg
 /* Handles the error so the caller (which must be generic) doesn't have to
  * care about it.
  */
-void _starpu_mic_common_dt_recv(const struct _starpu_mp_node *mp_node, void *msg, int len)
+void _starpu_mic_common_dt_recv(const struct _starpu_mp_node *mp_node, void *msg, int len, void * event)
 {
 	if ((scif_recv(mp_node->host_sink_dt_connection.mic_endpoint, msg, len, SCIF_SEND_BLOCK)) < 0)
 		STARPU_MP_COMMON_REPORT_ERROR(mp_node, errno);

+ 4 - 4
src/drivers/mic/driver_mic_common.h

@@ -56,13 +56,13 @@ void _starpu_mic_common_report_scif_error(const char *func, const char *file, in
 
 int _starpu_mic_common_recv_is_ready(const struct _starpu_mp_node *mp_node);
 
-void _starpu_mic_common_send(const struct _starpu_mp_node *node, void *msg, int len);
+void _starpu_mic_common_send(const struct _starpu_mp_node *node, void *msg, int len, void * event);
 
-void _starpu_mic_common_recv(const struct _starpu_mp_node *node, void *msg, int len);
+void _starpu_mic_common_recv(const struct _starpu_mp_node *node, void *msg, int len, void * event);
 
-void _starpu_mic_common_dt_send(const struct _starpu_mp_node *node, void *msg, int len);
+void _starpu_mic_common_dt_send(const struct _starpu_mp_node *node, void *msg, int len, void * event);
 
-void _starpu_mic_common_dt_recv(const struct _starpu_mp_node *node, void *msg, int len);
+void _starpu_mic_common_dt_recv(const struct _starpu_mp_node *node, void *msg, int len, void * event);
 
 void _starpu_mic_common_connect(scif_epd_t *endpoint, uint16_t remote_node, COIPROCESS process,
 				uint16_t local_port_number, uint16_t remote_port_number);

+ 2 - 2
src/drivers/mic/driver_mic_source.c

@@ -407,7 +407,7 @@ int _starpu_mic_copy_ram_to_mic(void *src, unsigned src_node STARPU_ATTRIBUTE_UN
 {
 	const struct _starpu_mp_node *mp_node = _starpu_mic_src_get_mp_node_from_memory_node(dst_node);
 
-	return _starpu_src_common_copy_host_to_sink(mp_node, src, dst, size);
+	return _starpu_src_common_copy_host_to_sink_sync(mp_node, src, dst, size);
 }
 
 /* Transfert SIZE bytes from the address pointed by SRC in the SRC_NODE memory
@@ -417,7 +417,7 @@ int _starpu_mic_copy_mic_to_ram(void *src, unsigned src_node, void *dst, unsigne
 {
 	const struct _starpu_mp_node *mp_node = _starpu_mic_src_get_mp_node_from_memory_node(src_node);
 
-	return _starpu_src_common_copy_sink_to_host(mp_node, src, dst, size);
+	return _starpu_src_common_copy_sink_to_host_sync(mp_node, src, dst, size);
 }
 
 /* Asynchronous transfers */

+ 12 - 4
src/drivers/mp_common/mp_common.c

@@ -162,6 +162,8 @@ _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
 		node->dt_send = _starpu_mic_common_dt_send;
 		node->dt_recv = _starpu_mic_common_dt_recv;
 
+        node->dt_test = NULL; /* Not used now */
+
 		node->get_kernel_from_job = NULL;
 		node->lookup = _starpu_mic_sink_lookup;
 		node->bind_thread = _starpu_mic_sink_bind_thread;
@@ -212,6 +214,8 @@ _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
 		node->dt_send_to_device = _starpu_scc_sink_send_to_device;
 		node->dt_recv_from_device = _starpu_scc_sink_recv_from_device;
 
+        node->dt_test = NULL /* not used now */
+
 		node->get_kernel_from_job = NULL;
 		node->lookup = _starpu_scc_sink_lookup;
 		node->bind_thread = _starpu_scc_sink_bind_thread;
@@ -237,8 +241,8 @@ _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
    /*     node->report_error = */
 
 	 	node->mp_recv_is_ready = _starpu_mpi_common_recv_is_ready;
-		node->mp_send = _starpu_mpi_common_send;
-		node->mp_recv = _starpu_mpi_common_recv;
+		node->mp_send = _starpu_mpi_common_mp_send;
+		node->mp_recv = _starpu_mpi_common_mp_recv;
 		node->dt_send = _starpu_mpi_common_send;
 		node->dt_recv = _starpu_mpi_common_recv;
         node->dt_send_to_device = _starpu_mpi_common_send_to_device;
@@ -267,13 +271,15 @@ _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
     /*    node->report_error =  */
 
     	node->mp_recv_is_ready = _starpu_mpi_common_recv_is_ready;
-        node->mp_send = _starpu_mpi_common_send;
-		node->mp_recv = _starpu_mpi_common_recv;
+        node->mp_send = _starpu_mpi_common_mp_send;
+		node->mp_recv = _starpu_mpi_common_mp_recv;
 		node->dt_send = _starpu_mpi_common_send;
 		node->dt_recv = _starpu_mpi_common_recv;
         node->dt_send_to_device = _starpu_mpi_common_send_to_device;
         node->dt_recv_from_device = _starpu_mpi_common_recv_from_device;
 
+        node->dt_test = _starpu_mpi_common_test_event;
+
 		node->get_kernel_from_job = NULL;
 		node->lookup = _starpu_mpi_sink_lookup;
 		node->bind_thread = _starpu_mpi_sink_bind_thread;
@@ -300,6 +306,8 @@ _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
 	mp_message_list_init(&node->message_queue);
 	STARPU_PTHREAD_MUTEX_INIT(&node->message_queue_mutex,NULL);
 
+    _starpu_mp_event_list_init(&node->event_list);
+
 	/* If the node is a sink then we must initialize some field */
 	if(node->kind == STARPU_MIC_SINK || node->kind == STARPU_SCC_SINK || node->kind == STARPU_MPI_SINK)
 	{

+ 46 - 18
src/drivers/mp_common/mp_common.h

@@ -26,6 +26,7 @@
 #include <common/barrier.h>
 #include <common/thread.h>
 #include <datawizard/interfaces/data_interface.h>
+#include <datawizard/copy_driver.h>
 
 #ifdef STARPU_USE_MP
 
@@ -52,10 +53,21 @@ enum _starpu_mp_command
 	STARPU_ANSWER_ALLOCATE,
 	STARPU_ERROR_ALLOCATE,
 	STARPU_FREE,
+    /* Synchronous send */
 	STARPU_RECV_FROM_HOST,
 	STARPU_SEND_TO_HOST,
 	STARPU_RECV_FROM_SINK,
 	STARPU_SEND_TO_SINK,
+    /* Asynchronous send */
+	STARPU_RECV_FROM_HOST_ASYNC,
+    STARPU_RECV_FROM_HOST_ASYNC_COMPLETED,
+	STARPU_SEND_TO_HOST_ASYNC,
+	STARPU_SEND_TO_HOST_ASYNC_COMPLETED,
+	STARPU_RECV_FROM_SINK_ASYNC,
+	STARPU_RECV_FROM_SINK_ASYNC_COMPLETED,
+	STARPU_SEND_TO_SINK_ASYNC,
+	STARPU_SEND_TO_SINK_ASYNC_COMPLETED,
+
 	STARPU_TRANSFER_COMPLETE,
 	STARPU_SINK_NBCORES,
 	STARPU_ANSWER_SINK_NBCORES,
@@ -97,6 +109,7 @@ struct _starpu_mp_transfer_command
 {
 	size_t size;
 	void *addr;
+    void *event;
 };
 
 struct _starpu_mp_transfer_command_to_device
@@ -104,6 +117,7 @@ struct _starpu_mp_transfer_command_to_device
 	int devid;
 	size_t size;
 	void *addr;
+    void *event;
 };
 
 LIST_TYPE(mp_barrier,
@@ -131,6 +145,12 @@ struct mp_task
  	struct mp_barrier* mp_barrier;
 };
 
+LIST_TYPE(_starpu_mp_event,
+    struct _starpu_async_channel event;
+    void * remote_event;
+    enum _starpu_mp_command answer_cmd;
+);
+
 
 /* Message-passing working node, whether source
  * or sink */
@@ -183,6 +203,11 @@ struct _starpu_mp_node
 	 *  - sink_sink_dt_connections[j] is not initialized for the sink number j. */
 	union _starpu_mp_connection *sink_sink_dt_connections;
 
+    /* This list contains events
+     * about asynchronous request
+     */
+    struct _starpu_mp_event_list event_list;
+
 	/* */
 	starpu_pthread_barrier_t init_completed_barrier; 
 	
@@ -202,28 +227,31 @@ struct _starpu_mp_node
 	sem_t * sem_run_table;
 
 	/* Node general functions */
-	void (*init)(struct _starpu_mp_node *node);
-	void (*launch_workers)(struct _starpu_mp_node *node);
-	void (*deinit)(struct _starpu_mp_node *node);
-	void (*report_error)(const char *, const char *, const int, const int);
+	void (*init)            (struct _starpu_mp_node *node);
+	void (*launch_workers)  (struct _starpu_mp_node *node);
+	void (*deinit)          (struct _starpu_mp_node *node);
+	void (*report_error)    (const char *, const char *, const int, const int);
 
 	/* Message passing */
-	int (*mp_recv_is_ready)(const struct _starpu_mp_node *);
-	void (*mp_send)(const struct _starpu_mp_node *, void *, int);
-	void (*mp_recv)(const struct _starpu_mp_node *, void *, int);
+	int (*mp_recv_is_ready) (const struct _starpu_mp_node *);
+	void (*mp_send)         (const struct _starpu_mp_node *, void *, int);
+	void (*mp_recv)         (const struct _starpu_mp_node *, void *, int);
 
 	/* Data transfers */
-	void (*dt_send)(const struct _starpu_mp_node *, void *, int);
-	void (*dt_recv)(const struct _starpu_mp_node *, void *, int);
-	void (*dt_send_to_device)(const struct _starpu_mp_node *, int, void *, int);
-	void (*dt_recv_from_device)(const struct _starpu_mp_node *, int, void *, int);
-
-	void (*(*get_kernel_from_job)(const struct _starpu_mp_node *,struct _starpu_job *))(void);
-	void (*(*lookup)(const struct _starpu_mp_node *, char* ))(void);
-	void (*bind_thread)(const struct _starpu_mp_node *, int,int *,int);
-	void (*execute)(struct _starpu_mp_node *, void *, int);
-	void (*allocate)(const struct _starpu_mp_node *, void *, int);
-	void (*free)(const struct _starpu_mp_node *, void *, int);
+	void (*dt_send)             (const struct _starpu_mp_node *, void *, int, void *);
+	void (*dt_recv)             (const struct _starpu_mp_node *, void *, int, void *);
+	void (*dt_send_to_device)   (const struct _starpu_mp_node *, int, void *, int, void *);
+	void (*dt_recv_from_device) (const struct _starpu_mp_node *, int, void *, int, void *);
+
+    /* Test async transfers */
+    int (*dt_test) (struct _starpu_async_channel *);
+
+	void (*(*get_kernel_from_job)   (const struct _starpu_mp_node *,struct _starpu_job *))(void);
+	void (*(*lookup)                (const struct _starpu_mp_node *, char* ))(void);
+	void (*bind_thread)             (const struct _starpu_mp_node *, int,int *,int);
+	void (*execute)                 (struct _starpu_mp_node *, void *, int);
+	void (*allocate)                (const struct _starpu_mp_node *, void *, int);
+	void (*free)                    (const struct _starpu_mp_node *, void *, int);
 };
 
 struct _starpu_mp_node * _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind, int peer_devid) STARPU_ATTRIBUTE_MALLOC;

+ 157 - 18
src/drivers/mp_common/sink_common.c

@@ -30,7 +30,6 @@
 
 #include "sink_common.h"
 
-
 /* Return the sink kind of the running process, based on the value of the
  * STARPU_SINK environment variable.
  * If there is no valid value retrieved, return STARPU_INVALID_KIND
@@ -108,51 +107,156 @@ void _starpu_sink_common_free(const struct _starpu_mp_node *mp_node STARPU_ATTRI
 	free(*(void **)(arg));
 }
 
-static void _starpu_sink_common_copy_from_host(const struct _starpu_mp_node *mp_node,
+static void _starpu_sink_common_copy_from_host_sync(const struct _starpu_mp_node *mp_node,
 					       void *arg, int arg_size)
 {
 	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command));
 
-	struct _starpu_mp_transfer_command *cmd = (struct _starpu_mp_transfer_command *)arg;
+    struct _starpu_mp_transfer_command *cmd = (struct _starpu_mp_transfer_command *)arg;
 
-	mp_node->dt_recv(mp_node, cmd->addr, cmd->size);
+    mp_node->dt_recv(mp_node, cmd->addr, cmd->size, NULL);
 }
 
-static void _starpu_sink_common_copy_to_host(const struct _starpu_mp_node *mp_node,
+
+static void _starpu_sink_common_copy_from_host_async(struct _starpu_mp_node *mp_node,
+					       void *arg, int arg_size)
+{
+	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command));
+
+    struct _starpu_mp_transfer_command *cmd = (struct _starpu_mp_transfer_command *)arg;
+
+    /* For asynchronous transfers, we store events to test them later when they are finished */
+    struct _starpu_mp_event * sink_event = _starpu_mp_event_new();
+    /* Save the command to send */
+    sink_event->answer_cmd = STARPU_RECV_FROM_HOST_ASYNC_COMPLETED;
+    sink_event->remote_event = cmd->event;
+
+    /* Set the sender (host) ready because we don't want to wait its ack */
+    struct _starpu_async_channel * async_channel = &sink_event->event;
+    async_channel->starpu_mp_common_finished_sender = 1;
+    async_channel->starpu_mp_common_finished_receiver = 0;
+
+    mp_node->dt_recv(mp_node, cmd->addr, cmd->size, &sink_event->event);
+    /* Push event on the list */
+    _starpu_mp_event_list_push_back(&mp_node->event_list, sink_event);
+}
+
+
+static void _starpu_sink_common_copy_to_host_sync(const struct _starpu_mp_node *mp_node,
 					     void *arg, int arg_size)
 {
 	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command));
 
 	struct _starpu_mp_transfer_command *cmd = (struct _starpu_mp_transfer_command *)arg;
+
     /* Save values before sending command to prevent the overwriting */
     size_t size = cmd->size;
     void * addr = cmd->addr;
 
-	_starpu_mp_common_send_command(mp_node, STARPU_SEND_TO_HOST, NULL, 0);
+    _starpu_mp_common_send_command(mp_node, STARPU_SEND_TO_HOST, NULL, 0);
+
+    mp_node->dt_send(mp_node, addr, size, NULL);
+}
+
+
+static void _starpu_sink_common_copy_to_host_async(struct _starpu_mp_node *mp_node,
+					     void *arg, int arg_size)
+{
+	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command));
+
+	struct _starpu_mp_transfer_command *cmd = (struct _starpu_mp_transfer_command *)arg;
+
+    /* For asynchronous transfers, we need to say dt_send that we are in async mode 
+     * but we don't push event on list because we don't need to know if it's finished
+     */
+    struct _starpu_mp_event * sink_event = _starpu_mp_event_new();
+    /* Save the command to send */
+    sink_event->answer_cmd = STARPU_SEND_TO_HOST_ASYNC_COMPLETED;
+    sink_event->remote_event = cmd->event;
     
-	mp_node->dt_send(mp_node, addr, size);
+    /* Set the receiver (host) ready because we don't want to wait its ack */
+    struct _starpu_async_channel * async_channel = &sink_event->event;
+    async_channel->starpu_mp_common_finished_sender = 0;
+    async_channel->starpu_mp_common_finished_receiver = 1;
+
+    mp_node->dt_send(mp_node, cmd->addr, cmd->size, &sink_event->event);
+    /* Push event on the list */
+    _starpu_mp_event_list_push_back(&mp_node->event_list, sink_event);
 }
 
-static void _starpu_sink_common_copy_from_sink(const struct _starpu_mp_node *mp_node,
+
+static void _starpu_sink_common_copy_from_sink_sync(const struct _starpu_mp_node *mp_node,
 					       void *arg, int arg_size)
 {
 	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command_to_device));
 
 	struct _starpu_mp_transfer_command_to_device *cmd = (struct _starpu_mp_transfer_command_to_device *)arg;
 
-	mp_node->dt_recv_from_device(mp_node, cmd->devid, cmd->addr, cmd->size);
+    mp_node->dt_recv_from_device(mp_node, cmd->devid, cmd->addr, cmd->size, NULL);
+    _starpu_mp_common_send_command(mp_node, STARPU_TRANSFER_COMPLETE, NULL, 0);
+}
 
-	_starpu_mp_common_send_command(mp_node, STARPU_TRANSFER_COMPLETE, NULL, 0);
+
+static void _starpu_sink_common_copy_from_sink_async(struct _starpu_mp_node *mp_node,
+					       void *arg, int arg_size)
+{
+	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command_to_device));
+
+	struct _starpu_mp_transfer_command_to_device *cmd = (struct _starpu_mp_transfer_command_to_device *)arg;
+
+    /* For asynchronous transfers, we store events to test them later when they are finished
+     */
+    struct _starpu_mp_event * sink_event = _starpu_mp_event_new();
+    /* Save the command to send */
+    sink_event->answer_cmd = STARPU_RECV_FROM_SINK_ASYNC_COMPLETED;
+    sink_event->remote_event = cmd->event;
+
+    /* Set the sender ready because we don't want to wait its ack */
+    struct _starpu_async_channel * async_channel = &sink_event->event;
+    async_channel->starpu_mp_common_finished_sender = 1;
+    async_channel->starpu_mp_common_finished_receiver = 0;
+
+    mp_node->dt_recv_from_device(mp_node, cmd->devid, cmd->addr, cmd->size, &sink_event->event);
+    /* Push event on the list */
+    _starpu_mp_event_list_push_back(&mp_node->event_list, sink_event);
 }
 
-static void _starpu_sink_common_copy_to_sink(const struct _starpu_mp_node *mp_node,
+
+static void _starpu_sink_common_copy_to_sink_sync(const struct _starpu_mp_node *mp_node,
 					     void *arg, int arg_size)
 {
 	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command_to_device));
 
 	struct _starpu_mp_transfer_command_to_device *cmd = (struct _starpu_mp_transfer_command_to_device *)arg;
 
-	mp_node->dt_send_to_device(mp_node, cmd->devid, cmd->addr, cmd->size);
+    mp_node->dt_send_to_device(mp_node, cmd->devid, cmd->addr, cmd->size, NULL);
+}
+
+
+static void _starpu_sink_common_copy_to_sink_async(struct _starpu_mp_node *mp_node,
+					     void *arg, int arg_size)
+{
+	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command_to_device));
+
+	struct _starpu_mp_transfer_command_to_device *cmd = (struct _starpu_mp_transfer_command_to_device *)arg;
+
+    /* For asynchronous transfers, we need to say dt_send that we are in async mode 
+     * but we don't push event on list because we don't need to know if it's finished
+     */
+    struct _starpu_mp_event * sink_event = _starpu_mp_event_new();
+    /* Save the command to send */
+    sink_event->answer_cmd = STARPU_SEND_TO_SINK_ASYNC_COMPLETED;
+    sink_event->remote_event = cmd->event;
+
+    /* Set the receiver ready because we don't want to wait its ack */
+    struct _starpu_async_channel * async_channel = &sink_event->event;
+    async_channel->starpu_mp_common_finished_sender = 0;
+    async_channel->starpu_mp_common_finished_receiver = 1;
+
+    mp_node->dt_send_to_device(mp_node, cmd->devid, cmd->addr, cmd->size, &sink_event->event);
+
+    /* Push event on the list */
+    _starpu_mp_event_list_push_back(&mp_node->event_list, sink_event);
 }
 
 
@@ -183,7 +287,7 @@ static void _starpu_sink_common_recv_workers(struct _starpu_mp_node * node, void
 
 	/* Retrieve workers */
 	struct _starpu_worker * workers = &config->workers[baseworkerid];
-	node->dt_recv(node,workers,worker_size);
+	node->dt_recv(node,workers,worker_size, NULL);
 
 	/* Update workers to have coherent field */
 	for(i=0; i<nworkers; i++)
@@ -210,7 +314,7 @@ static void _starpu_sink_common_recv_workers(struct _starpu_mp_node * node, void
 
 	/* Retrieve combined workers */
 	struct _starpu_combined_worker * combined_workers = config->combined_workers;
-	node->dt_recv(node, combined_workers, combined_worker_size);
+	node->dt_recv(node, combined_workers, combined_worker_size, NULL);
 
 	node->baseworkerid = baseworkerid;
 	STARPU_PTHREAD_BARRIER_WAIT(&node->init_completed_barrier);
@@ -272,19 +376,35 @@ void _starpu_sink_common_worker(void)
 					break;
 
 				case STARPU_RECV_FROM_HOST:
-					_starpu_sink_common_copy_from_host(node, arg, arg_size);
+					_starpu_sink_common_copy_from_host_sync(node, arg, arg_size);
 					break;
 
 				case STARPU_SEND_TO_HOST:
-					_starpu_sink_common_copy_to_host(node, arg, arg_size);
+					_starpu_sink_common_copy_to_host_sync(node, arg, arg_size);
 					break;
 
 				case STARPU_RECV_FROM_SINK:
-					_starpu_sink_common_copy_from_sink(node, arg, arg_size);
+					_starpu_sink_common_copy_from_sink_sync(node, arg, arg_size);
 					break;
 
 				case STARPU_SEND_TO_SINK:
-					_starpu_sink_common_copy_to_sink(node, arg, arg_size);
+					_starpu_sink_common_copy_to_sink_sync(node, arg, arg_size);
+					break;
+
+                case STARPU_RECV_FROM_HOST_ASYNC:
+					_starpu_sink_common_copy_from_host_async(node, arg, arg_size);
+                    break;
+                    
+				case STARPU_SEND_TO_HOST_ASYNC:
+					_starpu_sink_common_copy_to_host_async(node, arg, arg_size);
+                    break;
+
+				case STARPU_RECV_FROM_SINK_ASYNC:
+					_starpu_sink_common_copy_from_sink_async(node, arg, arg_size);
+                    break;
+
+                case STARPU_SEND_TO_SINK_ASYNC:
+					_starpu_sink_common_copy_to_sink_async(node, arg, arg_size);
 					break;
 
 				case STARPU_SYNC_WORKERS:
@@ -312,6 +432,25 @@ void _starpu_sink_common_worker(void)
 		{
 			STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
 		}
+
+        //XXX: Need mutex here ?
+        if(!_starpu_mp_event_list_empty(&node->event_list))
+        {
+            struct _starpu_mp_event * sink_event = _starpu_mp_event_list_pop_front(&node->event_list);
+            if (node->dt_test(&sink_event->event))
+            {
+                /* send ACK to host */
+                _starpu_mp_common_send_command(node, sink_event->answer_cmd , &sink_event->remote_event, sizeof(sink_event->remote_event));
+                _starpu_mp_event_delete(sink_event);
+            }
+            else
+            {
+                /* try later */
+                 _starpu_mp_event_list_push_back(&node->event_list, sink_event);
+            }
+            
+
+        }
 	}
 
 	STARPU_PTHREAD_KEY_DELETE(worker_key);

+ 123 - 17
src/drivers/mp_common/source_common.c

@@ -127,7 +127,7 @@ static int _starpu_src_common_handle_async(const struct _starpu_mp_node *node ST
 		void * arg, int arg_size,
 		enum _starpu_mp_command answer)
 {
-	struct _starpu_worker_set * worker_set=NULL;
+	struct _starpu_worker_set * worker_set = NULL;
 	switch(answer)
 	{
 		case STARPU_EXECUTION_COMPLETED:
@@ -137,6 +137,20 @@ static int _starpu_src_common_handle_async(const struct _starpu_mp_node *node ST
 		case STARPU_PRE_EXECUTION:
 			_starpu_src_common_pre_exec(arg,arg_size);
 			break;
+        case STARPU_RECV_FROM_HOST_ASYNC_COMPLETED:
+        case STARPU_RECV_FROM_SINK_ASYNC_COMPLETED:
+        {
+            struct _starpu_async_channel * event = arg;
+            event->starpu_mp_common_finished_receiver = 1;
+            break;
+        }
+        case STARPU_SEND_TO_HOST_ASYNC_COMPLETED:
+        case STARPU_SEND_TO_SINK_ASYNC_COMPLETED:
+        {
+            struct _starpu_async_channel * event = arg;
+            event->starpu_mp_common_finished_sender = 1;
+            break;
+        }
 		default:
 			return 0;
 			break;
@@ -187,6 +201,23 @@ int _starpu_src_common_store_message(struct _starpu_mp_node *node,
 			STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
 			return 1;
 			break;
+        /* For ASYNC commands don't store them, update event */
+        case STARPU_RECV_FROM_HOST_ASYNC_COMPLETED:
+        case STARPU_RECV_FROM_SINK_ASYNC_COMPLETED:
+        {
+            struct _starpu_async_channel * event = arg;
+            event->starpu_mp_common_finished_receiver = 1;
+            return 1;
+            break;
+        }
+        case STARPU_SEND_TO_HOST_ASYNC_COMPLETED:
+        case STARPU_SEND_TO_SINK_ASYNC_COMPLETED:
+        {
+            struct _starpu_async_channel * event = arg;
+            event->starpu_mp_common_finished_sender = 1;
+            return 1;
+            break;
+        }
 		default:
 			return 0;
 			break;
@@ -499,29 +530,53 @@ void _starpu_src_common_free(const struct _starpu_mp_node *mp_node,
 	_starpu_mp_common_send_command(mp_node, STARPU_FREE, &addr, sizeof(addr));
 }
 
-/* Send SIZE bytes pointed by SRC to DST on the sink linked to the MP_NODE.
-*/
-int _starpu_src_common_copy_host_to_sink(const struct _starpu_mp_node *mp_node,
+/* Send SIZE bytes pointed by SRC to DST on the sink linked to the MP_NODE with a
+ * synchronous mode.
+ */
+int _starpu_src_common_copy_host_to_sink_sync(const struct _starpu_mp_node *mp_node,
 		void *src, void *dst, size_t size)
 {
-	struct _starpu_mp_transfer_command cmd = {size, dst};
+	struct _starpu_mp_transfer_command cmd = {size, dst, NULL};
 
 	_starpu_mp_common_send_command(mp_node, STARPU_RECV_FROM_HOST, &cmd, sizeof(cmd));
 
-	mp_node->dt_send(mp_node, src, size);
+	mp_node->dt_send(mp_node, src, size, NULL);
 
 	return 0;
 }
 
-/* Receive SIZE bytes pointed by SRC on the sink linked to the MP_NODE and store them in DST.
-*/
-int _starpu_src_common_copy_sink_to_host(const struct _starpu_mp_node *mp_node,
+/* Send SIZE bytes pointed by SRC to DST on the sink linked to the MP_NODE with an
+ * asynchronous mode.
+ */
+int _starpu_src_common_copy_host_to_sink_async(const struct _starpu_mp_node *mp_node,
+		void *src, void *dst, size_t size, void * event)
+{
+	struct _starpu_mp_transfer_command cmd = {size, dst, event};
+
+    /* For asynchronous transfers, we save informations
+     * to test is they are finished
+     */
+    struct _starpu_async_channel * async_channel = event;
+    async_channel->starpu_mp_common_finished_sender = 0;
+    async_channel->starpu_mp_common_finished_receiver = 0;
+
+	_starpu_mp_common_send_command(mp_node, STARPU_RECV_FROM_HOST_ASYNC, &cmd, sizeof(cmd));
+
+	mp_node->dt_send(mp_node, src, size, event);
+
+	return 0;
+}
+
+/* Receive SIZE bytes pointed by SRC on the sink linked to the MP_NODE and store them in DST
+ * with a synchronous mode.
+ */
+int _starpu_src_common_copy_sink_to_host_sync(struct _starpu_mp_node *mp_node,
 		void *src, void *dst, size_t size)
 {
     enum _starpu_mp_command answer;
 	void *arg;
 	int arg_size;
-	struct _starpu_mp_transfer_command cmd = {size, src};
+	struct _starpu_mp_transfer_command cmd = {size, src, NULL};
 
 	_starpu_mp_common_send_command(mp_node, STARPU_SEND_TO_HOST, &cmd, sizeof(cmd));
 
@@ -529,22 +584,45 @@ int _starpu_src_common_copy_sink_to_host(const struct _starpu_mp_node *mp_node,
      
     STARPU_ASSERT(answer == STARPU_SEND_TO_HOST);
 
-	mp_node->dt_recv(mp_node, dst, size);
+	mp_node->dt_recv(mp_node, dst, size, NULL);
 
 	return 0;
 }
 
+/* Receive SIZE bytes pointed by SRC on the sink linked to the MP_NODE and store them in DST
+ * with an asynchronous mode.
+ */
+int _starpu_src_common_copy_sink_to_host_async(struct _starpu_mp_node *mp_node,
+		void *src, void *dst, size_t size, void * event)
+{
+	struct _starpu_mp_transfer_command cmd = {size, src, event};
+
+    /* For asynchronous transfers, we save informations
+     * to test is they are finished
+     */
+    struct _starpu_async_channel * async_channel = event;
+    async_channel->starpu_mp_common_finished_sender = 0;
+    async_channel->starpu_mp_common_finished_receiver = 0;
+
+	_starpu_mp_common_send_command(mp_node, STARPU_SEND_TO_HOST_ASYNC, &cmd, sizeof(cmd));
+
+	mp_node->dt_recv(mp_node, dst, size, event);
+    
+	return 0;
+}
+
 /* Tell the sink linked to SRC_NODE to send SIZE bytes of data pointed by SRC
- * to the sink linked to DST_NODE. The latter store them in DST.
+ * to the sink linked to DST_NODE. The latter store them in DST with a synchronous
+ * mode.
  */
-int _starpu_src_common_copy_sink_to_sink(const struct _starpu_mp_node *src_node,
+int _starpu_src_common_copy_sink_to_sink_sync(const struct _starpu_mp_node *src_node,
 		const struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size)
 {
 	enum _starpu_mp_command answer;
 	void *arg;
 	int arg_size;
 
-	struct _starpu_mp_transfer_command_to_device cmd = {dst_node->peer_id, size, src};
+	struct _starpu_mp_transfer_command_to_device cmd = {dst_node->peer_id, size, src, NULL};
 
 	/* Tell source to send data to dest. */
 	_starpu_mp_common_send_command(src_node, STARPU_SEND_TO_SINK, &cmd, sizeof(cmd));
@@ -564,6 +642,35 @@ int _starpu_src_common_copy_sink_to_sink(const struct _starpu_mp_node *src_node,
 	return 0;
 }
 
+/* Tell the sink linked to SRC_NODE to send SIZE bytes of data pointed by SRC
+ * to the sink linked to DST_NODE. The latter store them in DST with an asynchronous
+ * mode.
+ */
+int _starpu_src_common_copy_sink_to_sink_async(const struct _starpu_mp_node *src_node,
+		const struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size, void * event)
+{
+	struct _starpu_mp_transfer_command_to_device cmd = {dst_node->peer_id, size, src, event};
+
+    /* For asynchronous transfers, we save informations
+     * to test is they are finished
+     */
+    struct _starpu_async_channel * async_channel = event;
+    async_channel->starpu_mp_common_finished_sender = 0;
+    async_channel->starpu_mp_common_finished_receiver = 0;
+
+	/* Tell source to send data to dest. */
+	_starpu_mp_common_send_command(src_node, STARPU_SEND_TO_SINK_ASYNC, &cmd, sizeof(cmd));
+
+	cmd.devid = src_node->peer_id;
+	cmd.size = size;
+	cmd.addr = dst;
+
+	/* Tell dest to receive data from source. */
+	_starpu_mp_common_send_command(dst_node, STARPU_RECV_FROM_SINK_ASYNC, &cmd, sizeof(cmd));
+
+	return 0;
+}
+
 /* 5 functions to determine the executable to run on the device (MIC, SCC,
  * MPI).
  */
@@ -677,7 +784,6 @@ int _starpu_src_common_locate_file(char *located_file_name,
 
 
 #if defined(STARPU_USE_MPI_MASTER_SLAVE) && !defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
-
 void _starpu_src_common_init_switch_env(unsigned this)
 {
     save_thread_env[this].current_task = starpu_task_get_current();
@@ -733,10 +839,10 @@ static void _starpu_src_common_send_workers(struct _starpu_mp_node * node, int b
 			&msg, sizeof(msg));
 
 	/* Send all worker to the sink node */
-	node->dt_send(node,&config->workers[baseworkerid],worker_size);
+	node->dt_send(node,&config->workers[baseworkerid],worker_size, NULL);
 
 	/* Send all combined workers to the sink node */
-	node->dt_send(node, &config->combined_workers,combined_worker_size);
+	node->dt_send(node, &config->combined_workers,combined_worker_size, NULL);
 }
 
 

+ 12 - 3
src/drivers/mp_common/source_common.h

@@ -57,15 +57,24 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 				      void *cl_arg, size_t cl_arg_size);
 
 
-int _starpu_src_common_copy_host_to_sink(const struct _starpu_mp_node *mp_node,
+int _starpu_src_common_copy_host_to_sink_sync(const struct _starpu_mp_node *mp_node,
 					 void *src, void *dst, size_t size);
 
-int _starpu_src_common_copy_sink_to_host(const struct _starpu_mp_node *mp_node,
+int _starpu_src_common_copy_sink_to_host_sync(struct _starpu_mp_node *mp_node,
 					 void *src, void *dst, size_t size);
 
-int _starpu_src_common_copy_sink_to_sink(const struct _starpu_mp_node *src_node,
+int _starpu_src_common_copy_sink_to_sink_sync(const struct _starpu_mp_node *src_node,
 					 const struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size);
 
+int _starpu_src_common_copy_host_to_sink_async(const struct _starpu_mp_node *mp_node,
+					 void *src, void *dst, size_t size, void *event);
+
+int _starpu_src_common_copy_sink_to_host_async(struct _starpu_mp_node *mp_node,
+					 void *src, void *dst, size_t size, void *event);
+
+int _starpu_src_common_copy_sink_to_sink_async(const struct _starpu_mp_node *src_node,
+					 const struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size, void *event);
+
 int _starpu_src_common_locate_file(char *located_file_name,
 				   const char *env_file_name, const char *env_mic_path,
 				   const char *config_file_name, const char *actual_file_name,

+ 119 - 29
src/drivers/mpi/driver_mpi_common.c

@@ -23,7 +23,8 @@
 #define NITER 32
 #define SIZE_BANDWIDTH (1024*1024)
 
-#define SYNC_TAG 42
+#define SYNC_TAG 44
+#define ASYNC_TAG 45
 
 #define DRIVER_MPI_MASTER_NODE_DEFAULT 0
 
@@ -80,23 +81,23 @@ int _starpu_mpi_common_mp_init()
         int required = MPI_THREAD_FUNNELED;
 #endif
 
-        int thread_support;
-        STARPU_ASSERT(MPI_Init_thread(_starpu_get_argc(), _starpu_get_argv(), required, &thread_support) == MPI_SUCCESS);
+            int thread_support;
+            STARPU_ASSERT(MPI_Init_thread(_starpu_get_argc(), _starpu_get_argv(), required, &thread_support) == MPI_SUCCESS);
 
-        if (thread_support != required)
-        {
-            if (required == MPI_THREAD_MULTIPLE)
-                fprintf(stderr, "MPI doesn't support MPI_THREAD_MULTIPLE option. MPI Master-Slave can have problems if multiple slaves are launched. \n");
-            if (required == MPI_THREAD_FUNNELED)
-                fprintf(stderr, "MPI doesn't support MPI_THREAD_FUNNELED option. Many errors can occur. \n");
+            if (thread_support != required)
+            {
+                if (required == MPI_THREAD_MULTIPLE)
+                    fprintf(stderr, "MPI doesn't support MPI_THREAD_MULTIPLE option. MPI Master-Slave can have problems if multiple slaves are launched. \n");
+                if (required == MPI_THREAD_FUNNELED)
+                    fprintf(stderr, "MPI doesn't support MPI_THREAD_FUNNELED option. Many errors can occur. \n");
+            }
         }
-    }
-    
-    /* Find which node is the master */
-    _starpu_mpi_set_src_node_id();
+        
+        /* Find which node is the master */
+        _starpu_mpi_set_src_node_id();
 
-    return 1;
-}
+        return 1;
+    }
 
 void _starpu_mpi_common_mp_deinit()
 {
@@ -118,7 +119,7 @@ int _starpu_mpi_common_get_src_node()
 
 int _starpu_mpi_common_is_mp_initialized()
 {
-	return mpi_initialized;
+    return mpi_initialized;
 }
 
 /* common parts to initialize a source or a sink node */
@@ -146,7 +147,7 @@ int _starpu_mpi_common_recv_is_ready(const struct _starpu_mp_node *mp_node)
         /* Sink can have sink to sink message */
         source = MPI_ANY_SOURCE;
     }
-        
+
     res = MPI_Iprobe(source, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
     STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot test if we received a message !");
 
@@ -154,7 +155,7 @@ int _starpu_mpi_common_recv_is_ready(const struct _starpu_mp_node *mp_node)
 }
 
 /* SEND to source node */
-void _starpu_mpi_common_send(const struct _starpu_mp_node *node, void *msg, int len)
+void _starpu_mpi_common_send(const struct _starpu_mp_node *node, void *msg, int len, void * event)
 {
     int res;
     int id_proc;
@@ -162,12 +163,30 @@ void _starpu_mpi_common_send(const struct _starpu_mp_node *node, void *msg, int
 
     printf("envoi %d B to %d\n", len, node->mp_connection.mpi_remote_nodeid);
 
-    res = MPI_Send(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, SYNC_TAG, MPI_COMM_WORLD);
+    if (event)
+    {
+        /* Asynchronous send */
+        struct _starpu_async_channel * channel = event;
+        channel->event.mpi_ms_event.finished = 0;
+        channel->event.mpi_ms_event.is_sender = 1;
+        res = MPI_Isend(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, ASYNC_TAG, MPI_COMM_WORLD, &channel->event.mpi_ms_event.request);
+    } 
+    else
+    {
+        /* Synchronous send */
+        res = MPI_Send(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, SYNC_TAG, MPI_COMM_WORLD);
+    }
     STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
 }
 
+void _starpu_mpi_common_mp_send(const struct _starpu_mp_node *node, void *msg, int len)
+{
+    _starpu_mpi_common_send(node, msg, len, NULL);
+}
+
+
 /* RECV to source node */
-void _starpu_mpi_common_recv(const struct _starpu_mp_node *node, void *msg, int len)
+void _starpu_mpi_common_recv(const struct _starpu_mp_node *node, void *msg, int len, void * event)
 {
     int res;
     int id_proc;
@@ -176,16 +195,33 @@ void _starpu_mpi_common_recv(const struct _starpu_mp_node *node, void *msg, int
 
     printf("recv %d B from %d in %p\n", len, node->mp_connection.mpi_remote_nodeid, msg);
 
-    res = MPI_Recv(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, SYNC_TAG, MPI_COMM_WORLD, &s);
-    int num_expected;
-    MPI_Get_count(&s, MPI_BYTE, &num_expected);
+    if (event)
+    {
+        /* Asynchronous recv */
+        struct _starpu_async_channel * channel = event;
+        channel->event.mpi_ms_event.finished = 0;
+        channel->event.mpi_ms_event.is_sender = 0;
+        res = MPI_Irecv(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, ASYNC_TAG, MPI_COMM_WORLD, &channel->event.mpi_ms_event.request);
+    } 
+    else
+    {
+        /* Synchronous recv */
+        res = MPI_Recv(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, SYNC_TAG, MPI_COMM_WORLD, &s);
+        int num_expected;
+        MPI_Get_count(&s, MPI_BYTE, &num_expected);
 
-    STARPU_ASSERT_MSG(num_expected == len, "MPI Master/Slave received a msg with a size of %d Bytes (expected %d Bytes) !", num_expected, len);
+        STARPU_ASSERT_MSG(num_expected == len, "MPI Master/Slave received a msg with a size of %d Bytes (expected %d Bytes) !", num_expected, len);
+    }
     STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
 }
 
+void _starpu_mpi_common_mp_recv(const struct _starpu_mp_node *node, void *msg, int len)
+{
+    _starpu_mpi_common_recv(node, msg, len, NULL);
+}
+
 /* SEND to any node */
-void _starpu_mpi_common_send_to_device(const struct _starpu_mp_node *node, int dst_devid, void *msg, int len)
+void _starpu_mpi_common_send_to_device(const struct _starpu_mp_node *node, int dst_devid, void *msg, int len, void * event)
 {   
     int res;
     int id_proc;
@@ -193,12 +229,25 @@ void _starpu_mpi_common_send_to_device(const struct _starpu_mp_node *node, int d
 
     printf("send %d bytes from %d from %p\n", len, dst_devid, msg);
 
-    res = MPI_Send(msg, len, MPI_BYTE, dst_devid, SYNC_TAG, MPI_COMM_WORLD);
+    if (event)
+    {
+        /* Asynchronous send */
+        struct _starpu_async_channel * channel = event;
+        channel->event.mpi_ms_event.finished = 0;
+        channel->event.mpi_ms_event.is_sender = 1;
+        res = MPI_Isend(msg, len, MPI_BYTE, dst_devid, ASYNC_TAG, MPI_COMM_WORLD, &channel->event.mpi_ms_event.request);
+    } 
+    else
+    {
+        /* Synchronous send */
+        res = MPI_Send(msg, len, MPI_BYTE, dst_devid, SYNC_TAG, MPI_COMM_WORLD);
+    }    
+
     STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
 }
 
 /* RECV to any node */
-void _starpu_mpi_common_recv_from_device(const struct _starpu_mp_node *node, int src_devid, void *msg, int len)
+void _starpu_mpi_common_recv_from_device(const struct _starpu_mp_node *node, int src_devid, void *msg, int len, void * event)
 {
     int res;
     int id_proc;
@@ -206,10 +255,51 @@ void _starpu_mpi_common_recv_from_device(const struct _starpu_mp_node *node, int
 
     printf("nop recv %d bytes from %d\n", len, src_devid);
 
-    res = MPI_Recv(msg, len, MPI_BYTE, src_devid, SYNC_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
-    STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
+    if (event)
+    {
+        /* Asynchronous recv */
+        struct _starpu_async_channel * channel = event;
+        channel->event.mpi_ms_event.finished = 0;
+        channel->event.mpi_ms_event.is_sender = 0;
+        res = MPI_Irecv(msg, len, MPI_BYTE, src_devid, ASYNC_TAG, MPI_COMM_WORLD, &channel->event.mpi_ms_event.request);
+    } 
+    else
+    {
+        /* Synchronous recv */
+        res = MPI_Recv(msg, len, MPI_BYTE, src_devid, SYNC_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
+        STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
+    }
+}
+
+/* - In MPI Master-Slave communications between host and device,
+ * host is always considered as the sender and the device, the receiver.
+ * - In device to device communications, the first ack received by host
+ * is considered as the sender (but it cannot be, in fact, the sender)
+ */
+int _starpu_mpi_common_test_event(struct _starpu_async_channel * event)
+{
+    //if the event is not finished, maybe it's a host-device communication
+    //or host has already finished its work
+    if (!event->event.mpi_ms_event.finished)
+    {
+        int flag = 0;
+        MPI_Test(&event->event.mpi_ms_event.request, &flag, MPI_STATUS_IGNORE);
+        if (flag)
+        {
+            event->event.mpi_ms_event.finished = 1;
+            if (event->event.mpi_ms_event.is_sender)
+                event->starpu_mp_common_finished_sender = 1;
+            else
+                event->starpu_mp_common_finished_receiver = 1;
+        }
+    }
+
+    return event->starpu_mp_common_finished_sender && event->starpu_mp_common_finished_receiver;
 }
 
+
+
+
 void _starpu_mpi_common_barrier(void)
 {
     MPI_Barrier(MPI_COMM_WORLD);

+ 9 - 4
src/drivers/mpi/driver_mpi_common.h

@@ -34,11 +34,16 @@ int _starpu_mpi_common_recv_is_ready(const struct _starpu_mp_node *mp_node);
 
 void _starpu_mpi_common_mp_initialize_src_sink(struct _starpu_mp_node *node);
 
-void _starpu_mpi_common_send(const struct _starpu_mp_node *node, void *msg, int len);
-void _starpu_mpi_common_recv(const struct _starpu_mp_node *node, void *msg, int len);
+void _starpu_mpi_common_send(const struct _starpu_mp_node *node, void *msg, int len, void * event);
+void _starpu_mpi_common_recv(const struct _starpu_mp_node *node, void *msg, int len, void * event);
 
-void _starpu_mpi_common_recv_from_device(const struct _starpu_mp_node *node, int src_devid, void *msg, int len);
-void _starpu_mpi_common_send_to_device(const struct _starpu_mp_node *node, int dst_devid, void *msg, int len);
+void _starpu_mpi_common_mp_send(const struct _starpu_mp_node *node, void *msg, int len);
+void _starpu_mpi_common_mp_recv(const struct _starpu_mp_node *node, void *msg, int len);
+
+void _starpu_mpi_common_recv_from_device(const struct _starpu_mp_node *node, int src_devid, void *msg, int len, void * event);
+void _starpu_mpi_common_send_to_device(const struct _starpu_mp_node *node, int dst_devid, void *msg, int len, void * event);
+
+int _starpu_mpi_common_test_event(struct _starpu_async_channel * event);
 
 void _starpu_mpi_common_barrier(void);
 

+ 60 - 17
src/drivers/mpi/driver_mpi_source.c

@@ -83,28 +83,80 @@ void _starpu_mpi_source_free_memory(void *addr, unsigned memory_node)
  /* Transfert SIZE bytes from the address pointed by SRC in the SRC_NODE memory
   * node to the address pointed by DST in the DST_NODE memory node
   */
-int _starpu_mpi_copy_ram_to_mpi(void *src, unsigned src_node STARPU_ATTRIBUTE_UNUSED, void *dst, unsigned dst_node, size_t size)
+int _starpu_mpi_copy_ram_to_mpi_sync(void *src, unsigned src_node STARPU_ATTRIBUTE_UNUSED, void *dst, unsigned dst_node, size_t size)
 {
-    const struct _starpu_mp_node *mp_node = _starpu_mpi_src_get_mp_node_from_memory_node(dst_node);
-    return _starpu_src_common_copy_host_to_sink(mp_node, src, dst, size);
+    struct _starpu_mp_node *mp_node = _starpu_mpi_src_get_mp_node_from_memory_node(dst_node);
+    return _starpu_src_common_copy_host_to_sink_sync(mp_node, src, dst, size);
 }   
  
  /* Transfert SIZE bytes from the address pointed by SRC in the SRC_NODE memory
   * node to the address pointed by DST in the DST_NODE memory node
   */    
-int _starpu_mpi_copy_mpi_to_ram(void *src, unsigned src_node, void *dst, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, size_t size)
+int _starpu_mpi_copy_mpi_to_ram_sync(void *src, unsigned src_node, void *dst, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, size_t size)
 {
-    const struct _starpu_mp_node *mp_node = _starpu_mpi_src_get_mp_node_from_memory_node(src_node);
-    return _starpu_src_common_copy_sink_to_host(mp_node, src, dst, size);
+    struct _starpu_mp_node *mp_node = _starpu_mpi_src_get_mp_node_from_memory_node(src_node);
+    return _starpu_src_common_copy_sink_to_host_sync(mp_node, src, dst, size);
 }   
 
-int _starpu_mpi_copy_sink_to_sink(void *src, unsigned src_node, void *dst, unsigned dst_node, size_t size)
+int _starpu_mpi_copy_sink_to_sink_sync(void *src, unsigned src_node, void *dst, unsigned dst_node, size_t size)
 {
-    return _starpu_src_common_copy_sink_to_sink(_starpu_mpi_src_get_mp_node_from_memory_node(src_node),
+    return _starpu_src_common_copy_sink_to_sink_sync(_starpu_mpi_src_get_mp_node_from_memory_node(src_node),
             _starpu_mpi_src_get_mp_node_from_memory_node(dst_node),
             src, dst, size);
 }
 
+int _starpu_mpi_copy_mpi_to_ram_async(void *src, unsigned src_node, void *dst, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, size_t size, void * event)
+{
+    /* By default, init the request with MPI_REQUEST_NULL */
+    struct _starpu_async_channel * channel = event;
+    channel->event.mpi_ms_event.request = MPI_REQUEST_NULL;
+
+    struct _starpu_mp_node *mp_node = _starpu_mpi_src_get_mp_node_from_memory_node(src_node);
+    return _starpu_src_common_copy_sink_to_host_async(mp_node, src, dst, size, event);
+}
+
+int _starpu_mpi_copy_ram_to_mpi_async(void *src, unsigned src_node STARPU_ATTRIBUTE_UNUSED, void *dst, unsigned dst_node, size_t size, void * event)
+{
+    /* By default, init the request with MPI_REQUEST_NULL */
+    struct _starpu_async_channel * channel = event;
+    channel->event.mpi_ms_event.request = MPI_REQUEST_NULL;
+
+    struct _starpu_mp_node *mp_node = _starpu_mpi_src_get_mp_node_from_memory_node(dst_node);
+    return _starpu_src_common_copy_host_to_sink_async(mp_node, src, dst, size, event);
+}
+
+int _starpu_mpi_copy_sink_to_sink_async(void *src, unsigned src_node, void *dst, unsigned dst_node, size_t size, void * event)
+{
+    /* By default, init the request with MPI_REQUEST_NULL */
+    struct _starpu_async_channel * channel = event;
+    channel->event.mpi_ms_event.request = MPI_REQUEST_NULL;
+
+    return _starpu_src_common_copy_sink_to_sink_async(_starpu_mpi_src_get_mp_node_from_memory_node(src_node),
+            _starpu_mpi_src_get_mp_node_from_memory_node(dst_node),
+            src, dst, size, event);
+}
+
+/* - In device to device communications, the first ack received by host
+ * is considered as the sender (but it cannot be, in fact, the sender)
+ */
+void _starpu_mpi_src_wait_event(struct _starpu_async_channel * event)
+{
+    if (!event->event.mpi_ms_event.finished)
+    {
+        MPI_Wait(&event->event.mpi_ms_event.request, MPI_STATUS_IGNORE);
+        event->event.mpi_ms_event.finished = 1;
+        if (event->event.mpi_ms_event.is_sender)
+            event->starpu_mp_common_finished_sender = 1;
+        else
+            event->starpu_mp_common_finished_receiver = 1;
+    }
+
+    //XXX: Maybe cause deadlock when the same thread is waiting here and cannot handle
+    //incoming ack from devices
+    while(!event->starpu_mp_common_finished_sender || !event->starpu_mp_common_finished_receiver)
+        ;
+}
+
 
 int _starpu_mpi_ms_src_register_kernel(starpu_mpi_ms_func_symbol_t *symbol, const char *func_name)
 {
@@ -230,15 +282,6 @@ unsigned _starpu_mpi_src_get_device_count()
 
 }
 
- void _starpu_mpi_exit_useless_node(int devid)
-{   
-    struct _starpu_mp_node *node = _starpu_mp_common_node_create(STARPU_MPI_SOURCE, devid);
-
-    _starpu_mp_common_send_command(node, STARPU_EXIT, NULL, 0);
-
-    _starpu_mp_common_node_destroy(node);
-}  
-
 void *_starpu_mpi_src_worker(void *arg)
 {
 #ifndef STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD

+ 9 - 4
src/drivers/mpi/driver_mpi_source.h

@@ -30,7 +30,6 @@ struct _starpu_mp_node *_starpu_mpi_src_get_mp_node_from_memory_node(int memory_
 
 unsigned _starpu_mpi_src_get_device_count();
 void *_starpu_mpi_src_worker(void *arg);
-void _starpu_mpi_exit_useless_node(int devid);
 
 void _starpu_mpi_source_init(struct _starpu_mp_node *node);
 void _starpu_mpi_source_deinit(struct _starpu_mp_node *node);
@@ -38,9 +37,15 @@ void _starpu_mpi_source_deinit(struct _starpu_mp_node *node);
 int _starpu_mpi_src_allocate_memory(void ** addr, size_t size, unsigned memory_node);
 void _starpu_mpi_source_free_memory(void *addr, unsigned memory_node);
 
-int _starpu_mpi_copy_mpi_to_ram(void *src, unsigned src_node, void *dst, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, size_t size);
-int _starpu_mpi_copy_ram_to_mpi(void *src, unsigned src_node STARPU_ATTRIBUTE_UNUSED, void *dst, unsigned dst_node, size_t size);
-int _starpu_mpi_copy_sink_to_sink(void *src, unsigned src_node, void *dst, unsigned dst_node, size_t size);
+int _starpu_mpi_copy_mpi_to_ram_sync(void *src, unsigned src_node, void *dst, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, size_t size);
+int _starpu_mpi_copy_ram_to_mpi_sync(void *src, unsigned src_node STARPU_ATTRIBUTE_UNUSED, void *dst, unsigned dst_node, size_t size);
+int _starpu_mpi_copy_sink_to_sink_sync(void *src, unsigned src_node, void *dst, unsigned dst_node, size_t size);
+
+int _starpu_mpi_copy_mpi_to_ram_async(void *src, unsigned src_node, void *dst, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, size_t size, void * event);
+int _starpu_mpi_copy_ram_to_mpi_async(void *src, unsigned src_node STARPU_ATTRIBUTE_UNUSED, void *dst, unsigned dst_node, size_t size, void * event);
+int _starpu_mpi_copy_sink_to_sink_async(void *src, unsigned src_node, void *dst, unsigned dst_node, size_t size, void * event);
+
+void _starpu_mpi_src_wait_event(struct _starpu_async_channel * event);
 
 void(* _starpu_mpi_ms_src_get_kernel_from_job(const struct _starpu_mp_node *node STARPU_ATTRIBUTE_UNUSED, struct _starpu_job *j))(void);
 

+ 5 - 1
src/drivers/scc/driver_scc_common.c

@@ -141,6 +141,8 @@ void _starpu_scc_common_send(const struct _starpu_mp_node *node, void *msg, int
 {
 	int ret;
 
+    STARPU_ASSERT_MSG(!event, "Asynchronous msg is not used here");
+
 	/* There are potentially 48 threads running on the master core and RCCE_send write
 	 * data in the MPB associated to this core. It's not thread safe, so we have to protect it.
 	 * RCCE_acquire_lock uses a test&set register on SCC. */
@@ -155,8 +157,10 @@ void _starpu_scc_common_send(const struct _starpu_mp_node *node, void *msg, int
 	RCCE_release_lock(RCCE_ue());
 }
 
-void _starpu_scc_common_recv(const struct _starpu_mp_node *node, void *msg, int len)
+void _starpu_scc_common_recv(const struct _starpu_mp_node *node, void *msg, int len, void * event)
 {
+    STARPU_ASSERT_MSG(!event, "Asynchronous msg is not used here");
+
 	int ret;
 	if ((ret = RCCE_recv(msg, len, node->mp_connection.scc_nodeid)) != RCCE_SUCCESS)
 		STARPU_MP_COMMON_REPORT_ERROR(node, ret);

+ 2 - 2
src/drivers/scc/driver_scc_common.h

@@ -39,8 +39,8 @@ int _starpu_scc_common_is_mp_initialized();
 int _starpu_scc_common_get_src_node_id();
 int _starpu_scc_common_is_src_node();
 
-void _starpu_scc_common_send(const struct _starpu_mp_node *node, void *msg, int len);
-void _starpu_scc_common_recv(const struct _starpu_mp_node *node, void *msg, int len);
+void _starpu_scc_common_send(const struct _starpu_mp_node *node, void *msg, int len, void * event);
+void _starpu_scc_common_recv(const struct _starpu_mp_node *node, void *msg, int len, void * event);
 
 void _starpu_scc_common_report_rcce_error(const char *func, const char *file, const int line, const int err_no);
 

+ 6 - 0
src/drivers/scc/driver_scc_sink.c

@@ -58,6 +58,9 @@ void _starpu_scc_sink_deinit(struct _starpu_mp_node *node)
 void _starpu_scc_sink_send_to_device(const struct _starpu_mp_node *node, int dst_devid, void *msg, int len)
 {
 	int ret;
+
+    STARPU_ASSERT_MSG(!event, "Asynchronous msg is not used here");
+
 	if ((ret = RCCE_send(msg, len, STARPU_TO_SCC_SINK_ID(dst_devid))) != RCCE_SUCCESS)
 		STARPU_MP_COMMON_REPORT_ERROR(node, ret);
 }
@@ -65,6 +68,9 @@ void _starpu_scc_sink_send_to_device(const struct _starpu_mp_node *node, int dst
 void _starpu_scc_sink_recv_from_device(const struct _starpu_mp_node *node, int src_devid, void *msg, int len)
 {
 	int ret;
+
+    STARPU_ASSERT_MSG(!event, "Asynchronous msg is not used here");
+
 	if ((ret = RCCE_recv(msg, len, STARPU_TO_SCC_SINK_ID(src_devid))) != RCCE_SUCCESS)
 		STARPU_MP_COMMON_REPORT_ERROR(node, ret);
 }

+ 2 - 2
src/drivers/scc/driver_scc_sink.h

@@ -28,8 +28,8 @@ void _starpu_scc_sink_init(struct _starpu_mp_node *node);
 void _starpu_scc_sink_launch_workers(struct _starpu_mp_node *node);
 void _starpu_scc_sink_deinit(struct _starpu_mp_node *node);
 
-void _starpu_scc_sink_send_to_device(const struct _starpu_mp_node *node, int dst_devid, void *msg, int len);
-void _starpu_scc_sink_recv_from_device(const struct _starpu_mp_node *node, int src_devid, void *msg, int len);
+void _starpu_scc_sink_send_to_device(const struct _starpu_mp_node *node, int dst_devid, void *msg, int len, void * event);
+void _starpu_scc_sink_recv_from_device(const struct _starpu_mp_node *node, int src_devid, void *msg, int len, void * event);
 
 void _starpu_scc_sink_bind_thread(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, cpu_set_t * cpuset, int coreid, starpu_pthread_t *thread);
 

+ 3 - 3
src/drivers/scc/driver_scc_source.c

@@ -259,7 +259,7 @@ void _starpu_scc_set_offset_in_shared_memory(void *ptr, void **dev_handle, size_
  */
 int _starpu_scc_copy_src_to_sink(void *src, unsigned src_node STARPU_ATTRIBUTE_UNUSED, void *dst, unsigned dst_node, size_t size)
 {
-	return _starpu_src_common_copy_host_to_sink(_starpu_scc_src_memory_node_to_mp_node(dst_node),
+	return _starpu_src_common_copy_host_to_sink_sync(_starpu_scc_src_memory_node_to_mp_node(dst_node),
 			src, dst, size);
 }
 
@@ -268,13 +268,13 @@ int _starpu_scc_copy_src_to_sink(void *src, unsigned src_node STARPU_ATTRIBUTE_U
  */
 int _starpu_scc_copy_sink_to_src(void *src, unsigned src_node, void *dst, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, size_t size)
 {
-	return _starpu_src_common_copy_sink_to_host(_starpu_scc_src_memory_node_to_mp_node(src_node),
+	return _starpu_src_common_copy_sink_to_host_sync(_starpu_scc_src_memory_node_to_mp_node(src_node),
 			src, dst, size);
 }
 
 int _starpu_scc_copy_sink_to_sink(void *src, unsigned src_node, void *dst, unsigned dst_node, size_t size)
 {
-	return _starpu_src_common_copy_sink_to_sink(_starpu_scc_src_memory_node_to_mp_node(src_node),
+	return _starpu_src_common_copy_sink_to_sink_sync(_starpu_scc_src_memory_node_to_mp_node(src_node),
 			_starpu_scc_src_memory_node_to_mp_node(dst_node),
 			src, dst, size);
 }