Bläddra i källkod

* fix bug in async mode when using MPI Master-Slave
Use a list to store MPI request when an async event is used more than once
Update copyright for modified files in r19323

Corentin Salingue 8 år sedan
förälder
incheckning
f7acc7fe76

+ 1 - 1
include/starpu_data_interfaces.h

@@ -2,7 +2,7 @@
  *
  * Copyright (C) 2010-2016  Université de Bordeaux
  * Copyright (C) 2010-2014  CNRS
- * Copyright (C) 2011-2012  INRIA
+ * Copyright (C) 2011-2012, 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by

+ 1 - 0
src/datawizard/copy_driver.c

@@ -2,6 +2,7 @@
  *
  * Copyright (C) 2010-2016  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2013  CNRS
+ * Copyright (C) 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by

+ 9 - 3
src/datawizard/copy_driver.h

@@ -2,6 +2,7 @@
  *
  * Copyright (C) 2010, 2012-2015  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2013, 2015  CNRS
+ * Copyright (C) 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -23,6 +24,7 @@
 #endif
 
 #include <common/config.h>
+#include <common/list.h>
 
 #ifdef STARPU_USE_CUDA
 #include <cuda.h>
@@ -59,12 +61,14 @@ struct _starpu_mic_async_event
 #endif
 
 #ifdef STARPU_USE_MPI_MASTER_SLAVE
+LIST_TYPE(_starpu_mpi_ms_event_request,
+        MPI_Request request;
+);
+
 struct _starpu_mpi_ms_async_event
 {
-    /* to know if request is finished and already handled */
-    unsigned finished;
     int is_sender;
-    MPI_Request request;
+    struct _starpu_mpi_ms_event_request_list * requests;
 };
 #endif
 
