浏览代码

nmad: decrement number of pending requests just after destroying a request

Philippe SWARTVAGHER 4 年之前
父节点
当前提交
72e39acbfe
共有 1 个文件被更改,包括 25 次插入19 次删除
  1. 25 19
      mpi/src/nmad/starpu_mpi_nmad.c

+ 25 - 19
mpi/src/nmad/starpu_mpi_nmad.c

@@ -48,6 +48,7 @@ char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type);
 #endif
 
 void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req);
+static inline void _starpu_mpi_request_end(struct _starpu_mpi_req* req, int post_callback_sem);
 
 #ifdef STARPU_USE_FXT
 static void _starpu_mpi_add_sync_point_in_fxt(void);
@@ -229,8 +230,9 @@ int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 	if (status!=MPI_STATUS_IGNORE)
 		_starpu_mpi_req_status(req,status);
 
-	_starpu_mpi_request_destroy(req);
+	_starpu_mpi_request_end(req, 1);
 	*public_req = NULL;
+
 	_STARPU_MPI_LOG_OUT();
 	return MPI_SUCCESS;
 }
@@ -264,7 +266,7 @@ int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 
 	if(*flag)
 	{
-		_starpu_mpi_request_destroy(req);
+		_starpu_mpi_request_end(req, 1);
 		*public_req = NULL;
 	}
 	_STARPU_MPI_LOG_OUT();
@@ -334,6 +336,25 @@ char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type)
 }
 #endif
 
+static inline void _starpu_mpi_request_end(struct _starpu_mpi_req* req, int post_callback_sem)
+{
+	/* Destroying a request and decrementing the number of pending requests
+	 * should be done together, so let's wrap these two things in a
+	 * function. This means instead of calling _starpu_mpi_request_destroy(),
+	 * you should call this function. */
+	_starpu_mpi_request_destroy(req);
+
+	int pending_remaining = STARPU_ATOMIC_ADD(&nb_pending_requests, -1);
+	if (!pending_remaining)
+	{
+		STARPU_PTHREAD_COND_BROADCAST(&mpi_wait_for_all_running_cond);
+		if (post_callback_sem && !running)
+		{
+			starpu_sem_post(&callback_sem);
+		}
+	}
+}
+
 void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req* req)
 {
 	_STARPU_MPI_LOG_IN();
@@ -394,8 +415,8 @@ void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req* req)
 	{
 		if(req->detached)
 		{
-			_starpu_mpi_request_destroy(req);
 			// a detached request wont be wait/test (and freed inside).
+			_starpu_mpi_request_end(req, 1);
 		}
 		else
 		{
@@ -404,13 +425,6 @@ void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req* req)
 			req->completed = 1;
 			piom_cond_signal(&req->backend->req_cond, REQ_FINALIZED);
 		}
-		int pending_remaining = STARPU_ATOMIC_ADD(&nb_pending_requests, -1);
-		if (!pending_remaining)
-		{
-			STARPU_PTHREAD_COND_BROADCAST(&mpi_wait_for_all_running_cond);
-			if (!running)
-				starpu_sem_post(&callback_sem);
-		}
 	}
 	_STARPU_MPI_LOG_OUT();
 }
@@ -555,11 +569,10 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 			}
 		}
 
-
 		c->req->callback(c->req->callback_arg);
 		if (c->req->detached)
 		{
-			_starpu_mpi_request_destroy(c->req);
+			_starpu_mpi_request_end(c->req, 0);
 		}
 		else
 		{
@@ -567,13 +580,6 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 			piom_cond_signal(&(c->req->backend->req_cond), REQ_FINALIZED);
 		}
 
-		/* we signal that the request is completed.*/
-		int pending_remaining = STARPU_ATOMIC_ADD(&nb_pending_requests, -1);
-		if (!pending_remaining)
-		{
-			STARPU_PTHREAD_COND_BROADCAST(&mpi_wait_for_all_running_cond);
-		}
-
 		free(c);
 	}