瀏覽代碼

mpi: rewrite code to allow several MPI communicators within a same application

Nathalie Furmento 10 年之前
父節點
當前提交
3345c92b36

+ 2 - 0
mpi/src/Makefile.am

@@ -43,6 +43,7 @@ noinst_HEADERS =					\
 	starpu_mpi_early_data.h				\
 	starpu_mpi_early_request.h			\
 	starpu_mpi_sync_data.h				\
+	starpu_mpi_comm.h				\
 	starpu_mpi_tag.h
 
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
@@ -59,6 +60,7 @@ libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi_early_data.c				\
 	starpu_mpi_early_request.c			\
 	starpu_mpi_sync_data.c				\
+	starpu_mpi_comm.c				\
 	starpu_mpi_tag.c
 
 showcheck:

+ 98 - 104
mpi/src/starpu_mpi.c

@@ -28,6 +28,7 @@
 #include <starpu_mpi_early_request.h>
 #include <starpu_mpi_select_node.h>
 #include <starpu_mpi_tag.h>
+#include <starpu_mpi_comm.h>
 #include <common/config.h>
 #include <common/thread.h>
 #include <datawizard/interfaces/data_interface.h>
@@ -95,9 +96,9 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 	(*req)->count = -1;
 	(*req)->user_datatype = -1;
 
-	(*req)->srcdst = -1;
-	(*req)->data_tag = -1;
-	(*req)->comm = 0;
+	(*req)->node_tag.rank = -1;
+	(*req)->node_tag.data_tag = -1;
+	(*req)->node_tag.comm = NULL;
 
 	(*req)->func = NULL;
 
@@ -152,7 +153,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 
 	_STARPU_MPI_INC_POSTED_REQUESTS(-1);
 
-	_STARPU_MPI_DEBUG(3, "new req %p srcdst %d tag %d and type %s %d\n", req, req->srcdst, req->data_tag, _starpu_mpi_request_type(req->request_type), req->is_internal_req);
+	_STARPU_MPI_DEBUG(3, "new req %p srcdst %d tag %d and type %s %d\n", req, req->node_tag.rank, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->is_internal_req);
 
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 
@@ -180,7 +181,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 			}
 
 			_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
-					  req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, req->ptr,
+					  req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr,
 					  _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 			_starpu_mpi_req_list_push_front(ready_requests, req);
 
@@ -195,7 +196,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 		else
 		{
 			/* test whether some data with the given tag and source have already been received by StarPU-MPI*/
-			struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(req->data_tag, req->srcdst);
+			struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(&req->node_tag);
 
 			/* Case: a receive request for a data with the given tag and source has already been
 			 * posted by StarPU. Asynchronously requests a Read permission over the temporary handle ,
@@ -210,7 +211,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 				STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req_mutex));
 				STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 
-				_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->data_tag);
+				_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->node_tag.data_tag);
 				STARPU_ASSERT(req->data_handle != early_data_handle->handle);
 
 				req->internal_req = early_data_handle->req;
@@ -227,9 +228,9 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 			/* Case: no matching data has been received. Store the receive request as an early_request. */
 			else
 			{
-				struct _starpu_mpi_sync_data_handle *sync_data_handle = _starpu_mpi_sync_data_find(req->data_tag, req->srcdst);
-				_STARPU_MPI_DEBUG(3, "----------> Looking for sync data for tag %d and src %d = %p\n", req->data_tag, req->srcdst, sync_data_handle);
-				if (sync_data_handle)
+				struct _starpu_mpi_req *sync_req = _starpu_mpi_sync_data_find(req->node_tag.data_tag, req->node_tag.rank, req->node_tag.comm);
+				_STARPU_MPI_DEBUG(3, "----------> Looking for sync data for tag %d and src %d = %p\n", req->node_tag.data_tag, req->node_tag.rank, sync_req);
+				if (sync_req)
 				{
 					req->sync = 1;
 					_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
@@ -240,7 +241,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 					}
 					else
 					{
-						req->count = sync_data_handle->req->count;
+						req->count = sync_req->count;
 						STARPU_ASSERT(req->count);
 						req->ptr = malloc(req->count);
 						STARPU_MPI_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
@@ -249,7 +250,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 				}
 				else
 				{
-					_STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %d) into the request hashmap\n", req, req->srcdst, req->data_tag);
+					_STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %d) into the request hashmap\n", req, req->node_tag.rank, req->node_tag.data_tag);
 					_starpu_mpi_early_request_add(req);
 				}
 			}
@@ -259,7 +260,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 	{
 		_starpu_mpi_req_list_push_front(ready_requests, req);
 		_STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
-				  req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+				  req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 	}
 
 	newer_requests = 1;
@@ -286,9 +287,9 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 	_starpu_mpi_request_init(&req);
 	req->request_type = request_type;
 	req->data_handle = data_handle;
-	req->srcdst = srcdst;
-	req->data_tag = data_tag;
-	req->comm = comm;
+	req->node_tag.rank = srcdst;
+	req->node_tag.data_tag = data_tag;
+	req->node_tag.comm = comm;
 	req->detached = detached;
 	req->sync = sync;
 	req->callback = callback;
@@ -317,26 +318,26 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 {
 	_STARPU_MPI_LOG_IN();
 
-	_STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p datasize %ld ptr %p datatype '%s' count %d user_datatype %d sync %d\n", req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype, req->sync);
+	_STARPU_MPI_DEBUG(20, "post MPI isend request %p type %s tag %d src %d data %p datasize %ld ptr %p datatype '%s' count %d user_datatype %d sync %d\n", req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype, req->sync);
 
-	_starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
+	_starpu_mpi_comm_amounts_inc(req->node_tag.comm, req->node_tag.rank, req->datatype, req->count);
 
-	_STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(req->srcdst, req->data_tag, 0);
+	_STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(req->node_tag.rank, req->node_tag.data_tag, 0);
 
 	if (req->sync == 0)
 	{
-		_STARPU_MPI_COMM_TO_DEBUG(req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_DATA, req->data_tag);
-		req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_DATA, req->comm, &req->request);
+		_STARPU_MPI_COMM_TO_DEBUG(req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.data_tag);
+		req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.comm, &req->request);
 		STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %s", _starpu_mpi_get_mpi_code(req->ret));
 	}
 	else
 	{
-		_STARPU_MPI_COMM_TO_DEBUG(req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_SYNC_DATA, req->data_tag);
-		req->ret = MPI_Issend(req->ptr, req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_SYNC_DATA, req->comm, &req->request);
+		_STARPU_MPI_COMM_TO_DEBUG(req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.data_tag);
+		req->ret = MPI_Issend(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.comm, &req->request);
 		STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Issend returning %s", _starpu_mpi_get_mpi_code(req->ret));
 	}
 
-	_STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->srcdst, req->data_tag, 0);
+	_STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->node_tag.rank, req->node_tag.data_tag, 0);
 
 	/* somebody is perhaps waiting for the MPI request to be posted */
 	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
@@ -355,7 +356,7 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 
 	req->envelope = calloc(1,sizeof(struct _starpu_mpi_envelope));
 	req->envelope->mode = _STARPU_MPI_ENVELOPE_DATA;
-	req->envelope->data_tag = req->data_tag;
+	req->envelope->data_tag = req->node_tag.data_tag;
 	req->envelope->sync = req->sync;
 
 	if (req->user_datatype == 0)
@@ -366,9 +367,9 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 
 		MPI_Type_size(req->datatype, &size);
 		req->envelope->size = (starpu_ssize_t)req->count * size;
-		_STARPU_MPI_DEBUG(1, "Post MPI isend count (%ld) datatype_size %ld request to %d\n",req->count,starpu_data_get_size(req->data_handle),req->srcdst);
-		_STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE);
-		MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, req->comm, &req->size_req);
+		_STARPU_MPI_DEBUG(20, "Post MPI isend count (%ld) datatype_size %ld request to %d\n",req->count,starpu_data_get_size(req->data_handle), req->node_tag.rank);
+		_STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE);
+		MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm, &req->size_req);
 	}
 	else
 	{
@@ -380,10 +381,10 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 		if (req->envelope->size != -1)
  		{
  			// We already know the size of the data, let's send it to overlap with the packing of the data
-			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) to node %d (first call to pack)\n", req->envelope->size, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->srcdst);
+			_STARPU_MPI_DEBUG(20, "Sending size %ld (%ld %s) to node %d (first call to pack)\n", req->envelope->size, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->node_tag.rank);
 			req->count = req->envelope->size;
-			_STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE);
-			ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, req->comm, &req->size_req);
+			_STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE);
+			ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm, &req->size_req);
 			STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %s", _starpu_mpi_get_mpi_code(ret));
  		}
 
@@ -392,9 +393,9 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 		if (req->envelope->size == -1)
  		{
  			// We know the size now, let's send it
-			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) to node %d (second call to pack)\n", req->envelope->size, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->srcdst);
-			_STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE);
-			ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, req->comm, &req->size_req);
+			_STARPU_MPI_DEBUG(20, "Sending size %ld (%ld %s) to node %d (second call to pack)\n", req->envelope->size, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->node_tag.rank);
+			_STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE);
+			ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm, &req->size_req);
 			STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %s", _starpu_mpi_get_mpi_code(ret));
  		}
  		else