@@ -105,6 +109,8 @@ struct _starpu_async_channel
 {
 	union _starpu_async_channel_event event;
 	enum starpu_node_kind type;
+    /* Which node to polling when needing ACK msg */
+    struct _starpu_mp_node *polling_node;
     /* Used to know if the acknowlegdment msg is arrived from sinks */
     volatile int starpu_mp_common_finished_sender; 
     volatile int starpu_mp_common_finished_receiver; 

+ 6 - 0
src/datawizard/data_request.c

@@ -2,6 +2,7 @@
  *
  * Copyright (C) 2009-2016  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015  CNRS
+ * Copyright (C) 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -152,6 +153,11 @@ struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t ha
 	r->dst_replicate = dst_replicate;
 	r->mode = mode;
 	r->async_channel.type = STARPU_UNUSED;
+    r->async_channel.starpu_mp_common_finished_sender = 0;
+    r->async_channel.starpu_mp_common_finished_receiver = 0;
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+    r->async_channel.event.mpi_ms_event.requests = NULL;
+#endif
 	if (handling_node == -1)
 		handling_node = STARPU_MAIN_RAM;
 	r->handling_node = handling_node;

+ 1 - 1
src/drivers/mic/driver_mic_common.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012  INRIA
+ * Copyright (C) 2012, 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by

+ 1 - 1
src/drivers/mic/driver_mic_common.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012  INRIA
+ * Copyright (C) 2012, 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by

+ 1 - 1
src/drivers/mic/driver_mic_source.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012  INRIA
+ * Copyright (C) 2012, 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by

+ 13 - 5
src/drivers/mp_common/sink_common.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012  INRIA
+ * Copyright (C) 2012, 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -133,8 +133,10 @@ static void _starpu_sink_common_copy_from_host_async(struct _starpu_mp_node *mp_
 
     /* Set the sender (host) ready because we don't want to wait its ack */
     struct _starpu_async_channel * async_channel = &sink_event->event;
-    async_channel->starpu_mp_common_finished_sender = 1;
+    async_channel->type = STARPU_UNUSED;
+    async_channel->starpu_mp_common_finished_sender = -1;
     async_channel->starpu_mp_common_finished_receiver = 0;
+    async_channel->polling_node = NULL;
 
     mp_node->dt_recv(mp_node, cmd->addr, cmd->size, &sink_event->event);
     /* Push event on the list */
@@ -176,8 +178,10 @@ static void _starpu_sink_common_copy_to_host_async(struct _starpu_mp_node *mp_no
     
     /* Set the receiver (host) ready because we don't want to wait its ack */
     struct _starpu_async_channel * async_channel = &sink_event->event;
+    async_channel->type = STARPU_UNUSED;
     async_channel->starpu_mp_common_finished_sender = 0;
-    async_channel->starpu_mp_common_finished_receiver = 1;
+    async_channel->starpu_mp_common_finished_receiver = -1;
+    async_channel->polling_node = NULL;
 
     mp_node->dt_send(mp_node, cmd->addr, cmd->size, &sink_event->event);
     /* Push event on the list */
@@ -213,8 +217,10 @@ static void _starpu_sink_common_copy_from_sink_async(struct _starpu_mp_node *mp_
 
     /* Set the sender ready because we don't want to wait its ack */
     struct _starpu_async_channel * async_channel = &sink_event->event;
-    async_channel->starpu_mp_common_finished_sender = 1;
+    async_channel->type = STARPU_UNUSED;
+    async_channel->starpu_mp_common_finished_sender = -1;
     async_channel->starpu_mp_common_finished_receiver = 0;
+    async_channel->polling_node = NULL;
 
     mp_node->dt_recv_from_device(mp_node, cmd->devid, cmd->addr, cmd->size, &sink_event->event);
     /* Push event on the list */
@@ -250,8 +256,10 @@ static void _starpu_sink_common_copy_to_sink_async(struct _starpu_mp_node *mp_no
 
     /* Set the receiver ready because we don't want to wait its ack */
     struct _starpu_async_channel * async_channel = &sink_event->event;
+    async_channel->type = STARPU_UNUSED;
     async_channel->starpu_mp_common_finished_sender = 0;
-    async_channel->starpu_mp_common_finished_receiver = 1;
+    async_channel->starpu_mp_common_finished_receiver = -1;
+    async_channel->polling_node = NULL;
 
     mp_node->dt_send_to_device(mp_node, cmd->devid, cmd->addr, cmd->size, &sink_event->event);
 

+ 17 - 19
src/drivers/mp_common/source_common.c

@@ -140,15 +140,15 @@ static int _starpu_src_common_handle_async(const struct _starpu_mp_node *node ST
         case STARPU_RECV_FROM_HOST_ASYNC_COMPLETED:
         case STARPU_RECV_FROM_SINK_ASYNC_COMPLETED:
         {
-            struct _starpu_async_channel * event = arg;
-            event->starpu_mp_common_finished_receiver = 1;
+            struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
+            event->starpu_mp_common_finished_receiver--;
             break;
         }
         case STARPU_SEND_TO_HOST_ASYNC_COMPLETED:
         case STARPU_SEND_TO_SINK_ASYNC_COMPLETED:
         {
-            struct _starpu_async_channel * event = arg;
-            event->starpu_mp_common_finished_sender = 1;
+            struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
+            event->starpu_mp_common_finished_sender--;
             break;
         }
 		default:
@@ -205,16 +205,16 @@ int _starpu_src_common_store_message(struct _starpu_mp_node *node,
         case STARPU_RECV_FROM_HOST_ASYNC_COMPLETED:
         case STARPU_RECV_FROM_SINK_ASYNC_COMPLETED:
         {
-            struct _starpu_async_channel * event = arg;
-            event->starpu_mp_common_finished_receiver = 1;
+            struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
+            event->starpu_mp_common_finished_receiver--;
             return 1;
             break;
         }
         case STARPU_SEND_TO_HOST_ASYNC_COMPLETED:
         case STARPU_SEND_TO_SINK_ASYNC_COMPLETED:
         {
-            struct _starpu_async_channel * event = arg;
-            event->starpu_mp_common_finished_sender = 1;
+            struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
+            event->starpu_mp_common_finished_sender--;
             return 1;
             break;
         }
@@ -548,7 +548,7 @@ int _starpu_src_common_copy_host_to_sink_sync(const struct _starpu_mp_node *mp_n
 /* Send SIZE bytes pointed by SRC to DST on the sink linked to the MP_NODE with an
  * asynchronous mode.
  */
-int _starpu_src_common_copy_host_to_sink_async(const struct _starpu_mp_node *mp_node,
+int _starpu_src_common_copy_host_to_sink_async(struct _starpu_mp_node *mp_node,
 		void *src, void *dst, size_t size, void * event)
 {
 	struct _starpu_mp_transfer_command cmd = {size, dst, event};
@@ -557,14 +557,13 @@ int _starpu_src_common_copy_host_to_sink_async(const struct _starpu_mp_node *mp_
      * to test is they are finished
      */
     struct _starpu_async_channel * async_channel = event;
-    async_channel->starpu_mp_common_finished_sender = 0;
-    async_channel->starpu_mp_common_finished_receiver = 0;
+    async_channel->polling_node = mp_node;
 
 	_starpu_mp_common_send_command(mp_node, STARPU_RECV_FROM_HOST_ASYNC, &cmd, sizeof(cmd));
 
 	mp_node->dt_send(mp_node, src, size, event);
 
-	return 0;
+	return -EAGAIN;
 }
 
 /* Receive SIZE bytes pointed by SRC on the sink linked to the MP_NODE and store them in DST
@@ -601,14 +600,13 @@ int _starpu_src_common_copy_sink_to_host_async(struct _starpu_mp_node *mp_node,
      * to test is they are finished
      */
     struct _starpu_async_channel * async_channel = event;
-    async_channel->starpu_mp_common_finished_sender = 0;
-    async_channel->starpu_mp_common_finished_receiver = 0;
+    async_channel->polling_node = mp_node;
 
 	_starpu_mp_common_send_command(mp_node, STARPU_SEND_TO_HOST_ASYNC, &cmd, sizeof(cmd));
 
 	mp_node->dt_recv(mp_node, dst, size, event);
-    
-	return 0;
+
+	return -EAGAIN;
 }
 
 /* Tell the sink linked to SRC_NODE to send SIZE bytes of data pointed by SRC
@@ -655,8 +653,7 @@ int _starpu_src_common_copy_sink_to_sink_async(const struct _starpu_mp_node *src
      * to test is they are finished
      */
     struct _starpu_async_channel * async_channel = event;
-    async_channel->starpu_mp_common_finished_sender = 0;
-    async_channel->starpu_mp_common_finished_receiver = 0;
+    async_channel->polling_node = NULL; /* TODO which node ? */
 
 	/* Tell source to send data to dest. */
 	_starpu_mp_common_send_command(src_node, STARPU_SEND_TO_SINK_ASYNC, &cmd, sizeof(cmd));
@@ -668,7 +665,8 @@ int _starpu_src_common_copy_sink_to_sink_async(const struct _starpu_mp_node *src
 	/* Tell dest to receive data from source. */
 	_starpu_mp_common_send_command(dst_node, STARPU_RECV_FROM_SINK_ASYNC, &cmd, sizeof(cmd));
 
-	return 0;
+
+	return -EAGAIN;
 }
 
 /* 5 functions to determine the executable to run on the device (MIC, SCC,

+ 0 - 3
src/drivers/mp_common/source_common.h

@@ -28,9 +28,6 @@
 
 enum _starpu_mp_command _starpu_src_common_wait_command_sync(struct _starpu_mp_node *node, 
 							     void ** arg, int* arg_size);
-void _starpu_src_common_recv_async(struct _starpu_worker_set *worker_set, 
-				   struct _starpu_mp_node * baseworker_node);
-
 int _starpu_src_common_store_message(struct _starpu_mp_node *node, 
 		void * arg, int arg_size, enum _starpu_mp_command answer);
 

+ 129 - 23
src/drivers/mpi/driver_mpi_common.c

@@ -18,6 +18,7 @@
 #include <mpi.h>
 #include <core/workers.h>
 #include <core/perfmodel/perfmodel.h>
+#include <drivers/mp_common/source_common.h>
 #include "driver_mpi_common.h"
 
 #define NITER 32
@@ -167,9 +168,27 @@ void _starpu_mpi_common_send(const struct _starpu_mp_node *node, void *msg, int
     {
         /* Asynchronous send */
         struct _starpu_async_channel * channel = event;
-        channel->event.mpi_ms_event.finished = 0;
         channel->event.mpi_ms_event.is_sender = 1;
-        res = MPI_Isend(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, ASYNC_TAG, MPI_COMM_WORLD, &channel->event.mpi_ms_event.request);
+
+        /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
+        if (channel->type == STARPU_UNUSED)
+            channel->event.mpi_ms_event.requests = NULL;
+
+        /* Initialize the list */
+        if (channel->event.mpi_ms_event.requests == NULL)
+        {
+            channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();            
+            _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
+        }
+
+        struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
+
+        res = MPI_Isend(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
+
+        channel->starpu_mp_common_finished_receiver++;
+        channel->starpu_mp_common_finished_sender++;
+
+        _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
     } 
     else
     {
@@ -199,9 +218,27 @@ void _starpu_mpi_common_recv(const struct _starpu_mp_node *node, void *msg, int
     {
         /* Asynchronous recv */
         struct _starpu_async_channel * channel = event;
-        channel->event.mpi_ms_event.finished = 0;
         channel->event.mpi_ms_event.is_sender = 0;
-        res = MPI_Irecv(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, ASYNC_TAG, MPI_COMM_WORLD, &channel->event.mpi_ms_event.request);
+
+        /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
+        if (channel->type == STARPU_UNUSED)
+            channel->event.mpi_ms_event.requests = NULL;
+
+        /* Initialize the list */
+        if (channel->event.mpi_ms_event.requests == NULL)
+        {
+            channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();            
+            _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
+        }
+
+        struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
+
+        res = MPI_Irecv(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
+
+        channel->starpu_mp_common_finished_receiver++;
+        channel->starpu_mp_common_finished_sender++;
+
+        _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
     } 
     else
     {
@@ -233,9 +270,27 @@ void _starpu_mpi_common_send_to_device(const struct _starpu_mp_node *node, int d
     {
         /* Asynchronous send */
         struct _starpu_async_channel * channel = event;
-        channel->event.mpi_ms_event.finished = 0;
         channel->event.mpi_ms_event.is_sender = 1;
-        res = MPI_Isend(msg, len, MPI_BYTE, dst_devid, ASYNC_TAG, MPI_COMM_WORLD, &channel->event.mpi_ms_event.request);
+
+        /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
+        if (channel->type == STARPU_UNUSED)
+            channel->event.mpi_ms_event.requests = NULL;
+
+        /* Initialize the list */
+        if (channel->event.mpi_ms_event.requests == NULL)
+        {
+            channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();            
+            _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
+        }
+
+        struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
+
+        res = MPI_Isend(msg, len, MPI_BYTE, dst_devid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
+
+        channel->starpu_mp_common_finished_receiver++;
+        channel->starpu_mp_common_finished_sender++;
+
+        _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
     } 
     else
     {
@@ -259,9 +314,27 @@ void _starpu_mpi_common_recv_from_device(const struct _starpu_mp_node *node, int
     {
         /* Asynchronous recv */
         struct _starpu_async_channel * channel = event;
-        channel->event.mpi_ms_event.finished = 0;
         channel->event.mpi_ms_event.is_sender = 0;
-        res = MPI_Irecv(msg, len, MPI_BYTE, src_devid, ASYNC_TAG, MPI_COMM_WORLD, &channel->event.mpi_ms_event.request);
+
+        /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
+        if (channel->type == STARPU_UNUSED)
+            channel->event.mpi_ms_event.requests = NULL;
+
+        /* Initialize the list */
+        if (channel->event.mpi_ms_event.requests == NULL)
+        {
+            channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();            
+            _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
+        }
+
+        struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
+
+        res = MPI_Irecv(msg, len, MPI_BYTE, src_devid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
+
+        channel->starpu_mp_common_finished_receiver++;
+        channel->starpu_mp_common_finished_sender++;
+
+        _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
     } 
     else
     {
@@ -271,30 +344,63 @@ void _starpu_mpi_common_recv_from_device(const struct _starpu_mp_node *node, int
     }
 }
 
-/* - In MPI Master-Slave communications between host and device,
- * host is always considered as the sender and the device, the receiver.
- * - In device to device communications, the first ack received by host
+ /* - In device to device communications, the first ack received by host
  * is considered as the sender (but it cannot be, in fact, the sender)
  */
 int _starpu_mpi_common_test_event(struct _starpu_async_channel * event)
 {
-    //if the event is not finished, maybe it's a host-device communication
-    //or host has already finished its work
-    if (!event->event.mpi_ms_event.finished)
+    if (event->event.mpi_ms_event.requests != NULL && !_starpu_mpi_ms_event_request_list_empty(event->event.mpi_ms_event.requests))
+    {
+        struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_list_begin(event->event.mpi_ms_event.requests);
+        struct _starpu_mpi_ms_event_request * req_next;
+
+        while (req != _starpu_mpi_ms_event_request_list_end(event->event.mpi_ms_event.requests))
+        {
+            req_next = _starpu_mpi_ms_event_request_list_next(req);
+
+            int flag = 0;
+            MPI_Test(&req->request, &flag, MPI_STATUS_IGNORE);
+            if (flag)
+            {
+                _starpu_mpi_ms_event_request_list_erase(event->event.mpi_ms_event.requests, req);
+                _starpu_mpi_ms_event_request_delete(req);
+
+                if (event->event.mpi_ms_event.is_sender)
+                    event->starpu_mp_common_finished_sender--;
+                else
+                    event->starpu_mp_common_finished_receiver--;
+
+            }
+            req = req_next;
+        }
+
+        /* When the list is empty, we finished to wait each request */
+        if (_starpu_mpi_ms_event_request_list_empty(event->event.mpi_ms_event.requests))
+        {
+            /* Destroy the list */
+            _starpu_mpi_ms_event_request_list_delete(event->event.mpi_ms_event.requests);
+            event->event.mpi_ms_event.requests = NULL;
+        }
+    }
+
+    /* poll the asynchronous messages.*/
+    if (event->polling_node != NULL)
     {
-        int flag = 0;
-        MPI_Test(&event->event.mpi_ms_event.request, &flag, MPI_STATUS_IGNORE);
-        if (flag)
+        while(event->polling_node->mp_recv_is_ready(event->polling_node))
         {
-            event->event.mpi_ms_event.finished = 1;
-            if (event->event.mpi_ms_event.is_sender)
-                event->starpu_mp_common_finished_sender = 1;
-            else
-                event->starpu_mp_common_finished_receiver = 1;
+            enum _starpu_mp_command answer;
+            void *arg;
+            int arg_size;
+            answer = _starpu_mp_common_recv_command(event->polling_node, &arg, &arg_size);
+            if(!_starpu_src_common_store_message(event->polling_node,arg,arg_size,answer))
+            {
+                printf("incorrect commande: unknown command or sync command");
+                STARPU_ASSERT(0);
+            }
         }
     }
 
-    return event->starpu_mp_common_finished_sender && event->starpu_mp_common_finished_receiver;
+    return !event->starpu_mp_common_finished_sender && !event->starpu_mp_common_finished_receiver;
 }
 
 

+ 43 - 21
src/drivers/mpi/driver_mpi_source.c

@@ -107,30 +107,18 @@ int _starpu_mpi_copy_sink_to_sink_sync(void *src, unsigned src_node, void *dst,
 
 int _starpu_mpi_copy_mpi_to_ram_async(void *src, unsigned src_node, void *dst, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, size_t size, void * event)
 {
-    /* By default, init the request with MPI_REQUEST_NULL */
-    struct _starpu_async_channel * channel = event;
-    channel->event.mpi_ms_event.request = MPI_REQUEST_NULL;
-
     struct _starpu_mp_node *mp_node = _starpu_mpi_src_get_mp_node_from_memory_node(src_node);
     return _starpu_src_common_copy_sink_to_host_async(mp_node, src, dst, size, event);
 }
 
 int _starpu_mpi_copy_ram_to_mpi_async(void *src, unsigned src_node STARPU_ATTRIBUTE_UNUSED, void *dst, unsigned dst_node, size_t size, void * event)
 {
-    /* By default, init the request with MPI_REQUEST_NULL */
-    struct _starpu_async_channel * channel = event;
-    channel->event.mpi_ms_event.request = MPI_REQUEST_NULL;
-
     struct _starpu_mp_node *mp_node = _starpu_mpi_src_get_mp_node_from_memory_node(dst_node);
     return _starpu_src_common_copy_host_to_sink_async(mp_node, src, dst, size, event);
 }
 
 int _starpu_mpi_copy_sink_to_sink_async(void *src, unsigned src_node, void *dst, unsigned dst_node, size_t size, void * event)
 {
-    /* By default, init the request with MPI_REQUEST_NULL */
-    struct _starpu_async_channel * channel = event;
-    channel->event.mpi_ms_event.request = MPI_REQUEST_NULL;
-
     return _starpu_src_common_copy_sink_to_sink_async(_starpu_mpi_src_get_mp_node_from_memory_node(src_node),
             _starpu_mpi_src_get_mp_node_from_memory_node(dst_node),
             src, dst, size, event);
@@ -141,20 +129,54 @@ int _starpu_mpi_copy_sink_to_sink_async(void *src, unsigned src_node, void *dst,
  */
 void _starpu_mpi_src_wait_event(struct _starpu_async_channel * event)
 {
-    if (!event->event.mpi_ms_event.finished)
+    if (event->event.mpi_ms_event.requests != NULL && !_starpu_mpi_ms_event_request_list_empty(event->event.mpi_ms_event.requests))
     {
-        MPI_Wait(&event->event.mpi_ms_event.request, MPI_STATUS_IGNORE);
-        event->event.mpi_ms_event.finished = 1;
-        if (event->event.mpi_ms_event.is_sender)
-            event->starpu_mp_common_finished_sender = 1;
-        else
-            event->starpu_mp_common_finished_receiver = 1;
+        struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_list_begin(event->event.mpi_ms_event.requests);
+        struct _starpu_mpi_ms_event_request * req_next;
+
+        while (req != _starpu_mpi_ms_event_request_list_end(event->event.mpi_ms_event.requests))
+        {
+            req_next = _starpu_mpi_ms_event_request_list_next(req);
+
+            MPI_Wait(&req->request, MPI_STATUS_IGNORE);
+            _starpu_mpi_ms_event_request_list_erase(event->event.mpi_ms_event.requests, req);
+
+            _starpu_mpi_ms_event_request_delete(req);
+            req = req_next;
+
+            if (event->event.mpi_ms_event.is_sender)
+                event->starpu_mp_common_finished_sender--;
+            else
+                event->starpu_mp_common_finished_receiver--;
+
+        }
+
+        STARPU_ASSERT_MSG(_starpu_mpi_ms_event_request_list_empty(event->event.mpi_ms_event.requests), "MPI Request list is not empty after a wait_event !");
+
+        /* Destroy the list */
+        _starpu_mpi_ms_event_request_list_delete(event->event.mpi_ms_event.requests);
+        event->event.mpi_ms_event.requests = NULL;
     }
 
     //XXX: Maybe cause deadlock when the same thread is waiting here and cannot handle
     //incoming ack from devices
-    while(!event->starpu_mp_common_finished_sender || !event->starpu_mp_common_finished_receiver)
-        ;
+    while(event->starpu_mp_common_finished_sender > 0 || event->starpu_mp_common_finished_receiver > 0)
+        /* poll the asynchronous messages.*/
+        if (event->polling_node != NULL)
+        {
+            while(event->polling_node->mp_recv_is_ready(event->polling_node))
+            {
+                enum _starpu_mp_command answer;
+                void *arg;
+                int arg_size;
+                answer = _starpu_mp_common_recv_command(event->polling_node, &arg, &arg_size);
+                if(!_starpu_src_common_store_message(event->polling_node,arg,arg_size,answer))
+                {
+                    printf("incorrect commande: unknown command or sync command");
+                    STARPU_ASSERT(0);
+                }
+            }
+        }
 }
 
 

+ 1 - 1
src/drivers/scc/driver_scc_common.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012  INRIA
+ * Copyright (C) 2012, 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by

+ 1 - 1
src/drivers/scc/driver_scc_common.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012  INRIA
+ * Copyright (C) 2012, 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by

+ 1 - 1
src/drivers/scc/driver_scc_sink.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012  INRIA
+ * Copyright (C) 2012, 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by

+ 1 - 1
src/drivers/scc/driver_scc_sink.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012  INRIA
+ * Copyright (C) 2012, 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by

+ 1 - 1
src/drivers/scc/driver_scc_source.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012  INRIA
+ * Copyright (C) 2012, 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by

+ 2 - 1
tests/datawizard/copy.c

@@ -2,6 +2,7 @@
  *
  * Copyright (C) 2010-2011, 2016  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013  CNRS
+ * Copyright (C) 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -64,7 +65,7 @@ int main(int argc, char **argv)
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
 
 	if (starpu_worker_get_count_by_type(STARPU_CUDA_WORKER) == 0 && starpu_worker_get_count_by_type(STARPU_OPENCL_WORKER) == 0 &&
-		starpu_worker_get_count_by_type(STARPU_MIC_WORKER) == 0)
+		starpu_worker_get_count_by_type(STARPU_MIC_WORKER) == 0 && starpu_worker_get_count_by_type(STARPU_MPI_WORKER) == 0)
 	{
 		FPRINTF(stderr, "This application requires a CUDA , OpenCL or MIC Worker\n");
 		starpu_shutdown();