소스 검색

merge branche mpi

Nathalie Furmento 12 년 전
부모
커밋
0159c3ad5f

+ 8 - 0
ChangeLog

@@ -18,6 +18,14 @@ StarPU 1.2.0 (svn revision xxxx)
 ==============================================
 
 New features:
+  * MPI:
+        - New internal communication system : a unique tag called
+	  is now used for all communications, and a system
+	  of hashmaps on each node which stores pending receives has been
+	  implemented. Every message is now coupled with an envelope, sent
+	  before the corresponding data, which allows the receiver to
+	  allocate data correctly, and to submit the matching receive of
+	  the envelope.
 
 StarPU 1.1.0 (svn revision xxxx)
 ==============================================

+ 23 - 0
doc/chapters/api.texi

@@ -602,6 +602,15 @@ codelet, and reduction between per-worker buffers will be done with the
 @var{redux_cl} codelet.
 @end deftypefun
 
+@deftypefun struct starpu_data_interface_ops* starpu_data_get_interface_ops (starpu_data_handle_t @var{handle})
+Get a pointer to the structure describing the different methods used
+to manipulate the given data. See @ref{struct starpu_data_interface_ops} for more details on this structure.
+@end deftypefun
+
+@deftypefun unsigned starpu_data_get_sequential_consistency_flag (starpu_data_handle_t @var{handle})
+Return the sequential consistency flag of the given data.
+@end deftypefun
+
 @node Access registered data from the application
 @subsection Access registered data from the application
 
@@ -3345,6 +3354,16 @@ to the world size. Communications statistics must be enabled
 (@pxref{STARPU_COMM_STATS}).
 @end deftypefun
 
+@deftypefun void starpu_mpi_set_communication_tag (int @var{tag})
+@anchor{starpu_mpi_set_communication_tag}
+Tell StarPU-MPI which MPI tag to use for all its communications.
+@end deftypefun
+
+@deftypefun int starpu_mpi_get_communication_tag (void)
+@anchor{starpu_mpi_get_communication_tag}
+Returns the MPI tag which will be used for all StarPU-MPI communications.
+@end deftypefun
+
 @node Communication
 @subsection Communication
 
@@ -3475,6 +3494,10 @@ to it.
 Returns the last value set by @code{starpu_data_set_rank}.
 @end deftypefun
 
+@deftypefun starpu_data_handle_t starpu_data_get_data_handle_from_tag (int @var{tag})
+Returns the data handle associated to the MPI tag, or NULL if there is not.
+@end deftypefun
+
 @defmac STARPU_EXECUTE_ON_NODE
 this macro is used when calling @code{starpu_mpi_insert_task}, and
 must be followed by a integer value which specified the node on which

+ 26 - 1
doc/chapters/mpi-support.texi

@@ -148,7 +148,23 @@ creation of a StarPU-MPI request, the function
 @code{starpu_data_acquire_cb} is then called to asynchronously request
 StarPU to fetch the data in main memory; when the data is available in
 main memory, a StarPU-MPI function is called to put the new request in
-the list of the ready requests.
+the list of the ready requests if it is a send request, or in an
+hashmap if it is a receive request.
+
+Internally, all MPI communications submitted by StarPU uses a unique
+tag which has a default value, and can be accessed with the functions
+@ref{starpu_mpi_get_communication_tag} and
+@ref{starpu_mpi_set_communication_tag}.
+
+The matching of tags with corresponding requests is done into StarPU-MPI. 
+To handle this, any communication is a double-communication based on a 
+envelope + data system. Every data which will be sent needs to send an 
+envelope which describes the data (particularly its tag) before sending 
+the data, so the receiver can get the matching pending receive request 
+from the hashmap, and submit it to recieve the data correctly.
+
+To this aim, the StarPU-MPI progression thread has a permanent-submitted 
+request destined to receive incoming envelopes from all sources.
 
 The StarPU-MPI progression thread regularly polls this list of ready
 requests. For each new ready request, the appropriate function is
@@ -162,6 +178,15 @@ requests. For each detached request, it regularly tests the completion
 of the MPI request by calling @code{MPI_Test}. On completion, the data
 handle is released, and if a callback was defined, it is called.
 
+Finally, the StarPU-MPI progression thread checks if an envelope has 
+arrived. If it is, it'll check if the corresponding receive has already
+been submitted by the application. If it is, it'll submit the request
+just as like as it does with those on the list of ready requests.
+If it is not, it'll allocate a temporary handle to store the data that
+will arrive just after, so as when the corresponding receive request
+will be submitted by the application, it'll copy this temporary handle
+into its one instead of submitting a new StarPU-MPI request.
+
 @ref{Communication} gives the list of all the point to point
 communications defined in StarPU-MPI.
 

