浏览代码

mpi: rename mpi_tag into data_tag

Nathalie Furmento 10 年之前
父节点
当前提交
53f497f377

+ 180 - 179
mpi/src/starpu_mpi.c

@@ -36,15 +36,16 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req);
 static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type);
 #endif
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
-							int dest, int mpi_tag, MPI_Comm comm,
+							int dest, int data_tag, MPI_Comm comm,
 							unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
 							int sequential_consistency);
 static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle,
-							int source, int mpi_tag, MPI_Comm comm,
+							int source, int data_tag, MPI_Comm comm,
 							unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
 							int sequential_consistency, int is_internal_req,
 							starpu_ssize_t count);
 static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
+static void _starpu_mpi_early_data_cb(void* arg);
 
 /* The list of ready requests */
 static struct _starpu_mpi_req_list *ready_requests;
@@ -81,7 +82,7 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 	(*req)->user_datatype = -1;
 
 	(*req)->srcdst = -1;
-	(*req)->mpi_tag = -1;
+	(*req)->data_tag = -1;
 	(*req)->comm = 0;
 
 	(*req)->func = NULL;
@@ -122,14 +123,122 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
  /*                                                      */
  /********************************************************/
 
- static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
-							       int srcdst, int mpi_tag, MPI_Comm comm,
-							       unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
-							       enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
-							       enum starpu_data_access_mode mode,
-							       int sequential_consistency,
-							       int is_internal_req,
-							       starpu_ssize_t count)
+struct _starpu_mpi_early_data_cb_args
+{
+	starpu_data_handle_t data_handle;
+	starpu_data_handle_t early_handle;
+	struct _starpu_mpi_req *req;
+	void *buffer;
+};
+
+static void _starpu_mpi_submit_ready_request(void *arg)
+{
+	_STARPU_MPI_LOG_IN();
+	struct _starpu_mpi_req *req = arg;
+
+	_STARPU_MPI_INC_POSTED_REQUESTS(-1);
+
+	_STARPU_MPI_DEBUG(3, "new req %p srcdst %d tag %d and type %s\n", req, req->srcdst, req->data_tag, _starpu_mpi_request_type(req->request_type));
+
+	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+
+	if (req->request_type == RECV_REQ)
+	{
+		/* Case : the request is the internal receive request submitted
+		 * by StarPU-MPI to receive incoming data without a matching
+		 * early_request from the application. We immediately allocate the
+		 * pointer associated to the data_handle, and push it into the
+		 * ready_requests list, so as the real MPI request can be submitted
+		 * before the next submission of the envelope-catching request. */
+		if (req->is_internal_req)
+		{
+			_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
+			if (req->user_datatype == 0)
+			{
+				req->count = 1;
+				req->ptr = starpu_data_get_local_ptr(req->data_handle);
+			}
+			else
+			{
+				STARPU_ASSERT(req->count);
+				req->ptr = malloc(req->count);
+				STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
+			}
+
+			_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
+					  req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, req->ptr,
+					  _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+			_starpu_mpi_req_list_push_front(ready_requests, req);
+
+			/* inform the starpu mpi thread that the request has been pushed in the ready_requests list */
+			STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+			STARPU_PTHREAD_MUTEX_LOCK(&req->posted_mutex);
+			req->posted = 1;
+			STARPU_PTHREAD_COND_BROADCAST(&req->posted_cond);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&req->posted_mutex);
+			STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+		}
+		else
+		{
+			/* test whether the receive request has already been submitted internally by StarPU-MPI*/
+			struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(req->data_tag, req->srcdst);
+
+			/* Case: a receive request for a data with the given tag and source has already been
+			 * posted by StarPU. Asynchronously requests a Read permission over the temporary handle ,
+			 * so as when the internal receive is completed, the _starpu_mpi_early_data_cb function
+			 * will be called to bring the data back to the original data handle associated to the request.*/
+			if (early_data_handle)
+			{
+				STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+				STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req_mutex));
+				while (!(early_data_handle->req_ready))
+					STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req_cond), &(early_data_handle->req_mutex));
+				STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req_mutex));
+				STARPU_PTHREAD_MUTEX_LOCK(&mutex);
+
+				_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->data_tag);
+				STARPU_ASSERT(req->data_handle != early_data_handle->handle);
+
+				req->internal_req = early_data_handle->req;
+
+				struct _starpu_mpi_early_data_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_early_data_cb_args));
+				cb_args->data_handle = req->data_handle;
+				cb_args->early_handle = early_data_handle->handle;
+				cb_args->buffer = early_data_handle->buffer;
+				cb_args->req = req;
+
+				_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
+				starpu_data_acquire_cb(early_data_handle->handle,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args);
+			}
+			/* Case: no matching data has been received. Store the receive request as an early_request. */
+			else
+			{
+				_STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %d) into the request hashmap\n", req, req->srcdst, req->data_tag);
+				_starpu_mpi_early_request_add(req);
+			}
+		}
+	}
+	else
+	{
+		_starpu_mpi_req_list_push_front(ready_requests, req);
+		_STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
+				  req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+	}
+
+	newer_requests = 1;
+	STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
+	_STARPU_MPI_LOG_OUT();
+}
+
+static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
+							      int srcdst, int data_tag, MPI_Comm comm,
+							      unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
+							      enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
+							      enum starpu_data_access_mode mode,
+							      int sequential_consistency,
+							      int is_internal_req,
+							      starpu_ssize_t count)
 {
 	struct _starpu_mpi_req *req;
 
@@ -141,7 +250,7 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 	req->request_type = request_type;
 	req->data_handle = data_handle;
 	req->srcdst = srcdst;
-	req->mpi_tag = mpi_tag;
+	req->data_tag = data_tag;
 	req->comm = comm;
 	req->detached = detached;
 	req->sync = sync;
@@ -171,11 +280,11 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
  {
 	 _STARPU_MPI_LOG_IN();
 
-	 _STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p datasize %ld ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+	 _STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p datasize %ld ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 
 	 _starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
 
-	 _STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
+	 _STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(req->srcdst, req->data_tag, 0);
 
 	 if (req->sync == 0)
 	 {
@@ -188,7 +297,7 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 		 STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Issend returning %d", req->ret);
 	 }
 
-	 _STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, 0);
+	 _STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->srcdst, req->data_tag, 0);
 
 	 /* somebody is perhaps waiting for the MPI request to be posted */
 	 STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