@@ -408,8 +409,7 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 	if (req->sync)
 	{
 		// If the data is to be sent in synchronous mode, we need to wait for the receiver ready message
-		struct _starpu_mpi_sync_data_handle *_sync_data = _starpu_mpi_sync_data_create(req);
-		_starpu_mpi_sync_data_add(_sync_data);
+		_starpu_mpi_sync_data_add(req);
 	}
 	else
 	{
@@ -502,34 +502,34 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 {
 	_STARPU_MPI_LOG_IN();
 
-	_STARPU_MPI_DEBUG(20, "post MPI irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+	_STARPU_MPI_DEBUG(20, "post MPI irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 
-	_STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(req->srcdst, req->data_tag);
+	_STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
 
 	if (req->sync)
 	{
 		struct _starpu_mpi_envelope *_envelope = calloc(1,sizeof(struct _starpu_mpi_envelope));
 		_envelope->mode = _STARPU_MPI_ENVELOPE_SYNC_READY;
-		_envelope->data_tag = req->data_tag;
-		_STARPU_MPI_DEBUG(20, "Telling node %d it can send the data and waiting for the data back ...\n", req->srcdst);
-		_STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE);
-		req->ret = MPI_Send(_envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, req->comm);
+		_envelope->data_tag = req->node_tag.data_tag;
+		_STARPU_MPI_DEBUG(20, "Telling node %d it can send the data and waiting for the data back ...\n", req->node_tag.rank);
+		_STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE);
+		req->ret = MPI_Send(_envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm);
 		STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Send returning %s", _starpu_mpi_get_mpi_code(req->ret));
 	}
 
 	if (req->sync)
 	{
-		_STARPU_MPI_COMM_FROM_DEBUG(req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_SYNC_DATA, req->data_tag);
-		req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_SYNC_DATA, req->comm, &req->request);
+		_STARPU_MPI_COMM_FROM_DEBUG(req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.data_tag);
+		req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.comm, &req->request);
 	}
 	else
 	{
-		_STARPU_MPI_COMM_FROM_DEBUG(req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_DATA, req->data_tag);
-		req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_DATA, req->comm, &req->request);
+		_STARPU_MPI_COMM_FROM_DEBUG(req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.data_tag);
+		req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.comm, &req->request);
 	}
 	STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %s", _starpu_mpi_get_mpi_code(req->ret));
 
-	_STARPU_MPI_TRACE_IRECV_SUBMIT_END(req->srcdst, req->data_tag);
+	_STARPU_MPI_TRACE_IRECV_SUBMIT_END(req->node_tag.rank, req->node_tag.data_tag);
 
 	/* somebody is perhaps waiting for the MPI request to be posted */
 	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
@@ -634,12 +634,12 @@ static void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
 	/* Which is the mpi request we are waiting for ? */
 	struct _starpu_mpi_req *req = waiting_req->other_request;
 
-	_STARPU_MPI_TRACE_UWAIT_BEGIN(req->srcdst, req->data_tag);
+	_STARPU_MPI_TRACE_UWAIT_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
 
 	req->ret = MPI_Wait(&req->request, waiting_req->status);
 	STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_code(req->ret));
 
-	_STARPU_MPI_TRACE_UWAIT_END(req->srcdst, req->data_tag);
+	_STARPU_MPI_TRACE_UWAIT_END(req->node_tag.rank, req->node_tag.data_tag);
 
 	_starpu_mpi_handle_request_termination(req);
 	_STARPU_MPI_LOG_OUT();
@@ -704,14 +704,14 @@ static void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
 	struct _starpu_mpi_req *req = testing_req->other_request;
 
 	_STARPU_MPI_DEBUG(2, "Test request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
-			  req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+			  req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 
-	_STARPU_MPI_TRACE_UTESTING_BEGIN(req->srcdst, req->data_tag);
+	_STARPU_MPI_TRACE_UTESTING_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
 
 	req->ret = MPI_Test(&req->request, testing_req->flag, testing_req->status);
 	STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_code(req->ret));
 
-	_STARPU_MPI_TRACE_UTESTING_END(req->srcdst, req->data_tag);
+	_STARPU_MPI_TRACE_UTESTING_END(req->node_tag.rank, req->node_tag.data_tag);
 
 	if (*testing_req->flag)
 	{
@@ -801,7 +801,7 @@ static void _starpu_mpi_barrier_func(struct _starpu_mpi_req *barrier_req)
 {
 	_STARPU_MPI_LOG_IN();
 
-	barrier_req->ret = MPI_Barrier(barrier_req->comm);
+	barrier_req->ret = MPI_Barrier(barrier_req->node_tag.comm);
 	STARPU_MPI_ASSERT_MSG(barrier_req->ret == MPI_SUCCESS, "MPI_Barrier returning %s", _starpu_mpi_get_mpi_code(barrier_req->ret));
 
 	_starpu_mpi_handle_request_termination(barrier_req);
@@ -845,7 +845,7 @@ int starpu_mpi_barrier(MPI_Comm comm)
 	STARPU_PTHREAD_COND_INIT(&(barrier_req->req_cond), NULL);
 	barrier_req->func = _starpu_mpi_barrier_func;
 	barrier_req->request_type = BARRIER_REQ;
-	barrier_req->comm = comm;
+	barrier_req->node_tag.comm = comm;
 
 	_STARPU_MPI_INC_POSTED_REQUESTS(1);
 	_starpu_mpi_submit_ready_request(barrier_req);
@@ -890,13 +890,13 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_IN();
 
 	_STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d internal_req %p\n",
-			  req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, req->ptr,
+			  req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr,
 			  _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype, req->internal_req);
 
 	if (req->internal_req)
 	{
-		struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(req->data_tag, req->srcdst);
-		STARPU_MPI_ASSERT_MSG(early_data_handle, "Could not find a copy data handle with the tag %d and the node %d\n", req->data_tag, req->srcdst);
+		struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(&req->node_tag);
+		STARPU_MPI_ASSERT_MSG(early_data_handle, "Could not find a copy data handle with the tag %d and the node %d\n", req->node_tag.data_tag, req->node_tag.rank);
 		_STARPU_MPI_DEBUG(3, "Handling deleting of early_data structure from the hashmap..\n");
 		_starpu_mpi_early_data_delete(early_data_handle);
 		free(early_data_handle);
@@ -1044,7 +1044,7 @@ static void _starpu_mpi_test_detached_requests(void)
 
 		STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
 
-		//_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->request, req->data_tag, _starpu_mpi_request_type(req->request_type), req->srcdst);
+		//_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->request, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->node_tag.rank);
 		req->ret = MPI_Test(&req->request, &flag, &status);
 
 		STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_code(req->ret));
@@ -1053,22 +1053,22 @@ static void _starpu_mpi_test_detached_requests(void)
 		{
 			if (req->request_type == RECV_REQ)
 			{
-				_STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(req->srcdst, req->data_tag);
+				_STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
 			}
 			else if (req->request_type == SEND_REQ)
 			{
-				_STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(req->srcdst, req->data_tag, 0);
+				_STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(req->node_tag.rank, req->node_tag.data_tag, 0);
 			}
 
 			_starpu_mpi_handle_request_termination(req);
 
 			if (req->request_type == RECV_REQ)
 			{
-				_STARPU_MPI_TRACE_IRECV_COMPLETE_END(req->srcdst, req->data_tag);
+				_STARPU_MPI_TRACE_IRECV_COMPLETE_END(req->node_tag.rank, req->node_tag.data_tag);
 			}
 			else if (req->request_type == SEND_REQ)
 			{
-				_STARPU_MPI_TRACE_ISEND_COMPLETE_END(req->srcdst, req->data_tag, 0);
+				_STARPU_MPI_TRACE_ISEND_COMPLETE_END(req->node_tag.rank, req->node_tag.data_tag, 0);
 			}
 		}
 
@@ -1115,7 +1115,7 @@ static void _starpu_mpi_handle_ready_request(struct _starpu_mpi_req *req)
 
 	/* submit the request to MPI */
 	_STARPU_MPI_DEBUG(2, "Handling new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
-			  req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+			  req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 	req->func(req);
 
 	_STARPU_MPI_LOG_OUT();
@@ -1156,7 +1156,7 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
 	_STARPU_MPI_DEBUG(20, "Request with tag %d and source %d not found, creating a early_handle to receive incoming data..\n", envelope->data_tag, status.MPI_SOURCE);
 	_STARPU_MPI_DEBUG(20, "Request sync %d\n", envelope->sync);
 
-	struct _starpu_mpi_early_data_handle* early_data_handle = _starpu_mpi_early_data_create(envelope, status.MPI_SOURCE);
+	struct _starpu_mpi_early_data_handle* early_data_handle = _starpu_mpi_early_data_create(envelope, status.MPI_SOURCE, comm);
 
 	starpu_data_handle_t data_handle = NULL;
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
@@ -1182,10 +1182,10 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
 		_starpu_mpi_early_data_add(early_data_handle);
 	}
 
-	_STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_handle with tag %d from src %d ..\n", early_data_handle->data_tag, status.MPI_SOURCE);
+	_STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_handle with tag %d from comm %p src %d ..\n", early_data_handle->node_tag.data_tag, comm, status.MPI_SOURCE);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 	early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
-							  early_data_handle->data_tag, comm, 1, 0,
+							  early_data_handle->node_tag.data_tag, comm, 1, 0,
 							  NULL, NULL, 1, 1, envelope->size);
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 
@@ -1252,10 +1252,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	_starpu_mpi_cache_init(argc_argv->comm);
 	_starpu_mpi_select_node_init();
 	_starpu_mpi_tag_init();
