Browse Source

mpi/src: implement properly synchronous send

Nathalie Furmento 10 years ago
parent
commit
bf0576198d

+ 4 - 2
mpi/src/Makefile.am

@@ -41,7 +41,8 @@ noinst_HEADERS =					\
 	starpu_mpi_select_node.h			\
 	starpu_mpi_cache_stats.h			\
 	starpu_mpi_early_data.h				\
-	starpu_mpi_early_request.h
+	starpu_mpi_early_request.h			\
+	starpu_mpi_sync_data.h
 
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi.c					\
@@ -55,7 +56,8 @@ libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi_select_node.c			\
 	starpu_mpi_cache_stats.c			\
 	starpu_mpi_early_data.c				\
-	starpu_mpi_early_request.c
+	starpu_mpi_early_request.c			\
+	starpu_mpi_sync_data.c
 
 showcheck:
 	-cat /dev/null

+ 243 - 127
mpi/src/starpu_mpi.c

@@ -22,6 +22,7 @@
 #include <starpu_profiling.h>
 #include <starpu_mpi_stats.h>
 #include <starpu_mpi_cache.h>
+#include <starpu_mpi_sync_data.h>
 #include <starpu_mpi_early_data.h>
 #include <starpu_mpi_early_request.h>
 #include <starpu_mpi_select_node.h>
@@ -33,6 +34,7 @@
 
 static void _starpu_mpi_add_sync_point_in_fxt(void);
 static void _starpu_mpi_submit_ready_request(void *arg);
+static void _starpu_mpi_handle_ready_request(struct _starpu_mpi_req *req);
 static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req);
 #ifdef STARPU_VERBOSE
 static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type);
@@ -148,7 +150,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 
 	_STARPU_MPI_INC_POSTED_REQUESTS(-1);
 
-	_STARPU_MPI_DEBUG(3, "new req %p srcdst %d tag %d and type %s\n", req, req->srcdst, req->data_tag, _starpu_mpi_request_type(req->request_type));
+	_STARPU_MPI_DEBUG(3, "new req %p srcdst %d tag %d and type %s %d\n", req, req->srcdst, req->data_tag, _starpu_mpi_request_type(req->request_type), req->is_internal_req);
 
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 
@@ -190,7 +192,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 		}
 		else
 		{
-			/* test whether the receive request has already been submitted internally by StarPU-MPI*/
+			/* test whether some data with the given tag and source have already been received by StarPU-MPI*/
 			struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(req->data_tag, req->srcdst);
 
 			/* Case: a receive request for a data with the given tag and source has already been
@@ -223,8 +225,31 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 			/* Case: no matching data has been received. Store the receive request as an early_request. */
 			else
 			{
-				_STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %d) into the request hashmap\n", req, req->srcdst, req->data_tag);
-				_starpu_mpi_early_request_add(req);
+				struct _starpu_mpi_sync_data_handle *sync_data_handle = _starpu_mpi_sync_data_find(req->data_tag, req->srcdst);
+				_STARPU_MPI_DEBUG(3, "----------> Looking for sync data for tag %d and src %d = %p\n", req->data_tag, req->srcdst, sync_data_handle);
+				if (sync_data_handle)
+				{
+					req->sync = 1;
+					_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
+					if (req->user_datatype == 0)
+					{
+						req->count = 1;
+						req->ptr = starpu_data_get_local_ptr(req->data_handle);
+					}
+					else
+					{
+						req->count = sync_data_handle->req->count;
+						STARPU_ASSERT(req->count);
+						req->ptr = malloc(req->count);
+						STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
+					}
+					_starpu_mpi_req_list_push_front(ready_requests, req);
+				}
+				else
+				{
+					_STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %d) into the request hashmap\n", req, req->srcdst, req->data_tag);
+					_starpu_mpi_early_request_add(req);
+				}
 			}
 		}
 	}
@@ -286,25 +311,27 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
  /*                                                      */
  /********************************************************/
 
- static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
- {
-	 _STARPU_MPI_LOG_IN();
+static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
+{
+	_STARPU_MPI_LOG_IN();
 
-	 _STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p datasize %ld ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+	_STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p datasize %ld ptr %p datatype '%s' count %d user_datatype %d sync %d\n", req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype, req->sync);
 
-	 _starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
+	_starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
 
-	 _STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(req->srcdst, req->data_tag, 0);
+	_STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(req->srcdst, req->data_tag, 0);
 
-	 if (req->sync == 0)
-	 {
-		 req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
-		 STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %d", req->ret);
+	if (req->sync == 0)
+	{
+		_STARPU_MPI_COMM_TO_DEBUG(req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_DATA);
+		 req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_DATA, req->comm, &req->request);
+		 STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %s", _starpu_mpi_get_mpi_code(req->ret));
 	 }
 	 else
 	 {
-		 req->ret = MPI_Issend(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
-		 STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Issend returning %d", req->ret);
+		_STARPU_MPI_COMM_TO_DEBUG(req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_SYNC_DATA);
+		 req->ret = MPI_Issend(req->ptr, req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_SYNC_DATA, req->comm, &req->request);
+		 STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Issend returning %s", _starpu_mpi_get_mpi_code(req->ret));
 	 }
 
 	 _STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->srcdst, req->data_tag, 0);
@@ -318,14 +345,16 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 	 _starpu_mpi_handle_detached_request(req);
 
 	 _STARPU_MPI_LOG_OUT();
- }
+}
 
