Przeglądaj źródła

Opportunistically gather sends whenever possible, for implementing diffusion tree later

Samuel Thibault 7 lat temu
rodzic
commit
e6082a8f8e

+ 1 - 0
mpi/src/Makefile.am

@@ -79,6 +79,7 @@ noinst_HEADERS =					\
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
 	starpu_mpi.c					\
 	starpu_mpi_req.c				\
+	starpu_mpi_coop_sends.c				\
 	starpu_mpi_helper.c				\
 	starpu_mpi_datatype.c				\
 	starpu_mpi_task_insert.c			\

+ 1 - 2
mpi/src/mpi/starpu_mpi_mpi.c

@@ -806,8 +806,7 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
 		_STARPU_MPI_TRACE_TERMINATED(req, req->node_tag.rank, req->node_tag.data_tag);
 	}
 
-	if (req->data_handle)
-		starpu_data_release(req->data_handle);
+	_starpu_mpi_release_req_data(req);
 
 	if (req->envelope)
 	{

+ 1 - 1
mpi/src/nmad/starpu_mpi_nmad.c

@@ -388,7 +388,7 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,n
 		        nm_mpi_nmad_data_release(req->datatype);
 			_starpu_mpi_datatype_free(req->data_handle, &req->datatype);
 		}
-		starpu_data_release(req->data_handle);
+		_starpu_mpi_release_req_data(req);
 	}
 
 	/* Execute the specified callback, if any */

+ 18 - 11
mpi/src/starpu_mpi.c

@@ -59,18 +59,26 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t dat
 		return NULL;
 	}
 
+#ifdef STARPU_MPI_PEDANTIC_ISEND
+	enum starpu_data_access_mode mode = STARPU_RW;
+#else
+	enum starpu_data_access_mode mode = STARPU_R;
+#endif
+
 	struct _starpu_mpi_req *req = _starpu_mpi_request_fill(
 	                                      data_handle, dest, data_tag, comm, detached, sync, prio, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func,
 					      sequential_consistency, 0, 0);
 	_starpu_mpi_req_willpost(req);
-	_starpu_mpi_isend_irecv_common(req,
-#ifdef STARPU_MPI_PEDANTIC_ISEND
-					      STARPU_RW,
-#else
-					      STARPU_R,
-#endif
-					      sequential_consistency
-			);
+
+	if (_starpu_mpi_use_coop_sends && detached == 1 && sync == 0 && callback == NULL)
+	{
+		/* It's a send & forget send, we can perhaps optimize its distribution over several nodes */
+		_starpu_mpi_coop_send(data_handle, req, mode, sequential_consistency);
+		return req;
+	}
+
+	/* Post normally */
+	_starpu_mpi_isend_irecv_common(req, mode, sequential_consistency);
 	return req;
 }
 
@@ -249,6 +257,7 @@ void _starpu_mpi_data_clear(starpu_data_handle_t data_handle)
 #endif
 	_starpu_mpi_cache_data_clear(data_handle);
 	free(data_handle->mpi_data);