+	_starpu_mpi_comm_init(argc_argv->comm);
 
-	_starpu_mpi_early_request_init(worldsize);
-	_starpu_mpi_early_data_init(worldsize);
-	_starpu_mpi_sync_data_init(worldsize);
+	_starpu_mpi_early_request_init();
+	_starpu_mpi_early_data_init();
+	_starpu_mpi_sync_data_init();
 
 	/* notify the main thread that the progression thread is ready */
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
@@ -1265,9 +1266,6 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 
- 	struct _starpu_mpi_envelope *envelope = calloc(1,sizeof(struct _starpu_mpi_envelope));
-
-	MPI_Request envelope_request;
  	int envelope_request_submitted = 0;
 
 	while (running || posted_requests || !(_starpu_mpi_req_list_empty(ready_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))// || !(_starpu_mpi_early_request_count()) || !(_starpu_mpi_sync_data_count()))
@@ -1315,9 +1313,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
                  * receive requests on our side, we resubmit a header request. */
 		if (((_starpu_mpi_early_request_count() > 0) || (_starpu_mpi_sync_data_count() > 0)) && (envelope_request_submitted == 0))// && (HASH_COUNT(_starpu_mpi_early_data_handle_hashmap) == 0))
 		{
-			_STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop\n");
-			_STARPU_MPI_COMM_FROM_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE);
-			MPI_Irecv(envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _STARPU_MPI_TAG_ENVELOPE, argc_argv->comm, &envelope_request);
+			_starpu_mpi_comm_post_recv();
 			envelope_request_submitted = 1;
 		}
 