+ 3 - 0
include/starpu_data.h

@@ -117,6 +117,7 @@ enum starpu_node_kind starpu_node_get_kind(unsigned node);
 void starpu_data_set_wt_mask(starpu_data_handle_t handle, uint32_t wt_mask);
 
 void starpu_data_set_sequential_consistency_flag(starpu_data_handle_t handle, unsigned flag);
+unsigned starpu_data_get_sequential_consistency_flag(starpu_data_handle_t handle);
 unsigned starpu_data_get_default_sequential_consistency_flag(void);
 void starpu_data_set_default_sequential_consistency_flag(unsigned flag);
 
@@ -132,6 +133,8 @@ int starpu_data_get_rank(starpu_data_handle_t handle);
 
 int starpu_data_set_tag(starpu_data_handle_t handle, int tag);
 int starpu_data_get_tag(starpu_data_handle_t handle);
+starpu_data_handle_t starpu_data_get_data_handle_from_tag(int tag);
+struct starpu_data_interface_ops* starpu_data_get_interface_ops(starpu_data_handle_t handle);
 
 unsigned starpu_data_test_if_allocated_on_node(starpu_data_handle_t handle, unsigned memory_node);
 

+ 4 - 0
mpi/include/starpu_mpi.h

@@ -70,6 +70,10 @@ void starpu_mpi_comm_amounts_retrieve(size_t *comm_amounts);
 void starpu_mpi_cache_flush(MPI_Comm comm, starpu_data_handle_t data_handle);
 void starpu_mpi_cache_flush_all_data(MPI_Comm comm);
 
+/* getter/setter for communication tag used for all communications in StarPU-MPI. */
+int starpu_mpi_get_communication_tag(void);
+void starpu_mpi_set_communication_tag(int tag);
+
 #ifdef __cplusplus
 }
 #endif

+ 389 - 82
mpi/src/starpu_mpi.c

@@ -57,6 +57,124 @@ static int posted_requests = 0, newer_requests, barrier_running = 0;
 
 #define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
 
