Parcourir la source

mpi: Replace req_ready condition with a mutex dedicated to early data

We cannot make _starpu_mpi_submit_ready_request wait for req->req_ready while
_starpu_mpi_receive_early_data is waiting for req->posted: while
_starpu_mpi_submit_ready_request is executing the data acquisitions cannot
proceed.

This replaces it with a mutex dedicated to checking whether an early
request, a sync request, or an early data handle was made, and creating the
dual otherwise. This allows to further move the acquisition of the
progress_mutex later in _starpu_mpi_submit_ready_request.
Samuel Thibault il y a 4 ans
Parent
commit
403c7e6c8e
2 fichiers modifiés avec 29 ajouts et 22 suppressions
  1. 0 1
      mpi/src/mpi/starpu_mpi_early_data.h
  2. 29 21
      mpi/src/mpi/starpu_mpi_mpi.c

+ 0 - 1
mpi/src/mpi/starpu_mpi_early_data.h

@@ -40,7 +40,6 @@ LIST_TYPE(_starpu_mpi_early_data_handle,
 	  void *buffer;
 	  size_t size;
 	  unsigned buffer_node;
-	  int req_ready;
 	  struct _starpu_mpi_node_tag node_tag;
 	  starpu_pthread_mutex_t req_mutex;
 	  starpu_pthread_cond_t req_cond;

+ 29 - 21
mpi/src/mpi/starpu_mpi_mpi.c

@@ -81,6 +81,11 @@ static starpu_pthread_t progress_thread;
 #endif
 static int running = 0;
 
+/* Provides synchronization between an early request, a sync request, and an early data handle:
+ * we keep it held while checking and posting one to prevent the other.
+ * This is to be taken always before the progress_mutex. */
+static starpu_pthread_mutex_t early_data_mutex;
+
 /* Driver taken by StarPU-MPI to process tasks when there is no requests to
  * handle instead of polling endlessly */
 static struct starpu_driver *mpi_driver = NULL;
@@ -208,7 +213,6 @@ void _starpu_mpi_submit_ready_request(void *arg)
 			_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %"PRIi64" src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
 					  req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.node.rank, req->data_handle, req->ptr,
 					  req->datatype_name, (int)req->count, req->registered_datatype);
-
 			_starpu_mpi_req_list_push_front(&ready_recv_requests, req);
 			_STARPU_MPI_INC_READY_REQUESTS(+1);
 
@@ -218,23 +222,19 @@ void _starpu_mpi_submit_ready_request(void *arg)
 		}
 		else
 		{
-			STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
+			STARPU_PTHREAD_MUTEX_LOCK(&early_data_mutex);
 			/* test whether some data with the given tag and source have already been received by StarPU-MPI*/
 			struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(&req->node_tag);
 
 			if (early_data_handle)
 			{
+				/* Got the early_data_handle */
+				STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_mutex);
+
 				/* Case: a receive request for a data with the given tag and source has already been
 				 * posted to MPI by StarPU. Asynchronously requests a Read permission over the temporary handle ,
 				 * so as when the internal receive is completed, the _starpu_mpi_early_data_cb function
 				 * will be called to bring the data back to the original data handle associated to the request.*/
-				STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
-				STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req_mutex));
-				while (!(early_data_handle->req_ready))
-					STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req_cond), &(early_data_handle->req_mutex));
-				STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req_mutex));
-				STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
-
 				_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %"PRIi64" has already been received, copying previously received data into handle's pointer..\n", req, req->node_tag.data_tag);
 				STARPU_ASSERT(req->data_handle != early_data_handle->handle);
 
@@ -251,7 +251,6 @@ void _starpu_mpi_submit_ready_request(void *arg)
 				cb_args->req = req;
 
 				_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
-				STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 				// FIXME: when buffer == NULL, do not hardcode acquiring on early_data_handle->buffer_node, to just acquire where the data happens to have been stored by MPI
 				starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(early_data_handle->handle,early_data_handle->buffer_node,STARPU_R,NULL,_starpu_mpi_early_data_cb,(void*) cb_args,  1, 0, NULL, NULL);
 				STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
@@ -262,6 +261,8 @@ void _starpu_mpi_submit_ready_request(void *arg)
 				_STARPU_MPI_DEBUG(3, "----------> Looking for sync data for tag %"PRIi64" and src %d = %p\n", req->node_tag.data_tag, req->node_tag.node.rank, sync_req);
 				if (sync_req)
 				{
+					/* Got the sync req */
+					STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_mutex);
 					/* Case: we already received the send envelope, we can proceed with the receive */
 					req->sync = 1;
 					_starpu_mpi_datatype_allocate(req->data_handle, req);
