Bladeren bron

Modifications of internal handling of tags in StarPU-MPI.

A unique tag called STARPU_MPI_TAG is now used for all
StarPU-MPI communications.

Any communication now works with a envelope + data system.
The envelope contains information about the tag of the data
which will be sent, so as the receiver can get the matching
request from the pending receive requests' hashmap, then 
submit it.
Marc Sergent 12 jaren geleden
bovenliggende
commit
ff83ffc582

+ 9 - 1
ChangeLog

@@ -66,7 +66,15 @@ New features:
 	  the data is the size returned by the pack operation, i.e
 	  data with dynamic size can now be exchanged with StarPU-MPI.
         - New functionality starpu_mpi_irecv_probe_detached which
-  	  first tests if the message is available before calling MPI_Recv.
+  	  first tests if the message is available before calling MPI_Recv
+	  (now deprecated).
+        - New internal communication system : a unique tag called
+	  STARPU_MPI_TAG is now used for all communications, and a system
+	  of hashmaps on each node which stores pending receives has been
+	  implemented. Every message is now coupled with an envelope, sent
+	  before the corresponding data, which allows the receiver to
+	  allocate data correctly, and to submit the matching receive of
+	  the envelope.
   * Add experimental simgrid support, to simulate execution with various
     number of CPUs, GPUs, amount of memory, etc.
   * Add support for OpenCL simulators (which provide simulated execution time)

+ 23 - 1
doc/chapters/mpi-support.texi

@@ -147,7 +147,20 @@ creation of a StarPU-MPI request, the function
 @code{starpu_data_acquire_cb} is then called to asynchronously request
 StarPU to fetch the data in main memory; when the data is available in
 main memory, a StarPU-MPI function is called to put the new request in
-the list of the ready requests.
+the list of the ready requests if it is a send request, or in an
+hashmap if it is a receive request.
+
+Internally, all MPI communications submitted by StarPU uses a unique
+tag called STARPU_MPI_TAG. The matching of tags with corresponding
+requests is done into StarPU-MPI. To handle this, any communication is 
+a double-communication based on a envelope + data system. Every data 
+which will be sent needs to send an envelope which describes the data 
+(particularly its tag) before sending the data, so the receiver can
+get the matching pending receive request from the hashmap, and submit 
+it to recieve the data correctly.
+
+To this aim, the StarPU-MPI progression thread has a permanent-submitted 
+request destined to receive incoming envelopes from all sources.
 
 The StarPU-MPI progression thread regularly polls this list of ready
 requests. For each new ready request, the appropriate function is
@@ -161,6 +174,15 @@ requests. For each detached request, it regularly tests the completion
 of the MPI request by calling @code{MPI_Test}. On completion, the data
 handle is released, and if a callback was defined, it is called.
 
+Finally, the StarPU-MPI progression thread checks if an envelope has 
+arrived. If it is, it'll check if the corresponding receive has already
+been submitted by the application. If it is, it'll submit the request
+just as like as it does with those on the list of ready requests.
+If it is not, it'll allocate a temporary handle to store the data that
+will arrive just after, so as when the corresponding receive request
+will be submitted by the application, it'll copy this temporary handle
+into its one instead of submitting a new StarPU-MPI request.
+
 @ref{Communication} gives the list of all the point to point
 communications defined in StarPU-MPI.
 

+ 3 - 0
include/starpu_data.h

@@ -117,6 +117,7 @@ enum starpu_node_kind starpu_node_get_kind(unsigned node);
 void starpu_data_set_wt_mask(starpu_data_handle_t handle, uint32_t wt_mask);
 
 void starpu_data_set_sequential_consistency_flag(starpu_data_handle_t handle, unsigned flag);
+unsigned starpu_data_get_sequential_consistency_flag(starpu_data_handle_t handle);
 unsigned starpu_data_get_default_sequential_consistency_flag(void);
 void starpu_data_set_default_sequential_consistency_flag(unsigned flag);
 
@@ -132,6 +133,8 @@ int starpu_data_get_rank(starpu_data_handle_t handle);
 
 int starpu_data_set_tag(starpu_data_handle_t handle, int tag);
 int starpu_data_get_tag(starpu_data_handle_t handle);
+starpu_data_handle_t starpu_get_data_handle_from_tag(int tag);
+struct starpu_data_interface_ops* starpu_handle_get_interface(starpu_data_handle_t handle);
 
 unsigned starpu_data_test_if_allocated_on_node(starpu_data_handle_t handle, unsigned memory_node);
 

+ 387 - 142
mpi/src/starpu_mpi.c