- static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
- {
+static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
+{
 	_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
 
 	req->envelope = calloc(1,sizeof(struct _starpu_mpi_envelope));
+	req->envelope->mode = _STARPU_MPI_ENVELOPE_DATA;
 	req->envelope->data_tag = req->data_tag;
+	req->envelope->sync = req->sync;
 
 	if (req->user_datatype == 0)
 	{
@@ -335,8 +364,9 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 
 		MPI_Type_size(req->datatype, &size);
 		req->envelope->size = (starpu_ssize_t)req->count * size;
-		_STARPU_MPI_DEBUG(1, "Post MPI isend count (%ld) datatype_size %ld request to %d with tag %d\n",req->count,starpu_data_get_size(req->data_handle),req->srcdst, _starpu_mpi_tag);
-		MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
+		_STARPU_MPI_DEBUG(1, "Post MPI isend count (%ld) datatype_size %ld request to %d\n",req->count,starpu_data_get_size(req->data_handle),req->srcdst);
+		_STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE);
+		MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, req->comm, &req->size_req);
 	}
 	else
 	{
@@ -348,10 +378,11 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 		if (req->envelope->size != -1)
  		{
  			// We already know the size of the data, let's send it to overlap with the packing of the data
-			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", req->envelope->size, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
+			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) to node %d (first call to pack)\n", req->envelope->size, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->srcdst);
 			req->count = req->envelope->size;
-			ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
-			STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
+			_STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE);
+			ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, req->comm, &req->size_req);
+			STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %s", _starpu_mpi_get_mpi_code(ret));
  		}
 
  		// Pack the data
@@ -359,9 +390,10 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 		if (req->envelope->size == -1)
  		{
  			// We know the size now, let's send it
-			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", req->envelope->size, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
-			ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
-			STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
+			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) to node %d (second call to pack)\n", req->envelope->size, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->srcdst);
+			_STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE);
+			ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, req->comm, &req->size_req);
+			STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %s", _starpu_mpi_get_mpi_code(ret));
  		}
  		else
  		{
@@ -370,7 +402,18 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
  		}
 		// We can send the data now
 	}
-	_starpu_mpi_isend_data_func(req);
+
+	if (req->sync)
+	{
+		// If the data is to be sent in synchronous mode, we need to wait for the receiver ready message
+		struct _starpu_mpi_sync_data_handle *_sync_data = _starpu_mpi_sync_data_create(req);
+		_starpu_mpi_sync_data_add(_sync_data);
+	}
+	else
+	{
+		// Otherwise we can send the data
+		_starpu_mpi_isend_data_func(req);
+	}
 }
 
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
@@ -461,8 +504,28 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 
 	_STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(req->srcdst, req->data_tag);
 
-	req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
-	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %d", req->ret);
+	if (req->sync)
+	{
+		struct _starpu_mpi_envelope *_envelope = calloc(1,sizeof(struct _starpu_mpi_envelope));
+		_envelope->mode = _STARPU_MPI_ENVELOPE_SYNC_READY;
+		_envelope->data_tag = req->data_tag;
+		_STARPU_MPI_DEBUG(20, "Telling node %d it can send the data and waiting for the data back ...\n", req->srcdst);
+		_STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE);
+		req->ret = MPI_Send(_envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, req->comm);
+		STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Send returning %s", _starpu_mpi_get_mpi_code(req->ret));
+	}
+
+	if (req->sync)
+	{
+		_STARPU_MPI_COMM_FROM_DEBUG(req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_SYNC_DATA);
+		req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_SYNC_DATA, req->comm, &req->request);
+	}
+	else
+	{
+		_STARPU_MPI_COMM_FROM_DEBUG(req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_DATA);
+		req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_DATA, req->comm, &req->request);
+	}
+	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %s", _starpu_mpi_get_mpi_code(req->ret));
 
 	_STARPU_MPI_TRACE_IRECV_SUBMIT_END(req->srcdst, req->data_tag);
 
@@ -1086,6 +1149,61 @@ static void _starpu_mpi_print_thread_level_support(int thread_level, char *msg)
 	}
 }
 