@@ -1328,31 +1324,31 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 		if (envelope_request_submitted == 1)
 		{
-			int flag,res;
-			MPI_Status status;
-			//_STARPU_MPI_DEBUG(4, "Test of envelope_request\n");
+			int flag;
+			struct _starpu_mpi_envelope *envelope;
+			MPI_Status envelope_status;
+			MPI_Comm envelope_comm;
 
 			/* test whether an envelope has arrived. */
-			res = MPI_Test(&envelope_request, &flag, &status);
-			STARPU_ASSERT(res == MPI_SUCCESS);
+			flag = _starpu_mpi_comm_test_recv(&envelope_status, &envelope, &envelope_comm);
 
 			if (flag)
 			{
 				_STARPU_MPI_DEBUG(4, "Envelope received with mode %d\n", envelope->mode);
 				if (envelope->mode == _STARPU_MPI_ENVELOPE_SYNC_READY)
 				{
-					struct _starpu_mpi_sync_data_handle *_sync_data = _starpu_mpi_sync_data_find(envelope->data_tag, status.MPI_SOURCE);
-					_STARPU_MPI_DEBUG(2000, "Sending data with tag %d to node %d\n", _sync_data->req->data_tag, status.MPI_SOURCE);
-					STARPU_MPI_ASSERT_MSG(envelope->data_tag == _sync_data->req->data_tag, "Tag mismatch (envelope %d != req %d)\n", envelope->data_tag, _sync_data->req->data_tag);
+					struct _starpu_mpi_req *_sync_req = _starpu_mpi_sync_data_find(envelope->data_tag, envelope_status.MPI_SOURCE, envelope_comm);
+					_STARPU_MPI_DEBUG(20, "Sending data with tag %d to node %d\n", _sync_req->node_tag.data_tag, envelope_status.MPI_SOURCE);
+					STARPU_MPI_ASSERT_MSG(envelope->data_tag == _sync_req->node_tag.data_tag, "Tag mismatch (envelope %d != req %d)\n", envelope->data_tag, _sync_req->node_tag.data_tag);
 					STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-					_starpu_mpi_isend_data_func(_sync_data->req);
+					_starpu_mpi_isend_data_func(_sync_req);
 					STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 				}
 				else
 				{
-					_STARPU_MPI_DEBUG(3, "Searching for application request with tag %d and source %d (size %ld)\n", envelope->data_tag, status.MPI_SOURCE, envelope->size);
+					_STARPU_MPI_DEBUG(3, "Searching for application request with tag %d and source %d (size %ld)\n", envelope->data_tag, envelope_status.MPI_SOURCE, envelope->size);
 
-					struct _starpu_mpi_req *early_request = _starpu_mpi_early_request_find(envelope->data_tag, status.MPI_SOURCE);
+					struct _starpu_mpi_req *early_request = _starpu_mpi_early_request_find(envelope->data_tag, envelope_status.MPI_SOURCE, envelope_comm);
 
 					/* Case: a data will arrive before a matching receive is
 					 * posted by the application. Create a temporary handle to
@@ -1372,9 +1368,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 							_starpu_mpi_request_init(&new_req);
 							new_req->request_type = RECV_REQ;
 							new_req->data_handle = NULL;
-							new_req->srcdst = status.MPI_SOURCE;
-							new_req->data_tag = envelope->data_tag;
-							new_req->comm = argc_argv->comm;
+							new_req->node_tag.rank = envelope_status.MPI_SOURCE;
+							new_req->node_tag.data_tag = envelope->data_tag;
+							new_req->node_tag.comm = envelope_comm;
 							new_req->detached = 1;
 							new_req->sync = 1;
 							new_req->callback = NULL;
@@ -1383,12 +1379,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 							new_req->sequential_consistency = 1;
 							new_req->is_internal_req = 0; // ????
 							new_req->count = envelope->size;
-							struct _starpu_mpi_sync_data_handle *_sync_data = _starpu_mpi_sync_data_create(new_req);
-							_starpu_mpi_sync_data_add(_sync_data);
+							_starpu_mpi_sync_data_add(new_req);
 						}
 						else
 						{
-							_starpu_mpi_receive_early_data(envelope, status, argc_argv->comm);
+							_starpu_mpi_receive_early_data(envelope, envelope_status, envelope_comm);
 						}
 					}
 					/* Case: a matching application request has been found for
@@ -1439,9 +1434,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 	if (envelope_request_submitted)
 	{
-		MPI_Status status;
-		MPI_Cancel(&envelope_request);
-		MPI_Wait(&envelope_request, &status);
+		_starpu_mpi_comm_cancel_recv();
 		envelope_request_submitted = 0;
 	}
 
@@ -1460,11 +1453,10 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
-	_starpu_mpi_sync_data_free(worldsize);
-	_starpu_mpi_early_data_free(worldsize);
+	_starpu_mpi_sync_data_free();
+	_starpu_mpi_early_data_free();
 	_starpu_mpi_early_request_free();
 	free(argc_argv);
-	free(envelope);
 
 	return NULL;
 }
@@ -1637,6 +1629,7 @@ int starpu_mpi_shutdown(void)
 	_starpu_mpi_comm_amounts_free();
 	_starpu_mpi_cache_free(world_size);
 	_starpu_mpi_tag_free();
+	_starpu_mpi_comm_free();
 
 	return 0;
 }
@@ -1644,21 +1637,21 @@ int starpu_mpi_shutdown(void)
 void _starpu_mpi_clear_cache(starpu_data_handle_t data_handle)
 {
 	_starpu_mpi_data_release_tag(data_handle);
-	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
+	struct _starpu_mpi_node_tag *mpi_data = data_handle->mpi_data;
 	_starpu_mpi_cache_flush(mpi_data->comm, data_handle);
 	free(data_handle->mpi_data);
 }
 
 void starpu_mpi_data_register_comm(starpu_data_handle_t data_handle, int tag, int rank, MPI_Comm comm)
 {
-	struct _starpu_mpi_data *mpi_data;
+	struct _starpu_mpi_node_tag *mpi_data;
 	if (data_handle->mpi_data)
 	{
 		mpi_data = data_handle->mpi_data;
 	}
 	else
 	{
-		mpi_data = malloc(sizeof(struct _starpu_mpi_data));
+		mpi_data = calloc(1, sizeof(struct _starpu_mpi_node_tag));
 		data_handle->mpi_data = mpi_data;
 		_starpu_mpi_data_register_tag(data_handle, tag);
 		_starpu_data_set_unregister_hook(data_handle, _starpu_mpi_clear_cache);
@@ -1666,12 +1659,13 @@ void starpu_mpi_data_register_comm(starpu_data_handle_t data_handle, int tag, in
 
 	if (tag != -1)
 	{
-		mpi_data->tag = tag;
+		mpi_data->data_tag = tag;
 	}
 	if (rank != -1)
 	{
 		mpi_data->rank = rank;
 		mpi_data->comm = comm;
+		_starpu_mpi_comm_register(comm);
 	}
 }
 
@@ -1688,13 +1682,13 @@ void starpu_mpi_data_set_tag(starpu_data_handle_t handle, int tag)
 int starpu_mpi_data_get_rank(starpu_data_handle_t data)
 {
 	STARPU_ASSERT_MSG(data->mpi_data, "starpu_mpi_data_register MUST be called for data %p\n", data);
-	return ((struct _starpu_mpi_data *)(data->mpi_data))->rank;
+	return ((struct _starpu_mpi_node_tag *)(data->mpi_data))->rank;
 }
 
 int starpu_mpi_data_get_tag(starpu_data_handle_t data)
 {
 	STARPU_ASSERT_MSG(data->mpi_data, "starpu_mpi_data_register MUST be called for data %p\n", data);
-	return ((struct _starpu_mpi_data *)(data->mpi_data))->tag;
+	return ((struct _starpu_mpi_node_tag *)(data->mpi_data))->data_tag;
 }
 
 int starpu_mpi_comm_size(MPI_Comm comm, int *size)

+ 168 - 0
mpi/src/starpu_mpi_comm.c

@@ -0,0 +1,168 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2011, 2012, 2013, 2014, 2015  Centre National de la Recherche Scientifique
+ * Copyright (C) 2011-2015  Université de Bordeaux
+ * Copyright (C) 2014 INRIA
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <starpu_mpi.h>
+#include <starpu_mpi_private.h>
+#include <starpu_mpi_comm.h>
+#include <common/list.h>
+
+struct _starpu_mpi_comm
+{
+	MPI_Comm comm;
+	struct _starpu_mpi_envelope *envelope;
+	MPI_Request request;
+	int posted;
+};
+struct _starpu_mpi_comm_hashtable
+{
+	UT_hash_handle hh;
+	MPI_Comm comm;
+};
+struct _starpu_mpi_comm_hashtable *_starpu_mpi_comms_cache;
+struct _starpu_mpi_comm **_starpu_mpi_comms;
+int _starpu_mpi_comm_nb;
+int _starpu_mpi_comm_allocated;
+int _starpu_mpi_comm_tested;
+
+void _starpu_mpi_comm_init(MPI_Comm comm)
+{
+	_STARPU_MPI_DEBUG(10, "allocating for %d communicators\n", _starpu_mpi_comm_allocated);
+	_starpu_mpi_comm_allocated=10;
+	_starpu_mpi_comms = malloc(_starpu_mpi_comm_allocated * sizeof(struct _starpu_mpi_comm *));
+	_starpu_mpi_comm_nb=0;
+	_starpu_mpi_comm_tested=0;
+	_starpu_mpi_comms_cache = NULL;
+
+	_starpu_mpi_comm_register(comm);
+}
+
+void _starpu_mpi_comm_free()
+{
+	int i;
+	for(i=0 ; i<_starpu_mpi_comm_nb ; i++)
+	{
+		struct _starpu_mpi_comm *_comm = _starpu_mpi_comms[i]; // get the ith _comm;
+		free(_comm->envelope);
+		free(_comm);
+	}
+	free(_starpu_mpi_comms);
+
+	struct _starpu_mpi_comm_hashtable *entry, *tmp;
+	HASH_ITER(hh, _starpu_mpi_comms_cache, entry, tmp)
+	{
+		HASH_DEL(_starpu_mpi_comms_cache, entry);
+		free(entry);
+	}
+}
+
+void _starpu_mpi_comm_register(MPI_Comm comm)
+{
+	struct _starpu_mpi_comm_hashtable *found;
+
+	HASH_FIND_PTR(_starpu_mpi_comms_cache, &comm, found);
+	if (found)
+	{
+		_STARPU_MPI_DEBUG(10, "comm %p (%p) already registered\n", comm, MPI_COMM_WORLD);
+	}
+	else
+	{
+		if (_starpu_mpi_comm_nb == _starpu_mpi_comm_allocated)
+		{
+			_starpu_mpi_comm_allocated *= 2;
+			_STARPU_MPI_DEBUG(10, "reallocating for %d communicators\n", _starpu_mpi_comm_allocated);
+			_starpu_mpi_comms = realloc(_starpu_mpi_comms, _starpu_mpi_comm_allocated * sizeof(struct _starpu_mpi_comm *));
+		}
+		_STARPU_MPI_DEBUG(10, "registering comm %p (%p) number %d\n", comm, MPI_COMM_WORLD, _starpu_mpi_comm_nb);
+		struct _starpu_mpi_comm *_comm = calloc(1, sizeof(struct _starpu_mpi_comm));
+		_comm->comm = comm;
+		_comm->envelope = calloc(1,sizeof(struct _starpu_mpi_envelope));
+		_comm->posted = 0;
+		_starpu_mpi_comms[_starpu_mpi_comm_nb] = _comm;
+		_starpu_mpi_comm_nb++;
+		struct _starpu_mpi_comm_hashtable *entry = (struct _starpu_mpi_comm_hashtable *)malloc(sizeof(*entry));
+		entry->comm = comm;
+		HASH_ADD_PTR(_starpu_mpi_comms_cache, comm, entry);
+	}
+}
+
+void _starpu_mpi_comm_post_recv()
+{
+	int i;
+	for(i=0 ; i<_starpu_mpi_comm_nb ; i++)
+	{
+		struct _starpu_mpi_comm *_comm = _starpu_mpi_comms[i]; // get the ith _comm;
+		if (_comm->posted == 0)
+		{
+			_STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop\n");
+			_STARPU_MPI_COMM_FROM_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE);
+			MPI_Irecv(_comm->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _STARPU_MPI_TAG_ENVELOPE, _comm->comm, &_comm->request);
+			_comm->posted = 1;
+		}
+	}
+}
+
+int _starpu_mpi_comm_test_recv(MPI_Status *status, struct _starpu_mpi_envelope **envelope, MPI_Comm *comm)
+{
+	int i=_starpu_mpi_comm_tested;
+	while (1)
+	{
+		int flag, res;
+
+		struct _starpu_mpi_comm *_comm = _starpu_mpi_comms[i]; // get the ith _comm;
+
+		if (_comm->posted)
+		{
+			/* test whether an envelope has arrived. */
+			res = MPI_Test(&_comm->request, &flag, status);
+			STARPU_ASSERT(res == MPI_SUCCESS);
+			if (flag)
+			{
+				_comm->posted = 0;
+				_starpu_mpi_comm_tested++;
+				if (_starpu_mpi_comm_tested == _starpu_mpi_comm_nb)
+					_starpu_mpi_comm_tested = 0;
+				*envelope = _comm->envelope;
+				*comm = _comm->comm;
+				return 1;
+			}
+		}
+		i++;
+		if (i == _starpu_mpi_comm_nb) i=0;
+		if (i == _starpu_mpi_comm_tested)
+			// We have tested all the requests, none has completed
+			return 0;
+	}
+	return 0;
+}
+
+void _starpu_mpi_comm_cancel_recv()
+{
+	int i;
+	for(i=0 ; i<_starpu_mpi_comm_nb ; i++)
+	{
+		struct _starpu_mpi_comm *_comm = _starpu_mpi_comms[i]; // get the ith _comm;
+		if (_comm->posted == 1)
+		{
+			MPI_Status status;
+			MPI_Cancel(&_comm->request);
+			MPI_Wait(&_comm->request, &status);
+			_comm->posted = 0;
+		}
+	}
+}

+ 39 - 0
mpi/src/starpu_mpi_comm.h

@@ -0,0 +1,39 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_MPI_COMM_H__
+#define __STARPU_MPI_COMM_H__
+
+#include <starpu.h>
+#include <stdlib.h>
+#include <mpi.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void _starpu_mpi_comm_init(MPI_Comm comm);
+void _starpu_mpi_comm_free();
+void _starpu_mpi_comm_register(MPI_Comm comm);
+void _starpu_mpi_comm_post_recv();
+int _starpu_mpi_comm_test_recv(MPI_Status *status, struct _starpu_mpi_envelope **envelope, MPI_Comm *comm);
+void _starpu_mpi_comm_cancel_recv();
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // __STARPU_MPI_COMM_H__

+ 39 - 47
mpi/src/starpu_mpi_early_data.c

@@ -25,72 +25,61 @@ struct _starpu_mpi_early_data_handle_hashlist
 {
 	struct _starpu_mpi_early_data_handle_list *list;
 	UT_hash_handle hh;
-	int data_tag;
+	struct _starpu_mpi_node_tag node_tag;
 };
 
 /** stores data which have been received by MPI but have not been requested by the application */
-static struct _starpu_mpi_early_data_handle_hashlist **_starpu_mpi_early_data_handle_hashmap = NULL;
+static struct _starpu_mpi_early_data_handle_hashlist *_starpu_mpi_early_data_handle_hashmap = NULL;
 static int _starpu_mpi_early_data_handle_hashmap_count = 0;
 