@@ -57,6 +57,124 @@ static int posted_requests = 0, newer_requests, barrier_running = 0;
 
 #define _STARPU_MPI_INC_POSTED_REQUESTS(value) { _STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
 
+struct _starpu_mpi_envelope
+{
+	ssize_t psize;
+	int mpi_tag;
+};
+ 
+struct _starpu_mpi_copy_handle
+{
+	starpu_data_handle_t handle;
+	struct _starpu_mpi_envelope *env;
+	int mpi_tag;
+	UT_hash_handle hh;
+};
+
+ /********************************************************/
+ /*                                                      */
+ /*  Hashmap's requests functionalities                  */
+ /*                                                      */
+ /********************************************************/
+ 
+static struct _starpu_mpi_req *_starpu_mpi_req_hashmap = NULL;
+static struct _starpu_mpi_copy_handle *_starpu_mpi_copy_handle_hashmap = NULL;
+
+static struct _starpu_mpi_req* find_req(int mpi_tag)
+{
+	struct _starpu_mpi_req* req; // = malloc(sizeof(struct _starpu_mpi_req));
+
+	HASH_FIND_INT(_starpu_mpi_req_hashmap, &mpi_tag, req);
+
+	return req;
+}
+
+static void add_req(struct _starpu_mpi_req *req)
+{
+	struct _starpu_mpi_req *test_req;
+
+	test_req = find_req(req->mpi_tag); 
+
+	if (test_req == NULL)
+	{
+		HASH_ADD_INT(_starpu_mpi_req_hashmap, mpi_tag, req);
+		_STARPU_MPI_DEBUG(3, "Adding request %p with tag %d in the hashmap. \n", req, req->mpi_tag);
+	}
+	else
+	{
+		_STARPU_MPI_DEBUG(3, "Error add_req : request %p with tag %d already in the hashmap. \n", req, req->mpi_tag);
+		int seq_const = starpu_data_get_sequential_consistency_flag(req->data_handle);
+		if (seq_const)
+		{
+			STARPU_ASSERT_MSG(!test_req, "Error add_req : request %p with tag %d wanted to be added to the hashmap, while another request %p with the same tag is already in it. \n Sequential consistency is activated : this is not supported by StarPU.", req, req->mpi_tag, test_req);
+		}
+		else
+		{
+			STARPU_ASSERT_MSG(!test_req, "Error add_req : request %p with tag %d wanted to be added to the hashmap, while another request %p with the same tag is already in it. \n Sequential consistency isn't activated for this handle : you should want to add dependencies between requests for which the sequential consistency is deactivated.", req, req->mpi_tag, test_req);
+		}
+	}
+}
+
+static void delete_req(struct _starpu_mpi_req *req)
+{
+	struct _starpu_mpi_req *test_req;
+
+	test_req = find_req(req->mpi_tag); 
+
+	if (test_req != NULL)
+	{
+		HASH_DEL(_starpu_mpi_req_hashmap, req);
+		_STARPU_MPI_DEBUG(3, "Deleting request %p with tag %d from the hashmap. \n", req, req->mpi_tag);
+	}
+	else
+	{
+		_STARPU_MPI_DEBUG(3, "Warning delete_req : request %p with tag %d isn't in the hashmap. \n", req, req->mpi_tag);
+	}
+}
+
+static struct _starpu_mpi_copy_handle* find_chandle(int mpi_tag)
+{
+	struct _starpu_mpi_copy_handle* chandle;
+
+	HASH_FIND_INT(_starpu_mpi_copy_handle_hashmap, &mpi_tag, chandle);
+
+	return chandle;
+}
+
+static void add_chandle(struct _starpu_mpi_copy_handle *chandle)
+{
+	struct _starpu_mpi_copy_handle *test_chandle;
+
+	test_chandle = find_chandle(chandle->mpi_tag); 
+
+	if (test_chandle == NULL)
+	{
+		HASH_ADD_INT(_starpu_mpi_copy_handle_hashmap, mpi_tag, chandle);
+		_STARPU_MPI_DEBUG(3, "Adding copied handle %p with tag %d in the hashmap. \n", chandle, chandle->mpi_tag);
+	}
+	else
+	{
+		_STARPU_MPI_DEBUG(3, "Error add_chandle : copied handle %p with tag %d already in the hashmap. \n", chandle, chandle->mpi_tag);
+		STARPU_ASSERT(test_chandle != NULL);
+	}
+}
+
+static void delete_chandle(struct _starpu_mpi_copy_handle *chandle)
+{
+	struct _starpu_mpi_copy_handle *test_chandle;
+
+	test_chandle = find_chandle(chandle->mpi_tag); 
+
+	if (test_chandle != NULL)
+	{
+		HASH_DEL(_starpu_mpi_copy_handle_hashmap, chandle);
+		_STARPU_MPI_DEBUG(3, "Deleting copied handle %p with tag %d from the hashmap. \n", chandle, chandle->mpi_tag);
+	}
+	else
+	{
+		_STARPU_MPI_DEBUG(3, "Warning delete_chandle : copied handle %p with tag %d isn't in the hashmap. \n", chandle, chandle->mpi_tag);
+	}
+}
 
 /********************************************************/
 /*                                                      */
