浏览代码

mpi/src: various fixes

  - we need to store the internal request which is used when data
    arrive before a starpu_mpi_recv is posted. the internal request
    holds the mpi request which needs to be waited when calling
    starpu_mpi_wait

  - when posting such a request, we need to wait until it is pushed in
    the new_requests list to make sure the data are read directly
    after the envelope. Otherwise the starpu mpi engine will read the
    incoming data as an envelope.

  - when sending data, we need to store the envelop, and free it when
    handling with the request termination

  - release the global lock when calling
    starpu_data_get_data_handle_from_tag to avoid deadlock
Nathalie Furmento 12 年之前
父节点
当前提交
06d9ce0bca
共有 2 个文件被更改,包括 187 次插入103 次删除
  1. 168 102
      mpi/src/starpu_mpi.c
  2. 19 1
      mpi/src/starpu_mpi_private.h

+ 168 - 102
mpi/src/starpu_mpi.c

@@ -57,18 +57,13 @@ static int posted_requests = 0, newer_requests, barrier_running = 0;
 
 #define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
 
-struct _starpu_mpi_envelope
-{
-	ssize_t psize;
-	int mpi_tag;
-};
-
 struct _starpu_mpi_copy_handle
 {
 	starpu_data_handle_t handle;
 	struct _starpu_mpi_envelope *env;
 	int mpi_tag;
 	UT_hash_handle hh;
+	struct _starpu_mpi_req *req;
 };
 
  /********************************************************/
@@ -176,136 +171,170 @@ static void delete_chandle(struct _starpu_mpi_copy_handle *chandle)
 	}
 }
 
-/********************************************************/
-/*                                                      */
-/*  Send/Receive functionalities                        */
-/*                                                      */
-/********************************************************/
-
-static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
-							      int srcdst, int mpi_tag, MPI_Comm comm,
-							      unsigned detached, void (*callback)(void *), void *arg,
-							      enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
-							      enum starpu_data_access_mode mode)
+static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
 {
+	/* Initialize the request structure */
+	req->data_handle = NULL;
 
-	_STARPU_MPI_LOG_IN();
-	struct _starpu_mpi_req *req = calloc(1, sizeof(struct _starpu_mpi_req));
-	STARPU_ASSERT_MSG(req, "Invalid request");
+	req->datatype = NULL;
+	req->ptr = NULL;
+	req->count = -1;
+	req->user_datatype = -1;
 
-	_STARPU_MPI_INC_POSTED_REQUESTS(1);
+	req->srcdst = -1;
+	req->mpi_tag = -1;
+	req->comm = 0;
 
-	/* Initialize the request structure */
-	req->submitted = 0;
-	req->completed = 0;
+	req->func = NULL;
+
+	req->status = NULL;
+	req->request = NULL;
+	req->flag = NULL;
+
+	req->ret = -1;
 	STARPU_PTHREAD_MUTEX_INIT(&req->req_mutex, NULL);
 	STARPU_PTHREAD_COND_INIT(&req->req_cond, NULL);
+	STARPU_PTHREAD_MUTEX_INIT(&req->posted_mutex, NULL);
+	STARPU_PTHREAD_COND_INIT(&req->posted_cond, NULL);
 
-	req->request_type = request_type;
-	req->user_datatype = -1;
-	req->datatype = 0;
-	req->count = -1;
-	req->data_handle = data_handle;
-	req->srcdst = srcdst;
-	req->mpi_tag = mpi_tag;
-	req->comm = comm;
+	req->request_type = UNKNOWN_REQ;
 
-	req->detached = detached;
-	req->callback = callback;
-	req->callback_arg = arg;
+	req->submitted = 0;
+	req->completed = 0;
+	req->posted = 0;
 
-	req->func = func;
+	req->other_request = NULL;
 
-	/* Asynchronously request StarPU to fetch the data in main memory: when
-	 * it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
-	 * the request is actually submitted */
-	starpu_data_acquire_cb(data_handle, mode, _starpu_mpi_submit_new_mpi_request, (void *)req);
+	req->detached = -1;
+	req->callback = NULL;
+	req->callback_arg = NULL;
 
-	_STARPU_MPI_LOG_OUT();
-	return req;
-}
+	req->size_req = NULL;
+	req->internal_req = NULL;
+	req->is_internal_req = 0;
+	req->envelope = NULL;
+ }
 