@@ -206,7 +315,7 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 	_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
 
 	req->envelope = calloc(1,sizeof(struct _starpu_mpi_envelope));
-	req->envelope->mpi_tag = req->mpi_tag;
+	req->envelope->data_tag = req->data_tag;
 
 	if (req->user_datatype == 0)
 	{
@@ -255,22 +364,22 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 }
 
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
-							int dest, int mpi_tag, MPI_Comm comm,
+							int dest, int data_tag, MPI_Comm comm,
 							unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
 							int sequential_consistency)
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, dest, mpi_tag, comm, detached, sync, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R, sequential_consistency, 0, 0);
+	return _starpu_mpi_isend_irecv_common(data_handle, dest, data_tag, comm, detached, sync, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R, sequential_consistency, 0, 0);
 }
 
-int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
+int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, MPI_Comm comm)
 {
 	_STARPU_MPI_LOG_IN();
 	STARPU_ASSERT_MSG(public_req, "starpu_mpi_isend needs a valid starpu_mpi_req");
 
 	struct _starpu_mpi_req *req;
-	_STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(dest, mpi_tag, 0);
-	req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, 0, NULL, NULL, 1);
-	_STARPU_MPI_TRACE_ISEND_COMPLETE_END(dest, mpi_tag, 0);
+	_STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(dest, data_tag, 0);
+	req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 0, 0, NULL, NULL, 1);
+	_STARPU_MPI_TRACE_ISEND_COMPLETE_END(dest, data_tag, 0);
 
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
 	*public_req = req;
@@ -280,15 +389,15 @@ int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
 }
 
 int starpu_mpi_isend_detached(starpu_data_handle_t data_handle,
-			      int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
+			      int dest, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 {
 	_STARPU_MPI_LOG_IN();
-	_starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, 0, callback, arg, 1);
+	_starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 1, 0, callback, arg, 1);
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 }
 
-int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm)
+int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm)
 {
 	starpu_mpi_req req;
 	MPI_Status status;
@@ -296,20 +405,20 @@ int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI
 	_STARPU_MPI_LOG_IN();
 	memset(&status, 0, sizeof(MPI_Status));
 
-	starpu_mpi_isend(data_handle, &req, dest, mpi_tag, comm);
+	starpu_mpi_isend(data_handle, &req, dest, data_tag, comm);
 	starpu_mpi_wait(&req, &status);
 
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 }
 
-int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
+int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, MPI_Comm comm)
 {
 	_STARPU_MPI_LOG_IN();
 	STARPU_ASSERT_MSG(public_req, "starpu_mpi_issend needs a valid starpu_mpi_req");
 
 	struct _starpu_mpi_req *req;
-	req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, 1, NULL, NULL, 1);
+	req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 0, 1, NULL, NULL, 1);
 
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
 	*public_req = req;
@@ -318,11 +427,11 @@ int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *public_r
 	return 0;
 }
 
-int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
+int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 {
 	_STARPU_MPI_LOG_IN();
 
-	_starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, 1, callback, arg, 1);
+	_starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 1, 1, callback, arg, 1);
 
 	_STARPU_MPI_LOG_OUT();
 	return 0;
@@ -338,14 +447,14 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 {
 	_STARPU_MPI_LOG_IN();
 
-	_STARPU_MPI_DEBUG(20, "post MPI irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+	_STARPU_MPI_DEBUG(20, "post MPI irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 
-	_STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(req->srcdst, req->mpi_tag);
+	_STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(req->srcdst, req->data_tag);
 
 	req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
 	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %d", req->ret);
 
-	_STARPU_MPI_TRACE_IRECV_SUBMIT_END(req->srcdst, req->mpi_tag);
+	_STARPU_MPI_TRACE_IRECV_SUBMIT_END(req->srcdst, req->data_tag);
 
 	/* somebody is perhaps waiting for the MPI request to be posted */
 	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
@@ -358,12 +467,12 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_OUT();
 }
 
-static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count)
+static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count)
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, sync, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency, is_internal_req, count);
+	return _starpu_mpi_isend_irecv_common(data_handle, source, data_tag, comm, detached, sync, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency, is_internal_req, count);
 }
 
-int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
+int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int data_tag, MPI_Comm comm)
 {
 	_STARPU_MPI_LOG_IN();
 	STARPU_ASSERT_MSG(public_req, "starpu_mpi_irecv needs a valid starpu_mpi_req");
@@ -373,12 +482,12 @@ int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
 //	// A tag is necessary for the internal mpi engine.
 //	int tag = starpu_data_get_tag(data_handle);
 //	if (tag == -1)
-//		starpu_data_set_tag(data_handle, mpi_tag);
+//		starpu_data_set_tag(data_handle, data_tag);
 
 	struct _starpu_mpi_req *req;
-	_STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(source, mpi_tag);
-	req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, 0, NULL, NULL, 1, 0, 0);
-	_STARPU_MPI_TRACE_IRECV_COMPLETE_END(source, mpi_tag);
+	_STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(source, data_tag);
+	req = _starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 0, 0, NULL, NULL, 1, 0, 0);
+	_STARPU_MPI_TRACE_IRECV_COMPLETE_END(source, data_tag);
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_irecv_common");
 	*public_req = req;
 
@@ -386,7 +495,7 @@ int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
 	return 0;
 }
 
-int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
+int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 {
 	_STARPU_MPI_LOG_IN();
 
@@ -395,14 +504,14 @@ int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int
 //	// A tag is necessary for the internal mpi engine.
 //	int tag = starpu_data_get_tag(data_handle);
 //	if (tag == -1)
-//		starpu_data_set_tag(data_handle, mpi_tag);
+//		starpu_data_set_tag(data_handle, data_tag);
 
-	_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, 0, callback, arg, 1, 0, 0);
+	_starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 1, 0, callback, arg, 1, 0, 0);
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 }
 
-int starpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg, int sequential_consistency)
+int starpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_handle, int source, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg, int sequential_consistency)
 {
 	_STARPU_MPI_LOG_IN();
 
@@ -411,15 +520,15 @@ int starpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_h
 //	// A tag is necessary for the internal mpi engine.
 //	int tag = starpu_data_get_tag(data_handle);
 //	if (tag == -1)
-//		starpu_data_set_tag(data_handle, mpi_tag);
+//		starpu_data_set_tag(data_handle, data_tag);
 
-	_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, 0, callback, arg, sequential_consistency, 0, 0);
+	_starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 1, 0, callback, arg, sequential_consistency, 0, 0);
 
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 }
 
