Browse Source

nmad,mpi: another step towards code merge

Nathalie Furmento 7 years ago
parent
commit
4c63e4c66a

+ 2 - 2
mpi/src/mpi/starpu_mpi_mpi.c

@@ -561,7 +561,7 @@ void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 /*                                                      */
 /********************************************************/
 
-void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
+void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
 {
 	_STARPU_MPI_LOG_IN();
 
@@ -1309,7 +1309,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 							new_req->sync = 1;
 							new_req->callback = NULL;
 							new_req->callback_arg = NULL;
-							new_req->func = _starpu_mpi_irecv_data_func;
+							new_req->func = _starpu_mpi_irecv_size_func;
 							new_req->sequential_consistency = 1;
 							new_req->is_internal_req = 0; // ????
 							new_req->count = envelope->size;

+ 1 - 6
mpi/src/starpu_mpi.c

@@ -25,12 +25,7 @@
 #include <starpu_profiling.h>
 #include <starpu_mpi_stats.h>
 #include <starpu_mpi_cache.h>
-#include <mpi/starpu_mpi_sync_data.h>
-#include <mpi/starpu_mpi_early_data.h>
-#include <mpi/starpu_mpi_early_request.h>
 #include <starpu_mpi_select_node.h>
-#include <mpi/starpu_mpi_tag.h>
-#include <mpi/starpu_mpi_comm.h>
 #include <starpu_mpi_init.h>
 #include <common/config.h>
 #include <common/thread.h>
@@ -147,7 +142,7 @@ int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, int d
 
 struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count)
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, source, data_tag, comm, detached, sync, 0, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency, is_internal_req, count);
+	return _starpu_mpi_isend_irecv_common(data_handle, source, data_tag, comm, detached, sync, 0, callback, arg, RECV_REQ, _starpu_mpi_irecv_size_func, STARPU_W, sequential_consistency, is_internal_req, count);
 }
 
 int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int data_tag, MPI_Comm comm)

+ 2 - 6
mpi/src/starpu_mpi_init.c

@@ -24,12 +24,7 @@
 #include <starpu_profiling.h>
 #include <starpu_mpi_stats.h>
 #include <starpu_mpi_cache.h>
-#include <mpi/starpu_mpi_sync_data.h>
-#include <mpi/starpu_mpi_early_data.h>
-#include <mpi/starpu_mpi_early_request.h>
 #include <starpu_mpi_select_node.h>
-#include <mpi/starpu_mpi_tag.h>
-#include <mpi/starpu_mpi_comm.h>
 #include <common/config.h>
 #include <common/thread.h>
 #include <datawizard/interfaces/data_interface.h>
@@ -182,8 +177,10 @@ int starpu_mpi_shutdown(void)
 	_starpu_mpi_comm_amounts_display(stderr, rank);
 	_starpu_mpi_comm_amounts_shutdown();
 	_starpu_mpi_cache_shutdown(world_size);
+#if defined(STARPU_MPI_MPI)
 	_starpu_mpi_tag_shutdown();
 	_starpu_mpi_comm_shutdown();
+#endif
 
 	return 0;
 }
@@ -233,4 +230,3 @@ int starpu_mpi_world_rank(void)
 	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
 	return rank;
 }
-

+ 44 - 22
mpi/src/starpu_mpi_private.h

@@ -26,6 +26,11 @@
 #include <common/list.h>
 #include <common/prio_list.h>
 #include <core/simgrid.h>
+#if defined(STARPU_MPI_NMAD)
+#include <pioman.h>
+#include <nm_sendrecv_interface.h>
+#include <nm_session_interface.h>
+#endif
 
 #ifdef __cplusplus
 extern "C"
@@ -152,22 +157,12 @@ int _starpu_debug_rank;
 #  define _STARPU_MPI_LOG_OUT()
 #endif
 
+#if defined(STARPU_MPI_MPI)
 extern int _starpu_mpi_tag;
 #define _STARPU_MPI_TAG_ENVELOPE  _starpu_mpi_tag
 #define _STARPU_MPI_TAG_DATA      _starpu_mpi_tag+1
 #define _STARPU_MPI_TAG_SYNC_DATA _starpu_mpi_tag+2
 
-enum _starpu_mpi_request_type
-{
-	SEND_REQ=0,
-	RECV_REQ=1,
-	WAIT_REQ=2,
-	TEST_REQ=3,
-	BARRIER_REQ=4,
-	PROBE_REQ=5,
-	UNKNOWN_REQ=6,
-};
-
 #define _STARPU_MPI_ENVELOPE_DATA       0
 #define _STARPU_MPI_ENVELOPE_SYNC_READY 1
 
@@ -178,8 +173,18 @@ struct _starpu_mpi_envelope
 	int data_tag;
 	unsigned sync;
 };
