Kaynağa Gözat

To try to get priorities to actually do something useful, do not submit all MPI sends at the same time, only 10 at a time (to be tuned...)

Samuel Thibault 7 yıl önce
ebeveyn
işleme
dc689aaf94
1 değiştirilmiş dosya ile 49 ekleme ve 13 silme
  1. 49 13
      mpi/src/starpu_mpi.c

+ 49 - 13
mpi/src/starpu_mpi.c

@@ -42,6 +42,9 @@
 /* Number of ready requests to process before polling for completed requests */
 #define NREADY_PROCESS 10
 
+/* Number of send requests to submit to MPI at the same time */
+#define NDETACHED_SEND 10
+
 static void _starpu_mpi_add_sync_point_in_fxt(void);
 static void _starpu_mpi_submit_ready_request(void *arg);
 static void _starpu_mpi_handle_ready_request(struct _starpu_mpi_req *req);
@@ -62,10 +65,12 @@ static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
 static void _starpu_mpi_early_data_cb(void* arg);
 
 /* The list of ready requests */
-static struct _starpu_mpi_req_prio_list ready_requests;
+static struct _starpu_mpi_req_list ready_recv_requests;
+static struct _starpu_mpi_req_prio_list ready_send_requests;
 
 /* The list of detached requests that have already been submitted to MPI */
 static struct _starpu_mpi_req_list detached_requests;
+static int detached_send_nrequests;
 static starpu_pthread_mutex_t detached_requests_mutex;
 
 /* Condition to wake up progression thread */
@@ -224,7 +229,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 			_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
 					  req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr,
 					  req->datatype_name, (int)req->count, req->registered_datatype);
-			_starpu_mpi_req_prio_list_push_front(&ready_requests, req);
+			_starpu_mpi_req_list_push_front(&ready_recv_requests, req);
 
 			/* inform the starpu mpi thread that the request has been pushed in the ready_requests list */
 			STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
@@ -290,7 +295,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 						STARPU_ASSERT(req->count);
 						_STARPU_MPI_MALLOC(req->ptr, req->count);
 					}
-					_starpu_mpi_req_prio_list_push_front(&ready_requests, req);
+					_starpu_mpi_req_list_push_front(&ready_recv_requests, req);
 					_starpu_mpi_request_destroy(sync_req);
 				}
 				else
@@ -303,7 +308,10 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 	}
 	else
 	{
-		_starpu_mpi_req_prio_list_push_front(&ready_requests, req);
+		if (req->request_type == SEND_REQ)
+			_starpu_mpi_req_prio_list_push_front(&ready_send_requests, req);
+		else
+			_starpu_mpi_req_list_push_front(&ready_recv_requests, req);
 		_STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
 				  req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr,
 				  req->datatype_name, (int)req->count, req->registered_datatype);
@@ -1230,6 +1238,8 @@ static void _starpu_mpi_test_detached_requests(void)
 			_STARPU_MPI_TRACE_COMPLETE_BEGIN(req->request_type, req->node_tag.rank, req->node_tag.data_tag);
 
 			STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
+			if (req->request_type == SEND_REQ)
+				detached_send_nrequests--;
 			_starpu_mpi_req_list_erase(&detached_requests, req);
 			STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
 			_starpu_mpi_handle_request_termination(req);
@@ -1259,6 +1269,8 @@ static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req)
 		/* put the submitted request into the list of pending requests
 		 * so that it can be handled by the progression mechanisms */
 		STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
+		if (req->request_type == SEND_REQ)
+			detached_send_nrequests++;
 		_starpu_mpi_req_list_push_back(&detached_requests, req);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
 
@@ -1343,7 +1355,7 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
 
 	// Handle the request immediatly to make sure the mpi_irecv is
 	// posted before receiving an other envelope
-	_starpu_mpi_req_prio_list_erase(&ready_requests, early_data_handle->req);
+	_starpu_mpi_req_list_erase(&ready_recv_requests, early_data_handle->req);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 	_starpu_mpi_handle_ready_request(early_data_handle->req);
 	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
@@ -1419,13 +1431,13 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
  	int envelope_request_submitted = 0;
 