@@ -118,13 +236,13 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 
 	STARPU_ASSERT_MSG(req->ptr, "Pointer containing data to send is invalid");
 
-	_STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+	_STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p datasize %ld ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, starpu_handle_get_size(req->data_handle), req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 
 	_starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
 
 	TRACE_MPI_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
 
-	req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
+	req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, STARPU_MPI_TAG, req->comm, &req->request);
 	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %d", req->ret);
 
 	TRACE_MPI_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, 0);
@@ -143,43 +261,54 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 {
 	_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
+
+	struct _starpu_mpi_envelope* env = calloc(1,sizeof(struct _starpu_mpi_envelope));
+
+	env->mpi_tag = req->mpi_tag;
+
 	if (req->user_datatype == 0)
 	{
 		req->count = 1;
 		req->ptr = starpu_handle_get_local_ptr(req->data_handle);
+		
+		env->psize = (ssize_t)req->count;
+		
+		int type_size = 0;
+		MPI_Type_size(req->datatype,&type_size);
+		
+		_STARPU_MPI_DEBUG(1, "Post MPI isend count (%ld) datatype_size %d request to %d with tag %d\n",req->count,type_size,req->srcdst,STARPU_MPI_TAG);
+		MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, STARPU_MPI_TAG, req->comm, &req->size_req);
 	}
 	else
 	{
-		ssize_t psize = -1;
 		int ret;
-
-		// Do not pack the data, just try to find out the size
-		starpu_handle_pack_data(req->data_handle, NULL, &psize);
-
-		if (psize != -1)
-		{
-			// We already know the size of the data, let's send it to overlap with the packing of the data
-			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->mpi_tag, req->srcdst);
-			req->count = psize;
-			ret = MPI_Isend(&req->count, sizeof(req->count), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->size_req);
+ 
+ 		// Do not pack the data, just try to find out the size
+		starpu_handle_pack_data(req->data_handle, NULL, &(env->psize));
+ 
+		if (env->psize != -1)
+ 		{
+ 			// We already know the size of the data, let's send it to overlap with the packing of the data
+			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", env->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), STARPU_MPI_TAG, req->srcdst);
+			req->count = env->psize;
+			ret = MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, STARPU_MPI_TAG, req->comm, &req->size_req);
 			STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
-		}
-
-		// Pack the data
-		starpu_handle_pack_data(req->data_handle, &req->ptr, &req->count);
-		if (psize == -1)
-		{
-			// We know the size now, let's send it
-			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", req->count, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->mpi_tag, req->srcdst);
-			ret = MPI_Isend(&req->count, sizeof(req->count), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->size_req);
+ 		}
+ 
+ 		// Pack the data
+ 		starpu_handle_pack_data(req->data_handle, &req->ptr, &req->count);
+		if (env->psize == -1)
+ 		{
+ 			// We know the size now, let's send it
+			_STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", env->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), STARPU_MPI_TAG, req->srcdst);
+			ret = MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, STARPU_MPI_TAG, req->comm, &req->size_req);
 			STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
-		}
-		else
-		{
-			// We check the size returned with the 2 calls to pack is the same
-			STARPU_ASSERT_MSG(req->count == psize, "Calls to pack_data returned different sizes %ld != %ld", req->count, psize);
-		}
-
+ 		}
+ 		else
+ 		{
+ 			// We check the size returned with the 2 calls to pack is the same
+			STARPU_ASSERT_MSG(req->count == env->psize, "Calls to pack_data returned different sizes %ld != %ld", req->count, env->psize);
+ 		}
 		// We can send the data now
 	}
 	_starpu_mpi_isend_data_func(req);
@@ -234,7 +363,7 @@ int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI
 
 /********************************************************/
 /*                                                      */
-/*  Receive functionalities                             */
+/*  receive functionalities                             */
 /*                                                      */
 /********************************************************/
 
