瀏覽代碼

nmad,mpi: yet another step towards code merge

Nathalie Furmento 7 年之前
父節點
當前提交
db7a99b5ae
共有 6 個文件被更改,包括 149 次插入120 次删除
  1. 109 0
      mpi/src/mpi/starpu_mpi_mpi.c
  2. 11 104
      mpi/src/starpu_mpi.c
  3. 2 2
      mpi/src/starpu_mpi_private.h
  4. 13 0
      nmad/src/nmad/starpu_mpi_nmad.c
  5. 12 12
      nmad/src/starpu_mpi.c
  6. 2 2
      nmad/src/starpu_mpi_private.h

+ 109 - 0
mpi/src/mpi/starpu_mpi_mpi.c

@@ -640,6 +640,52 @@ void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
 	_STARPU_MPI_LOG_OUT();
 }
 
+int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
+{
+	int ret;
+	struct _starpu_mpi_req *req = *public_req;
+	struct _starpu_mpi_req *waiting_req;
+
+	_STARPU_MPI_LOG_IN();
+
+	/* We cannot try to complete a MPI request that was not actually posted
+	 * to MPI yet. */
+	STARPU_PTHREAD_MUTEX_LOCK(&(req->req_mutex));
+	while (!(req->submitted))
+		STARPU_PTHREAD_COND_WAIT(&(req->req_cond), &(req->req_mutex));
+	STARPU_PTHREAD_MUTEX_UNLOCK(&(req->req_mutex));
+
+	/* Initialize the request structure */
+	 _starpu_mpi_request_init(&waiting_req);
+	waiting_req->prio = INT_MAX;
+	waiting_req->status = status;
+	waiting_req->other_request = req;
+	waiting_req->func = _starpu_mpi_wait_func;
+	waiting_req->request_type = WAIT_REQ;
+
+	_starpu_mpi_submit_ready_request_inc(waiting_req);
+
+	/* We wait for the MPI request to finish */
+	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
+	while (!req->completed)
+		STARPU_PTHREAD_COND_WAIT(&req->req_cond, &req->req_mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
+
+	ret = req->ret;
+
+	/* The internal request structure was automatically allocated */
+	*public_req = NULL;
+	if (req->internal_req)
+	{
+		_starpu_mpi_request_destroy(req->internal_req);
+	}
+	_starpu_mpi_request_destroy(req);
+	_starpu_mpi_request_destroy(waiting_req);
+
+	_STARPU_MPI_LOG_OUT();
+	return ret;
+}
+
 /********************************************************/
 /*                                                      */
 /*  Test functionalities                                */
@@ -682,6 +728,69 @@ void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
 	_STARPU_MPI_LOG_OUT();
 }
 
+int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
+{
+	_STARPU_MPI_LOG_IN();
+	int ret = 0;
+
+	STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_test needs a valid starpu_mpi_req");
+
+	struct _starpu_mpi_req *req = *public_req;
+
+	STARPU_MPI_ASSERT_MSG(!req->detached, "MPI_Test cannot be called on a detached request");
+
+	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
+	unsigned submitted = req->submitted;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
+
+	if (submitted)
+	{
+		struct _starpu_mpi_req *testing_req;
+
+		/* Initialize the request structure */
+		_starpu_mpi_request_init(&testing_req);
+		testing_req->prio = INT_MAX;
+		testing_req->flag = flag;
+		testing_req->status = status;
+		testing_req->other_request = req;
+		testing_req->func = _starpu_mpi_test_func;
+		testing_req->completed = 0;
+		testing_req->request_type = TEST_REQ;
+
+		_starpu_mpi_submit_ready_request_inc(testing_req);
+
+		/* We wait for the test request to finish */
+		STARPU_PTHREAD_MUTEX_LOCK(&(testing_req->req_mutex));
+		while (!(testing_req->completed))
+			STARPU_PTHREAD_COND_WAIT(&(testing_req->req_cond), &(testing_req->req_mutex));
+		STARPU_PTHREAD_MUTEX_UNLOCK(&(testing_req->req_mutex));
+
+		ret = testing_req->ret;
+
+		if (*(testing_req->flag))
+		{
+			/* The request was completed so we free the internal
+			 * request structure which was automatically allocated
+			 * */
+			*public_req = NULL;
+			if (req->internal_req)
+			{
+				_starpu_mpi_request_destroy(req->internal_req);
+			}
+			_starpu_mpi_request_destroy(req);
+		}
+
+		_starpu_mpi_request_destroy(testing_req);
+	}
+	else
+	{
+		*flag = 0;
+	}
+
+	_STARPU_MPI_LOG_OUT();
+	return ret;
+}
+
 /********************************************************/
 /*                                                      */
 /*  Barrier functionalities                             */

+ 11 - 104
mpi/src/starpu_mpi.c