-void _starpu_mpi_early_data_init(int world_size)
+void _starpu_mpi_early_data_init(void)
 {
-	int k;
-
-	_starpu_mpi_early_data_handle_hashmap = malloc(world_size * sizeof(struct _starpu_mpi_early_data_handle_hash_list *));
-	for(k=0 ; k<world_size ; k++) _starpu_mpi_early_data_handle_hashmap[k] = NULL;
+	_starpu_mpi_early_data_handle_hashmap = NULL;
+	_starpu_mpi_early_data_handle_hashmap_count = 0;
 }
 
-void _starpu_mpi_early_data_check_termination()
+void _starpu_mpi_early_data_check_termination(void)
 {
 	STARPU_ASSERT_MSG(_starpu_mpi_early_data_handle_hashmap_count == 0, "Number of unexpected received messages left is not zero, did you forget to post a receive corresponding to a send?");
 }
 
-void _starpu_mpi_early_data_free(int world_size)
+void _starpu_mpi_early_data_free(void)
 {
-	int n;
-	struct _starpu_mpi_early_data_handle_hashlist *hashlist;
-
-	for(n=0 ; n<world_size; n++)
+	struct _starpu_mpi_early_data_handle_hashlist *current, *tmp;
+	HASH_ITER(hh, _starpu_mpi_early_data_handle_hashmap, current, tmp)
 	{
-		for(hashlist=_starpu_mpi_early_data_handle_hashmap[n]; hashlist != NULL; hashlist=hashlist->hh.next)
-		{
-			_starpu_mpi_early_data_handle_list_delete(hashlist->list);
-		}
-		struct _starpu_mpi_early_data_handle_hashlist *current, *tmp;
-		HASH_ITER(hh, _starpu_mpi_early_data_handle_hashmap[n], current, tmp)
-		{
-			HASH_DEL(_starpu_mpi_early_data_handle_hashmap[n], current);
-			free(current);
-		}
+		_starpu_mpi_early_data_handle_list_delete(current->list);
+		HASH_DEL(_starpu_mpi_early_data_handle_hashmap, current);
+		free(current);
 	}
-	free(_starpu_mpi_early_data_handle_hashmap);
 }
 
-struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_create(struct _starpu_mpi_envelope *envelope, int source)
+struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_create(struct _starpu_mpi_envelope *envelope, int source, MPI_Comm comm)
 {
 	struct _starpu_mpi_early_data_handle* early_data_handle = calloc(1, sizeof(struct _starpu_mpi_early_data_handle));
 	STARPU_ASSERT(early_data_handle);
 	STARPU_PTHREAD_MUTEX_INIT(&early_data_handle->req_mutex, NULL);
 	STARPU_PTHREAD_COND_INIT(&early_data_handle->req_cond, NULL);
-	early_data_handle->data_tag = envelope->data_tag;
 	early_data_handle->env = envelope;
-	early_data_handle->source = source;
+	early_data_handle->node_tag.comm = comm;
+	early_data_handle->node_tag.rank = source;
+	early_data_handle->node_tag.data_tag = envelope->data_tag;
 	return early_data_handle;
 }
 
 #ifdef STARPU_VERBOSE
-static void _starpu_mpi_early_data_handle_display_hash(int source, int tag)
+static void _starpu_mpi_early_data_handle_display_hash(struct _starpu_mpi_node_tag *node_tag)
 {
 	struct _starpu_mpi_early_data_handle_hashlist *hashlist;
-	HASH_FIND_INT(_starpu_mpi_early_data_handle_hashmap[source], &tag, hashlist);
 
+	HASH_FIND(hh, _starpu_mpi_early_data_handle_hashmap, node_tag, sizeof(struct _starpu_mpi_node_tag), hashlist);
 	if (hashlist == NULL)
 	{
-		_STARPU_MPI_DEBUG(60, "Hashlist for source %d and tag %d does not exist\n", source, tag);
+		_STARPU_MPI_DEBUG(60, "Hashlist for comm %p source %d and tag %d does not exist\n", node_tag->comm, node_tag->rank, node_tag->data_tag);
 	}
 	else if (_starpu_mpi_early_data_handle_list_empty(hashlist->list))
 	{
-		_STARPU_MPI_DEBUG(60, "Hashlist for source %d and tag %d is empty\n", source, tag);
+		_STARPU_MPI_DEBUG(60, "Hashlist for comm %p source %d and tag %d is empty\n", node_tag->comm, node_tag->rank, node_tag->data_tag);
 	}
 	else
 	{
@@ -99,20 +88,20 @@ static void _starpu_mpi_early_data_handle_display_hash(int source, int tag)
 		     cur != _starpu_mpi_early_data_handle_list_end(hashlist->list);
 		     cur = _starpu_mpi_early_data_handle_list_next(cur))
 		{
-			_STARPU_MPI_DEBUG(60, "Element for source %d and tag %d: %p\n", source, tag, cur);
+			_STARPU_MPI_DEBUG(60, "Element for comm %p source %d and tag %d: %p\n", node_tag->comm, node_tag->rank, node_tag->data_tag, cur);
 		}
 	}
 }
 #endif
 
 static
-struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_pop(int data_tag, int source, int delete)
+struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_pop(struct _starpu_mpi_node_tag *node_tag, int delete)
 {
 	struct _starpu_mpi_early_data_handle_hashlist *hashlist;
 	struct _starpu_mpi_early_data_handle *early_data_handle;
 
-	_STARPU_MPI_DEBUG(60, "Looking for early_data_handle with tag %d in the hashmap[%d]\n", data_tag, source);
-	HASH_FIND_INT(_starpu_mpi_early_data_handle_hashmap[source], &data_tag, hashlist);
+	_STARPU_MPI_DEBUG(60, "Looking for early_data_handle with comm %p source %d tag %d\n", node_tag->comm, node_tag->rank, node_tag->data_tag);
+	HASH_FIND(hh, _starpu_mpi_early_data_handle_hashmap, node_tag, sizeof(struct _starpu_mpi_node_tag), hashlist);
 	if (hashlist == NULL)
 	{
 		early_data_handle = NULL;
@@ -135,46 +124,49 @@ struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_pop(int data_tag, i
 			}
 		}
 	}
-	_STARPU_MPI_DEBUG(60, "Found early_data_handle %p with tag %d in the hashmap[%d]\n", early_data_handle, data_tag, source);
+	_STARPU_MPI_DEBUG(60, "Found early_data_handle %p with comm %p source %d tag %d\n", early_data_handle, node_tag->comm, node_tag->rank, node_tag->data_tag);
 	return early_data_handle;
 }
 
-struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_find(int data_tag, int source)
+struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_find(struct _starpu_mpi_node_tag *node_tag)
 {
-	return _starpu_mpi_early_data_pop(data_tag, source, 0);
+	return _starpu_mpi_early_data_pop(node_tag, 0);
 }
 
 void _starpu_mpi_early_data_add(struct _starpu_mpi_early_data_handle *early_data_handle)
 {
-	_STARPU_MPI_DEBUG(60, "Trying to add early_data_handle %p with tag %d in the hashmap[%d]\n", early_data_handle, early_data_handle->data_tag, early_data_handle->source);
+	_STARPU_MPI_DEBUG(60, "Trying to add early_data_handle %p with comm %p source %d tag %d\n", early_data_handle, early_data_handle->node_tag.comm,
+			  early_data_handle->node_tag.rank, early_data_handle->node_tag.data_tag);
 
 	struct _starpu_mpi_early_data_handle_hashlist *hashlist;
-	HASH_FIND_INT(_starpu_mpi_early_data_handle_hashmap[early_data_handle->source], &early_data_handle->data_tag, hashlist);
+	HASH_FIND(hh, _starpu_mpi_early_data_handle_hashmap, &early_data_handle->node_tag, sizeof(struct _starpu_mpi_node_tag), hashlist);
 	if (hashlist == NULL)
 	{
 		hashlist = malloc(sizeof(struct _starpu_mpi_early_data_handle_hashlist));
 		hashlist->list = _starpu_mpi_early_data_handle_list_new();
-		hashlist->data_tag = early_data_handle->data_tag;
-		HASH_ADD_INT(_starpu_mpi_early_data_handle_hashmap[early_data_handle->source], data_tag, hashlist);
+		hashlist->node_tag = early_data_handle->node_tag;
+		HASH_ADD(hh, _starpu_mpi_early_data_handle_hashmap, node_tag, sizeof(hashlist->node_tag), hashlist);
 	}
 	_starpu_mpi_early_data_handle_list_push_back(hashlist->list, early_data_handle);
 	_starpu_mpi_early_data_handle_hashmap_count ++;
 #ifdef STARPU_VERBOSE
-	_starpu_mpi_early_data_handle_display_hash(early_data_handle->source, early_data_handle->data_tag);
+	_starpu_mpi_early_data_handle_display_hash(&hashlist->node_tag);
 #endif
 }
 
 void _starpu_mpi_early_data_delete(struct _starpu_mpi_early_data_handle *early_data_handle)
 {
-	_STARPU_MPI_DEBUG(60, "Trying to delete early_data_handle %p with tag %d in the hashmap[%d]\n", early_data_handle, early_data_handle->data_tag, early_data_handle->source);
-	struct _starpu_mpi_early_data_handle *found = _starpu_mpi_early_data_pop(early_data_handle->data_tag, early_data_handle->source, 1);
+	_STARPU_MPI_DEBUG(60, "Trying to delete early_data_handle %p with comm %p source %d tag %d\n", early_data_handle, early_data_handle->node_tag.comm,
+			  early_data_handle->node_tag.rank, early_data_handle->node_tag.data_tag);
+	struct _starpu_mpi_early_data_handle *found = _starpu_mpi_early_data_pop(&early_data_handle->node_tag, 1);
 
 	STARPU_ASSERT_MSG(found == early_data_handle,
-			  "[_starpu_mpi_early_data_delete][error] early_data_handle %p with tag %d is NOT in the hashmap[%d]\n", early_data_handle, early_data_handle->data_tag, early_data_handle->source);
+			  "[_starpu_mpi_early_data_delete][error] early_data_handle %p with comm %p source %d tag %d is NOT available\n",
+			  early_data_handle, early_data_handle->node_tag.comm, early_data_handle->node_tag.rank, early_data_handle->node_tag.data_tag);
 
 	_starpu_mpi_early_data_handle_hashmap_count --;
 #ifdef STARPU_VERBOSE
