Bläddra i källkod

mpi/src/starpu_mpi.c: store size of the request when known in advance. It avoids having to look in hashmap tables for internal received requests.

Nathalie Furmento 12 år sedan
förälder
incheckning
7bda9c53e1
1 ändrade filer med 44 tillägg och 37 borttagningar
  1. 44 37
      mpi/src/starpu_mpi.c

+ 44 - 37
mpi/src/starpu_mpi.c

@@ -37,7 +37,8 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t dat
 static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle,
 							int source, int mpi_tag, MPI_Comm comm,
 							unsigned detached, void (*callback)(void *), void *arg,
-							int sequential_consistency, int is_internal_req);
+							int sequential_consistency, int is_internal_req,
+							ssize_t psize);
 static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
 
 /* The list of requests that have been newly submitted by the application */
@@ -220,7 +221,7 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
 	req->is_internal_req = 0;
 	req->envelope = NULL;
 	req->sequential_consistency = 1;
- }
+}
 
  /********************************************************/
  /*                                                      */
@@ -234,8 +235,9 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
 							       enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
 							       enum starpu_data_access_mode mode,
 							       int sequential_consistency,
-							       int is_internal_req)
- {
+							       int is_internal_req,
+							       ssize_t psize)
+{
 
 	 _STARPU_MPI_LOG_IN();
 	 struct _starpu_mpi_req *req = malloc(sizeof(struct _starpu_mpi_req));
@@ -256,6 +258,7 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
 	 req->func = func;
 	 req->sequential_consistency = sequential_consistency;
 	 req->is_internal_req = is_internal_req;
+	 req->count = psize;
 
 	 /* Asynchronously request StarPU to fetch the data in main memory: when
 	  * it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
@@ -357,7 +360,7 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t dat
 							unsigned detached, void (*callback)(void *), void *arg,
 							int sequential_consistency)
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, dest, mpi_tag, comm, detached, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R, sequential_consistency, 0);
+	return _starpu_mpi_isend_irecv_common(data_handle, dest, mpi_tag, comm, detached, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R, sequential_consistency, 0, 0);
 }
 
 int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
@@ -432,9 +435,9 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_OUT();
 }
 
-static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req)
+static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, ssize_t psize)
 {
-     return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency, is_internal_req);
+	return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency, is_internal_req, psize);
 }
 
 int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
@@ -450,7 +453,7 @@ int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
 		starpu_data_set_tag(data_handle, mpi_tag);
 
 	struct _starpu_mpi_req *req;
-	req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, NULL, NULL, 1, 0);
+	req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, NULL, NULL, 1, 0, 0);
 
 	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_irecv_common");
 	*public_req = req;
@@ -470,7 +473,7 @@ int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int
 	if (tag == -1)
 		starpu_data_set_tag(data_handle, mpi_tag);
 
-	_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, 1, 0);
+	_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, 1, 0, 0);
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 }
@@ -479,7 +482,7 @@ int starpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_h
 {
 	_STARPU_MPI_LOG_IN();
 
-	_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, sequential_consistency, 0);
+	_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, sequential_consistency, 0, 0);
 
 	_STARPU_MPI_LOG_OUT();
 	return 0;
@@ -892,33 +895,12 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 
 	if (req->request_type == RECV_REQ)
 	{
-		/* test whether the receive request has already been submitted internally by StarPU-MPI*/
-		struct _starpu_mpi_copy_handle *chandle = find_chandle(req->mpi_tag);
-
-		/* Case : the request has already been submitted internally by StarPU.
-		 * We'll asynchronously ask a Read permission over the temporary handle, so as when
-		 * the internal receive will be over, the _starpu_mpi_copy_cb function will be called to
-		 * bring the data back to the original data handle associated to the request.*/
-		if (chandle && (req->data_handle != chandle->handle))
-		{
-			_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->mpi_tag);
-
-			req->internal_req = chandle->req;
-
-			struct _starpu_mpi_copy_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_copy_cb_args));
-			cb_args->data_handle = req->data_handle;
-			cb_args->copy_handle = chandle->handle;
-			cb_args->req = req;
-
-			_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
-			starpu_data_acquire_cb(chandle->handle,STARPU_R,_starpu_mpi_copy_cb,(void*) cb_args);
-		}
 		/* Case : the request is the internal receive request submitted by StarPU-MPI to receive
 		 * incoming data without a matching pending receive already submitted by the application.
 		 * We immediately allocate the pointer associated to the data_handle, and pushing it into
 		 * the list of new_requests, so as the real MPI request can be submitted before the next
 		 * submission of the envelope-catching request. */
-		else if (chandle && (req->data_handle == chandle->handle))
+		if (req->is_internal_req)
 		{
 			_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
 			if (req->user_datatype == 0)
@@ -928,7 +910,7 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 			}
 			else
 			{
-				req->count = chandle->env->psize;
+				STARPU_ASSERT(req->count);
 				req->ptr = malloc(req->count);
 				STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
 			}
@@ -944,11 +926,36 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
 			STARPU_PTHREAD_MUTEX_UNLOCK(&req->posted_mutex);
 			STARPU_PTHREAD_MUTEX_LOCK(&mutex);
 		}
-		/* Case : a classic receive request with no send received earlier than expected.
-		 * We just add the pending receive request to the requests' hashmap. */
 		else
 		{
-			add_app_req(req);
+			/* test whether the receive request has already been submitted internally by StarPU-MPI*/
+			struct _starpu_mpi_copy_handle *chandle = find_chandle(req->mpi_tag);
+
+			/* Case : the request has already been submitted internally by StarPU.
+			 * We'll asynchronously ask a Read permission over the temporary handle, so as when
+			 * the internal receive will be over, the _starpu_mpi_copy_cb function will be called to
+			 * bring the data back to the original data handle associated to the request.*/
+			if (chandle)
+			{
+				_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->mpi_tag);
+				STARPU_ASSERT(req->data_handle != chandle->handle);
+
+				req->internal_req = chandle->req;
+
+				struct _starpu_mpi_copy_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_copy_cb_args));
+				cb_args->data_handle = req->data_handle;
+				cb_args->copy_handle = chandle->handle;
+				cb_args->req = req;
+
+				_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
+				starpu_data_acquire_cb(chandle->handle,STARPU_R,_starpu_mpi_copy_cb,(void*) cb_args);
+			}
+			/* Case : a classic receive request with no send received earlier than expected.
+			 * We just add the pending receive request to the requests' hashmap. */
+			else
+			{
+				add_app_req(req);
+			}
 		}
 	}
 	else
@@ -1249,7 +1256,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 					add_chandle(chandle);
 
 					_STARPU_MPI_DEBUG(3, "Posting internal detached irecv on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
-					chandle->req = _starpu_mpi_irecv_common(chandle->handle, status.MPI_SOURCE, chandle->mpi_tag, MPI_COMM_WORLD, 1, NULL, NULL, 1, 1);
+					chandle->req = _starpu_mpi_irecv_common(chandle->handle, status.MPI_SOURCE, chandle->mpi_tag, MPI_COMM_WORLD, 1, NULL, NULL, 1, 1, recv_env->psize);
 
 					// We wait until the request is pushed in the
 					// new_request list, that ensures that the next loop