@@ -184,6 +184,7 @@ int starpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_h
 int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int data_tag, MPI_Comm comm, MPI_Status *status)
 {
 	starpu_mpi_req req;
+
 	_STARPU_MPI_LOG_IN();
 
 	starpu_mpi_irecv(data_handle, &req, source, data_tag, comm);
@@ -195,122 +196,24 @@ int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int data_tag,
 
 int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 {
-	int ret;
-	struct _starpu_mpi_req *req = *public_req;
-	struct _starpu_mpi_req *waiting_req;
-
-	_STARPU_MPI_LOG_IN();
-
-	/* We cannot try to complete a MPI request that was not actually posted
-	 * to MPI yet. */
-	STARPU_PTHREAD_MUTEX_LOCK(&(req->req_mutex));
-	while (!(req->submitted))
-		STARPU_PTHREAD_COND_WAIT(&(req->req_cond), &(req->req_mutex));
-	STARPU_PTHREAD_MUTEX_UNLOCK(&(req->req_mutex));
-
-	/* Initialize the request structure */
-	 _starpu_mpi_request_init(&waiting_req);
-	waiting_req->prio = INT_MAX;
-	waiting_req->status = status;
-	waiting_req->other_request = req;
-	waiting_req->func = _starpu_mpi_wait_func;
-	waiting_req->request_type = WAIT_REQ;
-
-	_starpu_mpi_submit_ready_request_inc(waiting_req);
-
-	/* We wait for the MPI request to finish */
-	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
-	while (!req->completed)
-		STARPU_PTHREAD_COND_WAIT(&req->req_cond, &req->req_mutex);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
-
-	ret = req->ret;
-
-	/* The internal request structure was automatically allocated */
-	*public_req = NULL;
-	if (req->internal_req)
-	{
-		_starpu_mpi_request_destroy(req->internal_req);
-	}
-	_starpu_mpi_request_destroy(req);
-	_starpu_mpi_request_destroy(waiting_req);
-
-	_STARPU_MPI_LOG_OUT();
-	return ret;
+	return _starpu_mpi_wait(public_req, status);
 }
 
 int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 {
-	_STARPU_MPI_LOG_IN();
-	int ret = 0;
-
-	STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_test needs a valid starpu_mpi_req");
-
-	struct _starpu_mpi_req *req = *public_req;
-
-	STARPU_MPI_ASSERT_MSG(!req->detached, "MPI_Test cannot be called on a detached request");
-
-	STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
-	unsigned submitted = req->submitted;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
-
-	if (submitted)
-	{
-		struct _starpu_mpi_req *testing_req;
-
-		/* Initialize the request structure */
-		_starpu_mpi_request_init(&testing_req);
-		testing_req->prio = INT_MAX;
-		testing_req->flag = flag;
-		testing_req->status = status;
-		testing_req->other_request = req;
-		testing_req->func = _starpu_mpi_test_func;
-		testing_req->completed = 0;
-		testing_req->request_type = TEST_REQ;
-
-		_starpu_mpi_submit_ready_request_inc(testing_req);
-
-		/* We wait for the test request to finish */
-		STARPU_PTHREAD_MUTEX_LOCK(&(testing_req->req_mutex));
-		while (!(testing_req->completed))
-			STARPU_PTHREAD_COND_WAIT(&(testing_req->req_cond), &(testing_req->req_mutex));
-		STARPU_PTHREAD_MUTEX_UNLOCK(&(testing_req->req_mutex));
-
-		ret = testing_req->ret;
-
-		if (*(testing_req->flag))
-		{
-			/* The request was completed so we free the internal
-			 * request structure which was automatically allocated
-			 * */
-			*public_req = NULL;
-			if (req->internal_req)
-			{
-				_starpu_mpi_request_destroy(req->internal_req);
-			}
-			_starpu_mpi_request_destroy(req);
-		}
-
-		_starpu_mpi_request_destroy(testing_req);
-	}
-	else
-	{
-		*flag = 0;
-	}
-
-	_STARPU_MPI_LOG_OUT();
-	return ret;
+	return _starpu_mpi_test(public_req, flag, status);
 }
 
 int starpu_mpi_barrier(MPI_Comm comm)
 {
-	_starpu_mpi_barrier(comm);
-	return 0;
+	return _starpu_mpi_barrier(comm);
 }
 
 void _starpu_mpi_data_clear(starpu_data_handle_t data_handle)
 {
+#if defined(STARPU_MPI_MPI)
 	_starpu_mpi_tag_data_release(data_handle);
+#endif
 	_starpu_mpi_cache_data_clear(data_handle);
 	free(data_handle->mpi_data);
 }
@@ -330,8 +233,10 @@ void starpu_mpi_data_register_comm(starpu_data_handle_t data_handle, int tag, in
 		mpi_data->node_tag.rank = -1;
 		mpi_data->node_tag.comm = MPI_COMM_WORLD;
 		data_handle->mpi_data = mpi_data;
-		_starpu_mpi_cache_data_init(data_handle);
+#if defined(STARPU_MPI_MPI)
 		_starpu_mpi_tag_data_register(data_handle, tag);
+#endif
+		_starpu_mpi_cache_data_init(data_handle);
 		_starpu_data_set_unregister_hook(data_handle, _starpu_mpi_data_clear);
 	}
 
@@ -344,7 +249,9 @@ void starpu_mpi_data_register_comm(starpu_data_handle_t data_handle, int tag, in
 		_STARPU_MPI_TRACE_DATA_SET_RANK(data_handle, rank);
 		mpi_data->node_tag.rank = rank;
 		mpi_data->node_tag.comm = comm;
+#if defined(STARPU_MPI_MPI)
 		_starpu_mpi_comm_register(comm);
+#endif
 	}
 }
 