@@ -248,7 +377,7 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 
 	TRACE_MPI_IRECV_SUBMIT_BEGIN(req->srcdst, req->mpi_tag);
 
-	req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
+	req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, STARPU_MPI_TAG, req->comm, &req->request);
 	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %d", req->ret);
 
 	TRACE_MPI_IRECV_SUBMIT_END(req->srcdst, req->mpi_tag);
@@ -264,48 +393,9 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_OUT();
 }
 
-struct _starpu_mpi_irecv_size_callback
-{
-	starpu_data_handle_t handle;
-	struct _starpu_mpi_req *req;
-};
-
-static void _starpu_mpi_irecv_size_callback(void *arg)
-{
-	struct _starpu_mpi_irecv_size_callback *callback = (struct _starpu_mpi_irecv_size_callback *)arg;
-
-	starpu_data_unregister(callback->handle);
-	callback->req->ptr = malloc(callback->req->count);
-	STARPU_ASSERT_MSG(callback->req->ptr, "cannot allocate message of size %ld", callback->req->count);
-	_starpu_mpi_irecv_data_func(callback->req);
-	free(callback);
-}
-
-static void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
-{
-	_STARPU_MPI_LOG_IN();
-
-	_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
-	if (req->user_datatype == 0)
-	{
-		req->count = 1;
-		req->ptr = starpu_handle_get_local_ptr(req->data_handle);
-		_starpu_mpi_irecv_data_func(req);
-	}
-	else
-	{
-		struct _starpu_mpi_irecv_size_callback *callback = malloc(sizeof(struct _starpu_mpi_irecv_size_callback));
-		callback->req = req;
-		starpu_variable_data_register(&callback->handle, 0, (uintptr_t)&(callback->req->count), sizeof(callback->req->count));
-		_STARPU_MPI_DEBUG(4, "Receiving size with tag %d from node %d\n", req->mpi_tag, req->srcdst);
-		_starpu_mpi_irecv_common(callback->handle, req->srcdst, req->mpi_tag, req->comm, 1, _starpu_mpi_irecv_size_callback, callback);
-	}
-
-}
-
 static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg)
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, callback, arg, RECV_REQ, _starpu_mpi_irecv_size_func, STARPU_W);
+	return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W);
 }
 
 int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
@@ -343,39 +433,6 @@ int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, M
 	return 0;
 }
 
-static void _starpu_mpi_probe_func(struct _starpu_mpi_req *req)
-{
-	_STARPU_MPI_LOG_IN();
-
-	_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
-#ifdef STARPU_DEVEL
-#warning TODO: release that assert
-#endif
-	assert(req->user_datatype == 0);
-	req->count = 1;
-	req->ptr = starpu_handle_get_local_ptr(req->data_handle);
-
-	_STARPU_MPI_DEBUG(2, "MPI probe request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
-
-	/* somebody is perhaps waiting for the MPI request to be posted */
-	_STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
-	req->submitted = 1;
-	_STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
-	_STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
-
-	_starpu_mpi_handle_detached_request(req);
-
-	_STARPU_MPI_LOG_OUT();
-}
-
-int starpu_mpi_irecv_probe_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
-{
-	_STARPU_MPI_LOG_IN();
-	_starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, PROBE_REQ, _starpu_mpi_probe_func, STARPU_W);
-	_STARPU_MPI_LOG_OUT();
-	return 0;
-}
-
 /********************************************************/
 /*                                                      */
 /*  Wait functionalities                                */
@@ -627,7 +684,6 @@ static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type
 		case WAIT_REQ: return "WAIT_REQ";
 		case TEST_REQ: return "TEST_REQ";
 		case BARRIER_REQ: return "BARRIER_REQ";
-		case PROBE_REQ: return "PROBE_REQ";
 		default: return "unknown request type";
 		}
 }
@@ -641,18 +697,8 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 
 	_STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
 			  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
-	if (req->request_type == PROBE_REQ)
-	{
-#ifdef STARPU_DEVEL
-#warning TODO: instead of calling MPI_Recv, we should post a starpu mpi recv request
-#endif
-		MPI_Status status;
-		memset(&status, 0, sizeof(MPI_Status));
-		req->ret = MPI_Recv(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &status);
-		STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Recv returning %d", req->ret);
-	}
 