-	_starpu_mpi_early_data_handle_display_hash(early_data_handle->source, early_data_handle->data_tag);
+	_starpu_mpi_early_data_handle_display_hash(&early_data_handle->node_tag);
 #endif
 }
 

+ 7 - 7
mpi/src/starpu_mpi_early_data.h

@@ -23,6 +23,7 @@
 #include <mpi.h>
 #include <common/config.h>
 #include <common/list.h>
+#include <starpu_mpi_private.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -33,19 +34,18 @@ LIST_TYPE(_starpu_mpi_early_data_handle,
 	  struct _starpu_mpi_envelope *env;
 	  struct _starpu_mpi_req *req;
 	  void *buffer;
-	  int data_tag;
-	  int source;
 	  int req_ready;
+	  struct _starpu_mpi_node_tag node_tag;
 	  starpu_pthread_mutex_t req_mutex;
 	  starpu_pthread_cond_t req_cond;
 );
 
-void _starpu_mpi_early_data_init(int world_size);
-void _starpu_mpi_early_data_check_termination();
-void _starpu_mpi_early_data_free(int world_size);
+void _starpu_mpi_early_data_init(void);
+void _starpu_mpi_early_data_check_termination(void);
+void _starpu_mpi_early_data_free(void);
 
-struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_create(struct _starpu_mpi_envelope *envelope, int source);
-struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_find(int data_tag, int source);
+struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_create(struct _starpu_mpi_envelope *envelope, int source, MPI_Comm comm);
+struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_find(struct _starpu_mpi_node_tag *node_tag);
 void _starpu_mpi_early_data_add(struct _starpu_mpi_early_data_handle *early_data_handle);
 void _starpu_mpi_early_data_delete(struct _starpu_mpi_early_data_handle *early_data_handle);
 

+ 31 - 27
mpi/src/starpu_mpi_early_request.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2009, 2010-2014  Université de Bordeaux
- * Copyright (C) 2010, 2011, 2012, 2013, 2014  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -22,64 +22,68 @@
 #include <common/uthash.h>
 
 /** stores application requests for which data have not been received yet */
-static struct _starpu_mpi_req **_starpu_mpi_app_req_hashmap = NULL;
-static int _starpu_mpi_app_req_hashmap_count = 0;
+struct _starpu_mpi_req *_starpu_mpi_early_request_hash;
+int _starpu_mpi_early_request_hash_count;
 
-void _starpu_mpi_early_request_init(int world_size)
+void _starpu_mpi_early_request_init()
 {
-	int k;
-
-	_starpu_mpi_app_req_hashmap = malloc(world_size * sizeof(struct _starpu_mpi_req *));
-	for(k=0 ; k<world_size ; k++) _starpu_mpi_app_req_hashmap[k] = NULL;
+	_starpu_mpi_early_request_hash = NULL;
+	_starpu_mpi_early_request_hash_count = 0;
 }
 
 void _starpu_mpi_early_request_free()
 {
-	free(_starpu_mpi_app_req_hashmap);
+	free(_starpu_mpi_early_request_hash);
 }
 
 int _starpu_mpi_early_request_count()
 {
-	return _starpu_mpi_app_req_hashmap_count;
+	return _starpu_mpi_early_request_hash_count;
 }
 
 void _starpu_mpi_early_request_check_termination()
 {
-	STARPU_ASSERT_MSG(_starpu_mpi_early_request_count() == 0, "Number of receive requests left is not zero");
+	STARPU_ASSERT_MSG(_starpu_mpi_early_request_count() == 0, "Number of early requests left is not zero");
 }
 
-struct _starpu_mpi_req* _starpu_mpi_early_request_find(int data_tag, int source)
+struct _starpu_mpi_req* _starpu_mpi_early_request_find(int data_tag, int source, MPI_Comm comm)
 {
-	struct _starpu_mpi_req* req;
+	struct _starpu_mpi_node_tag node_tag;
+	struct _starpu_mpi_req *found;
+
+	memset(&node_tag, 0, sizeof(struct _starpu_mpi_node_tag));
+	node_tag.comm = comm;
+	node_tag.rank = source;
+	node_tag.data_tag = data_tag;
 
-	HASH_FIND_INT(_starpu_mpi_app_req_hashmap[source], &data_tag, req);
+	HASH_FIND(hh, _starpu_mpi_early_request_hash, &node_tag, sizeof(struct _starpu_mpi_node_tag), found);
 
-	return req;
+	return found;
 }
 
 void _starpu_mpi_early_request_add(struct _starpu_mpi_req *req)
 {
 	struct _starpu_mpi_req *test_req;
 
-	test_req = _starpu_mpi_early_request_find(req->data_tag, req->srcdst);
+	test_req = _starpu_mpi_early_request_find(req->node_tag.data_tag, req->node_tag.rank, req->node_tag.comm);
 
 	if (test_req == NULL)
 	{
-		HASH_ADD_INT(_starpu_mpi_app_req_hashmap[req->srcdst], data_tag, req);
-		_starpu_mpi_app_req_hashmap_count ++;
-		_STARPU_MPI_DEBUG(3, "Adding request %p with tag %d in the application request hashmap[%d]\n", req, req->data_tag, req->srcdst);
+		HASH_ADD(hh, _starpu_mpi_early_request_hash, node_tag, sizeof(req->node_tag), req);
+		_starpu_mpi_early_request_hash_count ++;
+		_STARPU_MPI_DEBUG(3, "Adding request %p with comm %p source %d tag %d in the application request hashmap\n", req, req->node_tag.comm, req->node_tag.rank, req->node_tag.data_tag);
 	}
 	else
 	{
-		_STARPU_MPI_DEBUG(3, "[Error] request %p with tag %d already in the application request hashmap[%d]\n", req, req->data_tag, req->srcdst);
+		_STARPU_MPI_DEBUG(3, "[Error] request %p with comm %p source %d tag %d already in the application request hashmap\n", req, req->node_tag.comm, req->node_tag.rank, req->node_tag.data_tag);
 		int seq_const = starpu_data_get_sequential_consistency_flag(req->data_handle);
 		if (seq_const &&  req->sequential_consistency)
 		{
-			STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap[%d], while another request %p with the same tag is already in it. \n Sequential consistency is activated : this is not supported by StarPU.", req, req->data_tag, req->srcdst, test_req);
+			STARPU_ASSERT_MSG(!test_req, "[Error] request %p with comm %p source %d tag %d wanted to be added to the application request hashmap, while another request %p with the same tag is already in it. \n Sequential consistency is activated : this is not supported by StarPU.", req, req->node_tag.comm, req->node_tag.rank, req->node_tag.data_tag, test_req);
 		}
 		else
 		{
-			STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap[%d], while another request %p with the same tag is already in it. \n Sequential consistency isn't activated for this handle : you should want to add dependencies between requests for which the sequential consistency is deactivated.", req, req->data_tag, req->srcdst, test_req);
+			STARPU_ASSERT_MSG(!test_req, "[Error] request %p with comm %p source %d tag %d wanted to be added to the application request hashmap, while another request %p with the same tag is already in it. \n Sequential consistency isn't activated for this handle : you should want to add dependencies between requests for which the sequential consistency is deactivated.", req, req->node_tag.comm, req->node_tag.rank, req->node_tag.data_tag, test_req);
 		}
 	}
 }