+static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope, MPI_Status status)
+{
+	_STARPU_MPI_DEBUG(20, "Request with tag %d and source %d not found, creating a early_handle to receive incoming data..\n", envelope->data_tag, status.MPI_SOURCE);
+	_STARPU_MPI_DEBUG(20, "Request sync %d\n", envelope->sync);
+
+	struct _starpu_mpi_early_data_handle* early_data_handle = _starpu_mpi_early_data_create(envelope, status.MPI_SOURCE);
+
+	starpu_data_handle_t data_handle = NULL;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	data_handle = _starpu_data_get_data_handle_from_tag(envelope->data_tag);
+	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+
+	if (data_handle)
+	{
+		early_data_handle->buffer = NULL;
+		starpu_data_register_same(&early_data_handle->handle, data_handle);
+		_starpu_mpi_early_data_add(early_data_handle);
+	}
+	else
+	{
+		/* The application has not registered yet a data with the tag,
+		 * we are going to receive the data as a raw memory, and give it
+		 * to the application when it post a receive for this tag
+		 */
+		_STARPU_MPI_DEBUG(3, "Posting a receive for a data of size %d which has not yet been registered\n", (int)early_data_handle->env->size);
+		early_data_handle->buffer = malloc(early_data_handle->env->size);
+		starpu_variable_data_register(&early_data_handle->handle, STARPU_MAIN_RAM, (uintptr_t) early_data_handle->buffer, early_data_handle->env->size);
+		_starpu_mpi_early_data_add(early_data_handle);
+	}
+
+	_STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_handle with tag %d from src %d ..\n", early_data_handle->data_tag, status.MPI_SOURCE);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
+							  early_data_handle->data_tag, MPI_COMM_WORLD, 1, 0,
+							  NULL, NULL, 1, 1, envelope->size);
+	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+
+	// We wait until the request is pushed in the
+	// ready_request list, that ensures that the next loop
+	// will call _starpu_mpi_handle_ready_request
+	// on the request and post the corresponding mpi_irecv,
+	// otherwise, it may lead to read data as envelop
+	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req->posted_mutex));
+	while (!(early_data_handle->req->posted))
+		STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req->posted_cond), &(early_data_handle->req->posted_mutex));
+	STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req->posted_mutex));
+
+	STARPU_PTHREAD_MUTEX_LOCK(&early_data_handle->req_mutex);
+	early_data_handle->req_ready = 1;
+	STARPU_PTHREAD_COND_BROADCAST(&early_data_handle->req_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_handle->req_mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+}
+
 static void *_starpu_mpi_progress_thread_func(void *arg)
 {
 	struct _starpu_mpi_argc_argv *argc_argv = (struct _starpu_mpi_argc_argv *) arg;
@@ -1133,6 +1251,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 	_starpu_mpi_early_request_init(worldsize);
 	_starpu_mpi_early_data_init(worldsize);
+	_starpu_mpi_sync_data_init(worldsize);
 
 	/* notify the main thread that the progression thread is ready */
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
@@ -1144,12 +1263,13 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
  	struct _starpu_mpi_envelope *envelope = calloc(1,sizeof(struct _starpu_mpi_envelope));
 
+	MPI_Request envelope_request;
  	int envelope_request_submitted = 0;
 
-	while (running || posted_requests || !(_starpu_mpi_req_list_empty(ready_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
+	while (running || posted_requests || !(_starpu_mpi_req_list_empty(ready_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))// || !(_starpu_mpi_early_request_count()) || !(_starpu_mpi_sync_data_count()))
 	{
 		/* shall we block ? */
-		unsigned block = _starpu_mpi_req_list_empty(ready_requests) && _starpu_mpi_early_request_count() == 0;
+		unsigned block = _starpu_mpi_req_list_empty(ready_requests) && _starpu_mpi_early_request_count() == 0 && _starpu_mpi_sync_data_count() == 0;
 
 #ifndef STARPU_MPI_ACTIVITY
 		STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
@@ -1189,11 +1309,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		/* If there is no currently submitted envelope_request submitted to
                  * catch envelopes from senders, and there is some pending
                  * receive requests on our side, we resubmit a header request. */
-		MPI_Request envelope_request;
-		if ((_starpu_mpi_early_request_count() > 0) && (envelope_request_submitted == 0))// && (HASH_COUNT(_starpu_mpi_early_data_handle_hashmap) == 0))
+		if (((_starpu_mpi_early_request_count() > 0) || (_starpu_mpi_sync_data_count() > 0)) && (envelope_request_submitted == 0))// && (HASH_COUNT(_starpu_mpi_early_data_handle_hashmap) == 0))
 		{
 			_STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop\n");
-			MPI_Irecv(envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _starpu_mpi_tag, MPI_COMM_WORLD, &envelope_request);
+			_STARPU_MPI_COMM_FROM_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _STARPU_MPI_TAG_ENVELOPE);
+			MPI_Irecv(envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _STARPU_MPI_TAG_ENVELOPE, MPI_COMM_WORLD, &envelope_request);
 			envelope_request_submitted = 1;
 		}
 
@@ -1206,7 +1326,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		{
 			int flag,res;
 			MPI_Status status;
-			_STARPU_MPI_DEBUG(4, "Test of envelope_request\n");
+			//_STARPU_MPI_DEBUG(4, "Test of envelope_request\n");
 
 			/* test whether an envelope has arrived. */
 			res = MPI_Test(&envelope_request, &flag, &status);
@@ -1214,124 +1334,119 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 			if (flag)
 			{
-				_STARPU_MPI_DEBUG(3, "Searching for application request with tag %d and source %d (size %ld)\n", envelope->data_tag, status.MPI_SOURCE, envelope->size);
-
-				struct _starpu_mpi_req *early_request = _starpu_mpi_early_request_find(envelope->data_tag, status.MPI_SOURCE);
-
-				/* Case: a data will arrive before a matching receive is
-				 * posted by the application. Create a temporary handle to
-				 * store the incoming data, submit a starpu_mpi_irecv_detached
-				 * on this handle, and store it as an early_data
-				 */
-				if (early_request == NULL)
+				_STARPU_MPI_DEBUG(4, "Envelope received with mode %d\n", envelope->mode);
+				if (envelope->mode == _STARPU_MPI_ENVELOPE_SYNC_READY)
 				{
-
-					_STARPU_MPI_DEBUG(3, "Request with tag %d and source %d not found, creating a early_handle to receive incoming data..\n", envelope->data_tag, status.MPI_SOURCE);
-
-					starpu_data_handle_t data_handle = NULL;
-
-					STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-					data_handle = _starpu_data_get_data_handle_from_tag(envelope->data_tag);
-					STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-
-					struct _starpu_mpi_early_data_handle* early_data_handle = calloc(1, sizeof(struct _starpu_mpi_early_data_handle));
-					STARPU_ASSERT(early_data_handle);
-					STARPU_PTHREAD_MUTEX_INIT(&early_data_handle->req_mutex, NULL);
-					STARPU_PTHREAD_COND_INIT(&early_data_handle->req_cond, NULL);
-					early_data_handle->data_tag = envelope->data_tag;
-					early_data_handle->env = envelope;
-					early_data_handle->source = status.MPI_SOURCE;
-
-					if (data_handle)
-					{
-						early_data_handle->buffer = NULL;
-						starpu_data_register_same(&early_data_handle->handle, data_handle);
-						_starpu_mpi_early_data_add(early_data_handle);
-					}
-					else
-					{
-						/* The application has not registered yet a data with the tag,
-						 * we are going to receive the data as a raw memory, and give it
-						 * to the application when it post a receive for this tag
-						 */
-						_STARPU_MPI_DEBUG(20, "Posting a receive for a data of size %d which has not yet been registered\n", (int)early_data_handle->env->size);
-						early_data_handle->buffer = malloc(early_data_handle->env->size);
-						starpu_vector_data_register(&early_data_handle->handle, STARPU_MAIN_RAM, (uintptr_t) early_data_handle->buffer, early_data_handle->env->size, 1);
-						_starpu_mpi_early_data_add(early_data_handle);
-					}
-
-					_STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_handle with tag %d from src %d ..\n", early_data_handle->data_tag, status.MPI_SOURCE);
-					STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-					early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
-											  early_data_handle->data_tag, MPI_COMM_WORLD, 1, 0,
-											  NULL, NULL, 1, 1, envelope->size);
-					STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-
-					// We wait until the request is pushed in the
-					// ready_request list, that ensures that the next loop
-					// will call _starpu_mpi_handle_ready_request
-					// on the request and post the corresponding mpi_irecv,
-					// otherwise, it may lead to read data as envelop
+					struct _starpu_mpi_sync_data_handle *_sync_data = _starpu_mpi_sync_data_find(envelope->data_tag, status.MPI_SOURCE);
+					_STARPU_MPI_DEBUG(2000, "Sending data with tag %d to node %d\n", _sync_data->req->data_tag, status.MPI_SOURCE);
+					STARPU_ASSERT_MSG(envelope->data_tag == _sync_data->req->data_tag, "Tag mismatch (envelope %d != req %d)\n", envelope->data_tag, _sync_data->req->data_tag);
 					STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-					STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req->posted_mutex));
-					while (!(early_data_handle->req->posted))
-					     STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req->posted_cond), &(early_data_handle->req->posted_mutex));
-					STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req->posted_mutex));
-
-					STARPU_PTHREAD_MUTEX_LOCK(&early_data_handle->req_mutex);
-					early_data_handle->req_ready = 1;
-					STARPU_PTHREAD_COND_BROADCAST(&early_data_handle->req_cond);
-					STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_handle->req_mutex);
+					_starpu_mpi_isend_data_func(_sync_data->req);
 					STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 				}