-/********************************************************/
-/*                                                      */
-/*  Send functionalities                                */
-/*                                                      */
-/********************************************************/
+ /********************************************************/
+ /*                                                      */
+ /*  Send/Receive functionalities                        */
+ /*                                                      */
+ /********************************************************/
 
-static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
-{
-	_STARPU_MPI_LOG_IN();
+ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
+							       int srcdst, int mpi_tag, MPI_Comm comm,
+							       unsigned detached, void (*callback)(void *), void *arg,
+							       enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
+							       enum starpu_data_access_mode mode)
+ {
+
+	 _STARPU_MPI_LOG_IN();
+	 struct _starpu_mpi_req *req = malloc(sizeof(struct _starpu_mpi_req));
+	 STARPU_ASSERT_MSG(req, "Invalid request");
+
+	 _STARPU_MPI_INC_POSTED_REQUESTS(1);
+
+	 /* Initialize the request structure */
+	 _starpu_mpi_request_init(req);
+	 req->request_type = request_type;
+	 req->data_handle = data_handle;
+	 req->srcdst = srcdst;
+	 req->mpi_tag = mpi_tag;
+	 req->comm = comm;
+	 req->detached = detached;
+	 req->callback = callback;
+	 req->callback_arg = arg;
+	 req->func = func;
+
+	 /* Asynchronously request StarPU to fetch the data in main memory: when
+	  * it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
+	  * the request is actually submitted */
+	 starpu_data_acquire_cb(data_handle, mode, _starpu_mpi_submit_new_mpi_request, (void *)req);
+
+	 _STARPU_MPI_LOG_OUT();
+	 return req;
+ }
 
-	STARPU_ASSERT_MSG(req->ptr, "Pointer containing data to send is invalid");
+ /********************************************************/
+ /*                                                      */
+ /*  Send functionalities                                */
+ /*                                                      */
+ /********************************************************/
 
-	_STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p datasize %ld ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
+ {
+	 _STARPU_MPI_LOG_IN();
 
-	_starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
+	 STARPU_ASSERT_MSG(req->ptr, "Pointer containing data to send is invalid");
 
-	TRACE_MPI_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
+	 _STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p datasize %ld ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 
-	req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
-	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %d", req->ret);
+	 _starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
 
-	TRACE_MPI_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, 0);
+	 TRACE_MPI_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
 
-	/* somebody is perhaps waiting for the MPI request to be posted */
-	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
-	req->submitted = 1;
-	STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
+	 req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
+	 STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %d", req->ret);
 
-	_starpu_mpi_handle_detached_request(req);
+	 TRACE_MPI_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, 0);
 
-	_STARPU_MPI_LOG_OUT();
-}
+	 /* somebody is perhaps waiting for the MPI request to be posted */
+	 STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
+	 req->submitted = 1;
+	 STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
+	 STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
 
-static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
-{
-	_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
+	 _starpu_mpi_handle_detached_request(req);
+
+	 _STARPU_MPI_LOG_OUT();
+ }
 
