Просмотр исходного кода

mpi: put all implementation specific functions in the backend interface

Nathalie Furmento лет назад: 5
Родитель
Сommit
d78d15b29e

+ 2 - 0
mpi/src/Makefile.am

@@ -68,6 +68,7 @@ noinst_HEADERS =					\
 	starpu_mpi_cache_stats.h			\
 	starpu_mpi_task_insert.h			\
 	starpu_mpi_init.h				\
+	mpi/starpu_mpi_mpi.h				\
 	mpi/starpu_mpi_early_data.h			\
 	mpi/starpu_mpi_early_request.h			\
 	mpi/starpu_mpi_sync_data.h			\
@@ -77,6 +78,7 @@ noinst_HEADERS =					\
 	mpi/starpu_mpi_mpi_backend.h			\
 	nmad/starpu_mpi_nmad_backend.h			\
 	nmad/starpu_mpi_nmad_unknown_datatype.h		\
+	nmad/starpu_mpi_nmad.h				\
 	load_balancer/policy/data_movements_interface.h	\
 	load_balancer/policy/load_data_interface.h	\
 	load_balancer/policy/load_balancer_policy.h

+ 53 - 0
mpi/src/mpi/starpu_mpi_mpi.h

@@ -0,0 +1,53 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2020       Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_MPI_MPI_H__
+#define __STARPU_MPI_MPI_H__
+
+#include <starpu.h>
+#include <stdlib.h>
+#include <mpi.h>
+#include <common/config.h>
+#include <common/list.h>
+
+#ifdef STARPU_USE_MPI_MPI
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv);
+void _starpu_mpi_progress_shutdown(void **value);
+
+#ifdef STARPU_SIMGRID
+void _starpu_mpi_wait_for_initialization();
+#endif
+
+int _starpu_mpi_barrier(MPI_Comm comm);
+int _starpu_mpi_wait_for_all(MPI_Comm comm);
+int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status);
+int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status);
+
+void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req);
+void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* STARPU_USE_MPI_MPI */
+#endif /* __STARPU_MPI_MPI_H__ */

+ 16 - 1
mpi/src/mpi/starpu_mpi_mpi_backend.c

@@ -27,6 +27,7 @@
 #include <mpi/starpu_mpi_comm.h>
 #include <mpi/starpu_mpi_tag.h>
 #include <mpi/starpu_mpi_driver.h>
+#include <mpi/starpu_mpi_mpi.h>
 
 void _starpu_mpi_mpi_backend_init(struct starpu_conf *conf)
 {
@@ -109,7 +110,21 @@ struct _starpu_mpi_backend _mpi_backend =
 	._starpu_mpi_backend_request_destroy = _starpu_mpi_mpi_backend_request_destroy,
 	._starpu_mpi_backend_data_clear = _starpu_mpi_mpi_backend_data_clear,
 	._starpu_mpi_backend_data_register = _starpu_mpi_mpi_backend_data_register,
-	._starpu_mpi_backend_comm_register = _starpu_mpi_mpi_backend_comm_register
+	._starpu_mpi_backend_comm_register = _starpu_mpi_mpi_backend_comm_register,
+
+	._starpu_mpi_backend_progress_init = _starpu_mpi_progress_init,
+	._starpu_mpi_backend_progress_shutdown = _starpu_mpi_progress_shutdown,
+#ifdef STARPU_SIMGRID
+	._starpu_mpi_backend_wait_for_initialization = _starpu_mpi_wait_for_initialization,
+#endif
+
+	._starpu_mpi_backend_barrier = _starpu_mpi_barrier,
+	._starpu_mpi_backend_wait_for_all = _starpu_mpi_wait_for_all,
+	._starpu_mpi_backend_wait = _starpu_mpi_wait,
+	._starpu_mpi_backend_test = _starpu_mpi_test,
+
+	._starpu_mpi_backend_isend_size_func = _starpu_mpi_isend_size_func,
+	._starpu_mpi_backend_irecv_size_func = _starpu_mpi_irecv_size_func,
 };
 
 #endif /* STARPU_USE_MPI_MPI*/