+#endif /* STARPU_MPI_MPI */
 
-struct _starpu_mpi_req;
+enum _starpu_mpi_request_type
+{
+	SEND_REQ=0,
+	RECV_REQ=1,
+	WAIT_REQ=2,
+	TEST_REQ=3,
+	BARRIER_REQ=4,
+	PROBE_REQ=5,
+	UNKNOWN_REQ=6,
+};
 
 struct _starpu_mpi_node_tag
 {
@@ -196,6 +201,7 @@ struct _starpu_mpi_data
 	int cache_received;
 };
 
+struct _starpu_mpi_req;
 LIST_TYPE(_starpu_mpi_req,
 	/* description of the data at StarPU level */
 	starpu_data_handle_t data_handle;
@@ -211,20 +217,36 @@ LIST_TYPE(_starpu_mpi_req,
 
 	/* who are we talking to ? */
 	struct _starpu_mpi_node_tag node_tag;
+#if defined(STARPU_MPI_NMAD)
+	nm_gate_t gate;
+	nm_session_t session;
+#endif
 
 	void (*func)(struct _starpu_mpi_req *);
 
 	MPI_Status *status;
+#if defined(STARPU_MPI_NMAD)
+	nm_sr_request_t data_request;
+	int waited;
+#elif defined(STARPU_MPI_MPI)
 	MPI_Request data_request;
+#endif
+
 	int *flag;
 	unsigned sync;
 
 	int ret;
+#if defined(STARPU_MPI_NMAD)
+	piom_cond_t req_cond;
+#elif defined(STARPU_MPI_MPI)
 	starpu_pthread_mutex_t req_mutex;
 	starpu_pthread_cond_t req_cond;
-
 	starpu_pthread_mutex_t posted_mutex;
 	starpu_pthread_cond_t posted_cond;
+	/* In the case of a Wait/Test request, we are going to post a request
+	 * to test the completion of another request */
+	struct _starpu_mpi_req *other_request;
+#endif
 
 	enum _starpu_mpi_request_type request_type; /* 0 send, 1 recv */
 
@@ -232,38 +254,38 @@ LIST_TYPE(_starpu_mpi_req,
 	unsigned completed;
 	unsigned posted;
 
-	/* In the case of a Wait/Test request, we are going to post a request
-	 * to test the completion of another request */
-	struct _starpu_mpi_req *other_request;
-
 	/* in the case of detached requests */
 	int detached;
 	void *callback_arg;
 	void (*callback)(void *);
 
         /* in the case of user-defined datatypes, we need to send the size of the data */
+#if defined(STARPU_MPI_NMAD)
+	nm_sr_request_t size_req;
+#elif defined(STARPU_MPI_MPI)
 	MPI_Request size_req;
+#endif
 
-        struct _starpu_mpi_envelope* envelope;
+#if defined(STARPU_MPI_MPI)
+	struct _starpu_mpi_envelope* envelope;
 
 	unsigned is_internal_req:1;
 	unsigned to_destroy:1;
 	struct _starpu_mpi_req *internal_req;
 	struct _starpu_mpi_early_data_handle *early_data_handle;
+     	UT_hash_handle hh;
+#endif
 
 	int sequential_consistency;
 
 	long pre_sync_jobid;
 	long post_sync_jobid;
 
-     	UT_hash_handle hh;
-
 #ifdef STARPU_SIMGRID
         MPI_Status status_store;
 	starpu_pthread_queue_t queue;
 	unsigned done;
 #endif
-
 );
 PRIO_LIST_TYPE(_starpu_mpi_req, prio)
 
@@ -280,7 +302,7 @@ void _starpu_mpi_submit_ready_request_inc(struct _starpu_mpi_req *req);
 void _starpu_mpi_request_init(struct _starpu_mpi_req **req);
 void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req);
 void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req);
-void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req);
+void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req);
 void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req);
 void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req);
 int _starpu_mpi_barrier(MPI_Comm comm);

+ 18 - 10
nmad/src/nmad/starpu_mpi_nmad.c

@@ -211,18 +211,18 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
 
 	struct nm_data_s data;
 	nm_mpi_nmad_data(&data, (void*)req->ptr, req->datatype, req->count);