@@ -88,17 +92,17 @@ void _starpu_mpi_early_request_delete(struct _starpu_mpi_req *req)
 {
 	struct _starpu_mpi_req *test_req;
 
-	test_req = _starpu_mpi_early_request_find(req->data_tag, req->srcdst);
+	test_req = _starpu_mpi_early_request_find(req->node_tag.data_tag, req->node_tag.rank, req->node_tag.comm);
 
 	if (test_req != NULL)
 	{
-		HASH_DEL(_starpu_mpi_app_req_hashmap[req->srcdst], req);
-		_starpu_mpi_app_req_hashmap_count --;
-		_STARPU_MPI_DEBUG(3, "Deleting application request %p with tag %d from the application request hashmap[%d]\n", req, req->data_tag, req->srcdst);
+		HASH_DEL(_starpu_mpi_early_request_hash, req);
+		_starpu_mpi_early_request_hash_count --;
+		_STARPU_MPI_DEBUG(3, "Deleting application request %p with comm %p source %d tag %d from the application request hashmap\n", req, req->node_tag.comm, req->node_tag.rank, req->node_tag.data_tag);
 	}
 	else
 	{
-		_STARPU_MPI_DEBUG(3, "[Warning] request %p with tag %d is NOT in the application request hashmap[%d]\n", req, req->data_tag, req->srcdst);
+		_STARPU_MPI_DEBUG(3, "[Warning] request %p with comm %p source %d tag %d is NOT in the application request hashmap\n", req, req->node_tag.comm, req->node_tag.rank, req->node_tag.data_tag);
 	}
 }
 

+ 5 - 5
mpi/src/starpu_mpi_early_request.h

@@ -28,13 +28,13 @@
 extern "C" {
 #endif
 
-void _starpu_mpi_early_request_init(int world_size);
-void _starpu_mpi_early_request_free();
-int _starpu_mpi_early_request_count();
-void _starpu_mpi_early_request_check_termination();
+void _starpu_mpi_early_request_init(void);
+void _starpu_mpi_early_request_free(void);
+int _starpu_mpi_early_request_count(void);
+void _starpu_mpi_early_request_check_termination(void);
 
 void _starpu_mpi_early_request_add(struct _starpu_mpi_req *req);
-struct _starpu_mpi_req* _starpu_mpi_early_request_find(int data_tag, int source);
+struct _starpu_mpi_req* _starpu_mpi_early_request_find(int data_tag, int source, MPI_Comm comm);
 void _starpu_mpi_early_request_delete(struct _starpu_mpi_req *req);
 
 #ifdef __cplusplus

+ 10 - 12
mpi/src/starpu_mpi_private.h

@@ -151,6 +151,13 @@ struct _starpu_mpi_envelope
 
 struct _starpu_mpi_req;
 
+struct _starpu_mpi_node_tag
+{
+	MPI_Comm comm;
+	int rank;
+	int data_tag;
+};
+
 LIST_TYPE(_starpu_mpi_req,
 	/* description of the data at StarPU level */
 	starpu_data_handle_t data_handle;
@@ -162,9 +169,7 @@ LIST_TYPE(_starpu_mpi_req,
 	int user_datatype;
 
 	/* who are we talking to ? */
-	int srcdst;
-	int data_tag;
-	MPI_Comm comm;
+	struct _starpu_mpi_node_tag node_tag;
 
 	void (*func)(struct _starpu_mpi_req *);
 
@@ -186,8 +191,6 @@ LIST_TYPE(_starpu_mpi_req,
 	unsigned completed;
 	unsigned posted;
 
-	UT_hash_handle hh;
-
 	/* In the case of a Wait/Test request, we are going to post a request
 	 * to test the completion of another request */
 	struct _starpu_mpi_req *other_request;
@@ -206,14 +209,9 @@ LIST_TYPE(_starpu_mpi_req,
 	struct _starpu_mpi_req *internal_req;
 
 	int sequential_consistency;
-);
 
-struct _starpu_mpi_data
-{
-	int tag;
-	int rank;
-	MPI_Comm comm;
-};
+     	UT_hash_handle hh;
+);
 
 #ifdef __cplusplus
 }

+ 68 - 86
mpi/src/starpu_mpi_sync_data.c

@@ -22,148 +22,130 @@
 
 struct _starpu_mpi_sync_data_handle_hashlist
 {
-	struct _starpu_mpi_sync_data_handle_list *list;
+	struct _starpu_mpi_req_list *list;
 	UT_hash_handle hh;
-	int data_tag;
+	struct _starpu_mpi_node_tag node_tag;
 };
 
 /** stores data which have been received by MPI but have not been requested by the application */
-static starpu_pthread_mutex_t *_starpu_mpi_sync_data_handle_mutex;
-static struct _starpu_mpi_sync_data_handle_hashlist **_starpu_mpi_sync_data_handle_hashmap = NULL;
+static starpu_pthread_mutex_t _starpu_mpi_sync_data_handle_mutex;
+static struct _starpu_mpi_sync_data_handle_hashlist *_starpu_mpi_sync_data_handle_hashmap = NULL;
 static int _starpu_mpi_sync_data_handle_hashmap_count = 0;
 
+void _starpu_mpi_sync_data_init(void)
+{
+	_starpu_mpi_sync_data_handle_hashmap = NULL;
+	STARPU_PTHREAD_MUTEX_INIT(&_starpu_mpi_sync_data_handle_mutex, NULL);
+	_starpu_mpi_sync_data_handle_hashmap_count = 0;
+}
+
+void _starpu_mpi_sync_data_free(void)
+{
+	struct _starpu_mpi_sync_data_handle_hashlist *current, *tmp;
+	HASH_ITER(hh, _starpu_mpi_sync_data_handle_hashmap, current, tmp)
+	{
+		_starpu_mpi_req_list_delete(current->list);
+		HASH_DEL(_starpu_mpi_sync_data_handle_hashmap, current);
+		free(current);
+	}
+	STARPU_PTHREAD_MUTEX_DESTROY(&_starpu_mpi_sync_data_handle_mutex);
+	free(_starpu_mpi_sync_data_handle_hashmap);
+}
+
 #ifdef STARPU_VERBOSE
 static
-void _starpu_mpi_sync_data_handle_display_hash(int source, int tag)
+void _starpu_mpi_sync_data_handle_display_hash(struct _starpu_mpi_node_tag *node_tag)
 {
 	struct _starpu_mpi_sync_data_handle_hashlist *hashlist;
-	HASH_FIND_INT(_starpu_mpi_sync_data_handle_hashmap[source], &tag, hashlist);
+	HASH_FIND(hh, _starpu_mpi_sync_data_handle_hashmap, node_tag, sizeof(struct _starpu_mpi_node_tag), hashlist);
 
 	if (hashlist == NULL)
 	{
-		_STARPU_MPI_DEBUG(60, "Hashlist for source %d and tag %d does not exist\n", source, tag);
+		_STARPU_MPI_DEBUG(60, "Hashlist for comm %p source %d and tag %d does not exist\n", node_tag->comm, node_tag->rank, node_tag->data_tag);
 	}
-	else if (_starpu_mpi_sync_data_handle_list_empty(hashlist->list))
+	else if (_starpu_mpi_req_list_empty(hashlist->list))
 	{
-		_STARPU_MPI_DEBUG(60, "Hashlist for source %d and tag %d is empty\n", source, tag);
+		_STARPU_MPI_DEBUG(60, "Hashlist for comm %p source %d and tag %d is empty\n", node_tag->comm, node_tag->rank, node_tag->data_tag);
 	}
 	else
 	{
-		struct _starpu_mpi_sync_data_handle *cur;
-		for (cur = _starpu_mpi_sync_data_handle_list_begin(hashlist->list) ;
-		     cur != _starpu_mpi_sync_data_handle_list_end(hashlist->list);
-		     cur = _starpu_mpi_sync_data_handle_list_next(cur))
+		struct _starpu_mpi_req *cur;
+		for (cur = _starpu_mpi_req_list_begin(hashlist->list) ;
+		     cur != _starpu_mpi_req_list_end(hashlist->list);
+		     cur = _starpu_mpi_req_list_next(cur))
 		{
-			_STARPU_MPI_DEBUG(60, "Element for source %d and tag %d: %p\n", source, tag, cur);
+			_STARPU_MPI_DEBUG(60, "Element for comm %p source %d and tag %d: %p\n", node_tag->comm, node_tag->rank, node_tag->data_tag, cur);
 		}
 	}
 }
 #endif
 
-void _starpu_mpi_sync_data_init(int world_size)
-{
-	int k;
-
-	_starpu_mpi_sync_data_handle_hashmap = malloc(world_size * sizeof(struct _starpu_mpi_sync_data_handle_hash_list *));
-	_starpu_mpi_sync_data_handle_mutex = malloc(world_size * sizeof(starpu_pthread_mutex_t));
-	for(k=0 ; k<world_size ; k++)
-	{
-		_starpu_mpi_sync_data_handle_hashmap[k] = NULL;
-		STARPU_PTHREAD_MUTEX_INIT(&_starpu_mpi_sync_data_handle_mutex[k], NULL);
-	}
-}
-
-void _starpu_mpi_sync_data_check_termination()
+void _starpu_mpi_sync_data_check_termination(void)
 {
 	STARPU_ASSERT_MSG(_starpu_mpi_sync_data_handle_hashmap_count == 0, "Number of sync received messages left is not zero, did you forget to post a receive corresponding to a send?");
 }
 
-void _starpu_mpi_sync_data_free(int world_size)
-{
-	int n;
-	struct _starpu_mpi_sync_data_handle_hashlist *hashlist;
-
-	for(n=0 ; n<world_size; n++)
-	{
-		for(hashlist=_starpu_mpi_sync_data_handle_hashmap[n]; hashlist != NULL; hashlist=hashlist->hh.next)
-		{
-			_starpu_mpi_sync_data_handle_list_delete(hashlist->list);
-		}
-		struct _starpu_mpi_sync_data_handle_hashlist *current, *tmp;
-		HASH_ITER(hh, _starpu_mpi_sync_data_handle_hashmap[n], current, tmp)
-		{
-			HASH_DEL(_starpu_mpi_sync_data_handle_hashmap[n], current);
-			free(current);
-		}
-		STARPU_PTHREAD_MUTEX_DESTROY(&_starpu_mpi_sync_data_handle_mutex[n]);
-	}
-	free(_starpu_mpi_sync_data_handle_hashmap);
-	free(_starpu_mpi_sync_data_handle_mutex);
-}
-
-int _starpu_mpi_sync_data_count()
+int _starpu_mpi_sync_data_count(void)
 {
 	return _starpu_mpi_sync_data_handle_hashmap_count;
 }
 
-struct _starpu_mpi_sync_data_handle *_starpu_mpi_sync_data_create(struct _starpu_mpi_req *req)
+struct _starpu_mpi_req *_starpu_mpi_sync_data_find(int data_tag, int source, MPI_Comm comm)
 {
-	struct _starpu_mpi_sync_data_handle* sync_data_handle = calloc(1, sizeof(struct _starpu_mpi_sync_data_handle));
-	STARPU_ASSERT(sync_data_handle);
-	sync_data_handle->data_tag = req->data_tag;
-	sync_data_handle->source = req->srcdst;
-	sync_data_handle->req = req;
-	return sync_data_handle;
-}
+	struct _starpu_mpi_req *req;
+	struct _starpu_mpi_node_tag node_tag;
+	struct _starpu_mpi_sync_data_handle_hashlist *found;
 
-struct _starpu_mpi_sync_data_handle *_starpu_mpi_sync_data_find(int data_tag, int source)
-{
-	struct _starpu_mpi_sync_data_handle_hashlist *hashlist;
-	struct _starpu_mpi_sync_data_handle *sync_data_handle;
+	memset(&node_tag, 0, sizeof(struct _starpu_mpi_node_tag));
+	node_tag.comm = comm;
+	node_tag.rank = source;
+	node_tag.data_tag = data_tag;
 
-	_STARPU_MPI_DEBUG(60, "Looking for sync_data_handle with tag %d in the hashmap[%d]\n", data_tag, source);
-	STARPU_PTHREAD_MUTEX_LOCK(&_starpu_mpi_sync_data_handle_mutex[source]);
-	HASH_FIND_INT(_starpu_mpi_sync_data_handle_hashmap[source], &data_tag, hashlist);
-	if (hashlist == NULL)
+	_STARPU_MPI_DEBUG(60, "Looking for sync_data_handle with comm %p source %d tag %d in the hashmap\n", comm, source, data_tag);
+
+	STARPU_PTHREAD_MUTEX_LOCK(&_starpu_mpi_sync_data_handle_mutex);
+	HASH_FIND(hh, _starpu_mpi_sync_data_handle_hashmap, &node_tag, sizeof(struct _starpu_mpi_node_tag), found);
+	if (found == NULL)
 	{
-		sync_data_handle = NULL;
+		req = NULL;
 	}
 	else
 	{
-		if (_starpu_mpi_sync_data_handle_list_empty(hashlist->list))
+		if (_starpu_mpi_req_list_empty(found->list))
 		{
-			sync_data_handle = NULL;
+			req = NULL;
 		}
 		else
 		{
-			sync_data_handle = _starpu_mpi_sync_data_handle_list_pop_front(hashlist->list);
+			req = _starpu_mpi_req_list_pop_front(found->list);
 			_starpu_mpi_sync_data_handle_hashmap_count --;
 		}
 	}
-	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_sync_data_handle_mutex[source]);
-	_STARPU_MPI_DEBUG(60, "Found sync_data_handle %p with tag %d in the hashmap[%d]\n", sync_data_handle, data_tag, source);
-	return sync_data_handle;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_sync_data_handle_mutex);
+	_STARPU_MPI_DEBUG(60, "Found sync_data_handle %p with comm %p source %d tag %d in the hashmap\n", req, comm, source, data_tag);
+	return req;
 }
 