+ 53 - 0
mpi/src/nmad/starpu_mpi_nmad.h

@@ -0,0 +1,53 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2020       Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef __STARPU_MPI_NMAD_H__
+#define __STARPU_MPI_NMAD_H__
+
+#include <starpu.h>
+#include <stdlib.h>
+#include <mpi.h>
+#include <common/config.h>
+#include <common/list.h>
+
+#ifdef STARPU_USE_MPI_NMAD
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv);
+void _starpu_mpi_progress_shutdown(void **value);
+
+//#ifdef STARPU_SIMGRID
+//void _starpu_mpi_wait_for_initialization();
+//#endif
+
+int _starpu_mpi_barrier(MPI_Comm comm);
+int _starpu_mpi_wait_for_all(MPI_Comm comm);
+int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status);
+int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status);
+
+void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req);
+void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* STARPU_USE_MPI_NMAD */
+#endif /* __STARPU_MPI_NMAD_H__ */

+ 16 - 1
mpi/src/nmad/starpu_mpi_nmad_backend.c

@@ -17,6 +17,7 @@
 #include <stdlib.h>
 #include "starpu_mpi_nmad_backend.h"
 #include <starpu_mpi_private.h>
+#include "starpu_mpi_nmad.h"
 
 #ifdef STARPU_USE_MPI_NMAD
 
@@ -91,7 +92,21 @@ struct _starpu_mpi_backend _mpi_backend =
 	._starpu_mpi_backend_request_destroy = _starpu_mpi_nmad_backend_request_destroy,
 	._starpu_mpi_backend_data_clear = _starpu_mpi_nmad_backend_data_clear,
 	._starpu_mpi_backend_data_register = _starpu_mpi_nmad_backend_data_register,
-	._starpu_mpi_backend_comm_register = _starpu_mpi_nmad_backend_comm_register
+	._starpu_mpi_backend_comm_register = _starpu_mpi_nmad_backend_comm_register,
+
+	._starpu_mpi_backend_progress_init = _starpu_mpi_progress_init,
+	._starpu_mpi_backend_progress_shutdown = _starpu_mpi_progress_shutdown,
+//#ifdef STARPU_SIMGRID
+//	._starpu_mpi_backend_wait_for_initialization = _starpu_mpi_wait_for_initialization,
+//#endif
+
+	._starpu_mpi_backend_barrier = _starpu_mpi_barrier,
+	._starpu_mpi_backend_wait_for_all = _starpu_mpi_wait_for_all,
+	._starpu_mpi_backend_wait = _starpu_mpi_wait,
+	._starpu_mpi_backend_test = _starpu_mpi_test,
+
+	._starpu_mpi_backend_isend_size_func = _starpu_mpi_isend_size_func,
+	._starpu_mpi_backend_irecv_size_func = _starpu_mpi_irecv_size_func,
 };
 
 #endif /* STARPU_USE_MPI_NMAD*/

+ 6 - 6
mpi/src/starpu_mpi.c

@@ -70,7 +70,7 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t dat
 	enum starpu_data_access_mode mode = STARPU_R;
 #endif
 
-	struct _starpu_mpi_req *req = _starpu_mpi_request_fill(data_handle, dest, data_tag, comm, detached, sync, prio, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, sequential_consistency, 0, 0);
+	struct _starpu_mpi_req *req = _starpu_mpi_request_fill(data_handle, dest, data_tag, comm, detached, sync, prio, callback, arg, SEND_REQ, _mpi_backend._starpu_mpi_backend_isend_size_func, sequential_consistency, 0, 0);
 	_starpu_mpi_req_willpost(req);
 
 	if (_starpu_mpi_use_coop_sends && detached == 1 && sync == 0 && callback == NULL)
@@ -183,7 +183,7 @@ struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handl
 		return NULL;
 	}
 