-	struct _starpu_mpi_envelope* env = calloc(1,sizeof(struct _starpu_mpi_envelope));
+ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
+ {
+	_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
 
-	env->mpi_tag = req->mpi_tag;
+	req->envelope = calloc(1,sizeof(struct _starpu_mpi_envelope));
+	req->envelope->mpi_tag = req->mpi_tag;
 
 	if (req->user_datatype == 0)
 	{
 		req->count = 1;
 		req->ptr = starpu_data_get_local_ptr(req->data_handle);
 
-		env->psize = (ssize_t)req->count;
+		req->envelope->psize = (ssize_t)req->count;
 
 		_STARPU_MPI_DEBUG(1, "Post MPI isend count (%ld) datatype_size %ld request to %d with tag %d\n",req->count,starpu_data_get_size(req->data_handle),req->srcdst, _starpu_mpi_tag);
-		MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
+		MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
 	}
 	else
 	{
 		int ret;
 
  		// Do not pack the data, just try to find out the size
-		starpu_data_pack(req->data_handle, NULL, &(env->psize));
+		starpu_data_pack(req->data_handle, NULL, &(req->envelope->psize));
 
-		if (env->psize != -1)
+		if (req->envelope->psize != -1)
  		{
  			// We already know the size of the data, let's send it to overlap with the packing of the data
-			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", env->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
-			req->count = env->psize;
-			ret = MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
+			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", req->envelope->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
+			req->count = req->envelope->psize;
+			ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
 			STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
  		}
 
  		// Pack the data
  		starpu_data_pack(req->data_handle, &req->ptr, &req->count);
-		if (env->psize == -1)
+		if (req->envelope->psize == -1)
  		{
  			// We know the size now, let's send it
-			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", env->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
-			ret = MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
+			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", req->envelope->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
+			ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
 			STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
  		}
  		else
  		{
  			// We check the size returned with the 2 calls to pack is the same
-			STARPU_ASSERT_MSG(req->count == env->psize, "Calls to pack_data returned different sizes %ld != %ld", req->count, env->psize);
+			STARPU_ASSERT_MSG(req->count == req->envelope->psize, "Calls to pack_data returned different sizes %ld != %ld", req->count, req->envelope->psize);
  		}
 		// We can send the data now
 	}
@@ -481,8 +510,11 @@ int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 {
 	_STARPU_MPI_LOG_IN();
 	int ret;
-	struct _starpu_mpi_req *waiting_req = calloc(1, sizeof(struct _starpu_mpi_req));
+
+	struct _starpu_mpi_req *waiting_req = malloc(sizeof(struct _starpu_mpi_req));
+	_starpu_mpi_request_init(waiting_req);
 	STARPU_ASSERT_MSG(waiting_req, "Allocation failed");
+
 	struct _starpu_mpi_req *req = *public_req;
 
 	_STARPU_MPI_INC_POSTED_REQUESTS(1);
@@ -573,9 +605,9 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 
 	if (submitted)
 	{
-		struct _starpu_mpi_req *testing_req = calloc(1, sizeof(struct _starpu_mpi_req));
+		struct _starpu_mpi_req *testing_req = malloc(sizeof(struct _starpu_mpi_req));
 		STARPU_ASSERT_MSG(testing_req, "allocation failed");
-		//		memset(testing_req, 0, sizeof(struct _starpu_mpi_req));
+		_starpu_mpi_request_init(testing_req);
 
 		/* Initialize the request structure */
 		STARPU_PTHREAD_MUTEX_INIT(&(testing_req->req_mutex), NULL);
@@ -639,8 +671,9 @@ int starpu_mpi_barrier(MPI_Comm comm)
 {
 	_STARPU_MPI_LOG_IN();
 	int ret;
-	struct _starpu_mpi_req *barrier_req = calloc(1, sizeof(struct _starpu_mpi_req));
+	struct _starpu_mpi_req *barrier_req = malloc(sizeof(struct _starpu_mpi_req));
 	STARPU_ASSERT_MSG(barrier_req, "allocation failed");
+	_starpu_mpi_request_init(barrier_req);
 
 	/* First wait for *both* all tasks and MPI requests to finish, in case
 	 * some tasks generate MPI requests, MPI requests generate tasks, etc.
@@ -705,6 +738,7 @@ static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type
 		case WAIT_REQ: return "WAIT_REQ";
 		case TEST_REQ: return "TEST_REQ";
 		case BARRIER_REQ: return "BARRIER_REQ";
+		case UNKNOWN_REQ: return "UNSET_REQ";
 		default: return "unknown request type";
 		}
 }
@@ -749,12 +783,25 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 			}
 			else
 			{
+				_STARPU_MPI_DEBUG(3, "NOT deleting chandle %p from hashmap (tag %d %d)\n", chandle, req->mpi_tag, starpu_data_get_tag(req->data_handle));
 				_starpu_mpi_handle_free_datatype(req->data_handle, &req->datatype);
 			}
 		}
 		starpu_data_release(req->data_handle);
 	}
 
+	if (req->envelope)
+	{
+		free(req->envelope);
+		req->envelope = NULL;
+	}
+
+	if (req->internal_req)
+	{
+		free(req->internal_req);
+		req->internal_req = NULL;
+	}
+
 	/* Execute the specified callback, if any */
 	if (req->callback)
 		req->callback(req->callback_arg);
@@ -779,6 +826,9 @@ static void _starpu_mpi_copy_cb(void* arg)
 {
 	struct _starpu_mpi_copy_cb_args *args = arg;
 
+	args->req->request = args->req->internal_req->request;
+	args->req->submitted = 1;
+
 	struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->copy_handle);
 	void* itf_src = starpu_data_get_interface_on_node(args->copy_handle,0);
 	void* itf_dst = starpu_data_get_interface_on_node(args->data_handle,0);
@@ -801,7 +851,8 @@ static void _starpu_mpi_copy_cb(void* arg)
 	starpu_data_unregister_submit(args->copy_handle);
 
 	_STARPU_MPI_DEBUG(3, "Done, handling request %p termination of the already received request\n",args->req);
-	_starpu_mpi_handle_request_termination(args->req);
+	if (args->req->detached)
+		_starpu_mpi_handle_request_termination(args->req);
 
 	free(args);
 }
@@ -813,6 +864,8 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 
 	_STARPU_MPI_INC_POSTED_REQUESTS(-1);
 
+	_STARPU_MPI_DEBUG(3, "calling _starpu_mpi_submit_new_mpi_request with req %p tag %d and type %s\n", req, req->mpi_tag, _starpu_mpi_request_type(req->request_type));
+
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 
 	if (req->request_type == RECV_REQ)
@@ -828,6 +881,8 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 		{
 			_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->mpi_tag);
 
+			req->internal_req = chandle->req;
+
 			struct _starpu_mpi_copy_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_copy_cb_args));
 			cb_args->data_handle = req->data_handle;
 			cb_args->copy_handle = chandle->handle;
@@ -859,9 +914,16 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 					STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
 				}
 