-	nm_sr_send_init(req->session, &(req->request));
-	nm_sr_send_pack_data(req->session, &(req->request), &data);
-	nm_sr_send_set_priority(req->session, &req->request, req->prio);
+	nm_sr_send_init(req->session, &(req->data_request));
+	nm_sr_send_pack_data(req->session, &(req->data_request), &data);
+	nm_sr_send_set_priority(req->session, &req->data_request, req->prio);
 
 	if (req->sync == 0)
 	{
-		req->ret = nm_sr_send_isend(req->session, &(req->request), req->gate, req->node_tag.data_tag);
+		req->ret = nm_sr_send_isend(req->session, &(req->data_request), req->gate, req->node_tag.data_tag);
 		STARPU_ASSERT_MSG(req->ret == NM_ESUCCESS, "MPI_Isend returning %d", req->ret);
 	}
 	else
 	{
-		req->ret = nm_sr_send_issend(req->session, &(req->request), req->gate, req->node_tag.data_tag);
+		req->ret = nm_sr_send_issend(req->session, &(req->data_request), req->gate, req->node_tag.data_tag);
 		STARPU_ASSERT_MSG(req->ret == NM_ESUCCESS, "MPI_Issend returning %d", req->ret);
 	}
 
@@ -301,9 +301,9 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 	//req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
 	struct nm_data_s data;
 	nm_mpi_nmad_data(&data, (void*)req->ptr, req->datatype, req->count);
-	nm_sr_recv_init(req->session, &(req->request));
-	nm_sr_recv_unpack_data(req->session, &(req->request), &data);
-	nm_sr_recv_irecv(req->session, &(req->request), req->gate, req->node_tag.data_tag, NM_TAG_MASK_FULL);
+	nm_sr_recv_init(req->session, &(req->data_request));
+	nm_sr_recv_unpack_data(req->session, &(req->data_request), &data);
+	nm_sr_recv_irecv(req->session, &(req->data_request), req->gate, req->node_tag.data_tag, NM_TAG_MASK_FULL);
 
 	_STARPU_MPI_TRACE_IRECV_SUBMIT_END(req->node_tag.rank, req->node_tag.data_tag);
 
@@ -357,6 +357,14 @@ void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
 /*                                                      */
 /********************************************************/
 
+#define _starpu_mpi_req_status(PUBLIC_REQ,STATUS) do {			\
+	STATUS->MPI_SOURCE=PUBLIC_REQ->node_tag.rank; /**< field name mandatory by spec */ \
+	STATUS->MPI_TAG=PUBLIC_REQ->node_tag.data_tag;    /**< field name mandatory by spec */ \
+	STATUS->MPI_ERROR=PUBLIC_REQ->ret;  /**< field name mandatory by spec */ \
+	STATUS->size=PUBLIC_REQ->count;       /**< size of data received */ \
+	STATUS->cancelled=0;  /**< whether request was cancelled */	\
+} while(0)
+
 int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 {
 	_STARPU_MPI_LOG_IN();
@@ -524,9 +532,9 @@ static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req)
 	}
 	/* the if must be before, because the first callback can directly free
 	* a detached request (the second callback free if req->waited>1). */
-	nm_sr_request_set_ref(&(req->request), req);
+	nm_sr_request_set_ref(&(req->data_request), req);
 
-	nm_sr_request_monitor(req->session, &(req->request), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
+	nm_sr_request_monitor(req->session, &(req->data_request), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
 }
 
 static void _starpu_mpi_handle_new_request(void *arg)

+ 6 - 4
nmad/src/starpu_mpi.c

@@ -1,7 +1,8 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009, 2010-2014, 2017  Université de Bordeaux
- * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015  Centre National de la Recherche Scientifique
+ * Copyright (C) 2009, 2010-2017  Université de Bordeaux
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017  CNRS
+ * Copyright (C) 2016  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -28,11 +29,12 @@
 #include <starpu_mpi_init.h>
 #include <common/config.h>
 #include <common/thread.h>
+#include <datawizard/interfaces/data_interface.h>
 #include <datawizard/coherency.h>
-#include <nm_sendrecv_interface.h>
-#include <nm_mpi_nmad.h>
+#include <core/simgrid.h>
 #include <core/task.h>
 #include <core/topology.h>
+#include <core/workers.h>
 
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
 							int dest, int data_tag, MPI_Comm comm,

+ 4 - 1
nmad/src/starpu_mpi_init.c

@@ -177,6 +177,10 @@ int starpu_mpi_shutdown(void)
 	_starpu_mpi_comm_amounts_display(stderr, rank);
 	_starpu_mpi_comm_amounts_shutdown();
 	_starpu_mpi_cache_shutdown(world_size);
+#if defined(STARPU_MPI_MPI)
+	_starpu_mpi_tag_shutdown();
+	_starpu_mpi_comm_shutdown();
+#endif
 
 	return 0;
 }
@@ -226,4 +230,3 @@ int starpu_mpi_world_rank(void)
 	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
 	return rank;
 }
-

+ 56 - 13
nmad/src/starpu_mpi_private.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010, 2012-2015, 2017  Université de Bordeaux
- * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015  Centre National de la Recherche Scientifique
+ * Copyright (C) 2010, 2012-2017  Université de Bordeaux
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -26,9 +26,11 @@
 #include <common/list.h>
 #include <common/prio_list.h>
 #include <core/simgrid.h>