@@ -276,6 +277,7 @@ void _starpu_mpi_submit_ready_request(void *arg)
 						STARPU_ASSERT(req->count);
 						req->ptr = (void *)starpu_malloc_on_node_flags(req->node, req->count, 0);
 					}
+					STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 					_starpu_mpi_req_list_push_front(&ready_recv_requests, req);
 					_STARPU_MPI_INC_READY_REQUESTS(+1);
 					/* Throw away the dumb request that was only used to know that we got the envelope */
@@ -285,7 +287,10 @@ void _starpu_mpi_submit_ready_request(void *arg)
 				{
 					/* Case: no matching data has been received. Store the receive request as an early_request. */
 					_STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %"PRIi64") into the request hashmap\n", req, req->node_tag.node.rank, req->node_tag.data_tag);
+					STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 					_starpu_mpi_early_request_enqueue(req);
+					/* We have queued our early request, we can let the progression thread look at it */
+					STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_mutex);
 				}
 			}
 		}
@@ -1187,22 +1192,15 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
 	early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
 							  early_data_handle->node_tag.data_tag, comm, 1, 0,
 							  NULL, NULL, 1, 1, envelope->size);
-	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
+	/* The early data handle is ready, we can let _starpu_mpi_submit_ready_request
+	 * proceed with acquiring it */
+	STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_mutex);
 
+	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 	// We wait until the request is pushed in the
 	// ready_request list
 	while (!(early_data_handle->req->posted))
 		STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req->backend->posted_cond), &progress_mutex);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
-
-#ifdef STARPU_DEVEL
-#warning check if req_ready is still necessary
-#endif
-	STARPU_PTHREAD_MUTEX_LOCK(&early_data_handle->req_mutex);
-	early_data_handle->req_ready = 1;
-	STARPU_PTHREAD_COND_BROADCAST(&early_data_handle->req_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_handle->req_mutex);
-	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 
 	// Handle the request immediatly to make sure the mpi_irecv is
 	// posted before receiving an other envelope
@@ -1415,6 +1413,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 				{
 					_STARPU_MPI_DEBUG(3, "Searching for application request with tag %"PRIi64" and source %d (size %ld)\n", envelope->data_tag, envelope_status.MPI_SOURCE, envelope->size);
 
+					STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
+					STARPU_PTHREAD_MUTEX_LOCK(&early_data_mutex);
+					STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
 					struct _starpu_mpi_req *early_request = _starpu_mpi_early_request_dequeue(envelope->data_tag, envelope_status.MPI_SOURCE, envelope_comm);
 
 					/* Case: a data will arrive before a matching receive is
@@ -1447,9 +1448,12 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 							new_req->backend->is_internal_req = 0; // ????
 							new_req->count = envelope->size;
 							_starpu_mpi_sync_data_add(new_req);
+							/* We have queued our sync request, we can let _starpu_mpi_submit_ready_request find it */
+							STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_mutex);
 						}
 						else
 						{
+							/* This will release early_data_mutex when appropriate */
 							_starpu_mpi_receive_early_data(envelope, envelope_status, envelope_comm);
 						}
 					}
@@ -1460,6 +1464,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 					 * _starpu_mpi_handle_ready_request. */
 					else
 					{
+						/* Got the early request */
+						STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_mutex);
 						_STARPU_MPI_DEBUG(2000, "A matching application request has been found for the incoming data with tag %"PRIi64"\n", envelope->data_tag);
 						_STARPU_MPI_DEBUG(2000, "Request sync %d\n", envelope->sync);
 
@@ -1615,6 +1621,7 @@ static void _starpu_mpi_add_sync_point_in_fxt(void)
 int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv)
 {
         STARPU_PTHREAD_MUTEX_INIT(&progress_mutex, NULL);
+        STARPU_PTHREAD_MUTEX_INIT(&early_data_mutex, NULL);
         STARPU_PTHREAD_COND_INIT(&progress_cond, NULL);
         STARPU_PTHREAD_COND_INIT(&barrier_cond, NULL);
 	_starpu_mpi_req_list_init(&ready_recv_requests);
@@ -1682,6 +1689,7 @@ void _starpu_mpi_progress_shutdown(void **value)
         STARPU_PTHREAD_MUTEX_DESTROY(&mutex_posted_requests);
         STARPU_PTHREAD_MUTEX_DESTROY(&mutex_ready_requests);
         STARPU_PTHREAD_MUTEX_DESTROY(&progress_mutex);
+        STARPU_PTHREAD_MUTEX_DESTROY(&early_data_mutex);
         STARPU_PTHREAD_COND_DESTROY(&barrier_cond);
 }