Browse Source

backport branches/starpu-1.1@13961: mpi: new function starpu_mpi_issend

Nathalie Furmento 10 years ago
parent
commit
3abfac9166
5 changed files with 50 additions and 17 deletions
  1. 2 0
      ChangeLog
  2. 6 0
      doc/doxygen/chapters/api/mpi.doxy
  3. 1 0
      mpi/include/starpu_mpi.h
  4. 40 17
      mpi/src/starpu_mpi.c
  5. 1 0
      mpi/src/starpu_mpi_private.h

+ 2 - 0
ChangeLog

@@ -137,6 +137,8 @@ New features:
   * Fix and actually enable the cache allocation.
   * Fix and actually enable the cache allocation.
   * Enable allocation cache in main RAM when STARPU_LIMIT_CPU_MEM is set by
   * Enable allocation cache in main RAM when STARPU_LIMIT_CPU_MEM is set by
     the user.
     the user.
+  * New MPI function starpu_mpi_issend to send data using a synchronous
+    and non-blocking mode (internally uses MPI_Issend)
 
 
 Changes:
 Changes:
   * Fix complexity of implicit task/data dependency, from quadratic to linear.
   * Fix complexity of implicit task/data dependency, from quadratic to linear.

+ 6 - 0
doc/doxygen/chapters/api/mpi.doxy

@@ -120,6 +120,12 @@ communication completes, its resources are automatically released back
 to the system, there is no need to test or to wait for the completion
 to the system, there is no need to test or to wait for the completion
 of the request.
 of the request.
 
 
+\fn int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *req, int dest, int mpi_tag, MPI_Comm comm)
+\ingroup API_MPI_Support
+Performs a synchronous-mode, non-blocking send of \p data_handle to the node
+\p dest using the message tag \p mpi_tag within the communicator \p
+comm.
+
 \fn int starpu_mpi_wait(starpu_mpi_req *req, MPI_Status *status)
 \fn int starpu_mpi_wait(starpu_mpi_req *req, MPI_Status *status)
 \ingroup API_MPI_Support
 \ingroup API_MPI_Support
 Returns when the operation identified by request \p req is complete.
 Returns when the operation identified by request \p req is complete.

+ 1 - 0
mpi/include/starpu_mpi.h

@@ -36,6 +36,7 @@ int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI
 int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, MPI_Status *status);
 int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, MPI_Status *status);
 int starpu_mpi_isend_detached(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
 int starpu_mpi_isend_detached(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
 int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
 int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
+int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *req, int dest, int mpi_tag, MPI_Comm comm);
 int starpu_mpi_wait(starpu_mpi_req *req, MPI_Status *status);
 int starpu_mpi_wait(starpu_mpi_req *req, MPI_Status *status);
 int starpu_mpi_test(starpu_mpi_req *req, int *flag, MPI_Status *status);
 int starpu_mpi_test(starpu_mpi_req *req, int *flag, MPI_Status *status);
 int starpu_mpi_barrier(MPI_Comm comm);
 int starpu_mpi_barrier(MPI_Comm comm);

+ 40 - 17
mpi/src/starpu_mpi.c

@@ -37,11 +37,11 @@ static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type
 #endif
 #endif
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
 							int dest, int mpi_tag, MPI_Comm comm,
 							int dest, int mpi_tag, MPI_Comm comm,
-							unsigned detached, void (*callback)(void *), void *arg,
+							unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
 							int sequential_consistency);
 							int sequential_consistency);
 static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle,
 static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle,
 							int source, int mpi_tag, MPI_Comm comm,
 							int source, int mpi_tag, MPI_Comm comm,
-							unsigned detached, void (*callback)(void *), void *arg,
+							unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
 							int sequential_consistency, int is_internal_req,
 							int sequential_consistency, int is_internal_req,
 							starpu_ssize_t count);
 							starpu_ssize_t count);
 static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
 static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
@@ -104,6 +104,7 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 
 
 	(*req)->other_request = NULL;
 	(*req)->other_request = NULL;
 
 
+	(*req)->sync = 0;
 	(*req)->detached = -1;
 	(*req)->detached = -1;
 	(*req)->callback = NULL;
 	(*req)->callback = NULL;
 	(*req)->callback_arg = NULL;
 	(*req)->callback_arg = NULL;
@@ -123,7 +124,7 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 
 
  static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
  static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
 							       int srcdst, int mpi_tag, MPI_Comm comm,
 							       int srcdst, int mpi_tag, MPI_Comm comm,
-							       unsigned detached, void (*callback)(void *), void *arg,
+							       unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
 							       enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
 							       enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
 							       enum starpu_data_access_mode mode,
 							       enum starpu_data_access_mode mode,
 							       int sequential_consistency,
 							       int sequential_consistency,
@@ -143,6 +144,7 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 	req->mpi_tag = mpi_tag;
 	req->mpi_tag = mpi_tag;
 	req->comm = comm;
 	req->comm = comm;
 	req->detached = detached;
 	req->detached = detached;
+	req->sync = sync;
 	req->callback = callback;
 	req->callback = callback;
 	req->callback_arg = arg;
 	req->callback_arg = arg;
 	req->func = func;
 	req->func = func;
@@ -175,8 +177,16 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 
 
 	 _STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
 	 _STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
 
 
-	 req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
-	 STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %d", req->ret);
+	if (req->sync == 0)
+	{
+		req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
+		STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %d", req->ret);
+	}
+	else
+	{
+		req->ret = MPI_Issend(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
+		STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Issend returning %d", req->ret);
+	}
 
 
 	 _STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, 0);
 	 _STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, 0);
 
 