-				/* Case: a matching application request has been found for
-				 * the incoming data, we handle the correct allocation
-				 * of the pointer associated to the data handle, then
-				 * submit the corresponding receive with
-				 * _starpu_mpi_handle_ready_request. */
 				else
 				{
-					_STARPU_MPI_DEBUG(3, "A matching receive has been found for the incoming data with tag %d\n", envelope->data_tag);
+					_STARPU_MPI_DEBUG(3, "Searching for application request with tag %d and source %d (size %ld)\n", envelope->data_tag, status.MPI_SOURCE, envelope->size);
 
-					_starpu_mpi_early_request_delete(early_request);
+					struct _starpu_mpi_req *early_request = _starpu_mpi_early_request_find(envelope->data_tag, status.MPI_SOURCE);
 
-					_starpu_mpi_handle_allocate_datatype(early_request->data_handle, &early_request->datatype, &early_request->user_datatype);
-					if (early_request->user_datatype == 0)
+					/* Case: a data will arrive before a matching receive is
+					 * posted by the application. Create a temporary handle to
+					 * store the incoming data, submit a starpu_mpi_irecv_detached
+					 * on this handle, and store it as an early_data
+					 */
+					if (early_request == NULL)
 					{
-						early_request->count = 1;
-						early_request->ptr = starpu_data_get_local_ptr(early_request->data_handle);
+						if (envelope->sync)
+						{
+							_STARPU_MPI_DEBUG(2000, "-------------------------> adding request for tag %d\n", envelope->data_tag);
+							struct _starpu_mpi_req *new_req;
+#ifdef STARPU_DEVEL
+#warning creating a request is not really useful.
+#endif
+							/* Initialize the request structure */
+							_starpu_mpi_request_init(&new_req);
+							new_req->request_type = RECV_REQ;
+							new_req->data_handle = NULL;
+							new_req->srcdst = status.MPI_SOURCE;
+							new_req->data_tag = envelope->data_tag;
+							new_req->comm = MPI_COMM_WORLD;
+							new_req->detached = 1;
+							new_req->sync = 1;
+							new_req->callback = NULL;
+							new_req->callback_arg = NULL;
+							new_req->func = _starpu_mpi_irecv_data_func;
+							new_req->sequential_consistency = 1;
+							new_req->is_internal_req = 0; // ????
+							new_req->count = envelope->size;
+							struct _starpu_mpi_sync_data_handle *_sync_data = _starpu_mpi_sync_data_create(new_req);
+							_starpu_mpi_sync_data_add(_sync_data);
+						}
+						else
+						{
+							_starpu_mpi_receive_early_data(envelope, status);
+						}
 					}
+					/* Case: a matching application request has been found for
+					 * the incoming data, we handle the correct allocation
+					 * of the pointer associated to the data handle, then
+					 * submit the corresponding receive with
+					 * _starpu_mpi_handle_ready_request. */
 					else
 					{
-						early_request->count = envelope->size;
-						early_request->ptr = malloc(early_request->count);
-
-						STARPU_ASSERT_MSG(early_request->ptr, "cannot allocate message of size %ld\n", early_request->count);
+						_STARPU_MPI_DEBUG(2000, "A matching application request has been found for the incoming data with tag %d\n", envelope->data_tag);
+						_STARPU_MPI_DEBUG(2000, "Request sync %d\n", envelope->sync);
+
+						_starpu_mpi_early_request_delete(early_request);
+
+						early_request->sync = envelope->sync;
+						_starpu_mpi_handle_allocate_datatype(early_request->data_handle, &early_request->datatype, &early_request->user_datatype);
+						if (early_request->user_datatype == 0)
+						{
+							early_request->count = 1;
+							early_request->ptr = starpu_data_get_local_ptr(early_request->data_handle);
+						}
+						else
+						{
+							early_request->count = envelope->size;
+							early_request->ptr = malloc(early_request->count);
+
+							STARPU_ASSERT_MSG(early_request->ptr, "cannot allocate message of size %ld\n", early_request->count);
+						}
+
+						_STARPU_MPI_DEBUG(3, "Handling new request... \n");
+						/* handling a request is likely to block for a while
+						 * (on a sync_data_with_mem call), we want to let the
+						 * application submit requests in the meantime, so we
+						 * release the lock. */
+						STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+						_starpu_mpi_handle_ready_request(early_request);
+						STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 					}
-
-					_STARPU_MPI_DEBUG(3, "Handling new request... \n");
-					/* handling a request is likely to block for a while
-					 * (on a sync_data_with_mem call), we want to let the
-					 * application submit requests in the meantime, so we
-					 * release the lock. */
-					STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-					_starpu_mpi_handle_ready_request(early_request);
-					STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 				}
 				envelope_request_submitted = 0;
 			}
 			else
 			{
-				_STARPU_MPI_DEBUG(4, "Nothing received, continue ..\n");
+				//_STARPU_MPI_DEBUG(4, "Nothing received, continue ..\n");
 			}
 		}
 	}
 