-void _starpu_mpi_sync_data_add(struct _starpu_mpi_sync_data_handle *sync_data_handle)
+void _starpu_mpi_sync_data_add(struct _starpu_mpi_req *sync_req)
 {
-	_STARPU_MPI_DEBUG(2000, "Adding sync_data_handle %p with tag %d in the hashmap[%d]\n", sync_data_handle, sync_data_handle->data_tag, sync_data_handle->source);
-
 	struct _starpu_mpi_sync_data_handle_hashlist *hashlist;
-	STARPU_PTHREAD_MUTEX_LOCK(&_starpu_mpi_sync_data_handle_mutex[sync_data_handle->source]);
-	HASH_FIND_INT(_starpu_mpi_sync_data_handle_hashmap[sync_data_handle->source], &sync_data_handle->data_tag, hashlist);
+
+	_STARPU_MPI_DEBUG(2000, "Adding sync_req %p with comm %p source %d tag %d in the hashmap\n", sync_req, sync_req->node_tag.comm, sync_req->node_tag.rank, sync_req->node_tag.data_tag);
+
+	STARPU_PTHREAD_MUTEX_LOCK(&_starpu_mpi_sync_data_handle_mutex);
+	HASH_FIND(hh, _starpu_mpi_sync_data_handle_hashmap, &sync_req->node_tag, sizeof(struct _starpu_mpi_node_tag), hashlist);
 	if (hashlist == NULL)
 	{
 		hashlist = malloc(sizeof(struct _starpu_mpi_sync_data_handle_hashlist));
-		hashlist->list = _starpu_mpi_sync_data_handle_list_new();
-		hashlist->data_tag = sync_data_handle->data_tag;
-		HASH_ADD_INT(_starpu_mpi_sync_data_handle_hashmap[sync_data_handle->source], data_tag, hashlist);
+		hashlist->list = _starpu_mpi_req_list_new();
+		hashlist->node_tag = sync_req->node_tag;
+		HASH_ADD(hh, _starpu_mpi_sync_data_handle_hashmap, node_tag, sizeof(hashlist->node_tag), hashlist);
 	}
-	_starpu_mpi_sync_data_handle_list_push_back(hashlist->list, sync_data_handle);
+	_starpu_mpi_req_list_push_back(hashlist->list, sync_req);
 	_starpu_mpi_sync_data_handle_hashmap_count ++;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_sync_data_handle_mutex[sync_data_handle->source]);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_sync_data_handle_mutex);
 #ifdef STARPU_VERBOSE
-	_starpu_mpi_sync_data_handle_display_hash(sync_data_handle->source, sync_data_handle->data_tag);
+	_starpu_mpi_sync_data_handle_display_hash(&sync_req->node_tag);
 #endif
 }
 

+ 5 - 12
mpi/src/starpu_mpi_sync_data.h

@@ -27,19 +27,12 @@
 extern "C" {
 #endif
 
-LIST_TYPE(_starpu_mpi_sync_data_handle,
-	  struct _starpu_mpi_req *req;
-	  int data_tag;
-	  int source;
-);
+void _starpu_mpi_sync_data_init(void);
+void _starpu_mpi_sync_data_check_termination(void);
+void _starpu_mpi_sync_data_free(void);
 
-void _starpu_mpi_sync_data_init(int world_size);
-void _starpu_mpi_sync_data_check_termination();
-void _starpu_mpi_sync_data_free(int world_size);
-
-struct _starpu_mpi_sync_data_handle *_starpu_mpi_sync_data_create(struct _starpu_mpi_req *req);
-struct _starpu_mpi_sync_data_handle *_starpu_mpi_sync_data_find(int data_tag, int source);
-void _starpu_mpi_sync_data_add(struct _starpu_mpi_sync_data_handle *sync_data_handle);
+struct _starpu_mpi_req *_starpu_mpi_sync_data_find(int data_tag, int source, MPI_Comm comm);
+void _starpu_mpi_sync_data_add(struct _starpu_mpi_req *req);
 int _starpu_mpi_sync_data_count();
 
 #ifdef __cplusplus

+ 1 - 1
mpi/src/starpu_mpi_tag.c

@@ -102,7 +102,7 @@ int _starpu_mpi_data_release_tag(starpu_data_handle_t handle)
 	if (tag != -1)
 	{
 		_starpu_spin_lock(&registered_tag_handles_lock);
-		HASH_FIND_INT(registered_tag_handles, &(((struct _starpu_mpi_data *)(handle->mpi_data))->tag), tag_entry);
+		HASH_FIND_INT(registered_tag_handles, &(((struct _starpu_mpi_node_tag *)(handle->mpi_data))->data_tag), tag_entry);
 		STARPU_ASSERT_MSG((tag_entry != NULL),"Data handle %p with tag %d isn't in the hashmap !",handle,tag);
 
 		HASH_DEL(registered_tag_handles, tag_entry);