-	if (req->request_type == RECV_REQ || req->request_type == SEND_REQ || req->request_type == PROBE_REQ)
+	if (req->request_type == RECV_REQ || req->request_type == SEND_REQ)
 	{
 		if (req->user_datatype == 1)
 		{
@@ -673,7 +719,17 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 		}
 		else
 		{
-			_starpu_mpi_handle_free_datatype(req->data_handle, &req->datatype);
+			struct _starpu_mpi_copy_handle *chandle = find_chandle(starpu_data_get_tag(req->data_handle));
+			if (chandle && (req->data_handle != chandle->handle))
+			{
+				_STARPU_MPI_DEBUG(3, "Handling deleting of copy_handle structure from the hashmap..\n");
+				delete_chandle(chandle);
+				free(chandle);
+			}
+			else
+			{
+				_starpu_mpi_handle_free_datatype(req->data_handle, &req->datatype);
+			}
 		}
 		starpu_data_release(req->data_handle);
 	}
@@ -691,6 +747,44 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_OUT();
 }
 
+struct _starpu_mpi_copy_cb_args
+{
+	starpu_data_handle_t data_handle;
+	starpu_data_handle_t copy_handle;
+	struct _starpu_mpi_req *req;
+};
+
+static void _starpu_mpi_copy_cb(void* arg)
+{
+	struct _starpu_mpi_copy_cb_args *args = arg;
+
+	struct starpu_data_interface_ops *itf = starpu_handle_get_interface(args->copy_handle);
+	void* itf_src = starpu_data_get_interface_on_node(args->copy_handle,0);
+	void* itf_dst = starpu_data_get_interface_on_node(args->data_handle,0);
+
+	if (!itf->copy_methods->ram_to_ram)
+	{
+		_STARPU_MPI_DEBUG(3, "Initiating any_to_any copy..\n");
+		itf->copy_methods->any_to_any(itf_src, 0, itf_dst, 0, NULL);
+	}
+	else
+	{
+		_STARPU_MPI_DEBUG(3, "Initiating ram_to_ram copy..\n");
+		itf->copy_methods->ram_to_ram(itf_src, 0, itf_dst, 0);
+	}
+
+	_STARPU_MPI_DEBUG(3, "Done, handling release of copy_handle..\n");
+	starpu_data_release(args->copy_handle);
+
+	_STARPU_MPI_DEBUG(3, "Done, handling unregister of copy_handle..\n");
+	starpu_data_unregister_submit(args->copy_handle);
+
+	_STARPU_MPI_DEBUG(3, "Done, handling request %p termination of the already received request\n",args->req);
+	_starpu_mpi_handle_request_termination(args->req);
+
+	free(args);
+}
+
 static void _starpu_mpi_submit_new_mpi_request(void *arg)
 {
 	_STARPU_MPI_LOG_IN();
@@ -699,13 +793,73 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 	_STARPU_MPI_INC_POSTED_REQUESTS(-1);
 
 	_STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-	_starpu_mpi_req_list_push_front(new_requests, req);
-	newer_requests = 1;
-	_STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
-			  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
-	_STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
-	_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-	_STARPU_MPI_LOG_OUT();
+
+	if (req->request_type == RECV_REQ)
+	{
+		struct _starpu_mpi_copy_handle *chandle = find_chandle(req->mpi_tag);
+		if (chandle && (req->data_handle != chandle->handle))
+		{
+			_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->mpi_tag);
+
+			struct _starpu_mpi_copy_cb_args *arg = malloc(sizeof(struct _starpu_mpi_copy_cb_args));
+			arg->data_handle = req->data_handle;
+			arg->copy_handle = chandle->handle;
+			arg->req = req;
+
+			_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
+			starpu_data_acquire_cb(chandle->handle,STARPU_R,_starpu_mpi_copy_cb,(void*) arg);
+
+			_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+			_STARPU_MPI_LOG_OUT();
+		}
+		else if (chandle && (req->data_handle == chandle->handle))
+		{
+			_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
+			if (req->user_datatype == 0)
+			{
+				req->count = 1;
+				req->ptr = starpu_handle_get_local_ptr(req->data_handle);
+			}
+			else
+			{
+				req->count = chandle->env->psize;
+				req->ptr = malloc(req->count);
+
+				STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
+			}
+
+			_starpu_mpi_req_list_push_front(new_requests, req);
+
+			newer_requests = 1;
+			_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
+					  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+			_STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
+			_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+			_STARPU_MPI_LOG_OUT();
+		}
+		else
+		{
+			add_req(req);
+
+			newer_requests = 1;
+			_STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
+					  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+			_STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
+			_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+			_STARPU_MPI_LOG_OUT();
+		}
+	}
+	else
+	{
+		_starpu_mpi_req_list_push_front(new_requests, req);
+
+		newer_requests = 1;
+		_STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
+				  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+		_STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+		_STARPU_MPI_LOG_OUT();
+	}
 }
 
 #ifdef STARPU_MPI_ACTIVITY
