Pārlūkot izejas kodu

mpi: new function starpu_mpi_irecv_detached_sequential_consistency
which allows to enable or disable the sequential consistency for the
given data handle (sequential consistency will be enabled or disabled
based on the value of the function parameter and the value of the
sequential consistency defined for the given data)

Nathalie Furmento 12 gadi atpakaļ
vecāks
revīzija
fff1cb4c47

+ 6 - 0
ChangeLog

@@ -34,6 +34,12 @@ New features:
     let starpu commute write accesses.
   * Out-of-core support, through registration of disk areas as additional memory
     nodes.
+  * StarPU-MPI: new function
+    starpu_mpi_irecv_detached_sequential_consistency which allows to
+    enable or disable the sequential consistency for the given data
+    handle (sequential consistency will be enabled or disabled based
+    on the value of the function parameter and the value of the
+    sequential consistency defined for the given data)
 
 Small features:
   * Add cl_arg_free field to enable automatic free(cl_arg) on task

+ 16 - 0
doc/doxygen/chapters/api/mpi.doxy

@@ -98,6 +98,22 @@ communication completes, its resources are automatically released back
 to the system, there is no need to test or to wait for the completion
 of the request.
 
+\fn int starpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg, int sequential_consistency)
+\ingroup API_MPI_Support
+Posts a nonblocking receive in \p data_handle from the node \p source
+using the message tag \p mpi_tag within the communicator \p comm. On
+completion, the \p callback function is called with the argument \p
+arg.
+The parameter \p sequential_consistency allows to enable or disable
+the sequential consistency for \p data handle (sequential consistency
+will be enabled or disabled based on the value of the parameter \p
+sequential_consistency and the value of the sequential consistency
+defined for \p data_handle).
+Similarly to the pthread detached functionality, when a detached
+communication completes, its resources are automatically released back
+to the system, there is no need to test or to wait for the completion
+of the request.
+
 \fn int starpu_mpi_wait(starpu_mpi_req *req, MPI_Status *status)
 \ingroup API_MPI_Support
 Returns when the operation identified by request \p req is complete.

+ 2 - 0
mpi/include/starpu_mpi.h

@@ -40,6 +40,8 @@ int starpu_mpi_wait(starpu_mpi_req *req, MPI_Status *status);
 int starpu_mpi_test(starpu_mpi_req *req, int *flag, MPI_Status *status);
 int starpu_mpi_barrier(MPI_Comm comm);
 
+int starpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg, int sequential_consistency);
+
 int starpu_mpi_init(int *argc, char ***argv, int initialize_mpi);
 int starpu_mpi_initialize(void) STARPU_DEPRECATED;
 int starpu_mpi_initialize_extended(int *rank, int *world_size) STARPU_DEPRECATED;

+ 31 - 14
mpi/src/starpu_mpi.c

@@ -32,8 +32,12 @@ static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type
 #endif
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
 							int dest, int mpi_tag, MPI_Comm comm,
-							unsigned detached, void (*callback)(void *), void *arg);
-static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg);
+							unsigned detached, void (*callback)(void *), void *arg,
+							int sequential_consistency);
+static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle,
+							int source, int mpi_tag, MPI_Comm comm,
+							unsigned detached, void (*callback)(void *), void *arg,
+							int sequential_consistency);
 static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
 
 /* The list of requests that have been newly submitted by the application */
@@ -73,6 +77,7 @@ struct _starpu_mpi_copy_handle
  /********************************************************/
 
 static struct _starpu_mpi_req *_starpu_mpi_req_hashmap = NULL;
+/** stores data which have been received by MPI but have not been requested by the application */
 static struct _starpu_mpi_copy_handle *_starpu_mpi_copy_handle_hashmap = NULL;
 
 static struct _starpu_mpi_req* find_req(int mpi_tag)
@@ -99,7 +104,7 @@ static void add_req(struct _starpu_mpi_req *req)
 	{
 		_STARPU_MPI_DEBUG(3, "Error add_req : request %p with tag %d already in the hashmap. \n", req, req->mpi_tag);
 		int seq_const = starpu_data_get_sequential_consistency_flag(req->data_handle);
-		if (seq_const)
+		if (seq_const &&  req->sequential_consistency)
 		{
 			STARPU_ASSERT_MSG(!test_req, "Error add_req : request %p with tag %d wanted to be added to the hashmap, while another request %p with the same tag is already in it. \n Sequential consistency is activated : this is not supported by StarPU.", req, req->mpi_tag, test_req);
 		}
@@ -213,6 +218,7 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
 	req->internal_req = NULL;
 	req->is_internal_req = 0;
 	req->envelope = NULL;
+	req->sequential_consistency = 1;
  }
 
  /********************************************************/