-int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, MPI_Status *status)
+int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int data_tag, MPI_Comm comm, MPI_Status *status)
 {
 	starpu_mpi_req req;
 	_STARPU_MPI_LOG_IN();
@@ -429,9 +538,9 @@ int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, M
 //	// A tag is necessary for the internal mpi engine.
 //	int tag = starpu_data_get_tag(data_handle);
 //	if (tag == -1)
-//		starpu_data_set_tag(data_handle, mpi_tag);
+//		starpu_data_set_tag(data_handle, data_tag);
 
-	starpu_mpi_irecv(data_handle, &req, source, mpi_tag, comm);
+	starpu_mpi_irecv(data_handle, &req, source, data_tag, comm);
 	starpu_mpi_wait(&req, status);
 
 	_STARPU_MPI_LOG_OUT();
@@ -450,12 +559,12 @@ static void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
 	/* Which is the mpi request we are waiting for ? */
 	struct _starpu_mpi_req *req = waiting_req->other_request;
 
-	_STARPU_MPI_TRACE_UWAIT_BEGIN(req->srcdst, req->mpi_tag);
+	_STARPU_MPI_TRACE_UWAIT_BEGIN(req->srcdst, req->data_tag);
 
 	req->ret = MPI_Wait(&req->request, waiting_req->status);
 	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Wait returning %d", req->ret);
 
-	_STARPU_MPI_TRACE_UWAIT_END(req->srcdst, req->mpi_tag);
+	_STARPU_MPI_TRACE_UWAIT_END(req->srcdst, req->data_tag);
 
 	_starpu_mpi_handle_request_termination(req);
 	_STARPU_MPI_LOG_OUT();
@@ -516,14 +625,14 @@ static void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
 	struct _starpu_mpi_req *req = testing_req->other_request;
 
 	_STARPU_MPI_DEBUG(2, "Test request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
-			  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+			  req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 
-	_STARPU_MPI_TRACE_UTESTING_BEGIN(req->srcdst, req->mpi_tag);
+	_STARPU_MPI_TRACE_UTESTING_BEGIN(req->srcdst, req->data_tag);
 
 	req->ret = MPI_Test(&req->request, testing_req->flag, testing_req->status);
 	STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %d", req->ret);
 
-	_STARPU_MPI_TRACE_UTESTING_END(req->srcdst, req->mpi_tag);
+	_STARPU_MPI_TRACE_UTESTING_END(req->srcdst, req->data_tag);
 
 	if (*testing_req->flag)
 	{
@@ -700,13 +809,13 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_IN();
 
 	_STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d internal_req %p\n",
-			  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr,
+			  req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, req->ptr,
 			  _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype, req->internal_req);
 
 	if (req->internal_req)
 	{
-		struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(req->mpi_tag, req->srcdst);
-		STARPU_ASSERT_MSG(early_data_handle, "Could not find a copy data handle with the tag %d and the node %d\n", req->mpi_tag, req->srcdst);
+		struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(req->data_tag, req->srcdst);
+		STARPU_ASSERT_MSG(early_data_handle, "Could not find a copy data handle with the tag %d and the node %d\n", req->data_tag, req->srcdst);
 		_STARPU_MPI_DEBUG(3, "Handling deleting of early_data structure from the hashmap..\n");
 		_starpu_mpi_early_data_delete(early_data_handle);
 		free(early_data_handle);
@@ -761,14 +870,6 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_OUT();
 }
 
-struct _starpu_mpi_early_data_cb_args
-{
-	starpu_data_handle_t data_handle;
-	starpu_data_handle_t early_handle;
-	struct _starpu_mpi_req *req;
-	void *buffer;
-};
-
 static void _starpu_mpi_early_data_cb(void* arg)
 {
 	struct _starpu_mpi_early_data_cb_args *args = arg;
@@ -822,106 +923,6 @@ static void _starpu_mpi_early_data_cb(void* arg)
 	free(args);
 }
 
-static void _starpu_mpi_submit_ready_request(void *arg)
-{
-	_STARPU_MPI_LOG_IN();
-	struct _starpu_mpi_req *req = arg;
-
-	_STARPU_MPI_INC_POSTED_REQUESTS(-1);
-
-	_STARPU_MPI_DEBUG(3, "new req %p srcdst %d tag %d and type %s\n", req, req->srcdst, req->mpi_tag, _starpu_mpi_request_type(req->request_type));
-
-	STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-
-	if (req->request_type == RECV_REQ)
-	{
-		/* Case : the request is the internal receive request submitted
-		 * by StarPU-MPI to receive incoming data without a matching
-		 * early_request from the application. We immediately allocate the
-		 * pointer associated to the data_handle, and push it into the
-		 * ready_requests list, so as the real MPI request can be submitted
-		 * before the next submission of the envelope-catching request. */
-		if (req->is_internal_req)
-		{
-			_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
-			if (req->user_datatype == 0)
-			{
-				req->count = 1;
-				req->ptr = starpu_data_get_local_ptr(req->data_handle);
-			}
-			else
-			{
-				STARPU_ASSERT(req->count);
-				req->ptr = malloc(req->count);
-				STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
-			}
-
-			_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
-					  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr,
-					  _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
-			_starpu_mpi_req_list_push_front(ready_requests, req);
-
-			/* inform the starpu mpi thread that the request has been pushed in the ready_requests list */
-			STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-			STARPU_PTHREAD_MUTEX_LOCK(&req->posted_mutex);
-			req->posted = 1;
-			STARPU_PTHREAD_COND_BROADCAST(&req->posted_cond);
-			STARPU_PTHREAD_MUTEX_UNLOCK(&req->posted_mutex);
-			STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-		}
-		else
-		{
-			/* test whether the receive request has already been submitted internally by StarPU-MPI*/
-			struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(req->mpi_tag, req->srcdst);
-
-			/* Case: a receive request for a data with the given tag and source has already been
-			 * posted by StarPU. Asynchronously requests a Read permission over the temporary handle ,
-			 * so as when the internal receive is completed, the _starpu_mpi_early_data_cb function
-			 * will be called to bring the data back to the original data handle associated to the request.*/
-			if (early_data_handle)
-			{
-				STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-				STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req_mutex));
-				while (!(early_data_handle->req_ready))
-					STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req_cond), &(early_data_handle->req_mutex));
-				STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req_mutex));
-				STARPU_PTHREAD_MUTEX_LOCK(&mutex);
-
-				_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->mpi_tag);
-				STARPU_ASSERT(req->data_handle != early_data_handle->handle);
-
-				req->internal_req = early_data_handle->req;
-
-				struct _starpu_mpi_early_data_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_early_data_cb_args));
-				cb_args->data_handle = req->data_handle;
-				cb_args->early_handle = early_data_handle->handle;
-				cb_args->buffer = early_data_handle->buffer;
-				cb_args->req = req;
-
-				_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
-				starpu_data_acquire_cb(early_data_handle->handle,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args);
-			}
-			/* Case: no matching data has been received. Store the receive request as an early_request. */
-			else
-			{
-				_STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %d) into the request hashmap\n", req, req->srcdst, req->mpi_tag);
-				_starpu_mpi_early_request_add(req);
-			}
-		}
-	}
-	else
-	{
-		_starpu_mpi_req_list_push_front(ready_requests, req);
-		_STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
-				  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
-	}
-
-	newer_requests = 1;
-	STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-	_STARPU_MPI_LOG_OUT();
-}
-
 #ifdef STARPU_MPI_ACTIVITY
 static unsigned _starpu_mpi_progression_hook_func(void *arg STARPU_ATTRIBUTE_UNUSED)
 {
@@ -961,7 +962,7 @@ static void _starpu_mpi_test_detached_requests(void)
 
 		STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
 
-		//_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->request, req->mpi_tag, _starpu_mpi_request_type(req->request_type), req->srcdst);
+		//_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->request, req->data_tag, _starpu_mpi_request_type(req->request_type), req->srcdst);
 		req->ret = MPI_Test(&req->request, &flag, &status);
 
 		STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %d", req->ret);
@@ -970,22 +971,22 @@ static void _starpu_mpi_test_detached_requests(void)
 		{
 			if (req->request_type == RECV_REQ)
 			{
-				_STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(req->srcdst, req->mpi_tag);
+				_STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(req->srcdst, req->data_tag);
 			}
 			else if (req->request_type == SEND_REQ)
 			{
-				_STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(req->srcdst, req->mpi_tag, 0);
+				_STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(req->srcdst, req->data_tag, 0);
 			}
 
 			_starpu_mpi_handle_request_termination(req);
 
 			if (req->request_type == RECV_REQ)
 			{
-				_STARPU_MPI_TRACE_IRECV_COMPLETE_END(req->srcdst, req->mpi_tag);
+				_STARPU_MPI_TRACE_IRECV_COMPLETE_END(req->srcdst, req->data_tag);
 			}
 			else if (req->request_type == SEND_REQ)
 			{
-				_STARPU_MPI_TRACE_ISEND_COMPLETE_END(req->srcdst, req->mpi_tag, 0);
+				_STARPU_MPI_TRACE_ISEND_COMPLETE_END(req->srcdst, req->data_tag, 0);
 			}
 		}
 
@@ -1032,7 +1033,7 @@ static void _starpu_mpi_handle_ready_request(struct _starpu_mpi_req *req)
 
 	/* submit the request to MPI */
 	_STARPU_MPI_DEBUG(2, "Handling new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
-			  req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
+			  req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
 	req->func(req);
 
 	_STARPU_MPI_LOG_OUT();
@@ -1187,9 +1188,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 			if (flag)
 			{
-				_STARPU_MPI_DEBUG(3, "Searching for application request with tag %d and source %d (size %ld)\n", envelope->mpi_tag, status.MPI_SOURCE, envelope->size);
+				_STARPU_MPI_DEBUG(3, "Searching for application request with tag %d and source %d (size %ld)\n", envelope->data_tag, status.MPI_SOURCE, envelope->size);
 
-				struct _starpu_mpi_req *early_request = _starpu_mpi_early_request_find(envelope->mpi_tag, status.MPI_SOURCE);
+				struct _starpu_mpi_req *early_request = _starpu_mpi_early_request_find(envelope->data_tag, status.MPI_SOURCE);
 
 				/* Case: a data will arrive before a matching receive is
 				 * posted by the application. Create a temporary handle to
@@ -1199,19 +1200,19 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 				if (early_request == NULL)
 				{
 
-					_STARPU_MPI_DEBUG(3, "Request with tag %d and source %d not found, creating a early_handle to receive incoming data..\n", envelope->mpi_tag, status.MPI_SOURCE);
+					_STARPU_MPI_DEBUG(3, "Request with tag %d and source %d not found, creating a early_handle to receive incoming data..\n", envelope->data_tag, status.MPI_SOURCE);
 
 					starpu_data_handle_t data_handle = NULL;
 
 					STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
-					data_handle = _starpu_data_get_data_handle_from_tag(envelope->mpi_tag);
+					data_handle = _starpu_data_get_data_handle_from_tag(envelope->data_tag);
 					STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 
 					struct _starpu_mpi_early_data_handle* early_data_handle = calloc(1, sizeof(struct _starpu_mpi_early_data_handle));
 					STARPU_ASSERT(early_data_handle);
 					STARPU_PTHREAD_MUTEX_INIT(&early_data_handle->req_mutex, NULL);
 					STARPU_PTHREAD_COND_INIT(&early_data_handle->req_cond, NULL);
-					early_data_handle->mpi_tag = envelope->mpi_tag;
+					early_data_handle->data_tag = envelope->data_tag;
 					early_data_handle->env = envelope;
 					early_data_handle->source = status.MPI_SOURCE;
 
@@ -1233,10 +1234,10 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 						_starpu_mpi_early_data_add(early_data_handle);
 					}
 
-					_STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_handle with tag %d from src %d ..\n", early_data_handle->mpi_tag, status.MPI_SOURCE);
+					_STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_handle with tag %d from src %d ..\n", early_data_handle->data_tag, status.MPI_SOURCE);
 					STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 					early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
-											  early_data_handle->mpi_tag, MPI_COMM_WORLD, 1, 0,
+											  early_data_handle->data_tag, MPI_COMM_WORLD, 1, 0,
 											  NULL, NULL, 1, 1, envelope->size);
 					STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 
@@ -1264,7 +1265,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 				 * _starpu_mpi_handle_ready_request. */
 				else
 				{
-					_STARPU_MPI_DEBUG(3, "A matching receive has been found for the incoming data with tag %d\n", envelope->mpi_tag);
+					_STARPU_MPI_DEBUG(3, "A matching receive has been found for the incoming data with tag %d\n", envelope->data_tag);
 
 					_starpu_mpi_early_request_delete(early_request);
 

+ 13 - 13
mpi/src/starpu_mpi_collective.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2011, 2012, 2013  Centre National de la Recherche Scientifique
+ * Copyright (C) 2011, 2012, 2013, 2014  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -64,8 +64,8 @@ int starpu_mpi_scatter_detached(starpu_data_handle_t *data_handles, int count, i
 			if (data_handles[x])
 			{
 				int owner = starpu_data_get_rank(data_handles[x]);
-				int mpi_tag = starpu_data_get_tag(data_handles[x]);
-				STARPU_ASSERT_MSG(mpi_tag >= 0, "Invalid tag for data handle");
+				int data_tag = starpu_data_get_tag(data_handles[x]);
+				STARPU_ASSERT_MSG(data_tag >= 0, "Invalid tag for data handle");
 				if ((rank == root) && (owner != root))
 				{
 					callback_arg->count ++;
@@ -83,17 +83,17 @@ int starpu_mpi_scatter_detached(starpu_data_handle_t *data_handles, int count, i
 		if (data_handles[x])
 		{
 			int owner = starpu_data_get_rank(data_handles[x]);
-			int mpi_tag = starpu_data_get_tag(data_handles[x]);
-			STARPU_ASSERT_MSG(mpi_tag >= 0, "Invalid tag for data handle");
+			int data_tag = starpu_data_get_tag(data_handles[x]);
+			STARPU_ASSERT_MSG(data_tag >= 0, "Invalid tag for data handle");
 			if ((rank == root) && (owner != root))
 			{
 				//fprintf(stderr, "[%d] Sending data[%d] to %d\n", rank, x, owner);
-				starpu_mpi_isend_detached(data_handles[x], owner, mpi_tag, comm, callback_func, callback_arg);
+				starpu_mpi_isend_detached(data_handles[x], owner, data_tag, comm, callback_func, callback_arg);
 			}
 			if ((rank != root) && (owner == rank))
 			{
 				//fprintf(stderr, "[%d] Receiving data[%d] from %d\n", rank, x, root);
-				starpu_mpi_irecv_detached(data_handles[x], root, mpi_tag, comm, callback_func, callback_arg);
+				starpu_mpi_irecv_detached(data_handles[x], root, data_tag, comm, callback_func, callback_arg);
 			}
 		}
 	}
@@ -126,8 +126,8 @@ int starpu_mpi_gather_detached(starpu_data_handle_t *data_handles, int count, in
 			if (data_handles[x])
 			{
 				int owner = starpu_data_get_rank(data_handles[x]);
-				int mpi_tag = starpu_data_get_tag(data_handles[x]);
-				STARPU_ASSERT_MSG(mpi_tag >= 0, "Invalid tag for data handle");
+				int data_tag = starpu_data_get_tag(data_handles[x]);
+				STARPU_ASSERT_MSG(data_tag >= 0, "Invalid tag for data handle");
 				if ((rank == root) && (owner != root))
 				{
 					callback_arg->count ++;
@@ -145,17 +145,17 @@ int starpu_mpi_gather_detached(starpu_data_handle_t *data_handles, int count, in
 		if (data_handles[x])
 		{
 			int owner = starpu_data_get_rank(data_handles[x]);
-			int mpi_tag = starpu_data_get_tag(data_handles[x]);
-			STARPU_ASSERT_MSG(mpi_tag >= 0, "Invalid tag for data handle");
+			int data_tag = starpu_data_get_tag(data_handles[x]);
+			STARPU_ASSERT_MSG(data_tag >= 0, "Invalid tag for data handle");
 			if ((rank == root) && (owner != root))
 			{
 				//fprintf(stderr, "[%d] Receiving data[%d] from %d\n", rank, x, owner);
-				starpu_mpi_irecv_detached(data_handles[x], owner, mpi_tag, comm, callback_func, callback_arg);
+				starpu_mpi_irecv_detached(data_handles[x], owner, data_tag, comm, callback_func, callback_arg);
 			}
 			if ((rank != root) && (owner == rank))
 			{
 				//fprintf(stderr, "[%d] Sending data[%d] to %d\n", rank, x, root);
-				starpu_mpi_isend_detached(data_handles[x], root, mpi_tag, comm, callback_func, callback_arg);
+				starpu_mpi_isend_detached(data_handles[x], root, data_tag, comm, callback_func, callback_arg);
 			}
 		}
 	}

+ 16 - 16
mpi/src/starpu_mpi_early_data.c

@@ -25,7 +25,7 @@ struct _starpu_mpi_early_data_handle_hashlist
 {
 	struct _starpu_mpi_early_data_handle_list *list;
 	UT_hash_handle hh;
-	int mpi_tag;
+	int data_tag;
 };
 
 /** stores data which have been received by MPI but have not been requested by the application */
@@ -94,13 +94,13 @@ static void _starpu_mpi_early_data_handle_display_hash(int source, int tag)
 #endif
 
 static
-struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_pop(int mpi_tag, int source, int delete)
+struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_pop(int data_tag, int source, int delete)
 {
 	struct _starpu_mpi_early_data_handle_hashlist *hashlist;
 	struct _starpu_mpi_early_data_handle *early_data_handle;
 
-	_STARPU_MPI_DEBUG(60, "Looking for early_data_handle with tag %d in the hashmap[%d]\n", mpi_tag, source);
-	HASH_FIND_INT(_starpu_mpi_early_data_handle_hashmap[source], &mpi_tag, hashlist);
+	_STARPU_MPI_DEBUG(60, "Looking for early_data_handle with tag %d in the hashmap[%d]\n", data_tag, source);
+	HASH_FIND_INT(_starpu_mpi_early_data_handle_hashmap[source], &data_tag, hashlist);
 	if (hashlist == NULL)
 	{
 		early_data_handle = NULL;
@@ -123,46 +123,46 @@ struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_pop(int mpi_tag, in
 			}
 		}
 	}
-	_STARPU_MPI_DEBUG(60, "Found early_data_handle %p with tag %d in the hashmap[%d]\n", early_data_handle, mpi_tag, source);
+	_STARPU_MPI_DEBUG(60, "Found early_data_handle %p with tag %d in the hashmap[%d]\n", early_data_handle, data_tag, source);
 	return early_data_handle;
 }
 
-struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_find(int mpi_tag, int source)
+struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_find(int data_tag, int source)
 {
-	return _starpu_mpi_early_data_pop(mpi_tag, source, 0);
+	return _starpu_mpi_early_data_pop(data_tag, source, 0);
 }
 
 void _starpu_mpi_early_data_add(struct _starpu_mpi_early_data_handle *early_data_handle)
 {
-	_STARPU_MPI_DEBUG(60, "Trying to add early_data_handle %p with tag %d in the hashmap[%d]\n", early_data_handle, early_data_handle->mpi_tag, early_data_handle->source);
+	_STARPU_MPI_DEBUG(60, "Trying to add early_data_handle %p with tag %d in the hashmap[%d]\n", early_data_handle, early_data_handle->data_tag, early_data_handle->source);
 
 	struct _starpu_mpi_early_data_handle_hashlist *hashlist;
-	HASH_FIND_INT(_starpu_mpi_early_data_handle_hashmap[early_data_handle->source], &early_data_handle->mpi_tag, hashlist);
+	HASH_FIND_INT(_starpu_mpi_early_data_handle_hashmap[early_data_handle->source], &early_data_handle->data_tag, hashlist);
 	if (hashlist == NULL)
 	{
 		hashlist = malloc(sizeof(struct _starpu_mpi_early_data_handle_hashlist));
 		hashlist->list = _starpu_mpi_early_data_handle_list_new();
-		hashlist->mpi_tag = early_data_handle->mpi_tag;
-		HASH_ADD_INT(_starpu_mpi_early_data_handle_hashmap[early_data_handle->source], mpi_tag, hashlist);
+		hashlist->data_tag = early_data_handle->data_tag;
+		HASH_ADD_INT(_starpu_mpi_early_data_handle_hashmap[early_data_handle->source], data_tag, hashlist);
 	}
 	_starpu_mpi_early_data_handle_list_push_back(hashlist->list, early_data_handle);
 	_starpu_mpi_early_data_handle_hashmap_count ++;
 #ifdef STARPU_VERBOSE
-	_starpu_mpi_early_data_handle_display_hash(early_data_handle->source, early_data_handle->mpi_tag);
+	_starpu_mpi_early_data_handle_display_hash(early_data_handle->source, early_data_handle->data_tag);
 #endif
 }
 
 void _starpu_mpi_early_data_delete(struct _starpu_mpi_early_data_handle *early_data_handle)
 {
-	_STARPU_MPI_DEBUG(60, "Trying to delete early_data_handle %p with tag %d in the hashmap[%d]\n", early_data_handle, early_data_handle->mpi_tag, early_data_handle->source);
-	struct _starpu_mpi_early_data_handle *found = _starpu_mpi_early_data_pop(early_data_handle->mpi_tag, early_data_handle->source, 1);
+	_STARPU_MPI_DEBUG(60, "Trying to delete early_data_handle %p with tag %d in the hashmap[%d]\n", early_data_handle, early_data_handle->data_tag, early_data_handle->source);
+	struct _starpu_mpi_early_data_handle *found = _starpu_mpi_early_data_pop(early_data_handle->data_tag, early_data_handle->source, 1);
 
 	STARPU_ASSERT_MSG(found == early_data_handle,
-			  "[_starpu_mpi_early_data_delete][error] early_data_handle %p with tag %d is NOT in the hashmap[%d]\n", early_data_handle, early_data_handle->mpi_tag, early_data_handle->source);
+			  "[_starpu_mpi_early_data_delete][error] early_data_handle %p with tag %d is NOT in the hashmap[%d]\n", early_data_handle, early_data_handle->data_tag, early_data_handle->source);
 
 	_starpu_mpi_early_data_handle_hashmap_count --;
 #ifdef STARPU_VERBOSE
-	_starpu_mpi_early_data_handle_display_hash(early_data_handle->source, early_data_handle->mpi_tag);
+	_starpu_mpi_early_data_handle_display_hash(early_data_handle->source, early_data_handle->data_tag);
 #endif
 }
 

+ 2 - 2
mpi/src/starpu_mpi_early_data.h

@@ -33,7 +33,7 @@ LIST_TYPE(_starpu_mpi_early_data_handle,
 	  struct _starpu_mpi_envelope *env;
 	  struct _starpu_mpi_req *req;
 	  void *buffer;
-	  int mpi_tag;
+	  int data_tag;
 	  int source;
 	  int req_ready;
 	  starpu_pthread_mutex_t req_mutex;
@@ -44,7 +44,7 @@ void _starpu_mpi_early_data_init(int world_size);
 void _starpu_mpi_early_data_check_termination();
 void _starpu_mpi_early_data_free(int world_size);
 
-struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_find(int mpi_tag, int source);
+struct _starpu_mpi_early_data_handle *_starpu_mpi_early_data_find(int data_tag, int source);
 void _starpu_mpi_early_data_add(struct _starpu_mpi_early_data_handle *early_data_handle);
 void _starpu_mpi_early_data_delete(struct _starpu_mpi_early_data_handle *early_data_handle);
 

+ 11 - 11
mpi/src/starpu_mpi_early_request.c

@@ -48,11 +48,11 @@ void _starpu_mpi_early_request_check_termination()
 	STARPU_ASSERT_MSG(_starpu_mpi_early_request_count() == 0, "Number of receive requests left is not zero");
 }
 
-struct _starpu_mpi_req* _starpu_mpi_early_request_find(int mpi_tag, int source)
+struct _starpu_mpi_req* _starpu_mpi_early_request_find(int data_tag, int source)
 {
 	struct _starpu_mpi_req* req;
 
-	HASH_FIND_INT(_starpu_mpi_app_req_hashmap[source], &mpi_tag, req);
+	HASH_FIND_INT(_starpu_mpi_app_req_hashmap[source], &data_tag, req);
 
 	return req;
 }
@@ -61,25 +61,25 @@ void _starpu_mpi_early_request_add(struct _starpu_mpi_req *req)
 {
 	struct _starpu_mpi_req *test_req;
 
-	test_req = _starpu_mpi_early_request_find(req->mpi_tag, req->srcdst);
+	test_req = _starpu_mpi_early_request_find(req->data_tag, req->srcdst);
 
 	if (test_req == NULL)
 	{
-		HASH_ADD_INT(_starpu_mpi_app_req_hashmap[req->srcdst], mpi_tag, req);
+		HASH_ADD_INT(_starpu_mpi_app_req_hashmap[req->srcdst], data_tag, req);
 		_starpu_mpi_app_req_hashmap_count ++;
-		_STARPU_MPI_DEBUG(3, "Adding request %p with tag %d in the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
+		_STARPU_MPI_DEBUG(3, "Adding request %p with tag %d in the application request hashmap[%d]\n", req, req->data_tag, req->srcdst);
 	}
 	else
 	{
-		_STARPU_MPI_DEBUG(3, "[Error] request %p with tag %d already in the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
+		_STARPU_MPI_DEBUG(3, "[Error] request %p with tag %d already in the application request hashmap[%d]\n", req, req->data_tag, req->srcdst);
 		int seq_const = starpu_data_get_sequential_consistency_flag(req->data_handle);
 		if (seq_const &&  req->sequential_consistency)
 		{
-			STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap[%d], while another request %p with the same tag is already in it. \n Sequential consistency is activated : this is not supported by StarPU.", req, req->mpi_tag, req->srcdst, test_req);
+			STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap[%d], while another request %p with the same tag is already in it. \n Sequential consistency is activated : this is not supported by StarPU.", req, req->data_tag, req->srcdst, test_req);
 		}
 		else
 		{
-			STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap[%d], while another request %p with the same tag is already in it. \n Sequential consistency isn't activated for this handle : you should want to add dependencies between requests for which the sequential consistency is deactivated.", req, req->mpi_tag, req->srcdst, test_req);
+			STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap[%d], while another request %p with the same tag is already in it. \n Sequential consistency isn't activated for this handle : you should want to add dependencies between requests for which the sequential consistency is deactivated.", req, req->data_tag, req->srcdst, test_req);
 		}
 	}
 }
@@ -88,17 +88,17 @@ void _starpu_mpi_early_request_delete(struct _starpu_mpi_req *req)
 {
 	struct _starpu_mpi_req *test_req;
 
-	test_req = _starpu_mpi_early_request_find(req->mpi_tag, req->srcdst);
+	test_req = _starpu_mpi_early_request_find(req->data_tag, req->srcdst);
 
 	if (test_req != NULL)
 	{
 		HASH_DEL(_starpu_mpi_app_req_hashmap[req->srcdst], req);
 		_starpu_mpi_app_req_hashmap_count --;
-		_STARPU_MPI_DEBUG(3, "Deleting application request %p with tag %d from the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
+		_STARPU_MPI_DEBUG(3, "Deleting application request %p with tag %d from the application request hashmap[%d]\n", req, req->data_tag, req->srcdst);
 	}
 	else
 	{
-		_STARPU_MPI_DEBUG(3, "[Warning] request %p with tag %d is NOT in the application request hashmap[%d]\n", req, req->mpi_tag, req->srcdst);
+		_STARPU_MPI_DEBUG(3, "[Warning] request %p with tag %d is NOT in the application request hashmap[%d]\n", req, req->data_tag, req->srcdst);
 	}
 }
 

+ 1 - 1
mpi/src/starpu_mpi_early_request.h

@@ -34,7 +34,7 @@ int _starpu_mpi_early_request_count();
 void _starpu_mpi_early_request_check_termination();
 
 void _starpu_mpi_early_request_add(struct _starpu_mpi_req *req);
-struct _starpu_mpi_req* _starpu_mpi_early_request_find(int mpi_tag, int source);
+struct _starpu_mpi_req* _starpu_mpi_early_request_find(int data_tag, int source);
 void _starpu_mpi_early_request_delete(struct _starpu_mpi_req *req);
 
 #ifdef __cplusplus

+ 9 - 16
mpi/src/starpu_mpi_helper.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2010  Université de Bordeaux
- * Copyright (C) 2010, 2012  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2012, 2014  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -26,24 +26,21 @@ static void starpu_mpi_unlock_tag_callback(void *arg)
 	free(tagptr);
 }
 
-int starpu_mpi_isend_detached_unlock_tag(starpu_data_handle_t data_handle,
-				int dest, int mpi_tag, MPI_Comm comm, starpu_tag_t tag)
+int starpu_mpi_isend_detached_unlock_tag(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm, starpu_tag_t tag)
 {
 	starpu_tag_t *tagptr = malloc(sizeof(starpu_tag_t));
 	*tagptr = tag;
 
-	return starpu_mpi_isend_detached(data_handle, dest, mpi_tag, comm,
-						starpu_mpi_unlock_tag_callback, tagptr);
+	return starpu_mpi_isend_detached(data_handle, dest, data_tag, comm, starpu_mpi_unlock_tag_callback, tagptr);
 }
 
 
-int starpu_mpi_irecv_detached_unlock_tag(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, starpu_tag_t tag)
+int starpu_mpi_irecv_detached_unlock_tag(starpu_data_handle_t data_handle, int source, int data_tag, MPI_Comm comm, starpu_tag_t tag)
 {
 	starpu_tag_t *tagptr = malloc(sizeof(starpu_tag_t));
 	*tagptr = tag;
 
-	return starpu_mpi_irecv_detached(data_handle, source, mpi_tag, comm,
-						starpu_mpi_unlock_tag_callback, tagptr);
+	return starpu_mpi_irecv_detached(data_handle, source, data_tag, comm, starpu_mpi_unlock_tag_callback, tagptr);
 }
 
 struct arg_array
@@ -66,7 +63,7 @@ static void starpu_mpi_array_unlock_callback(void *_arg)
 }
 
 int starpu_mpi_isend_array_detached_unlock_tag(unsigned array_size,
-		starpu_data_handle_t *data_handle, int *dest, int *mpi_tag,
+		starpu_data_handle_t *data_handle, int *dest, int *data_tag,
 		MPI_Comm *comm, starpu_tag_t tag)
 {
 	struct arg_array *arg = malloc(sizeof(struct arg_array));
@@ -77,16 +74,14 @@ int starpu_mpi_isend_array_detached_unlock_tag(unsigned array_size,
 	unsigned elem;
 	for (elem = 0; elem < array_size; elem++)
 	{
-		starpu_mpi_isend_detached(data_handle[elem], dest[elem],
-				mpi_tag[elem], comm[elem],
-				starpu_mpi_array_unlock_callback, arg);
+		starpu_mpi_isend_detached(data_handle[elem], dest[elem], data_tag[elem], comm[elem], starpu_mpi_array_unlock_callback, arg);
 	}
 
 	return 0;
 }
 
 
-int starpu_mpi_irecv_array_detached_unlock_tag(unsigned array_size, starpu_data_handle_t *data_handle, int *source, int *mpi_tag, MPI_Comm *comm, starpu_tag_t tag)
+int starpu_mpi_irecv_array_detached_unlock_tag(unsigned array_size, starpu_data_handle_t *data_handle, int *source, int *data_tag, MPI_Comm *comm, starpu_tag_t tag)
 {
 	struct arg_array *arg = malloc(sizeof(struct arg_array));
 
@@ -96,9 +91,7 @@ int starpu_mpi_irecv_array_detached_unlock_tag(unsigned array_size, starpu_data_
 	unsigned elem;
 	for (elem = 0; elem < array_size; elem++)
 	{
-		starpu_mpi_irecv_detached(data_handle[elem], source[elem],
-				mpi_tag[elem], comm[elem],
-				starpu_mpi_array_unlock_callback, arg);
+		starpu_mpi_irecv_detached(data_handle[elem], source[elem], data_tag[elem], comm[elem], starpu_mpi_array_unlock_callback, arg);
 	}
 
 	return 0;

+ 1 - 1
mpi/src/starpu_mpi_private.h

@@ -88,7 +88,7 @@ enum _starpu_mpi_request_type
 struct _starpu_mpi_envelope
 {
 	starpu_ssize_t size;
-	int mpi_tag;
+	int data_tag;
 };
 
 struct _starpu_mpi_req;

+ 12 - 12
mpi/src/starpu_mpi_task_insert.c

@@ -31,11 +31,11 @@
 #include <starpu_mpi_cache.h>
 #include <starpu_mpi_select_node.h>
 
-#define _SEND_DATA(data, mode, dest, mpi_tag, comm, callback, arg)     \
-	if (mode & STARPU_SSEND)					\
-		starpu_mpi_issend_detached(data, dest, mpi_tag, comm, callback, arg); \
+#define _SEND_DATA(data, mode, dest, data_tag, comm, callback, arg)     \
+	if (mode & STARPU_SSYNC)					\
+		starpu_mpi_issend_detached(data, dest, data_tag, comm, callback, arg); \
 	else								\
-		starpu_mpi_isend_detached(data, dest, mpi_tag, comm, callback, arg);
+		starpu_mpi_isend_detached(data, dest, data_tag, comm, callback, arg);
 
 static
 int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int *do_execute, int *inconsistent_execute, int *xrank)
@@ -102,13 +102,13 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
 	if (data && mode & STARPU_R)
 	{
 		int mpi_rank = starpu_data_get_rank(data);
-		int mpi_tag = starpu_data_get_tag(data);
+		int data_tag = starpu_data_get_tag(data);
 		if(mpi_rank == -1)
 		{
 			fprintf(stderr,"StarPU needs to be told the MPI rank of this data, using starpu_data_set_rank\n");
 			STARPU_ABORT();
 		}
-		if(mpi_tag == -1)
+		if(data_tag == -1)
 		{
 			fprintf(stderr,"StarPU needs to be told the MPI tag of this data, using starpu_data_set_tag\n");
 			STARPU_ABORT();
@@ -121,7 +121,7 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
 			if (already_received == NULL)
 			{
 				_STARPU_MPI_DEBUG(1, "Receive data %p from %d\n", data, mpi_rank);
-				starpu_mpi_irecv_detached(data, mpi_rank, mpi_tag, comm, NULL, NULL);
+				starpu_mpi_irecv_detached(data, mpi_rank, data_tag, comm, NULL, NULL);
 			}
 		}
 		if (!do_execute && mpi_rank == me)
@@ -131,7 +131,7 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
 			if (already_sent == NULL)
 			{
 				_STARPU_MPI_DEBUG(1, "Send data %p to %d\n", data, xrank);
-				_SEND_DATA(data, mode, xrank, mpi_tag, comm, NULL, NULL);
+				_SEND_DATA(data, mode, xrank, data_tag, comm, NULL, NULL);
 			}
 		}
 	}
@@ -143,13 +143,13 @@ void _starpu_mpi_exchange_data_after_execution(starpu_data_handle_t data, enum s
 	if (mode & STARPU_W)
 	{
 		int mpi_rank = starpu_data_get_rank(data);
-		int mpi_tag = starpu_data_get_tag(data);
+		int data_tag = starpu_data_get_tag(data);
 		if(mpi_rank == -1)
 		{
 			fprintf(stderr,"StarPU needs to be told the MPI rank of this data, using starpu_data_set_rank\n");
 			STARPU_ABORT();
 		}
-		if(mpi_tag == -1)
+		if(data_tag == -1)
 		{
 			fprintf(stderr,"StarPU needs to be told the MPI tag of this data, using starpu_data_set_tag\n");
 			STARPU_ABORT();
@@ -159,13 +159,13 @@ void _starpu_mpi_exchange_data_after_execution(starpu_data_handle_t data, enum s
 			if (xrank != -1 && me != xrank)
 			{
 				_STARPU_MPI_DEBUG(1, "Receive data %p back from the task %d which executed the codelet ...\n", data, xrank);
-				starpu_mpi_irecv_detached(data, xrank, mpi_tag, comm, NULL, NULL);
+				starpu_mpi_irecv_detached(data, xrank, data_tag, comm, NULL, NULL);
 			}
 		}
 		else if (do_execute)
 		{
 			_STARPU_MPI_DEBUG(1, "Send data %p back to its owner %d...\n", data, mpi_rank);
-			_SEND_DATA(data, mode, mpi_rank, mpi_tag, comm, NULL, NULL);
+			_SEND_DATA(data, mode, mpi_rank, data_tag, comm, NULL, NULL);
 		}
 	}
 }