Parcourir la source

mpi: Fix allocating req->ptr

The unpack method will use starpu_free_on_node, and
_starpu_mpi_handle_request_termination uses it as well, so allocation
should also use that.
Samuel Thibault il y a 4 ans
Parent
commit
227554faa2
2 fichiers modifiés avec 13 ajouts et 9 suppressions
  1. 12 8
      mpi/src/mpi/starpu_mpi_mpi.c
  2. 1 1
      mpi/src/nmad/starpu_mpi_nmad.c

+ 12 - 8
mpi/src/mpi/starpu_mpi_mpi.c

@@ -203,7 +203,7 @@ void _starpu_mpi_submit_ready_request(void *arg)
 			else
 			{
 				STARPU_ASSERT(req->count);
-				_STARPU_MPI_MALLOC(req->ptr, req->count);
+				req->ptr = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, req->count, 0);
 			}
 
 			_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %"PRIi64" src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
@@ -225,12 +225,12 @@ void _starpu_mpi_submit_ready_request(void *arg)
 			/* test whether some data with the given tag and source have already been received by StarPU-MPI*/
 			struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(&req->node_tag);
 
-			/* Case: a receive request for a data with the given tag and source has already been
-			 * posted by StarPU. Asynchronously requests a Read permission over the temporary handle ,
-			 * so as when the internal receive is completed, the _starpu_mpi_early_data_cb function
-			 * will be called to bring the data back to the original data handle associated to the request.*/
 			if (early_data_handle)
 			{
+				/* Case: a receive request for a data with the given tag and source has already been
+				 * posted to MPI by StarPU. Asynchronously requests a Read permission over the temporary handle ,
+				 * so as when the internal receive is completed, the _starpu_mpi_early_data_cb function
+				 * will be called to bring the data back to the original data handle associated to the request.*/
 				STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 				STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req_mutex));
 				while (!(early_data_handle->req_ready))
@@ -257,13 +257,13 @@ void _starpu_mpi_submit_ready_request(void *arg)
 				starpu_data_acquire_on_node_cb(early_data_handle->handle,STARPU_MAIN_RAM,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args);
 				STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 			}
-			/* Case: no matching data has been received. Store the receive request as an early_request. */
 			else
 			{
 				struct _starpu_mpi_req *sync_req = _starpu_mpi_sync_data_find(req->node_tag.data_tag, req->node_tag.node.rank, req->node_tag.node.comm);
 				_STARPU_MPI_DEBUG(3, "----------> Looking for sync data for tag %"PRIi64" and src %d = %p\n", req->node_tag.data_tag, req->node_tag.node.rank, sync_req);
 				if (sync_req)
 				{
+					/* Case: we already received the send envelope, we can proceed with the receive */
 					req->sync = 1;
 					_starpu_mpi_datatype_allocate(req->data_handle, req);
 					if (req->registered_datatype == 1)
@@ -275,14 +275,16 @@ void _starpu_mpi_submit_ready_request(void *arg)
 					{
 						req->count = sync_req->count;
 						STARPU_ASSERT(req->count);
-						_STARPU_MPI_MALLOC(req->ptr, req->count);
+						req->ptr = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, req->count, 0);
 					}
 					_starpu_mpi_req_list_push_front(&ready_recv_requests, req);
 					_STARPU_MPI_INC_READY_REQUESTS(+1);
+					/* Throw away the dumb request that was only used to know that we got the envelope */
 					_starpu_mpi_request_destroy(sync_req);
 				}
 				else
 				{
+					/* Case: no matching data has been received. Store the receive request as an early_request. */
 					_STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %"PRIi64") into the request hashmap\n", req, req->node_tag.node.rank, req->node_tag.data_tag);
 					_starpu_mpi_early_request_enqueue(req);
 				}
@@ -908,6 +910,8 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 	_STARPU_MPI_LOG_OUT();
 }
 
+/* This is called when the data is now received in the early data handle, we can
+ * now copy it over to the real handle. */
 static void _starpu_mpi_early_data_cb(void* arg)
 {
 	struct _starpu_mpi_early_data_cb_args *args = arg;
@@ -1448,7 +1452,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 						else
 						{
 							early_request->count = envelope->size;
-							_STARPU_MPI_MALLOC(early_request->ptr, early_request->count);
+							early_request->ptr = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, early_request->count, 0);
 							starpu_memory_allocate(STARPU_MAIN_RAM, early_request->count, STARPU_MEMORY_OVERFLOW);
 
 							STARPU_MPI_ASSERT_MSG(early_request->ptr, "cannot allocate message of size %ld\n", early_request->count);

+ 1 - 1
mpi/src/nmad/starpu_mpi_nmad.c

@@ -344,7 +344,7 @@ void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_ev
 				// req->ptr is freed by starpu_data_unpack
 				starpu_data_unpack(req->data_handle, req->ptr, req->count);
 			else
-				free(req->ptr);
+				starpu_free_on_node_flags(STARPU_MAIN_RAM, (uintptr_t) req->ptr, req->count, 0);
 		}
 		else
 		{