-	struct _starpu_mpi_req *req = _starpu_mpi_request_fill(data_handle, source, data_tag, comm, detached, sync, 0, callback, arg, RECV_REQ, _starpu_mpi_irecv_size_func, sequential_consistency, is_internal_req, count);
+	struct _starpu_mpi_req *req = _starpu_mpi_request_fill(data_handle, source, data_tag, comm, detached, sync, 0, callback, arg, RECV_REQ, _mpi_backend._starpu_mpi_backend_irecv_size_func, sequential_consistency, is_internal_req, count);
 	_starpu_mpi_req_willpost(req);
 	_starpu_mpi_isend_irecv_common(req, STARPU_W, sequential_consistency);
 	return req;
@@ -240,17 +240,17 @@ int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, starpu_mpi_tag
 
 int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 {
-	return _starpu_mpi_wait(public_req, status);
+	return _mpi_backend._starpu_mpi_backend_wait(public_req, status);
 }
 
 int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 {
-	return _starpu_mpi_test(public_req, flag, status);
+	return _mpi_backend._starpu_mpi_backend_test(public_req, flag, status);
 }
 
 int starpu_mpi_barrier(MPI_Comm comm)
 {
-	return _starpu_mpi_barrier(comm);
+	return _mpi_backend._starpu_mpi_backend_barrier(comm);
 }
 
 void _starpu_mpi_data_clear(starpu_data_handle_t data_handle)
@@ -437,5 +437,5 @@ void starpu_mpi_data_migrate(MPI_Comm comm, starpu_data_handle_t data, int new_r
 
 int starpu_mpi_wait_for_all(MPI_Comm comm)
 {
-	return _starpu_mpi_wait_for_all(comm);
+	return _mpi_backend._starpu_mpi_backend_wait_for_all(comm);
 }

+ 29 - 3
mpi/src/starpu_mpi_init.c

@@ -92,6 +92,30 @@ void _starpu_mpi_do_initialize(struct _starpu_mpi_argc_argv *argc_argv)
 }
 
 static
+void _starpu_mpi_backend_check()
+{
+	STARPU_ASSERT(_mpi_backend._starpu_mpi_backend_init != NULL);
+	STARPU_ASSERT(_mpi_backend._starpu_mpi_backend_shutdown != NULL);
+	STARPU_ASSERT(_mpi_backend._starpu_mpi_backend_reserve_core != NULL);
+	STARPU_ASSERT(_mpi_backend._starpu_mpi_backend_request_init != NULL);
+	STARPU_ASSERT(_mpi_backend._starpu_mpi_backend_request_fill != NULL);
+	STARPU_ASSERT(_mpi_backend._starpu_mpi_backend_request_destroy != NULL);
+	STARPU_ASSERT(_mpi_backend._starpu_mpi_backend_data_clear != NULL);
+	STARPU_ASSERT(_mpi_backend._starpu_mpi_backend_data_register != NULL);
+	STARPU_ASSERT(_mpi_backend._starpu_mpi_backend_comm_register != NULL);
+	STARPU_ASSERT(_mpi_backend._starpu_mpi_backend_progress_init != NULL);
+	STARPU_ASSERT(_mpi_backend._starpu_mpi_backend_progress_shutdown != NULL);
+#ifdef STARPU_SIMGRID
+	STARPU_ASSERT(_mpi_backend._starpu_mpi_backend_wait_for_initialization != NULL);
+#endif
+	STARPU_ASSERT(_mpi_backend._starpu_mpi_backend_barrier != NULL);
+	STARPU_ASSERT(_mpi_backend._starpu_mpi_backend_wait_for_all != NULL);
+	STARPU_ASSERT(_mpi_backend._starpu_mpi_backend_wait != NULL);
+	STARPU_ASSERT(_mpi_backend._starpu_mpi_backend_test != NULL);
+	STARPU_ASSERT(_mpi_backend._starpu_mpi_backend_isend_size_func != NULL);
+	STARPU_ASSERT(_mpi_backend._starpu_mpi_backend_irecv_size_func != NULL);
+}
+static
 int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi, MPI_Comm comm)
 {
 	struct _starpu_mpi_argc_argv *argc_argv;
@@ -102,13 +126,15 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi, MPI_Comm
 	argc_argv->comm = comm;
 	_starpu_implicit_data_deps_write_hook(_starpu_mpi_data_flush);
 
+	_starpu_mpi_backend_check();
+
 #ifdef STARPU_SIMGRID
 	/* Call MPI_Init_thread as early as possible, to initialize simgrid
 	 * before working with mutexes etc. */
 	_starpu_mpi_do_initialize(argc_argv);
 #endif
 
-	return _starpu_mpi_progress_init(argc_argv);
+	return _mpi_backend._starpu_mpi_backend_progress_init(argc_argv);
 }
 
 #ifdef STARPU_SIMGRID