+	if (envelope_request_submitted)
+	{
+		MPI_Status status;
+		MPI_Cancel(&envelope_request);
+		MPI_Wait(&envelope_request, &status);
+		envelope_request_submitted = 0;
+	}
+
 	STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
 	STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(ready_requests), "List of ready requests not empty");
 	STARPU_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
 	_starpu_mpi_early_request_check_termination();
 	_starpu_mpi_early_data_check_termination();
+	_starpu_mpi_sync_data_check_termination();
 
 	if (argc_argv->initialize_mpi)
 	{
@@ -1341,6 +1456,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
+	_starpu_mpi_sync_data_free(worldsize);
 	_starpu_mpi_early_data_free(worldsize);
 	_starpu_mpi_early_request_free();
 	free(argc_argv);

+ 13 - 1
mpi/src/starpu_mpi_early_data.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2009, 2010-2014  Université de Bordeaux
- * Copyright (C) 2010, 2011, 2012, 2013, 2014  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -66,6 +66,18 @@ void _starpu_mpi_early_data_free(int world_size)
 	free(_starpu_mpi_early_data_handle_hashmap);
 }
 
+struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_create(struct _starpu_mpi_envelope *envelope, int source)
+{
+	struct _starpu_mpi_early_data_handle* early_data_handle = calloc(1, sizeof(struct _starpu_mpi_early_data_handle));
+	STARPU_ASSERT(early_data_handle);
+	STARPU_PTHREAD_MUTEX_INIT(&early_data_handle->req_mutex, NULL);
+	STARPU_PTHREAD_COND_INIT(&early_data_handle->req_cond, NULL);
+	early_data_handle->data_tag = envelope->data_tag;
+	early_data_handle->env = envelope;
+	early_data_handle->source = source;
+	return early_data_handle;
+}
+
 #ifdef STARPU_VERBOSE
 static void _starpu_mpi_early_data_handle_display_hash(int source, int tag)
 {

+ 2 - 1
mpi/src/starpu_mpi_early_data.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2009, 2010-2014  Université de Bordeaux
- * Copyright (C) 2010, 2011, 2012, 2013, 2014  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -44,6 +44,7 @@ void _starpu_mpi_early_data_init(int world_size);
 void _starpu_mpi_early_data_check_termination();
 void _starpu_mpi_early_data_free(int world_size);
 
+struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_create(struct _starpu_mpi_envelope *envelope, int source);
 struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_find(int data_tag, int source);
 void _starpu_mpi_early_data_add(struct _starpu_mpi_early_data_handle *early_data_handle);
 void _starpu_mpi_early_data_delete(struct _starpu_mpi_early_data_handle *early_data_handle);

+ 64 - 1
mpi/src/starpu_mpi_private.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2010, 2012, 2014  Université de Bordeaux
- * Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2011, 2012, 2013, 2015  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -41,3 +41,66 @@ void starpu_mpi_set_communication_tag(int tag)
 {
 	_starpu_mpi_tag = tag;
 }
+
+char *_starpu_mpi_get_mpi_code(int code)
+{
+	switch (code)
+	{
+	case MPI_SUCCESS: return "MPI_SUCCESS";
+	case MPI_ERR_BUFFER: return "MPI_ERR_BUFFER";;
+	case MPI_ERR_COUNT: return "MPI_ERR_COUNT";;
+	case MPI_ERR_TYPE: return "MPI_ERR_TYPE";;
+	case MPI_ERR_TAG: return "MPI_ERR_TAG";;
+	case MPI_ERR_COMM: return "MPI_ERR_COMM";;
+	case MPI_ERR_RANK: return "MPI_ERR_RANK";;
+	case MPI_ERR_REQUEST: return "MPI_ERR_REQUEST";;
+	case MPI_ERR_ROOT: return "MPI_ERR_ROOT";;
+	case MPI_ERR_GROUP: return "MPI_ERR_GROUP";;
+	case MPI_ERR_OP: return "MPI_ERR_OP";;
+	case MPI_ERR_TOPOLOGY: return "MPI_ERR_TOPOLOGY";;
+	case MPI_ERR_DIMS: return "MPI_ERR_DIMS";;
+	case MPI_ERR_ARG: return "MPI_ERR_ARG";;
+	case MPI_ERR_UNKNOWN: return "MPI_ERR_UNKNOWN";;
+	case MPI_ERR_TRUNCATE: return "MPI_ERR_TRUNCATE";;
+	case MPI_ERR_OTHER: return "MPI_ERR_OTHER";;
+	case MPI_ERR_INTERN: return "MPI_ERR_INTERN";;
+	case MPI_ERR_IN_STATUS: return "MPI_ERR_IN_STATUS";;
+	case MPI_ERR_PENDING: return "MPI_ERR_PENDING";;
+	case MPI_ERR_ACCESS: return "MPI_ERR_ACCESS";;
+	case MPI_ERR_AMODE: return "MPI_ERR_AMODE";;
+	case MPI_ERR_ASSERT: return "MPI_ERR_ASSERT";;
+	case MPI_ERR_BAD_FILE: return "MPI_ERR_BAD_FILE";;
+	case MPI_ERR_BASE: return "MPI_ERR_BASE";;
+	case MPI_ERR_CONVERSION: return "MPI_ERR_CONVERSION";;
+	case MPI_ERR_DISP: return "MPI_ERR_DISP";;
+	case MPI_ERR_DUP_DATAREP: return "MPI_ERR_DUP_DATAREP";;
+	case MPI_ERR_FILE_EXISTS: return "MPI_ERR_FILE_EXISTS";;
+	case MPI_ERR_FILE_IN_USE: return "MPI_ERR_FILE_IN_USE";;
+	case MPI_ERR_FILE: return "MPI_ERR_FILE";;
+	case MPI_ERR_INFO_KEY: return "MPI_ERR_INFO_KEY";;
+	case MPI_ERR_INFO_NOKEY: return "MPI_ERR_INFO_NOKEY";;
+	case MPI_ERR_INFO_VALUE: return "MPI_ERR_INFO_VALUE";;
+	case MPI_ERR_INFO: return "MPI_ERR_INFO";;
+	case MPI_ERR_IO: return "MPI_ERR_IO";;
+	case MPI_ERR_KEYVAL: return "MPI_ERR_KEYVAL";;
+	case MPI_ERR_LOCKTYPE: return "MPI_ERR_LOCKTYPE";;
+	case MPI_ERR_NAME: return "MPI_ERR_NAME";;
+	case MPI_ERR_NO_MEM: return "MPI_ERR_NO_MEM";;
+	case MPI_ERR_NOT_SAME: return "MPI_ERR_NOT_SAME";;
+	case MPI_ERR_NO_SPACE: return "MPI_ERR_NO_SPACE";;
+	case MPI_ERR_NO_SUCH_FILE: return "MPI_ERR_NO_SUCH_FILE";;
+	case MPI_ERR_PORT: return "MPI_ERR_PORT";;
+	case MPI_ERR_QUOTA: return "MPI_ERR_QUOTA";;
+	case MPI_ERR_READ_ONLY: return "MPI_ERR_READ_ONLY";;
+	case MPI_ERR_RMA_CONFLICT: return "MPI_ERR_RMA_CONFLICT";;
+	case MPI_ERR_RMA_SYNC: return "MPI_ERR_RMA_SYNC";;
+	case MPI_ERR_SERVICE: return "MPI_ERR_SERVICE";;
+	case MPI_ERR_SIZE: return "MPI_ERR_SIZE";;
+	case MPI_ERR_SPAWN: return "MPI_ERR_SPAWN";;
+	case MPI_ERR_UNSUPPORTED_DATAREP: return "MPI_ERR_UNSUPPORTED_DATAREP";;
+	case MPI_ERR_UNSUPPORTED_OPERATION: return "MPI_ERR_UNSUPPORTED_OPERATION";;
+	case MPI_ERR_WIN: return "MPI_ERR_WIN";;
+	case MPI_ERR_LASTCODE: return "MPI_ERR_LASTCODE";;
+	default: return "UNKNOWN_MPI_CODE";
+	}
+}

+ 56 - 1
mpi/src/starpu_mpi_private.h

@@ -30,6 +30,7 @@ extern "C" {
 #endif
 
 extern int _starpu_debug_rank;
+char *_starpu_mpi_get_mpi_code(int code);
 
 #ifdef STARPU_VERBOSE
 extern int _starpu_debug_level_min;
@@ -38,7 +39,50 @@ void _starpu_mpi_set_debug_level_min(int level);
 void _starpu_mpi_set_debug_level_max(int level);
 #endif
 
+#ifdef STARPU_NO_ASSERT
+#  define STARPU_MPI_ASSERT_MSG(x, msg, ...)	do { } while(0)
+#else
+#  if defined(__CUDACC__) && defined(STARPU_HAVE_WINDOWS)
+int _starpu_debug_rank;
+#    define STARPU_MPI_ASSERT_MSG(x, msg, ...)									\
+	do													\
+	{ 													\
+		if (STARPU_UNLIKELY(!(x))) 									\
+		{												\
+			if (_starpu_debug_rank == -1) starpu_mpi_comm_rank(MPI_COMM_WORLD, &_starpu_debug_rank); \
+			fprintf(stderr, "\n[%d][starpu_mpi][%s][assert failure] " msg "\n\n", _starpu_debug_rank, __starpu_func__, ## __VA_ARGS__); *(int*)NULL = 0; \
+		} \
+	} while(0)
+#  else
+#    define STARPU_MPI_ASSERT_MSG(x, msg, ...)	\
+	do \
+	{ \
+		if (STARPU_UNLIKELY(!(x))) \
+		{ \
+			if (_starpu_debug_rank == -1) starpu_mpi_comm_rank(MPI_COMM_WORLD, &_starpu_debug_rank); \
+			fprintf(stderr, "\n[%d][starpu_mpi][%s][assert failure] " msg "\n\n", _starpu_debug_rank, __starpu_func__, ## __VA_ARGS__); \
+		} \
+		assert(x); \
+	} while(0)
+
+#  endif
+#endif
+
 #ifdef STARPU_VERBOSE
+#  define _STARPU_MPI_COMM_DEBUG(count, datatype, node, tag, way)	\
+	do \
+	{ \
+	     	if (getenv("STARPU_MPI_COMM"))	\
+	     	{ \
+     			int __size; \
+			if (_starpu_debug_rank == -1) starpu_mpi_comm_rank(MPI_COMM_WORLD, &_starpu_debug_rank); \
+			MPI_Type_size(datatype, &__size); \
+			fprintf(stderr, "[%d][starpu_mpi] %s %d:%d %12ld     [%s:%d]\n", _starpu_debug_rank, way, node, tag, count*__size, __starpu_func__ , __LINE__); \
+			fflush(stderr); \
+		} \
+	} while(0);
+#  define _STARPU_MPI_COMM_TO_DEBUG(count, datatype, dest, tag) 	_STARPU_MPI_COMM_DEBUG(count, datatype, dest, tag, "-->")
+#  define _STARPU_MPI_COMM_FROM_DEBUG(count, datatype, source, tag) 	_STARPU_MPI_COMM_DEBUG(count, datatype, source, tag, "<--")
 #  define _STARPU_MPI_DEBUG(level, fmt, ...) \
 	do \
 	{								\
@@ -50,7 +94,10 @@ void _starpu_mpi_set_debug_level_max(int level);
 		}			\
 	} while(0);
 #else
-#  define _STARPU_MPI_DEBUG(level, fmt, ...)
+#  define _STARPU_MPI_COMM_DEBUG(count, datatype, node, tag, way)	do { } while(0)
+#  define _STARPU_MPI_COMM_TO_DEBUG(count, datatype, dest, tag)		do { } while(0)
+#  define _STARPU_MPI_COMM_FROM_DEBUG(count, datatype, source, tag)	do { } while(0)
+#  define _STARPU_MPI_DEBUG(level, fmt, ...)		do { } while(0)
 #endif
 
 #define _STARPU_MPI_DISP(fmt, ...) do { if (!getenv("STARPU_SILENT")) { \
@@ -76,6 +123,9 @@ void _starpu_mpi_set_debug_level_max(int level);
 #endif
 
 extern int _starpu_mpi_tag;
+#define _STARPU_MPI_TAG_ENVELOPE  _starpu_mpi_tag
+#define _STARPU_MPI_TAG_DATA      _starpu_mpi_tag+1
+#define _STARPU_MPI_TAG_SYNC_DATA _starpu_mpi_tag+2
 
 enum _starpu_mpi_request_type
 {
@@ -88,10 +138,15 @@ enum _starpu_mpi_request_type
 	UNKNOWN_REQ=6,
 };
 
+#define _STARPU_MPI_ENVELOPE_DATA       0
+#define _STARPU_MPI_ENVELOPE_SYNC_READY 1
+
 struct _starpu_mpi_envelope
 {
+	int mode;
 	starpu_ssize_t size;
 	int data_tag;
+	unsigned sync;
 };
 
 struct _starpu_mpi_req;

+ 169 - 0
mpi/src/starpu_mpi_sync_data.c

@@ -0,0 +1,169 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdlib.h>
+#include <starpu_mpi.h>
+#include <starpu_mpi_sync_data.h>
+#include <starpu_mpi_private.h>
+#include <common/uthash.h>
+
+struct _starpu_mpi_sync_data_handle_hashlist
+{
+	struct _starpu_mpi_sync_data_handle_list *list;
+	UT_hash_handle hh;
+	int data_tag;
+};
+
+/** stores data which have been received by MPI but have not been requested by the application */
+static starpu_pthread_mutex_t *_starpu_mpi_sync_data_handle_mutex;
+static struct _starpu_mpi_sync_data_handle_hashlist **_starpu_mpi_sync_data_handle_hashmap = NULL;
+static int _starpu_mpi_sync_data_handle_hashmap_count = 0;
+
+#ifdef STARPU_VERBOSE
+static
+void _starpu_mpi_sync_data_handle_display_hash(int source, int tag)
+{
+	struct _starpu_mpi_sync_data_handle_hashlist *hashlist;
+	HASH_FIND_INT(_starpu_mpi_sync_data_handle_hashmap[source], &tag, hashlist);
+
+	if (hashlist == NULL)
+	{
+		_STARPU_MPI_DEBUG(60, "Hashlist for source %d and tag %d does not exist\n", source, tag);
+	}
+	else if (_starpu_mpi_sync_data_handle_list_empty(hashlist->list))
+	{
+		_STARPU_MPI_DEBUG(60, "Hashlist for source %d and tag %d is empty\n", source, tag);
+	}
+	else
+	{
+		struct _starpu_mpi_sync_data_handle *cur;
+		for (cur = _starpu_mpi_sync_data_handle_list_begin(hashlist->list) ;
+		     cur != _starpu_mpi_sync_data_handle_list_end(hashlist->list);
+		     cur = _starpu_mpi_sync_data_handle_list_next(cur))
+		{
+			_STARPU_MPI_DEBUG(60, "Element for source %d and tag %d: %p\n", source, tag, cur);
+		}
+	}
+}
+#endif
+
+void _starpu_mpi_sync_data_init(int world_size)
+{
+	int k;
+
+	_starpu_mpi_sync_data_handle_hashmap = malloc(world_size * sizeof(struct _starpu_mpi_sync_data_handle_hash_list *));
+	_starpu_mpi_sync_data_handle_mutex = malloc(world_size * sizeof(starpu_pthread_mutex_t));
+	for(k=0 ; k<world_size ; k++)
+	{
+		_starpu_mpi_sync_data_handle_hashmap[k] = NULL;
+		STARPU_PTHREAD_MUTEX_INIT(&_starpu_mpi_sync_data_handle_mutex[k], NULL);
+	}
+}
+
+void _starpu_mpi_sync_data_check_termination()
+{
+	STARPU_ASSERT_MSG(_starpu_mpi_sync_data_handle_hashmap_count == 0, "Number of sync received messages left is not zero, did you forget to post a receive corresponding to a send?");
+}
+
+void _starpu_mpi_sync_data_free(int world_size)
+{
+	int n;
+	struct _starpu_mpi_sync_data_handle_hashlist *hashlist;
+
+	for(n=0 ; n<world_size; n++)
+	{
+		for(hashlist=_starpu_mpi_sync_data_handle_hashmap[n]; hashlist != NULL; hashlist=hashlist->hh.next)
+		{
+			_starpu_mpi_sync_data_handle_list_delete(hashlist->list);
+		}
+		struct _starpu_mpi_sync_data_handle_hashlist *current, *tmp;
+		HASH_ITER(hh, _starpu_mpi_sync_data_handle_hashmap[n], current, tmp)
+		{
+			HASH_DEL(_starpu_mpi_sync_data_handle_hashmap[n], current);
+			free(current);
+		}
+		STARPU_PTHREAD_MUTEX_DESTROY(&_starpu_mpi_sync_data_handle_mutex[n]);
+	}
+	free(_starpu_mpi_sync_data_handle_hashmap);
+	free(_starpu_mpi_sync_data_handle_mutex);
+}
+
+int _starpu_mpi_sync_data_count()
+{
+	return _starpu_mpi_sync_data_handle_hashmap_count;
+}
+
+struct _starpu_mpi_sync_data_handle *_starpu_mpi_sync_data_create(struct _starpu_mpi_req *req)
+{
+	struct _starpu_mpi_sync_data_handle* sync_data_handle = calloc(1, sizeof(struct _starpu_mpi_sync_data_handle));
+	STARPU_ASSERT(sync_data_handle);
+	sync_data_handle->data_tag = req->data_tag;
+	sync_data_handle->source = req->srcdst;
+	sync_data_handle->req = req;
+	return sync_data_handle;
+}
+
+struct _starpu_mpi_sync_data_handle *_starpu_mpi_sync_data_find(int data_tag, int source)
+{
+	struct _starpu_mpi_sync_data_handle_hashlist *hashlist;
+	struct _starpu_mpi_sync_data_handle *sync_data_handle;
+
+	_STARPU_MPI_DEBUG(60, "Looking for sync_data_handle with tag %d in the hashmap[%d]\n", data_tag, source);
+	STARPU_PTHREAD_MUTEX_LOCK(&_starpu_mpi_sync_data_handle_mutex[source]);
+	HASH_FIND_INT(_starpu_mpi_sync_data_handle_hashmap[source], &data_tag, hashlist);
+	if (hashlist == NULL)
+	{
+		sync_data_handle = NULL;
+	}
+	else
+	{
+		if (_starpu_mpi_sync_data_handle_list_empty(hashlist->list))
+		{
+			sync_data_handle = NULL;
+		}
+		else
+		{
+			sync_data_handle = _starpu_mpi_sync_data_handle_list_pop_front(hashlist->list);
+			_starpu_mpi_sync_data_handle_hashmap_count --;
+		}
+	}
+	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_sync_data_handle_mutex[source]);
+	_STARPU_MPI_DEBUG(60, "Found sync_data_handle %p with tag %d in the hashmap[%d]\n", sync_data_handle, data_tag, source);
+	return sync_data_handle;
+}
+
+void _starpu_mpi_sync_data_add(struct _starpu_mpi_sync_data_handle *sync_data_handle)
+{
+	_STARPU_MPI_DEBUG(2000, "Adding sync_data_handle %p with tag %d in the hashmap[%d]\n", sync_data_handle, sync_data_handle->data_tag, sync_data_handle->source);
+
+	struct _starpu_mpi_sync_data_handle_hashlist *hashlist;
+	STARPU_PTHREAD_MUTEX_LOCK(&_starpu_mpi_sync_data_handle_mutex[sync_data_handle->source]);
+	HASH_FIND_INT(_starpu_mpi_sync_data_handle_hashmap[sync_data_handle->source], &sync_data_handle->data_tag, hashlist);
+	if (hashlist == NULL)
+	{
+		hashlist = malloc(sizeof(struct _starpu_mpi_sync_data_handle_hashlist));
+		hashlist->list = _starpu_mpi_sync_data_handle_list_new();
+		hashlist->data_tag = sync_data_handle->data_tag;
+		HASH_ADD_INT(_starpu_mpi_sync_data_handle_hashmap[sync_data_handle->source], data_tag, hashlist);
+	}
+	_starpu_mpi_sync_data_handle_list_push_back(hashlist->list, sync_data_handle);
+	_starpu_mpi_sync_data_handle_hashmap_count ++;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&_starpu_mpi_sync_data_handle_mutex[sync_data_handle->source]);
+#ifdef STARPU_VERBOSE
+	_starpu_mpi_sync_data_handle_display_hash(sync_data_handle->source, sync_data_handle->data_tag);
+#endif
+}
+

+ 49 - 0
mpi/src/starpu_mpi_sync_data.h

@@ -0,0 +1,49 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_MPI_SYNC_DATA_H__
+#define __STARPU_MPI_SYNC_DATA_H__
+
+#include <starpu.h>
+#include <stdlib.h>
+#include <mpi.h>
+#include <common/config.h>
+#include <common/list.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+LIST_TYPE(_starpu_mpi_sync_data_handle,
+	  struct _starpu_mpi_req *req;
+	  int data_tag;
+	  int source;
+);
+
+void _starpu_mpi_sync_data_init(int world_size);
+void _starpu_mpi_sync_data_check_termination();
+void _starpu_mpi_sync_data_free(int world_size);
+
+struct _starpu_mpi_sync_data_handle *_starpu_mpi_sync_data_create(struct _starpu_mpi_req *req);
+struct _starpu_mpi_sync_data_handle *_starpu_mpi_sync_data_find(int data_tag, int source);
+void _starpu_mpi_sync_data_add(struct _starpu_mpi_sync_data_handle *sync_data_handle);
+int _starpu_mpi_sync_data_count();
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* __STARPU_MPI_SYNC_DATA_H__ */