+	data_handle->mpi_data = NULL;
 }
 
 struct _starpu_mpi_data *_starpu_mpi_data_get(starpu_data_handle_t data_handle) {
@@ -264,6 +273,7 @@ struct _starpu_mpi_data *_starpu_mpi_data_get(starpu_data_handle_t data_handle)
 		mpi_data->node_tag.data_tag = -1;
 		mpi_data->node_tag.rank = -1;
 		mpi_data->node_tag.comm = MPI_COMM_WORLD;
+		_starpu_spin_init(&mpi_data->coop_lock);
 		data_handle->mpi_data = mpi_data;
 		_starpu_mpi_cache_data_init(data_handle);
 		_starpu_data_set_unregister_hook(data_handle, _starpu_mpi_data_clear);
@@ -404,9 +414,6 @@ void starpu_mpi_get_data_on_all_nodes_detached(MPI_Comm comm, starpu_data_handle
 {
 	int size, i;
 	starpu_mpi_comm_size(comm, &size);
-#ifdef STARPU_DEVEL
-#warning TODO: use binary communication tree to optimize broadcast
-#endif
 	for (i = 0; i < size; i++)
 		starpu_mpi_get_data_on_node_detached(comm, data_handle, i, NULL, NULL);
 }

+ 218 - 0
mpi/src/starpu_mpi_coop_sends.c

@@ -0,0 +1,218 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2012-2013,2016-2017                      Inria
+ * Copyright (C) 2009-2018                                Université de Bordeaux
+ * Copyright (C) 2010-2017                                CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu_mpi.h>
+#include <starpu_mpi_private.h>
+#include <datawizard/coherency.h>
+
+/*
+ * One node sends the same data to several nodes. Gather them into a
+ * "coop_sends", which then has a global view of all the required sends, and can
+ * establish a diffusion tree by telling receiving nodes to retransmit what they
+ * received (forwards) to others, and to others that they will receive from the
+ * former (redirects).
+ */
+
+/* This is called after a request is finished processing, to release the data */
+void _starpu_mpi_release_req_data(struct _starpu_mpi_req *req) {
+	if (!req->data_handle)
+		return;
+
+	if (_starpu_mpi_req_multilist_queued_coop_sends(req)) {
+		struct _starpu_mpi_coop_sends *coop_sends = req->coop_sends_head;
+		struct _starpu_mpi_data *mpi_data = coop_sends->mpi_data;
+		int last;
+		_starpu_spin_lock(&mpi_data->coop_lock);
+		/* Part of a cooperative send, dequeue ourself from others */
+		_starpu_mpi_req_multilist_erase_coop_sends(&coop_sends->reqs, req);
+		last = _starpu_mpi_req_multilist_empty_coop_sends(&coop_sends->reqs);
+		_starpu_spin_unlock(&mpi_data->coop_lock);
+		if (last) {
+			/* We were last, release data */
+			starpu_data_release(req->data_handle);
+			free(coop_sends);
+		}
+	} else {
+		/* Trivial request */
+		starpu_data_release(req->data_handle);
+	}
+}
+
+/* This is called on completion of acquisition of data for a cooperative send */
+static void _starpu_mpi_submit_coop_sends(void *arg)
+{
+	_STARPU_MPI_LOG_IN();
+	struct _starpu_mpi_coop_sends *coop_sends = arg;
+	struct _starpu_mpi_data *mpi_data = coop_sends->mpi_data;
+
+	/* Take the cooperative send bag out from more submissions */
+	if (mpi_data->coop_sends == coop_sends)
+	{
+		_starpu_spin_lock(&mpi_data->coop_lock);
+		if (mpi_data->coop_sends == coop_sends)
+			mpi_data->coop_sends = NULL;
+		_starpu_spin_unlock(&mpi_data->coop_lock);
+	}
+
+	struct _starpu_mpi_req *cur;
+	unsigned n;
+
+	/* Count their number */
+	n = 0;
+	for (cur  = _starpu_mpi_req_multilist_begin_coop_sends(&coop_sends->reqs);
+	     cur != _starpu_mpi_req_multilist_end_coop_sends(&coop_sends->reqs);
+	     cur  = _starpu_mpi_req_multilist_next_coop_sends(cur))
+		n++;
+
+	_STARPU_MPI_DEBUG(0, "handling cooperative sends %p for %u neighbours\n", coop_sends, n);
+
+	/* Store them in an array */
+	struct _starpu_mpi_req *reqs[n];
+	unsigned i;
+	for (cur  = _starpu_mpi_req_multilist_begin_coop_sends(&coop_sends->reqs), i = 0;
+	     cur != _starpu_mpi_req_multilist_end_coop_sends(&coop_sends->reqs);
+	     cur  = _starpu_mpi_req_multilist_next_coop_sends(cur), i++)
+		reqs[i] = cur;
+
+	/* TODO: sort reqs by priority */
+
+	/* TODO: turn them into redirects & forwards */
+
+	/* And submit them */
+	for (i = 0; i < n; i++) {
+		/* Prefetch next request, since once we submit cur we may not be able to read it.  */
+		_STARPU_MPI_DEBUG(0, "cooperative sends %p sending to %d\n", coop_sends, reqs[i]->node_tag.rank);
+		_starpu_mpi_submit_ready_request(reqs[i]);
+	}
+	_STARPU_MPI_LOG_OUT();
+}
+
+/* This is called when we want to stop including new members in a cooperative send,
+ * either because we know there won't be any other members due to the algorithm
+ * or because the value has changed.  */
+static void _starpu_mpi_coop_send_flush(struct _starpu_mpi_coop_sends *coop_sends)
+{
+	if (!coop_sends)
+		return;
+	/* TODO: send the redirects already */
+}
+
+/* This is called when a write to the data was just submitted, which means we
+ * can't make future sends cooperate with past sends since it's not the same value
+ */
+void _starpu_mpi_data_flush(starpu_data_handle_t data_handle)
+{
+	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
+	struct _starpu_mpi_coop_sends *coop_sends;
+	if (!mpi_data)
+		return;
+
+	_starpu_spin_lock(&mpi_data->coop_lock);
+	coop_sends = mpi_data->coop_sends;
+	if (coop_sends)
+		mpi_data->coop_sends = NULL;
+	_starpu_spin_unlock(&mpi_data->coop_lock);
+	if (coop_sends)
+	{
+		_STARPU_MPI_DEBUG(0, "%p: data written to, flush cooperative sends %p\n", data_handle, coop_sends);
+		_starpu_mpi_coop_send_flush(coop_sends);
+	}
+}
+
+/* Test whether a request is compatible with a cooperative send */
+static int _starpu_mpi_coop_send_compatible(struct _starpu_mpi_req *req, struct _starpu_mpi_coop_sends *coop_sends)
+{
+	struct _starpu_mpi_req *prevreq;
+
+	prevreq = _starpu_mpi_req_multilist_begin_coop_sends(&coop_sends->reqs);
+	return /* we can cope with tag being different */
+	          prevreq->node_tag.comm == req->node_tag.comm
+	       && prevreq->sequential_consistency == req->sequential_consistency;
+}
+
+void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_req *req, enum starpu_data_access_mode mode, int sequential_consistency) {
+	struct _starpu_mpi_data *mpi_data = _starpu_mpi_data_get(data_handle);
+	struct _starpu_mpi_coop_sends *coop_sends = NULL, *tofree = NULL;
+	int done = 0, queue, first = 1;
+
+	/* Try to add ourself to something existing, otherwise create one.  */
+	while (!done)
+	{
+		_starpu_spin_lock(&mpi_data->coop_lock);
+		if (mpi_data->coop_sends)
+		{
+			/* Already something, check we are coherent with it */
+			queue = _starpu_mpi_coop_send_compatible(req, mpi_data->coop_sends);
+			if (queue)
+			{
+				/* Yes, queue ourself there */
+				if (coop_sends)
+				{
+					/* Remove ourself from what we created for ourself first */
+					_starpu_mpi_req_multilist_erase_coop_sends(&coop_sends->reqs, req);
+					tofree = coop_sends;
+				}
+				coop_sends = mpi_data->coop_sends;
+				_STARPU_MPI_DEBUG(0, "%p: add to cooperative sends %p, dest %d\n", data_handle, coop_sends, req->node_tag.rank);
+				_starpu_mpi_req_multilist_push_back_coop_sends(&coop_sends->reqs, req);
+				req->coop_sends_head = coop_sends;
+				first = 0;
+				done = 1;
+			} else {
+				/* Nope, incompatible, put ours instead */
+				_STARPU_MPI_DEBUG(0, "%p: new cooperative sends %p, dest %d\n", data_handle, coop_sends, req->node_tag.rank);
+				mpi_data->coop_sends = coop_sends;
+				first = 1;
+				_starpu_spin_unlock(&mpi_data->coop_lock);
+				/* and flush it */
+				_starpu_mpi_coop_send_flush(coop_sends);
+				break;
+			}
+		}
+		else if (coop_sends)
+		{
+			/* Nobody else and we have allocated one, we're first! */
+			_STARPU_MPI_DEBUG(0, "%p: new cooperative sends %p, dest %d\n", data_handle, coop_sends, req->node_tag.rank);
+			mpi_data->coop_sends = coop_sends;
+			first = 1;
+			done = 1;
+		}
+		_starpu_spin_unlock(&mpi_data->coop_lock);
+
+		if (!done && !coop_sends)
+		{
+			/* Didn't find something to join, create one out of critical section */
+			_STARPU_MPI_CALLOC(coop_sends, 1, sizeof(*coop_sends));
+			_starpu_mpi_req_multilist_head_init_coop_sends(&coop_sends->reqs);
+			_starpu_mpi_req_multilist_push_back_coop_sends(&coop_sends->reqs, req);
+			req->coop_sends_head = coop_sends;
+			coop_sends->mpi_data = mpi_data;
+		}
+		/* We at worse do two iteration */
+		STARPU_ASSERT(done || coop_sends);
+	}
+
+	/* In case we created one for nothing after all */
+	free(tofree);
+
+	if (first) {
+		/* We were first, we are responsible for acquiring the data for everybody */
+		starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(req->data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_submit_coop_sends, coop_sends, sequential_consistency, &req->pre_sync_jobid, NULL);
+	}
+}
+

+ 1 - 0
mpi/src/starpu_mpi_init.c

@@ -109,6 +109,7 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi, MPI_Comm
 	argc_argv->argc = argc;
 	argc_argv->argv = argv;
 	argc_argv->comm = comm;
+	_starpu_implicit_data_deps_write_hook(_starpu_mpi_data_flush);
 
 #ifdef STARPU_SIMGRID
 	/* Call MPI_Init_thread as early as possible, to initialize simgrid

+ 2 - 0
mpi/src/starpu_mpi_private.c

@@ -27,6 +27,7 @@ int _starpu_mpi_thread_cpuid = -1;
 int _starpu_mpi_use_prio = 1;
 int _starpu_mpi_fake_world_size = -1;
 int _starpu_mpi_fake_world_rank = -1;
+int _starpu_mpi_use_coop_sends = 1;
 
 void _starpu_mpi_set_debug_level_min(int level)
 {
@@ -63,4 +64,5 @@ void _starpu_mpi_env_init(void)
 	_starpu_mpi_fake_world_rank = starpu_get_env_number("STARPU_MPI_FAKE_RANK");
 	_starpu_mpi_thread_cpuid = starpu_get_env_number_default("STARPU_MPI_THREAD_CPUID", -1);
 	_starpu_mpi_use_prio = starpu_get_env_number_default("STARPU_MPI_PRIORITIES", 1);
+	_starpu_mpi_use_coop_sends = starpu_get_env_number_default("STARPU_MPI_COOP_SENDS", 1);
 }

+ 26 - 1
mpi/src/starpu_mpi_private.h

@@ -26,6 +26,7 @@
 #include <starpu_mpi_fxt.h>
 #include <common/list.h>
 #include <common/prio_list.h>
+#include <common/starpu_spinlock.h>
 #include <core/simgrid.h>
 #if defined(STARPU_USE_MPI_NMAD)
 #include <pioman.h>
@@ -68,6 +69,7 @@ extern int _starpu_mpi_fake_world_size;
 extern int _starpu_mpi_fake_world_rank;
 extern int _starpu_mpi_use_prio;
 extern int _starpu_mpi_thread_cpuid;
+extern int _starpu_mpi_use_coop_sends;
 void _starpu_mpi_env_init(void);
 
 #ifdef STARPU_NO_ASSERT
@@ -197,6 +199,15 @@ struct _starpu_mpi_node_tag
 	starpu_mpi_tag_t data_tag;
 };
 
+MULTILIST_CREATE_TYPE(_starpu_mpi_req, coop_sends)
+/* One bag of cooperative sends */
+struct _starpu_mpi_coop_sends
+{
+	/* List of send requests */
+	struct _starpu_mpi_req_multilist_coop_sends reqs;
+	struct _starpu_mpi_data *mpi_data;
+};
+
 /* Initialized in starpu_mpi_data_register_comm */
 struct _starpu_mpi_data
 {
@@ -204,6 +215,10 @@ struct _starpu_mpi_data
 	struct _starpu_mpi_node_tag node_tag;
 	int *cache_sent;
 	int cache_received;
+
+	/* Rendez-vous data for opportunistic cooperative sends */
+	struct _starpu_spinlock coop_lock; /* Needed to synchronize between submit thread and workers */
+	struct _starpu_mpi_coop_sends *coop_sends; /* Current cooperative send bag */
 };
 
 struct _starpu_mpi_data *_starpu_mpi_data_get(starpu_data_handle_t data_handle);
@@ -238,6 +253,8 @@ LIST_TYPE(_starpu_mpi_req,
 #elif defined(STARPU_USE_MPI_MPI)
 	MPI_Request data_request;
 #endif
+	struct _starpu_mpi_req_multilist_coop_sends coop_sends;
+	struct _starpu_mpi_coop_sends *coop_sends_head;
 
 	int *flag;
 	unsigned sync;
@@ -296,10 +313,17 @@ LIST_TYPE(_starpu_mpi_req,
 );
 PRIO_LIST_TYPE(_starpu_mpi_req, prio)
 
+MULTILIST_CREATE_INLINES(struct _starpu_mpi_req, _starpu_mpi_req, coop_sends)
+
 /* To be called before actually queueing a request, so the communication layer knows it has something to look at */
 void _starpu_mpi_req_willpost(struct _starpu_mpi_req *req);
-
+/* To be called to actually submit the request */
 void _starpu_mpi_submit_ready_request(void *arg);
+/* To be called when request is completed */
+void _starpu_mpi_release_req_data(struct _starpu_mpi_req *req);
+
+/* Try to merge with send request with other send requests */
+void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_req *req, enum starpu_data_access_mode mode, int sequential_consistency);
 
 void _starpu_mpi_submit_ready_request_inc(struct _starpu_mpi_req *req);
 void _starpu_mpi_request_init(struct _starpu_mpi_req **req);
@@ -336,6 +360,7 @@ int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv);
 #ifdef STARPU_SIMGRID
 void _starpu_mpi_wait_for_initialization();
 #endif
+void _starpu_mpi_data_flush(starpu_data_handle_t data_handle);
 
 #ifdef __cplusplus
 }

+ 1 - 0
mpi/src/starpu_mpi_req.c

@@ -51,6 +51,7 @@ void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 	(*req)->data_request = 0;
 #endif
 	(*req)->flag = NULL;
+	_starpu_mpi_req_multilist_init_coop_sends(*req);
 
 	(*req)->ret = -1;
 #ifdef STARPU_USE_MPI_NMAD