+ 2 - 2
mpi/src/starpu_mpi_private.h

@@ -303,8 +303,8 @@ void _starpu_mpi_request_init(struct _starpu_mpi_req **req);
 void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req);
 void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req);
 void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req);
-void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req);
-void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req);
+int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status);
+int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status);
 int _starpu_mpi_barrier(MPI_Comm comm);
 
 struct _starpu_mpi_argc_argv

+ 13 - 0
nmad/src/nmad/starpu_mpi_nmad.c

@@ -428,6 +428,19 @@ int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 /*                                                      */
 /********************************************************/
 
+int _starpu_mpi_barrier(MPI_Comm comm)
+{
+	_STARPU_MPI_LOG_IN();
+	int ret;
+	//	STARPU_ASSERT_MSG(!barrier_running, "Concurrent starpu_mpi_barrier is not implemented, even on different communicators");
+	ret = MPI_Barrier(comm);
+
+	STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Barrier returning %d", ret);
+
+	_STARPU_MPI_LOG_OUT();
+	return ret;
+}
+
 /********************************************************/
 /*                                                      */
 /*  Progression                                         */

+ 12 - 12
nmad/src/starpu_mpi.c

@@ -186,6 +186,7 @@ int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int data_tag,
 	starpu_mpi_req req;
 
 	_STARPU_MPI_LOG_IN();
+
 	starpu_mpi_irecv(data_handle, &req, source, data_tag, comm);
 	starpu_mpi_wait(&req, status);
 
@@ -193,13 +194,11 @@ int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int data_tag,
 	return 0;
 }
 
-extern int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status);
 int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 {
 	return _starpu_mpi_wait(public_req, status);
 }
 
-extern int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status);
 int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 {
 	return _starpu_mpi_test(public_req, flag, status);
@@ -207,19 +206,14 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 
 int starpu_mpi_barrier(MPI_Comm comm)
 {
-	_STARPU_MPI_LOG_IN();
-	int ret;
-	//	STARPU_ASSERT_MSG(!barrier_running, "Concurrent starpu_mpi_barrier is not implemented, even on different communicators");
-	ret = MPI_Barrier(comm);
-
-	STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Barrier returning %d", ret);
-
-	_STARPU_MPI_LOG_OUT();
-	return ret;
+	return _starpu_mpi_barrier(comm);
 }
 
 void _starpu_mpi_data_clear(starpu_data_handle_t data_handle)
 {
+#if defined(STARPU_MPI_MPI)
+	_starpu_mpi_tag_data_release(data_handle);
+#endif
 	_starpu_mpi_cache_data_clear(data_handle);
 	free(data_handle->mpi_data);
 }
@@ -239,6 +233,9 @@ void starpu_mpi_data_register_comm(starpu_data_handle_t data_handle, int tag, in
 		mpi_data->node_tag.rank = -1;
 		mpi_data->node_tag.comm = MPI_COMM_WORLD;
 		data_handle->mpi_data = mpi_data;
+#if defined(STARPU_MPI_MPI)
+		_starpu_mpi_tag_data_register(data_handle, tag);
+#endif
 		_starpu_mpi_cache_data_init(data_handle);
 		_starpu_data_set_unregister_hook(data_handle, _starpu_mpi_data_clear);
 	}
@@ -252,6 +249,9 @@ void starpu_mpi_data_register_comm(starpu_data_handle_t data_handle, int tag, in
 		_STARPU_MPI_TRACE_DATA_SET_RANK(data_handle, rank);
 		mpi_data->node_tag.rank = rank;
 		mpi_data->node_tag.comm = comm;
+#if defined(STARPU_MPI_MPI)
+		_starpu_mpi_comm_register(comm);
+#endif
 	}
 }
 
@@ -399,7 +399,7 @@ int starpu_mpi_wait_for_all(MPI_Comm comm)
 	while (task || mpi)
 	{
 		task = _starpu_task_wait_for_all_and_return_nb_waited_tasks();
-		mpi = starpu_mpi_barrier(comm);
+		mpi = _starpu_mpi_barrier(comm);
 	}
 	return 0;
 }

+ 2 - 2
nmad/src/starpu_mpi_private.h

@@ -303,8 +303,8 @@ void _starpu_mpi_request_init(struct _starpu_mpi_req **req);
 void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req);
 void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req);
 void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req);
-void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req);
-void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req);
+int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status);
+int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status);
 int _starpu_mpi_barrier(MPI_Comm comm);
 
 struct _starpu_mpi_argc_argv