+struct _starpu_mpi_envelope
+{
+	ssize_t psize;
+	int mpi_tag;
+};
+
+struct _starpu_mpi_copy_handle
+{
+	starpu_data_handle_t handle;
+	struct _starpu_mpi_envelope *env;
+	int mpi_tag;
+	UT_hash_handle hh;
+};
+
+ /********************************************************/
+ /*                                                      */
+ /*  Hashmap's requests functionalities                  */
+ /*                                                      */
+ /********************************************************/
+
+static struct _starpu_mpi_req *_starpu_mpi_req_hashmap = NULL;
+static struct _starpu_mpi_copy_handle *_starpu_mpi_copy_handle_hashmap = NULL;
+
+static struct _starpu_mpi_req* find_req(int mpi_tag)
+{
+	struct _starpu_mpi_req* req; // = malloc(sizeof(struct _starpu_mpi_req));
+
+	HASH_FIND_INT(_starpu_mpi_req_hashmap, &mpi_tag, req);
+
+	return req;
+}
+
+static void add_req(struct _starpu_mpi_req *req)
+{
+	struct _starpu_mpi_req *test_req;
+
+	test_req = find_req(req->mpi_tag);
+
+	if (test_req == NULL)
+	{
+		HASH_ADD_INT(_starpu_mpi_req_hashmap, mpi_tag, req);
+		_STARPU_MPI_DEBUG(3, "Adding request %p with tag %d in the hashmap. \n", req, req->mpi_tag);
+	}
+	else
+	{
+		_STARPU_MPI_DEBUG(3, "Error add_req : request %p with tag %d already in the hashmap. \n", req, req->mpi_tag);
+		int seq_const = starpu_data_get_sequential_consistency_flag(req->data_handle);
+		if (seq_const)
+		{
+			STARPU_ASSERT_MSG(!test_req, "Error add_req : request %p with tag %d wanted to be added to the hashmap, while another request %p with the same tag is already in it. \n Sequential consistency is activated : this is not supported by StarPU.", req, req->mpi_tag, test_req);
+		}
+		else
+		{
+			STARPU_ASSERT_MSG(!test_req, "Error add_req : request %p with tag %d wanted to be added to the hashmap, while another request %p with the same tag is already in it. \n Sequential consistency isn't activated for this handle : you should want to add dependencies between requests for which the sequential consistency is deactivated.", req, req->mpi_tag, test_req);
+		}
+	}
+}
+
+static void delete_req(struct _starpu_mpi_req *req)
+{
+	struct _starpu_mpi_req *test_req;
+
+	test_req = find_req(req->mpi_tag);
+
+	if (test_req != NULL)
+	{
+		HASH_DEL(_starpu_mpi_req_hashmap, req);
+		_STARPU_MPI_DEBUG(3, "Deleting request %p with tag %d from the hashmap. \n", req, req->mpi_tag);
+	}
+	else
+	{
+		_STARPU_MPI_DEBUG(3, "Warning delete_req : request %p with tag %d isn't in the hashmap. \n", req, req->mpi_tag);
+	}
+}
+
+static struct _starpu_mpi_copy_handle* find_chandle(int mpi_tag)
+{
+	struct _starpu_mpi_copy_handle* chandle;
+
+	HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap, &mpi_tag, chandle);
+
+	return chandle;
+}
+
+static void add_chandle(struct _starpu_mpi_copy_handle *chandle)
+{
+	struct _starpu_mpi_copy_handle *test_chandle;
+
+	test_chandle = find_chandle(chandle->mpi_tag);
+
+	if (test_chandle == NULL)
+	{
+		HASH_ADD_INT(_starpu_mpi_copy_handle_hashmap, mpi_tag, chandle);
+		_STARPU_MPI_DEBUG(3, "Adding copied handle %p with tag %d in the hashmap. \n", chandle, chandle->mpi_tag);
+	}
+	else
+	{
+		_STARPU_MPI_DEBUG(3, "Error add_chandle : copied handle %p with tag %d already in the hashmap. \n", chandle, chandle->mpi_tag);
+		STARPU_ASSERT(test_chandle != NULL);
+	}
+}
+
+static void delete_chandle(struct _starpu_mpi_copy_handle *chandle)
+{
+	struct _starpu_mpi_copy_handle *test_chandle;
+
+	test_chandle = find_chandle(chandle->mpi_tag);
+
+	if (test_chandle != NULL)
+	{
+		HASH_DEL(_starpu_mpi_copy_handle_hashmap, chandle);
+		_STARPU_MPI_DEBUG(3, "Deleting copied handle %p with tag %d from the hashmap. \n", chandle, chandle->mpi_tag);
+	}
+	else
+	{
+		_STARPU_MPI_DEBUG(3, "Warning delete_chandle : copied handle %p with tag %d isn't in the hashmap. \n", chandle, chandle->mpi_tag);
+	}
+}
 
 /********************************************************/
 /*                                                      */
@@ -118,13 +236,13 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 
 	STARPU_ASSERT_MSG(req->ptr, "Pointer containing data to send is invalid");
 
-	_STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+	_STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p datasize %ld ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, starpu_handle_get_size(req->data_handle), req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 
 	_starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
 
 	TRACE_MPI_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
 
-	req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
+	req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
 	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %d", req->ret);
 
 	TRACE_MPI_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, 0);