-	while (running || posted_requests || !(_starpu_mpi_req_prio_list_empty(&ready_requests)) || !(_starpu_mpi_req_list_empty(&detached_requests)))// || !(_starpu_mpi_early_request_count()) || !(_starpu_mpi_sync_data_count()))
+	while (running || posted_requests || !(_starpu_mpi_req_list_empty(&ready_recv_requests)) || !(_starpu_mpi_req_prio_list_empty(&ready_send_requests)) || !(_starpu_mpi_req_list_empty(&detached_requests)))// || !(_starpu_mpi_early_request_count()) || !(_starpu_mpi_sync_data_count()))
 	{
 #ifdef STARPU_SIMGRID
 		starpu_pthread_wait_reset(&wait);
 #endif
 		/* shall we block ? */
-		unsigned block = _starpu_mpi_req_prio_list_empty(&ready_requests) && _starpu_mpi_early_request_count() == 0 && _starpu_mpi_sync_data_count() == 0 && _starpu_mpi_req_list_empty(&detached_requests);
+		unsigned block = _starpu_mpi_req_list_empty(&ready_recv_requests) && _starpu_mpi_req_prio_list_empty(&ready_send_requests) && _starpu_mpi_early_request_count() == 0 && _starpu_mpi_sync_data_count() == 0 && _starpu_mpi_req_list_empty(&detached_requests);
 
 		if (block)
 		{
@@ -1440,17 +1452,38 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 			_STARPU_MPI_TRACE_SLEEP_END();
 		}
 
-		/* get one request */
+		/* get one recv request */
 		int n = 0;
-		while (!_starpu_mpi_req_prio_list_empty(&ready_requests))
+		while (!_starpu_mpi_req_list_empty(&ready_recv_requests))
+		{
+			struct _starpu_mpi_req *req;
+
+			if (n++ == NREADY_PROCESS)
+				/* Already spent some time on submitting ready recv requests, poll before processing more ready recv requests */
+				break;
+
+			req = _starpu_mpi_req_list_pop_back(&ready_recv_requests);
+
+			/* handling a request is likely to block for a while
+			 * (on a sync_data_with_mem call), we want to let the
+			 * application submit requests in the meantime, so we
+			 * release the lock. */
+			STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
+			_starpu_mpi_handle_ready_request(req);
+			STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
+		}
+
+		/* get one send request */
+		n = 0;
+		while (!_starpu_mpi_req_prio_list_empty(&ready_send_requests) && detached_send_nrequests < NDETACHED_SEND)
 		{
 			struct _starpu_mpi_req *req;
 
 			if (n++ == NREADY_PROCESS)
-				/* Already spent some time on submitting ready requests, poll before processing more ready requests */
+				/* Already spent some time on submitting ready send requests, poll before processing more ready send requests */
 				break;
 
-			req = _starpu_mpi_req_prio_list_pop_back(&ready_requests);
+			req = _starpu_mpi_req_prio_list_pop_back(&ready_send_requests);
 
 			/* handling a request is likely to block for a while
 			 * (on a sync_data_with_mem call), we want to let the
@@ -1612,7 +1645,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 #endif
 
 	STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(&detached_requests), "List of detached requests not empty");
-	STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_prio_list_empty(&ready_requests), "List of ready requests not empty");
+	STARPU_MPI_ASSERT_MSG(detached_send_nrequests == 0, "Number of detached send requests not 0");
+	STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(&ready_recv_requests), "List of ready requests not empty");
+	STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_prio_list_empty(&ready_send_requests), "List of ready requests not empty");
 	STARPU_MPI_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
 	_starpu_mpi_early_request_check_termination();
 	_starpu_mpi_early_data_check_termination();
@@ -1674,7 +1709,8 @@ int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv)
         STARPU_PTHREAD_MUTEX_INIT(&progress_mutex, NULL);
         STARPU_PTHREAD_COND_INIT(&progress_cond, NULL);
         STARPU_PTHREAD_COND_INIT(&barrier_cond, NULL);
-	_starpu_mpi_req_prio_list_init(&ready_requests);
+	_starpu_mpi_req_list_init(&ready_recv_requests);
+	_starpu_mpi_req_prio_list_init(&ready_send_requests);
 
         STARPU_PTHREAD_MUTEX_INIT(&detached_requests_mutex, NULL);
 	_starpu_mpi_req_list_init(&detached_requests);