@@ -743,20 +897,13 @@ static void _starpu_mpi_test_detached_requests(void)
 		_STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
 
 		//_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->request, req->mpi_tag, _starpu_mpi_request_type(req->request_type), req->srcdst);
-		if (req->request_type == PROBE_REQ)
-		{
-			req->ret = MPI_Iprobe(req->srcdst, req->mpi_tag, req->comm, &flag, &status);
-		}
-		else
-		{
-			req->ret = MPI_Test(&req->request, &flag, &status);
-		}
+		req->ret = MPI_Test(&req->request, &flag, &status);
 
-		STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Iprobe or MPI_Test returning %d", req->ret);
+		STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %d", req->ret);
 
 		if (flag)
 		{
-			if (req->request_type == RECV_REQ || req->request_type == PROBE_REQ)
+			if (req->request_type == RECV_REQ)
 			{
 				TRACE_MPI_IRECV_COMPLETE_BEGIN(req->srcdst, req->mpi_tag);
 			}
@@ -767,7 +914,7 @@ static void _starpu_mpi_test_detached_requests(void)
 
 			_starpu_mpi_handle_request_termination(req);
 
-			if (req->request_type == RECV_REQ || req->request_type == PROBE_REQ)
+			if (req->request_type == RECV_REQ)
 			{
 				TRACE_MPI_IRECV_COMPLETE_END(req->srcdst, req->mpi_tag);
 			}
@@ -882,7 +1029,6 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 #endif //STARPU_USE_FXT
 	}
 
-
 	/* notify the main thread that the progression thread is ready */
 	_STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 	running = 1;
@@ -890,10 +1036,17 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
 	_STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+ 