@@ -127,7 +153,7 @@ int starpu_mpi_init_comm(int *argc, char ***argv, int initialize_mpi, MPI_Comm c
 	(void)argv;
 	(void)initialize_mpi;
 	(void)comm;
-	_starpu_mpi_wait_for_initialization();
+	_mpi_backend._starpu_mpi_backend_wait_for_initialization();
 	return 0;
 #else
 	return _starpu_mpi_initialize(argc, argv, initialize_mpi, comm);
@@ -207,7 +233,7 @@ int starpu_mpi_shutdown(void)
 	starpu_mpi_comm_size(MPI_COMM_WORLD, &world_size);
 
 	/* kill the progression thread */
-	_starpu_mpi_progress_shutdown(&value);
+	_mpi_backend._starpu_mpi_backend_progress_shutdown(&value);
 
 #ifdef STARPU_USE_FXT
 	if (starpu_fxt_is_enabled())

+ 16 - 15
mpi/src/starpu_mpi_private.h

@@ -306,10 +306,7 @@ struct _starpu_mpi_req * _starpu_mpi_request_fill(starpu_data_handle_t data_hand
 						  starpu_ssize_t count);
 
 void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req);
-void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req);
-void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req);
-int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status);
-int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status);
+void _starpu_mpi_data_flush(starpu_data_handle_t data_handle);
 
 struct _starpu_mpi_argc_argv
 {
@@ -323,17 +320,7 @@ struct _starpu_mpi_argc_argv
 	int world_size;
 };
 
-void _starpu_mpi_progress_shutdown(void **value);
-int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv);
-#ifdef STARPU_SIMGRID
-void _starpu_mpi_wait_for_initialization();
-#endif
-void _starpu_mpi_data_flush(starpu_data_handle_t data_handle);
-
-int _starpu_mpi_barrier(MPI_Comm comm);
-int _starpu_mpi_wait_for_all(MPI_Comm comm);
-
-/*
+/**
  * Specific functions to backend implementation
  */
 struct _starpu_mpi_backend
@@ -347,6 +334,20 @@ struct _starpu_mpi_backend
 	void (*_starpu_mpi_backend_data_clear)(starpu_data_handle_t data_handle);
 	void (*_starpu_mpi_backend_data_register)(starpu_data_handle_t data_handle, starpu_mpi_tag_t data_tag);
 	void (*_starpu_mpi_backend_comm_register)(MPI_Comm comm);
+
+	int (*_starpu_mpi_backend_progress_init)(struct _starpu_mpi_argc_argv *argc_argv);
+	void (*_starpu_mpi_backend_progress_shutdown)(void **value);
+#ifdef STARPU_SIMGRID
+	void (*_starpu_mpi_backend_wait_for_initialization)();
+#endif
+
+	int (*_starpu_mpi_backend_barrier)(MPI_Comm comm);
+	int (*_starpu_mpi_backend_wait_for_all)(MPI_Comm comm);
+	int (*_starpu_mpi_backend_wait)(starpu_mpi_req *public_req, MPI_Status *status);
+	int (*_starpu_mpi_backend_test)(starpu_mpi_req *public_req, int *flag, MPI_Status *status);
+
+	void (*_starpu_mpi_backend_isend_size_func)(struct _starpu_mpi_req *req);
+	void (*_starpu_mpi_backend_irecv_size_func)(struct _starpu_mpi_req *req);
 };
 
 extern struct _starpu_mpi_backend _mpi_backend;