+				_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 				_starpu_mpi_req_list_push_front(new_requests, req);
 
-				_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+				/* somebody is perhaps waiting for the request to be pushed in the new_requests list */
+				STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+				STARPU_PTHREAD_MUTEX_LOCK(&req->posted_mutex);
+				req->posted = 1;
+				STARPU_PTHREAD_COND_BROADCAST(&req->posted_cond);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&req->posted_mutex);
+				STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 			}
 			/* Case : a classic receive request with no send received earlier than expected.
 			 * We just add the pending receive request to the requests' hashmap. */
@@ -955,7 +1017,8 @@ static void _starpu_mpi_test_detached_requests(void)
 		if (flag)
 		{
 			_starpu_mpi_req_list_erase(detached_requests, req);
-			free(req);
+			if (!req->is_internal_req)
+				free(req);
 		}
 
 	}
@@ -1065,13 +1128,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
  	struct _starpu_mpi_envelope *recv_env = calloc(1,sizeof(struct _starpu_mpi_envelope));
 
- 	MPI_Request header_req;
  	int header_req_submitted = 0;
 
 	while (running || posted_requests || !(_starpu_mpi_req_list_empty(new_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
 	{
 		/* shall we block ? */
-		_STARPU_MPI_DEBUG(3, "HASH_COUNT(_starpu_mpi_req_hashmap) = %d\n",HASH_COUNT(_starpu_mpi_req_hashmap));
 		unsigned block = _starpu_mpi_req_list_empty(new_requests) && (HASH_COUNT(_starpu_mpi_req_hashmap) == 0);
 
 #ifndef STARPU_MPI_ACTIVITY
@@ -1109,11 +1170,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 		/* If there is no currently submitted header_req submitted to catch envelopes from senders, and there is some pending receive
 		 * requests in our side, we resubmit a header request. */
-		if ((HASH_COUNT(_starpu_mpi_req_hashmap) > 0) && (header_req_submitted == 0) && (HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0))
+		MPI_Request header_req;
+		if ((HASH_COUNT(_starpu_mpi_req_hashmap) > 0) && (header_req_submitted == 0))// && (HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0))
 		{
+			_STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop\n");
 			MPI_Irecv(recv_env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _starpu_mpi_tag, MPI_COMM_WORLD, &header_req);
-
-			_STARPU_MPI_DEBUG(3, "Submit of header_req OK!\n");
 			header_req_submitted = 1;
 		}
 
@@ -1126,7 +1187,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 		{
 			int flag,res;
 			MPI_Status status;
-			_STARPU_MPI_DEBUG(3, "Test of header_req\n");
+			_STARPU_MPI_DEBUG(4, "Test of header_req\n");
 
 			/* test whether an envelope has arrived. */
 			res = MPI_Test(&header_req, &flag, &status);
@@ -1134,9 +1195,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 			if (flag)
 			{
-				_STARPU_MPI_DEBUG(3, "header_req received !\n");
-
-				_STARPU_MPI_DEBUG(3, "Searching for request with tag %d, size %ld ..\n",recv_env->mpi_tag, recv_env->psize);
+				_STARPU_MPI_DEBUG(3, "Searching for request with tag %d (size %ld)\n", recv_env->mpi_tag, recv_env->psize);
 
 				struct _starpu_mpi_req *found_req = find_req(recv_env->mpi_tag);
 
@@ -1151,7 +1210,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 					while(!(data_handle))
 					{
+						STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 						data_handle = starpu_data_get_data_handle_from_tag(recv_env->mpi_tag);
+						STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 					}
 					STARPU_ASSERT(data_handle);
 
@@ -1164,9 +1225,14 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 					add_chandle(chandle);
 
 					_STARPU_MPI_DEBUG(3, "Posting internal detached irecv on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
-					_starpu_mpi_irecv_common(chandle->handle, status.MPI_SOURCE, chandle->mpi_tag, MPI_COMM_WORLD, 1, NULL, NULL);
-
-					_STARPU_MPI_DEBUG(3, "Success of starpu_irecv_detached on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
+					chandle->req = _starpu_mpi_irecv_common(chandle->handle, status.MPI_SOURCE, chandle->mpi_tag, MPI_COMM_WORLD, 1, NULL, NULL);
+					chandle->req->is_internal_req = 1;
+					STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+					STARPU_PTHREAD_MUTEX_LOCK(&(chandle->req->posted_mutex));
+					while (!(chandle->req->posted))
+					     STARPU_PTHREAD_COND_WAIT(&(chandle->req->posted_cond), &(chandle->req->posted_mutex));
+					STARPU_PTHREAD_MUTEX_UNLOCK(&(chandle->req->posted_mutex));
+					STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 				}
 				/* Case : a matching receive has been found for the incoming data, we handle the correct allocation of the pointer associated to
 				 * the data handle, then submit the corresponding receive with _starpu_mpi_handle_new_request. */
@@ -1203,7 +1269,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 			}
 			else
 			{
-				_STARPU_MPI_DEBUG(3, "Nothing received, continue ..\n");
+				_STARPU_MPI_DEBUG(4, "Nothing received, continue ..\n");
 			}
 		}
 	}

+ 19 - 1
mpi/src/starpu_mpi_private.h

@@ -78,9 +78,18 @@ enum _starpu_mpi_request_type
 	WAIT_REQ=2,
 	TEST_REQ=3,
 	BARRIER_REQ=4,
-	PROBE_REQ=5
+	PROBE_REQ=5,
+	UNKNOWN_REQ=6,
 };
 
+struct _starpu_mpi_envelope
+{
+	ssize_t psize;
+	int mpi_tag;
+};
+
+struct _starpu_mpi_req;
+
 LIST_TYPE(_starpu_mpi_req,
 	/* description of the data at StarPU level */
 	starpu_data_handle_t data_handle;
@@ -106,10 +115,14 @@ LIST_TYPE(_starpu_mpi_req,
 	starpu_pthread_mutex_t req_mutex;
 	starpu_pthread_cond_t req_cond;
 
+	starpu_pthread_mutex_t posted_mutex;
+	starpu_pthread_cond_t posted_cond;
+
 	enum _starpu_mpi_request_type request_type; /* 0 send, 1 recv */
 
 	unsigned submitted;
 	unsigned completed;
+	unsigned posted;
 
 	UT_hash_handle hh;
 
@@ -124,6 +137,11 @@ LIST_TYPE(_starpu_mpi_req,
 
         /* in the case of user-defined datatypes, we need to send the size of the data */
 	MPI_Request size_req;
+
+        struct _starpu_mpi_envelope* envelope;
+
+	int is_internal_req;
+	struct _starpu_mpi_req *internal_req;
 );
 
 #ifdef __cplusplus