@@ -246,10 +256,10 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 
 
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
 							int dest, int mpi_tag, MPI_Comm comm,
 							int dest, int mpi_tag, MPI_Comm comm,
-							unsigned detached, void (*callback)(void *), void *arg,
+							unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
 							int sequential_consistency)
 							int sequential_consistency)
 {
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, dest, mpi_tag, comm, detached, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R, sequential_consistency, 0, 0);
+	return _starpu_mpi_isend_irecv_common(data_handle, dest, mpi_tag, comm, detached, sync, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R, sequential_consistency, 0, 0);
 }
 }
 
 
 int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
 int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
@@ -259,7 +269,7 @@ int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
 
 
 	struct _starpu_mpi_req *req;
 	struct _starpu_mpi_req *req;
 	_STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(dest, mpi_tag, 0);
 	_STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(dest, mpi_tag, 0);
-	req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, NULL, NULL, 1);
+	req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, 0, NULL, NULL, 1);
 	_STARPU_MPI_TRACE_ISEND_COMPLETE_END(dest, mpi_tag, 0);
 	_STARPU_MPI_TRACE_ISEND_COMPLETE_END(dest, mpi_tag, 0);
 
 
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
@@ -273,8 +283,7 @@ int starpu_mpi_isend_detached(starpu_data_handle_t data_handle,
 			      int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 			      int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 {
 {
 	_STARPU_MPI_LOG_IN();
 	_STARPU_MPI_LOG_IN();
-	_starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, callback, arg, 1);
-
+	_starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, 0, callback, arg, 1);
 	_STARPU_MPI_LOG_OUT();
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 	return 0;
 }
 }
@@ -294,6 +303,21 @@ int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI
 	return 0;
 	return 0;
 }
 }
 
 
+int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
+{
+	_STARPU_MPI_LOG_IN();
+	STARPU_ASSERT_MSG(public_req, "starpu_mpi_issend needs a valid starpu_mpi_req");
+
+	struct _starpu_mpi_req *req;
+	req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, 1, NULL, NULL, 1);
+
+	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
+	*public_req = req;
+
+	_STARPU_MPI_LOG_OUT();
+	return 0;
+}
+
 /********************************************************/
 /********************************************************/
 /*                                                      */
 /*                                                      */
 /*  receive functionalities                             */
 /*  receive functionalities                             */
@@ -324,9 +348,9 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_OUT();
 	_STARPU_MPI_LOG_OUT();
 }
 }
 
 
-static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count)
+static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count)
 {
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency, is_internal_req, count);
+	return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, sync, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency, is_internal_req, count);
 }
 }
 
 
 int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
 int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
@@ -343,9 +367,8 @@ int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
 
 
 	struct _starpu_mpi_req *req;
 	struct _starpu_mpi_req *req;
 	_STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(source, mpi_tag);
 	_STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(source, mpi_tag);
-	req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, NULL, NULL, 1, 0, 0);
+	req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, 0, NULL, NULL, 1, 0, 0);
 	_STARPU_MPI_TRACE_IRECV_COMPLETE_END(source, mpi_tag);
 	_STARPU_MPI_TRACE_IRECV_COMPLETE_END(source, mpi_tag);
-
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_irecv_common");
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_irecv_common");
 	*public_req = req;
 	*public_req = req;
 
 
@@ -364,7 +387,7 @@ int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int
 //	if (tag == -1)
 //	if (tag == -1)
 //		starpu_data_set_tag(data_handle, mpi_tag);
 //		starpu_data_set_tag(data_handle, mpi_tag);
 
 
-	_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, 1, 0, 0);
+	_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, 0, callback, arg, 1, 0, 0);
 	_STARPU_MPI_LOG_OUT();
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 	return 0;
 }
 }
@@ -380,7 +403,7 @@ int starpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_h
 //	if (tag == -1)
 //	if (tag == -1)
 //		starpu_data_set_tag(data_handle, mpi_tag);
 //		starpu_data_set_tag(data_handle, mpi_tag);
 
 
-	_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, sequential_consistency, 0, 0);
+	_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, 0, callback, arg, sequential_consistency, 0, 0);
 
 
 	_STARPU_MPI_LOG_OUT();
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 	return 0;
@@ -1203,7 +1226,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 					_STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_handle with tag %d from src %d ..\n", early_data_handle->mpi_tag, status.MPI_SOURCE);
 					_STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_handle with tag %d from src %d ..\n", early_data_handle->mpi_tag, status.MPI_SOURCE);
 					STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 					STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
 					early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
 					early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
-											  early_data_handle->mpi_tag, MPI_COMM_WORLD, 1,
+											  early_data_handle->mpi_tag, MPI_COMM_WORLD, 1, 0,
 											  NULL, NULL, 1, 1, recv_env->size);
 											  NULL, NULL, 1, 1, recv_env->size);
 					STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 					STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 
 

+ 1 - 0
mpi/src/starpu_mpi_private.h

@@ -113,6 +113,7 @@ LIST_TYPE(_starpu_mpi_req,
 	MPI_Status *status;
 	MPI_Status *status;
 	MPI_Request request;
 	MPI_Request request;
 	int *flag;
 	int *flag;
+	unsigned sync;
 
 
 	int ret;
 	int ret;
 	starpu_pthread_mutex_t req_mutex;
 	starpu_pthread_mutex_t req_mutex;