+#if defined(STARPU_MPI_NMAD)
 #include <pioman.h>
 #include <nm_sendrecv_interface.h>
 #include <nm_session_interface.h>
+#endif
 
 #ifdef __cplusplus
 extern "C"
@@ -155,6 +157,24 @@ int _starpu_debug_rank;
 #  define _STARPU_MPI_LOG_OUT()
 #endif
 
+#if defined(STARPU_MPI_MPI)
+extern int _starpu_mpi_tag;
+#define _STARPU_MPI_TAG_ENVELOPE  _starpu_mpi_tag
+#define _STARPU_MPI_TAG_DATA      _starpu_mpi_tag+1
+#define _STARPU_MPI_TAG_SYNC_DATA _starpu_mpi_tag+2
+
+#define _STARPU_MPI_ENVELOPE_DATA       0
+#define _STARPU_MPI_ENVELOPE_SYNC_READY 1
+
+struct _starpu_mpi_envelope
+{
+	int mode;
+	starpu_ssize_t size;
+	int data_tag;
+	unsigned sync;
+};
+#endif /* STARPU_MPI_MPI */
+
 enum _starpu_mpi_request_type
 {
 	SEND_REQ=0,
@@ -181,6 +201,7 @@ struct _starpu_mpi_data
 	int cache_received;
 };
 
+struct _starpu_mpi_req;
 LIST_TYPE(_starpu_mpi_req,
 	/* description of the data at StarPU level */
 	starpu_data_handle_t data_handle;
@@ -196,18 +217,36 @@ LIST_TYPE(_starpu_mpi_req,
 
 	/* who are we talking to ? */
 	struct _starpu_mpi_node_tag node_tag;
+#if defined(STARPU_MPI_NMAD)
 	nm_gate_t gate;
 	nm_session_t session;
+#endif
 
 	void (*func)(struct _starpu_mpi_req *);
 
 	MPI_Status *status;
-	nm_sr_request_t request;
+#if defined(STARPU_MPI_NMAD)
+	nm_sr_request_t data_request;
+	int waited;
+#elif defined(STARPU_MPI_MPI)
+	MPI_Request data_request;
+#endif
+
 	int *flag;
 	unsigned sync;
 
 	int ret;
+#if defined(STARPU_MPI_NMAD)
 	piom_cond_t req_cond;
+#elif defined(STARPU_MPI_MPI)
+	starpu_pthread_mutex_t req_mutex;
+	starpu_pthread_cond_t req_cond;
+	starpu_pthread_mutex_t posted_mutex;
+	starpu_pthread_cond_t posted_cond;
+	/* In the case of a Wait/Test request, we are going to post a request
+	 * to test the completion of another request */
+	struct _starpu_mpi_req *other_request;
+#endif
 
 	enum _starpu_mpi_request_type request_type; /* 0 send, 1 recv */
 
@@ -221,15 +260,27 @@ LIST_TYPE(_starpu_mpi_req,
 	void (*callback)(void *);
 
         /* in the case of user-defined datatypes, we need to send the size of the data */
+#if defined(STARPU_MPI_NMAD)
 	nm_sr_request_t size_req;
+#elif defined(STARPU_MPI_MPI)
+	MPI_Request size_req;
+#endif
+
+#if defined(STARPU_MPI_MPI)
+	struct _starpu_mpi_envelope* envelope;
+
+	unsigned is_internal_req:1;
+	unsigned to_destroy:1;
+	struct _starpu_mpi_req *internal_req;
+	struct _starpu_mpi_early_data_handle *early_data_handle;
+     	UT_hash_handle hh;
+#endif
 
 	int sequential_consistency;
 
 	long pre_sync_jobid;
 	long post_sync_jobid;
 
-	int waited;
-
 #ifdef STARPU_SIMGRID
         MPI_Status status_store;
 	starpu_pthread_queue_t queue;
@@ -274,14 +325,6 @@ int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv);
 void _starpu_mpi_wait_for_initialization();
 #endif
 
-#define _starpu_mpi_req_status(PUBLIC_REQ,STATUS) do {			\
-	STATUS->MPI_SOURCE=PUBLIC_REQ->node_tag.rank; /**< field name mandatory by spec */ \
-	STATUS->MPI_TAG=PUBLIC_REQ->node_tag.data_tag;    /**< field name mandatory by spec */ \
-	STATUS->MPI_ERROR=PUBLIC_REQ->ret;  /**< field name mandatory by spec */ \
-	STATUS->size=PUBLIC_REQ->count;       /**< size of data received */ \
-	STATUS->cancelled=0;  /**< whether request was cancelled */	\
-} while(0)
-
 #ifdef __cplusplus
 }
 #endif