@@ -143,43 +261,51 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 {
 	_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
+
+	struct _starpu_mpi_envelope* env = calloc(1,sizeof(struct _starpu_mpi_envelope));
+
+	env->mpi_tag = req->mpi_tag;
+
 	if (req->user_datatype == 0)
 	{
 		req->count = 1;
 		req->ptr = starpu_handle_get_local_ptr(req->data_handle);
+
+		env->psize = (ssize_t)req->count;
+
+		_STARPU_MPI_DEBUG(1, "Post MPI isend count (%ld) datatype_size %ld request to %d with tag %d\n",req->count,starpu_handle_get_size(req->data_handle),req->srcdst, _starpu_mpi_tag);
+		MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
 	}
 	else
 	{
-		ssize_t psize = -1;
 		int ret;
 
-		// Do not pack the data, just try to find out the size
-		starpu_handle_pack_data(req->data_handle, NULL, &psize);
+ 		// Do not pack the data, just try to find out the size
+		starpu_handle_pack_data(req->data_handle, NULL, &(env->psize));
 
-		if (psize != -1)
-		{
-			// We already know the size of the data, let's send it to overlap with the packing of the data
-			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->mpi_tag, req->srcdst);
-			req->count = psize;
-			ret = MPI_Isend(&req->count, sizeof(req->count), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->size_req);
+		if (env->psize != -1)
+ 		{
+ 			// We already know the size of the data, let's send it to overlap with the packing of the data
+			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", env->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
+			req->count = env->psize;
+			ret = MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
 			STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
-		}
-
-		// Pack the data
-		starpu_handle_pack_data(req->data_handle, &req->ptr, &req->count);
-		if (psize == -1)
-		{
-			// We know the size now, let's send it
-			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", req->count, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->mpi_tag, req->srcdst);
-			ret = MPI_Isend(&req->count, sizeof(req->count), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->size_req);
+ 		}
+
+ 		// Pack the data
+ 		starpu_handle_pack_data(req->data_handle, &req->ptr, &req->count);
+		if (env->psize == -1)
+ 		{
+ 			// We know the size now, let's send it
+			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", env->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
+			ret = MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
 			STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
-		}
-		else
-		{
-			// We check the size returned with the 2 calls to pack is the same
-			STARPU_ASSERT_MSG(req->count == psize, "Calls to pack_data returned different sizes %ld != %ld", req->count, psize);
-		}
-
+ 		}
+ 		else
+ 		{
+ 			// We check the size returned with the 2 calls to pack is the same
+			STARPU_ASSERT_MSG(req->count == env->psize, "Calls to pack_data returned different sizes %ld != %ld", req->count, env->psize);
+ 		}
 		// We can send the data now
 	}
 	_starpu_mpi_isend_data_func(req);
@@ -234,7 +360,7 @@ int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI
 
 /********************************************************/
 /*                                                      */
-/*  Receive functionalities                             */
+/*  receive functionalities                             */
 /*                                                      */
 /********************************************************/
 
@@ -248,7 +374,7 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 
 	TRACE_MPI_IRECV_SUBMIT_BEGIN(req->srcdst, req->mpi_tag);
 
-	req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
+	req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
 	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %d", req->ret);
 
 	TRACE_MPI_IRECV_SUBMIT_END(req->srcdst, req->mpi_tag);
@@ -264,48 +390,9 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_OUT();
 }
 
-struct _starpu_mpi_irecv_size_callback
-{
-	starpu_data_handle_t handle;
-	struct _starpu_mpi_req *req;
-};
-
-static void _starpu_mpi_irecv_size_callback(void *arg)
-{
-	struct _starpu_mpi_irecv_size_callback *callback = (struct _starpu_mpi_irecv_size_callback *)arg;
-
-	starpu_data_unregister(callback->handle);
-	callback->req->ptr = malloc(callback->req->count);
-	STARPU_ASSERT_MSG(callback->req->ptr, "cannot allocate message of size %ld", callback->req->count);
-	_starpu_mpi_irecv_data_func(callback->req);
-	free(callback);
-}
-
-static void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
-{
-	_STARPU_MPI_LOG_IN();
-
-	_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
-	if (req->user_datatype == 0)
-	{
-		req->count = 1;
-		req->ptr = starpu_handle_get_local_ptr(req->data_handle);
-		_starpu_mpi_irecv_data_func(req);
-	}
-	else
-	{
-		struct _starpu_mpi_irecv_size_callback *callback = malloc(sizeof(struct _starpu_mpi_irecv_size_callback));
-		callback->req = req;
-		starpu_variable_data_register(&callback->handle, 0, (uintptr_t)&(callback->req->count), sizeof(callback->req->count));
-		_STARPU_MPI_DEBUG(4, "Receiving size with tag %d from node %d\n", req->mpi_tag, req->srcdst);
-		_starpu_mpi_irecv_common(callback->handle, req->srcdst, req->mpi_tag, req->comm, 1, _starpu_mpi_irecv_size_callback, callback);
-	}
-
-}
-
 static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg)
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, callback, arg, RECV_REQ, _starpu_mpi_irecv_size_func, STARPU_W);
+	return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W);
 }
 
 int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
@@ -629,7 +716,17 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 		}
 		else
 		{
-			_starpu_mpi_handle_free_datatype(req->data_handle, &req->datatype);
+			struct _starpu_mpi_copy_handle *chandle = find_chandle(starpu_data_get_tag(req->data_handle));
+			if (chandle && (req->data_handle != chandle->handle))
+			{
+				_STARPU_MPI_DEBUG(3, "Handling deleting of copy_handle structure from the hashmap..\n");
+				delete_chandle(chandle);
+				free(chandle);
+			}
+			else
+			{
+				_starpu_mpi_handle_free_datatype(req->data_handle, &req->datatype);
+			}
 		}
 		starpu_data_release(req->data_handle);
 	}