@@ -225,7 +231,8 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
 							       int srcdst, int mpi_tag, MPI_Comm comm,
 							       unsigned detached, void (*callback)(void *), void *arg,
 							       enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
-							       enum starpu_data_access_mode mode)
+							       enum starpu_data_access_mode mode,
+							       int sequential_consistency)
  {
 
 	 _STARPU_MPI_LOG_IN();
@@ -245,11 +252,12 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
 	 req->callback = callback;
 	 req->callback_arg = arg;
 	 req->func = func;
+	 req->sequential_consistency = sequential_consistency;
 
 	 /* Asynchronously request StarPU to fetch the data in main memory: when
 	  * it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
 	  * the request is actually submitted */
-	 starpu_data_acquire_cb(data_handle, mode, _starpu_mpi_submit_new_mpi_request, (void *)req);
+	 starpu_data_acquire_cb_sequential_consistency(data_handle, mode, _starpu_mpi_submit_new_mpi_request, (void *)req, sequential_consistency);
 
 	 _STARPU_MPI_LOG_OUT();
 	 return req;
@@ -343,9 +351,10 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
 
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
 							int dest, int mpi_tag, MPI_Comm comm,
-							unsigned detached, void (*callback)(void *), void *arg)
+							unsigned detached, void (*callback)(void *), void *arg,
+							int sequential_consistency)
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, dest, mpi_tag, comm, detached, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R);
+	return _starpu_mpi_isend_irecv_common(data_handle, dest, mpi_tag, comm, detached, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R, sequential_consistency);
 }
 
 int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
@@ -354,7 +363,7 @@ int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
 	STARPU_ASSERT_MSG(public_req, "starpu_mpi_isend needs a valid starpu_mpi_req");
 
 	struct _starpu_mpi_req *req;
-	req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, NULL, NULL);
+	req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, NULL, NULL, 1);
 
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
 	*public_req = req;
@@ -367,7 +376,7 @@ int starpu_mpi_isend_detached(starpu_data_handle_t data_handle,
 			      int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 {
 	_STARPU_MPI_LOG_IN();
-	_starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, callback, arg);
+	_starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, callback, arg, 1);
 
 	_STARPU_MPI_LOG_OUT();
 	return 0;
@@ -420,9 +429,9 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_OUT();
 }
 
-static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg)
+static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg, int sequential_consistency)
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W);
+	return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency);
 }
 
 int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
@@ -438,7 +447,7 @@ int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
 		starpu_data_set_tag(data_handle, mpi_tag);
 
 	struct _starpu_mpi_req *req;
-	req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, NULL, NULL);
+	req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, NULL, NULL, 1);
 
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_irecv_common");
 	*public_req = req;
@@ -458,7 +467,15 @@ int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int
 	if (tag == -1)
 		starpu_data_set_tag(data_handle, mpi_tag);
 
-	_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg);
+	_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, 1);
+	_STARPU_MPI_LOG_OUT();
+	return 0;
+}
+
+int starpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg, int sequential_consistency)
+{
+	_STARPU_MPI_LOG_IN();
+	_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, sequential_consistency);
 
 	_STARPU_MPI_LOG_OUT();
 	return 0;
@@ -1230,7 +1247,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 					add_chandle(chandle);
 
 					_STARPU_MPI_DEBUG(3, "Posting internal detached irecv on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
-					chandle->req = _starpu_mpi_irecv_common(chandle->handle, status.MPI_SOURCE, chandle->mpi_tag, MPI_COMM_WORLD, 1, NULL, NULL);
+					chandle->req = _starpu_mpi_irecv_common(chandle->handle, status.MPI_SOURCE, chandle->mpi_tag, MPI_COMM_WORLD, 1, NULL, NULL, 1);
 					chandle->req->is_internal_req = 1;
 
 					// We wait until the request is pushed in the

+ 2 - 0
mpi/src/starpu_mpi_private.h

@@ -144,6 +144,8 @@ LIST_TYPE(_starpu_mpi_req,
 
 	int is_internal_req;
 	struct _starpu_mpi_req *internal_req;
+
+	int sequential_consistency;
 );
 
 #ifdef __cplusplus