+ 	struct _starpu_mpi_envelope *recv_env = calloc(1,sizeof(struct _starpu_mpi_envelope));
+ 
+ 	MPI_Request header_req;
+ 	int header_req_submitted = 0;
+ 
 	while (running || posted_requests || !(_starpu_mpi_req_list_empty(new_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
 	{
 		/* shall we block ? */
-		unsigned block = _starpu_mpi_req_list_empty(new_requests);
+		_STARPU_MPI_DEBUG(3, "HASH_COUNT(_starpu_mpi_req_hashmap) = %d\n",HASH_COUNT(_starpu_mpi_req_hashmap));
+		unsigned block = _starpu_mpi_req_list_empty(new_requests);// && (HASH_COUNT(_starpu_mpi_req_hashmap) == 0);
 
 #ifndef STARPU_MPI_ACTIVITY
 		block = block && _starpu_mpi_req_list_empty(detached_requests);
@@ -913,11 +1066,6 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 			TRACE_MPI_SLEEP_END();
 		}
 
-		/* test whether there are some terminated "detached request" */
-		_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-		_starpu_mpi_test_detached_requests();
-		_STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-
 		/* get one request */
 		struct _starpu_mpi_req *req;
 		while (!_starpu_mpi_req_list_empty(new_requests))
@@ -932,11 +1080,106 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 			_starpu_mpi_handle_new_request(req);
 			_STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 		}
+
+		if ((HASH_COUNT(_starpu_mpi_req_hashmap) > 0) && (header_req_submitted == 0) && (HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0))
+		{
+			MPI_Irecv(recv_env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, STARPU_MPI_TAG, MPI_COMM_WORLD, &header_req);
+
+			_STARPU_MPI_DEBUG(3, "Submit of header_req OK!\n");
+			header_req_submitted = 1;
+		}
+
+		/* test whether there are some terminated "detached request" */
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+		_starpu_mpi_test_detached_requests();
+		_STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+
+		if (header_req_submitted == 1)
+		{
+			int flag,res;
+			MPI_Status status;
+			_STARPU_MPI_DEBUG(3, "Test of header_req\n");
+
+			res = MPI_Test(&header_req, &flag, &status);
+			STARPU_ASSERT(res == MPI_SUCCESS);
+
+			if (flag)
+			{
+				_STARPU_MPI_DEBUG(3, "Request received !\n");
+
+				_STARPU_MPI_DEBUG(3, "Searching for request with tag %d, size %ld ..\n",recv_env->mpi_tag, recv_env->psize);
+
+				struct _starpu_mpi_req *found_req = find_req(recv_env->mpi_tag);
+
+				if (!found_req)
+				{
+					_STARPU_MPI_DEBUG(3, "Request with tag %d not found, creating a copy_handle to receive incoming data..\n",recv_env->mpi_tag);
+
+					starpu_data_handle_t data_handle = NULL;
+
+					while(!(data_handle))
+					{
+						data_handle = starpu_get_data_handle_from_tag(recv_env->mpi_tag);
+					}
+					STARPU_ASSERT(data_handle);
+
+					struct _starpu_mpi_copy_handle* chandle = malloc(sizeof(struct _starpu_mpi_copy_handle));
+					STARPU_ASSERT(chandle);
+
+					chandle->mpi_tag = recv_env->mpi_tag;
+					chandle->env = recv_env;
+					starpu_data_register_same(&chandle->handle, data_handle);
+					add_chandle(chandle);
+
+					_STARPU_MPI_DEBUG(3, "Posting internal starpu_irecv_detached on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE); 
+
+					res = starpu_mpi_irecv_detached(chandle->handle,status.MPI_SOURCE,chandle->mpi_tag,MPI_COMM_WORLD,NULL,NULL);
+					STARPU_ASSERT(res == MPI_SUCCESS);
+
+					_STARPU_MPI_DEBUG(3, "Success of starpu_irecv_detached on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE); 
+				}
+				else
+				{
+					_STARPU_MPI_DEBUG(3, "Found !\n");
+
+					delete_req(found_req);
+
+					_starpu_mpi_handle_allocate_datatype(found_req->data_handle, &found_req->datatype, &found_req->user_datatype);
+					if (found_req->user_datatype == 0)
+					{
+						found_req->count = 1;
+						found_req->ptr = starpu_handle_get_local_ptr(found_req->data_handle);
+					}
+					else
+					{
+						found_req->count = recv_env->psize;
+						found_req->ptr = malloc(found_req->count);
+
+						STARPU_ASSERT_MSG(found_req->ptr, "cannot allocate message of size %ld\n", found_req->count);
+					}
+
+					_STARPU_MPI_DEBUG(3, "Handling new request... \n");
+					/* handling a request is likely to block for a while
+					 * (on a sync_data_with_mem call), we want to let the
+					 * application submit requests in the meantime, so we
+					 * release the lock. */
+					_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+					_starpu_mpi_handle_new_request(found_req);
+					_STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+				}
+				header_req_submitted = 0;
+			}
+			else
+			{
+				_STARPU_MPI_DEBUG(3, "Nothing received, continue ..\n");
+			}
+		}
 	}
 
 	STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
 	STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(new_requests), "List of new requests not empty");
 	STARPU_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
+	STARPU_ASSERT_MSG(HASH_COUNT(_starpu_mpi_req_hashmap) == 0, "Number of receive requests left is not zero");
 
 	if (argc_argv->initialize_mpi)
 	{
@@ -947,6 +1190,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 
 	free(argc_argv);
+	free(recv_env);
+
 	return NULL;
 }
 

+ 1 - 1
mpi/src/starpu_mpi_private.c

@@ -16,7 +16,7 @@
  */
 
 int _debug_rank=-1;
-int _debug_level=0;
+int _debug_level=3;
 
 void _starpu_mpi_set_debug_level(int level)
 {

+ 5 - 0
mpi/src/starpu_mpi_private.h

@@ -20,6 +20,7 @@
 
 #include <starpu.h>
 #include <common/config.h>
+#include <common/uthash.h>
 #include "starpu_mpi.h"
 #include "starpu_mpi_fxt.h"
 #include <common/list.h>
@@ -68,6 +69,8 @@ void _starpu_mpi_set_debug_level(int level);
 #  define _STARPU_MPI_LOG_OUT()
 #endif
 
+#define STARPU_MPI_TAG 42
+
 enum _starpu_mpi_request_type
 {
 	SEND_REQ=0,
@@ -108,6 +111,8 @@ LIST_TYPE(_starpu_mpi_req,
 	unsigned submitted;
 	unsigned completed;
 
+	UT_hash_handle hh;
+
 	/* In the case of a Wait/Test request, we are going to post a request
 	 * to test the completion of another request */
 	struct _starpu_mpi_req *other_request;

+ 85 - 4
src/datawizard/interfaces/data_interface.c

@@ -38,9 +38,22 @@ static struct handle_entry *registered_handles;
 static struct _starpu_spinlock    registered_handles_lock;
 static int _data_interface_number = STARPU_MAX_INTERFACE_ID;
 
+/* Entry in the `registered_tag_handles' hash table.  */
+struct handle_tag_entry
+{
+	UT_hash_handle hh;
+	int tag;
+	starpu_data_handle_t handle;
+};
+
+/* Hash table mapping host tags to data handles.  */
+static struct handle_tag_entry *registered_tag_handles;
+static struct _starpu_spinlock    registered_tag_handles_lock;
+
 void _starpu_data_interface_init(void)
 {
 	_starpu_spin_init(&registered_handles_lock);
+	_starpu_spin_init(&registered_tag_handles_lock);
 }
 
 void _starpu_data_interface_shutdown()
@@ -56,6 +69,18 @@ void _starpu_data_interface_shutdown()
 	}
 
 	registered_handles = NULL;
+
+	struct handle_tag_entry *tag_entry, *tag_tmp;
+
+	_starpu_spin_destroy(&registered_tag_handles_lock);
+
+	HASH_ITER(hh, registered_tag_handles, tag_entry, tag_tmp)
+	{
+		HASH_DEL(registered_tag_handles, tag_entry);
+		free(tag_entry);
+	}
+
+	registered_tag_handles = NULL;
 }
 
 /* Register the mapping from PTR to HANDLE.  If PTR is already mapped to
@@ -329,8 +354,8 @@ int starpu_data_get_rank(starpu_data_handle_t handle)
 
 int starpu_data_set_rank(starpu_data_handle_t handle, int rank)
 {
-        handle->rank = rank;
-        return 0;
+	handle->rank = rank;
+	return 0;
 }
 
 int starpu_data_get_tag(starpu_data_handle_t handle)
@@ -338,10 +363,64 @@ int starpu_data_get_tag(starpu_data_handle_t handle)
 	return handle->tag;
 }
 
+starpu_data_handle_t starpu_get_data_handle_from_tag(int tag)
+{
+	struct handle_tag_entry *ret;
+
+	_starpu_spin_lock(&registered_tag_handles_lock);
+	HASH_FIND_INT(registered_tag_handles, &tag, ret);
+	_starpu_spin_unlock(&registered_tag_handles_lock);
+
+	if (ret)
+	{
+		return ret->handle;
+	}
+	else
+	{
+		return NULL;
+	}
+}
+
 int starpu_data_set_tag(starpu_data_handle_t handle, int tag)
 {
-        handle->tag = tag;
-        return 0;
+	struct handle_tag_entry *entry;
+	entry = (struct handle_tag_entry *) malloc(sizeof(*entry));
+	STARPU_ASSERT(entry != NULL);
+	
+	STARPU_ASSERT_MSG(!(starpu_get_data_handle_from_tag(tag)),"A data handle with tag %d had already been registered.\n",tag);
+
+	entry->tag = tag;
+	entry->handle = handle;
+
+	_starpu_spin_lock(&registered_tag_handles_lock);
+	HASH_ADD_INT(registered_tag_handles, tag, entry);
+	_starpu_spin_unlock(&registered_tag_handles_lock);
+
+	handle->tag = tag;
+	return 0;
+}
+
+int starpu_data_release_tag(starpu_data_handle_t handle)
+{
+	struct handle_tag_entry *tag_entry;
+
+	if (handle->tag != -1)
+	{
+		_starpu_spin_lock(&registered_tag_handles_lock);
+		HASH_FIND_INT(registered_tag_handles, &handle->tag, tag_entry);
+		STARPU_ASSERT_MSG((tag_entry != NULL),"Handle %p with tag %d isn't in the hashmap !",handle,handle->tag);
+
+		HASH_DEL(registered_tag_handles, tag_entry);
+		free(tag_entry);
+
+		_starpu_spin_unlock(&registered_tag_handles_lock);
+	}
+	return 0;
+}
+
+struct starpu_data_interface_ops* starpu_handle_get_interface(starpu_data_handle_t handle)
+{
+	return handle->ops;
 }
 
 /*
@@ -602,6 +681,8 @@ static void _starpu_data_unregister(starpu_data_handle_t handle, unsigned cohere
 	_STARPU_PTHREAD_COND_DESTROY(&handle->busy_cond);
 	_STARPU_PTHREAD_MUTEX_DESTROY(&handle->sequential_consistency_mutex);
 
+	starpu_data_release_tag(handle);
+
 	free(handle);
 }
 

+ 5 - 0
src/datawizard/user_interactions.c

@@ -472,6 +472,11 @@ void starpu_data_set_sequential_consistency_flag(starpu_data_handle_t handle, un
 	_starpu_spin_unlock(&handle->header_lock);
 }
 
+unsigned starpu_data_get_sequential_consistency_flag(starpu_data_handle_t handle)
+{
+	return handle->sequential_consistency;
+}
+
 /* By default, sequential consistency is enabled */
 static unsigned default_sequential_consistency_flag = 1;