@@ -647,6 +744,44 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_OUT();
 }
 
+struct _starpu_mpi_copy_cb_args
+{
+	starpu_data_handle_t data_handle;
+	starpu_data_handle_t copy_handle;
+	struct _starpu_mpi_req *req;
+};
+
+static void _starpu_mpi_copy_cb(void* arg)
+{
+	struct _starpu_mpi_copy_cb_args *args = arg;
+
+	struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->copy_handle);
+	void* itf_src = starpu_data_get_interface_on_node(args->copy_handle,0);
+	void* itf_dst = starpu_data_get_interface_on_node(args->data_handle,0);
+
+	if (!itf->copy_methods->ram_to_ram)
+	{
+		_STARPU_MPI_DEBUG(3, "Initiating any_to_any copy..\n");
+		itf->copy_methods->any_to_any(itf_src, 0, itf_dst, 0, NULL);
+	}
+	else
+	{
+		_STARPU_MPI_DEBUG(3, "Initiating ram_to_ram copy..\n");
+		itf->copy_methods->ram_to_ram(itf_src, 0, itf_dst, 0);
+	}
+
+	_STARPU_MPI_DEBUG(3, "Done, handling release of copy_handle..\n");
+	starpu_data_release(args->copy_handle);
+
+	_STARPU_MPI_DEBUG(3, "Done, handling unregister of copy_handle..\n");
+	starpu_data_unregister_submit(args->copy_handle);
+
+	_STARPU_MPI_DEBUG(3, "Done, handling request %p termination of the already received request\n",args->req);
+	_starpu_mpi_handle_request_termination(args->req);
+
+	free(args);
+}
+
 static void _starpu_mpi_submit_new_mpi_request(void *arg)
 {
 	_STARPU_MPI_LOG_IN();
@@ -655,11 +790,76 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 	_STARPU_MPI_INC_POSTED_REQUESTS(-1);
 
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-	_starpu_mpi_req_list_push_front(new_requests, req);
-	newer_requests = 1;
-	_STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
-			  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
-	STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
+
+	if (req->request_type == RECV_REQ)
+	{
+		/* test whether the receive request has already been submitted internally by StarPU-MPI*/
+		struct _starpu_mpi_copy_handle *chandle = find_chandle(req->mpi_tag);
+
+		/* Case : the request has already been submitted internally by StarPU.
+		 * We'll asynchronously ask a Read permission over the temporary handle, so as when
+		 * the internal receive will be over, the _starpu_mpi_copy_cb function will be called to
+		 * bring the data back to the original data handle associated to the request.*/
+		if (chandle && (req->data_handle != chandle->handle))
+		{
+			_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->mpi_tag);
+
+			struct _starpu_mpi_copy_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_copy_cb_args));
+			cb_args->data_handle = req->data_handle;
+			cb_args->copy_handle = chandle->handle;
+			cb_args->req = req;
+
+			_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
+			starpu_data_acquire_cb(chandle->handle,STARPU_R,_starpu_mpi_copy_cb,(void*) cb_args);
+		}
+		else
+		{
+			/* Case : the request is the internal receive request submitted by StarPU-MPI to receive
+			 * incoming data without a matching pending receive already submitted by the application.
+			 * We immediately allocate the pointer associated to the data_handle, and pushing it into
+			 * the list of new_requests, so as the real MPI request can be submitted before the next
+			 * submission of the envelope-catching request. */
+			if (chandle && (req->data_handle == chandle->handle))
+			{
+				_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
+				if (req->user_datatype == 0)
+				{
+					req->count = 1;
+					req->ptr = starpu_handle_get_local_ptr(req->data_handle);
+				}
+				else
+				{
+					req->count = chandle->env->psize;
+					req->ptr = malloc(req->count);
+
+					STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
+				}
+
+				_starpu_mpi_req_list_push_front(new_requests, req);
+
+				_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+			}
+			/* Case : a classic receive request with no send received earlier than expected.
+			 * We just add the pending receive request to the requests' hashmap. */
+			else
+			{
+				add_req(req);
+			}
+
+			newer_requests = 1;
+			STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
+		}
+	}
+	else
+	{
+		_starpu_mpi_req_list_push_front(new_requests, req);
+
+		newer_requests = 1;
+		_STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
+				  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+		STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
+	}
+
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 	_STARPU_MPI_LOG_OUT();
 }
@@ -700,6 +900,7 @@ static void _starpu_mpi_test_detached_requests(void)
 
 		//_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->request, req->mpi_tag, _starpu_mpi_request_type(req->request_type), req->srcdst);
 		req->ret = MPI_Test(&req->request, &flag, &status);
+
 		STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %d", req->ret);
 
 		if (flag)
@@ -830,7 +1031,6 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 #endif //STARPU_USE_FXT
 	}
 
-
 	/* notify the main thread that the progression thread is ready */
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 	running = 1;
@@ -838,10 +1038,17 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
 	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+
+ 	struct _starpu_mpi_envelope *recv_env = calloc(1,sizeof(struct _starpu_mpi_envelope));
+
+ 	MPI_Request header_req;
+ 	int header_req_submitted = 0;
+
 	while (running || posted_requests || !(_starpu_mpi_req_list_empty(new_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
 	{
 		/* shall we block ? */
-		unsigned block = _starpu_mpi_req_list_empty(new_requests);
+		_STARPU_MPI_DEBUG(3, "HASH_COUNT(_starpu_mpi_req_hashmap) = %d\n",HASH_COUNT(_starpu_mpi_req_hashmap));
+		unsigned block = _starpu_mpi_req_list_empty(new_requests) && (HASH_COUNT(_starpu_mpi_req_hashmap) == 0);
 
 #ifndef STARPU_MPI_ACTIVITY
 		block = block && _starpu_mpi_req_list_empty(detached_requests);
@@ -861,11 +1068,6 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 			TRACE_MPI_SLEEP_END();
 		}
 
-		/* test whether there are some terminated "detached request" */
-		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-		_starpu_mpi_test_detached_requests();
-		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-
 		/* get one request */
 		struct _starpu_mpi_req *req;
 		while (!_starpu_mpi_req_list_empty(new_requests))
@@ -880,11 +1082,114 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 			_starpu_mpi_handle_new_request(req);
 			STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 		}
+
+		/* If there is no currently submitted header_req submitted to catch envelopes from senders, and there is some pending receive
+		 * requests in our side, we resubmit a header request. */
+		if ((HASH_COUNT(_starpu_mpi_req_hashmap) > 0) && (header_req_submitted == 0) && (HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0))
+		{
+			MPI_Irecv(recv_env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _starpu_mpi_tag, MPI_COMM_WORLD, &header_req);
+
+			_STARPU_MPI_DEBUG(3, "Submit of header_req OK!\n");
+			header_req_submitted = 1;
+		}
+
+		/* test whether there are some terminated "detached request" */
+		STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+		_starpu_mpi_test_detached_requests();
+		STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+
+		if (header_req_submitted == 1)
+		{
+			int flag,res;
+			MPI_Status status;
+			_STARPU_MPI_DEBUG(3, "Test of header_req\n");
+
+			/* test whether an envelope has arrived. */
+			res = MPI_Test(&header_req, &flag, &status);
+			STARPU_ASSERT(res == MPI_SUCCESS);
+
+			if (flag)
+			{
+				_STARPU_MPI_DEBUG(3, "header_req received !\n");
+
+				_STARPU_MPI_DEBUG(3, "Searching for request with tag %d, size %ld ..\n",recv_env->mpi_tag, recv_env->psize);
+
+				struct _starpu_mpi_req *found_req = find_req(recv_env->mpi_tag);
+
+				/* Case : a data will arrive before the matching receive has been submitted in our side of the application.
+				 * We will allow a temporary handle to store the incoming data, by submitting a starpu_mpi_irecv_detached
+				 * on this handle, and register this so as the StarPU-MPI layer can remember it.*/
+				if (!found_req)
+				{
+					_STARPU_MPI_DEBUG(3, "Request with tag %d not found, creating a copy_handle to receive incoming data..\n",recv_env->mpi_tag);
+
+					starpu_data_handle_t data_handle = NULL;
+
+					while(!(data_handle))
+					{
+						data_handle = starpu_data_get_data_handle_from_tag(recv_env->mpi_tag);
+					}
+					STARPU_ASSERT(data_handle);
+
+					struct _starpu_mpi_copy_handle* chandle = malloc(sizeof(struct _starpu_mpi_copy_handle));
+					STARPU_ASSERT(chandle);
+
+					chandle->mpi_tag = recv_env->mpi_tag;
+					chandle->env = recv_env;
+					starpu_data_register_same(&chandle->handle, data_handle);
+					add_chandle(chandle);
+
+					_STARPU_MPI_DEBUG(3, "Posting internal starpu_irecv_detached on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
+
+					res = starpu_mpi_irecv_detached(chandle->handle,status.MPI_SOURCE,chandle->mpi_tag,MPI_COMM_WORLD,NULL,NULL);
+					STARPU_ASSERT(res == MPI_SUCCESS);
+
+					_STARPU_MPI_DEBUG(3, "Success of starpu_irecv_detached on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
+				}
+				/* Case : a matching receive has been found for the incoming data, we handle the correct allocation of the pointer associated to
+				 * the data handle, then submit the corresponding receive with _starpu_mpi_handle_new_request. */
+				else
+				{
+					_STARPU_MPI_DEBUG(3, "Found !\n");
+
+					delete_req(found_req);
+
+					_starpu_mpi_handle_allocate_datatype(found_req->data_handle, &found_req->datatype, &found_req->user_datatype);
+					if (found_req->user_datatype == 0)
+					{
+						found_req->count = 1;
+						found_req->ptr = starpu_handle_get_local_ptr(found_req->data_handle);
+					}
+					else
+					{
+						found_req->count = recv_env->psize;
+						found_req->ptr = malloc(found_req->count);
+
+						STARPU_ASSERT_MSG(found_req->ptr, "cannot allocate message of size %ld\n", found_req->count);
+					}
+
+					_STARPU_MPI_DEBUG(3, "Handling new request... \n");
+					/* handling a request is likely to block for a while
+					 * (on a sync_data_with_mem call), we want to let the
+					 * application submit requests in the meantime, so we
+					 * release the lock. */
+					STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+					_starpu_mpi_handle_new_request(found_req);
+					STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+				}
+				header_req_submitted = 0;
+			}
+			else
+			{
+				_STARPU_MPI_DEBUG(3, "Nothing received, continue ..\n");
+			}
+		}
 	}
 
 	STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
 	STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(new_requests), "List of new requests not empty");
 	STARPU_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
+	STARPU_ASSERT_MSG(HASH_COUNT(_starpu_mpi_req_hashmap) == 0, "Number of receive requests left is not zero");
 
 	if (argc_argv->initialize_mpi)
 	{
@@ -895,6 +1200,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
 	free(argc_argv);
+	free(recv_env);
+
 	return NULL;
 }
 

+ 12 - 0
mpi/src/starpu_mpi_private.c

@@ -15,11 +15,23 @@
  * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  */
 
+#include <starpu_mpi_private.h>
+
 int _debug_rank=-1;
 int _debug_level=0;
+int _starpu_mpi_tag = 42;
 
 void _starpu_mpi_set_debug_level(int level)
 {
 	_debug_level = level;
 }
 
+int starpu_mpi_get_communication_tag(void)
+{
+	return _starpu_mpi_tag;
+}
+
+void starpu_mpi_set_communication_tag(int tag)
+{
+	_starpu_mpi_tag = tag;
+}

+ 5 - 0
mpi/src/starpu_mpi_private.h

@@ -20,6 +20,7 @@
 
 #include <starpu.h>
 #include <common/config.h>
+#include <common/uthash.h>
 #include "starpu_mpi.h"
 #include "starpu_mpi_fxt.h"
 #include <common/list.h>
@@ -68,6 +69,8 @@ void _starpu_mpi_set_debug_level(int level);
 #  define _STARPU_MPI_LOG_OUT()
 #endif
 
+extern int _starpu_mpi_tag;
+
 enum _starpu_mpi_request_type
 {
 	SEND_REQ=0,
@@ -108,6 +111,8 @@ LIST_TYPE(_starpu_mpi_req,
 	unsigned submitted;
 	unsigned completed;
 
+	UT_hash_handle hh;
+
 	/* In the case of a Wait/Test request, we are going to post a request
 	 * to test the completion of another request */
 	struct _starpu_mpi_req *other_request;

+ 85 - 4
src/datawizard/interfaces/data_interface.c

@@ -38,9 +38,22 @@ static struct handle_entry *registered_handles;
 static struct _starpu_spinlock    registered_handles_lock;
 static int _data_interface_number = STARPU_MAX_INTERFACE_ID;
 
+/* Entry in the `registered_tag_handles' hash table.  */
+struct handle_tag_entry
+{
+	UT_hash_handle hh;
+	int tag;
+	starpu_data_handle_t handle;
+};
+
+/* Hash table mapping host tags to data handles.  */
+static struct handle_tag_entry *registered_tag_handles;
+static struct _starpu_spinlock    registered_tag_handles_lock;
+
 void _starpu_data_interface_init(void)
 {
 	_starpu_spin_init(&registered_handles_lock);
+	_starpu_spin_init(&registered_tag_handles_lock);
 }
 
 void _starpu_data_interface_shutdown()
@@ -56,6 +69,18 @@ void _starpu_data_interface_shutdown()
 	}
 
 	registered_handles = NULL;
+
+	struct handle_tag_entry *tag_entry, *tag_tmp;
+
+	_starpu_spin_destroy(&registered_tag_handles_lock);
+
+	HASH_ITER(hh, registered_tag_handles, tag_entry, tag_tmp)
+	{
+		HASH_DEL(registered_tag_handles, tag_entry);
+		free(tag_entry);
+	}
+
+	registered_tag_handles = NULL;
 }
 
 /* Register the mapping from PTR to HANDLE.  If PTR is already mapped to
@@ -329,8 +354,8 @@ int starpu_data_get_rank(starpu_data_handle_t handle)
 
 int starpu_data_set_rank(starpu_data_handle_t handle, int rank)
 {
-        handle->rank = rank;
-        return 0;
+	handle->rank = rank;
+	return 0;
 }
 
 int starpu_data_get_tag(starpu_data_handle_t handle)
@@ -338,10 +363,64 @@ int starpu_data_get_tag(starpu_data_handle_t handle)
 	return handle->tag;
 }
 
+starpu_data_handle_t starpu_data_get_data_handle_from_tag(int tag)
+{
+	struct handle_tag_entry *ret;
+
+	_starpu_spin_lock(&registered_tag_handles_lock);
+	HASH_FIND_INT(registered_tag_handles, &tag, ret);
+	_starpu_spin_unlock(&registered_tag_handles_lock);
+
+	if (ret)
+	{
+		return ret->handle;
+	}
+	else
+	{
+		return NULL;
+	}
+}
+
 int starpu_data_set_tag(starpu_data_handle_t handle, int tag)
 {
-        handle->tag = tag;
-        return 0;
+	struct handle_tag_entry *entry;
+	entry = (struct handle_tag_entry *) malloc(sizeof(*entry));
+	STARPU_ASSERT(entry != NULL);
+
+	STARPU_ASSERT_MSG(!(starpu_data_get_data_handle_from_tag(tag)),"A data handle with tag %d had already been registered.\n",tag);
+
+	entry->tag = tag;
+	entry->handle = handle;
+
+	_starpu_spin_lock(&registered_tag_handles_lock);
+	HASH_ADD_INT(registered_tag_handles, tag, entry);
+	_starpu_spin_unlock(&registered_tag_handles_lock);
+
+	handle->tag = tag;
+	return 0;
+}
+
+int starpu_data_release_tag(starpu_data_handle_t handle)
+{
+	struct handle_tag_entry *tag_entry;
+
+	if (handle->tag != -1)
+	{
+		_starpu_spin_lock(&registered_tag_handles_lock);
+		HASH_FIND_INT(registered_tag_handles, &handle->tag, tag_entry);
+		STARPU_ASSERT_MSG((tag_entry != NULL),"Handle %p with tag %d isn't in the hashmap !",handle,handle->tag);
+
+		HASH_DEL(registered_tag_handles, tag_entry);
+		free(tag_entry);
+
+		_starpu_spin_unlock(&registered_tag_handles_lock);
+	}
+	return 0;
+}
+
+struct starpu_data_interface_ops* starpu_data_get_interface_ops(starpu_data_handle_t handle)
+{
+	return handle->ops;
 }
 
 /*
@@ -602,6 +681,8 @@ static void _starpu_data_unregister(starpu_data_handle_t handle, unsigned cohere
 	STARPU_PTHREAD_COND_DESTROY(&handle->busy_cond);
 	STARPU_PTHREAD_MUTEX_DESTROY(&handle->sequential_consistency_mutex);
 
+	starpu_data_release_tag(handle);
+
 	free(handle);
 }
 

+ 5 - 0
src/datawizard/user_interactions.c

@@ -472,6 +472,11 @@ void starpu_data_set_sequential_consistency_flag(starpu_data_handle_t handle, un
 	_starpu_spin_unlock(&handle->header_lock);
 }
 
+unsigned starpu_data_get_sequential_consistency_flag(starpu_data_handle_t handle)
+{
+	return handle->sequential_consistency;
+}
+
 /* By default, sequential consistency is enabled */
 static unsigned default_sequential_consistency_flag = 1;