Pārlūkot izejas kodu

Add per-node MPI data

Samuel Thibault 7 gadi atpakaļ
vecāks
revīzija
5e3dbcf95a

+ 2 - 0
ChangeLog

@@ -78,6 +78,7 @@ New features:
     STARPU_TASK_BREAK_ON_EXEC environment variables, with the job_id
     of a task. StarPU will raise SIGTRAP when the task is being
     scheduled, pushed, or popped by the scheduler.
+  * Add per-node MPI data.
 
 Small features:
   * New function starpu_worker_get_job_id(struct starpu_task *task)
@@ -89,6 +90,7 @@ Small features:
   * Add starpu_vector_filter_list_long filter.
   * Add starpu_perfmodel_arch_comb_fetch function.
   * Add STARPU_WATCHDOG_DELAY environment variable.
+  * Add starpu_mpi_get_data_on_all_nodes_detached function.
 
 Small changes:
   * Output generated through STARPU_MPI_COMM has been modified to

+ 46 - 0
doc/doxygen/chapters/410_mpi_support.doxy

@@ -613,6 +613,52 @@ starpu_mpi_data_register(data1, 43, 1);
 starpu_mpi_task_insert(MPI_COMM_WORLD, &cl, STARPU_W, data, STARPU_R, data0, STARPU_R, data1, 0); /* Executes on node 0 */
 \endcode
 
+\section MPIPerNodeData Per-node Data
+
+Further than temporary data on just one node, one may want per-node data,
+to e.g. replicate some computation because that is less expensive than
+communicating the value over MPI:
+
+\code{.c}
+starpu_data_handle pernode, data0, data1;
+starpu_variable_data_register(&pernode, -1, 0, sizeof(val));
+starpu_mpi_data_register(pernode, -1, STARPU_MPI_PER_NODE);
+
+/* Normal data: one on node0, one on node1 */
+if (rank == 0) {
+	starpu_variable_data_register(&data0, STARPU_MAIN_RAM, (uintptr_t) &val0, sizeof(val0));
+	starpu_variable_data_register(&data1, -1, 0, sizeof(val1));
+} else if (rank == 1) {
+	starpu_variable_data_register(&data0, -1, 0, sizeof(val1));
+	starpu_variable_data_register(&data1, STARPU_MAIN_RAM, (uintptr_t) &val1, sizeof(val1));
+}
+starpu_mpi_data_register(data0, 42, 0);
+starpu_mpi_data_register(data1, 43, 1);
+
+starpu_mpi_task_insert(MPI_COMM_WORLD, &cl, STARPU_W, pernode, 0); /* Will be replicated on all nodes */
+
+starpu_mpi_task_insert(MPI_COMM_WORLD, &cl2, STARPU_RW, data0, STARPU_R, pernode); /* Will execute on node 0, using its own pernode*/
+starpu_mpi_task_insert(MPI_COMM_WORLD, &cl2, STARPU_RW, data1, STARPU_R, pernode); /* Will execute on node 1, using its own pernode*/
+\endcode
+
+One can turn a normal data into pernode data, by first broadcasting it to all nodes:
+
+\code{.c}
+starpu_data_handle data;
+starpu_variable_data_register(&data, -1, 0, sizeof(val));
+starpu_mpi_data_register(data, 42, 0);
+
+/* Compute some value */
+starpu_mpi_task_insert(MPI_COMM_WORLD, &cl, STARPU_W, data, 0); /* Node 0 computes it */
+
+/* Get it on all nodes */
+starpu_mpi_get_data_on_all_nodes_detached(MPI_COMM_WORLD, data);
+/* And turn it per-node */
+starpu_mpi_data_set_rank(data, STARPU_MPI_PER_NODE);
+\endcode
+
+The data can then be used just like pernode above.
+
 \section MPIPriorities Priorities
 
 All send functions have a <c>_prio</c> variant which takes an additional

+ 13 - 0
doc/doxygen/chapters/api/mpi.doxy

@@ -339,6 +339,14 @@ Return the tag of the given data.
 Return the tag of the given data.
 Symbol kept for backward compatibility. Calling function starpu_mpi_data_get_tag()
 
+\def STARPU_MPI_PER_NODE
+\ingroup API_MPI_Support
+Can be used as rank when calling starpu_mpi_data_register() and alike, to
+specify that the data is per-node: each node will have its own value. Tasks
+writing to such data will be replicated on all nodes (and all parameters then
+have to be per-node). Tasks not writing to such data will just take the
+node-local value without any MPI communication.
+
 \fn void starpu_mpi_data_migrate(MPI_Comm comm, starpu_data_handle_t handle, int new_rank)
 \ingroup API_MPI_Support
 Migrate the data onto the \p new_rank MPI node. This means both transferring
@@ -440,6 +448,11 @@ owner if needed. At least the target node and the owner have to call
 the function. On reception, the \p callback function is called with
 the argument \p arg.
 
+\fn void starpu_mpi_get_data_on_all_nodes_detached(MPI_Comm comm, starpu_data_handle_t data_handle)
+\ingroup API_MPI_Support
+Transfer data \p data_handle to all MPI nodes, sending it from its
+owner if needed. All nodes have to call the function.
+
 @name Node Selection Policy
 \anchor MPINodeSelectionPolicy
 \ingroup API_MPI_Support

+ 3 - 0
mpi/include/starpu_mpi.h

@@ -65,6 +65,7 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...);
 
 void starpu_mpi_get_data_on_node(MPI_Comm comm, starpu_data_handle_t data_handle, int node);
 void starpu_mpi_get_data_on_node_detached(MPI_Comm comm, starpu_data_handle_t data_handle, int node, void (*callback)(void*), void *arg);
+void starpu_mpi_get_data_on_all_nodes_detached(MPI_Comm comm, starpu_data_handle_t data_handle);
 void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle);
 void starpu_mpi_redux_data_prio(MPI_Comm comm, starpu_data_handle_t data_handle, int prio);
 
@@ -98,6 +99,8 @@ void starpu_mpi_set_communication_tag(int tag);
 void starpu_mpi_data_register_comm(starpu_data_handle_t data_handle, int tag, int rank, MPI_Comm comm);
 #define starpu_mpi_data_register(data_handle, tag, rank) starpu_mpi_data_register_comm(data_handle, tag, rank, MPI_COMM_WORLD)
 
+#define STARPU_MPI_PER_NODE -2
+
 void starpu_mpi_data_set_rank_comm(starpu_data_handle_t handle, int rank, MPI_Comm comm);
 #define starpu_mpi_data_set_rank(handle, rank) starpu_mpi_data_set_rank_comm(handle, rank, MPI_COMM_WORLD)
 void starpu_mpi_data_set_tag(starpu_data_handle_t handle, int tag);

+ 11 - 0
mpi/src/starpu_mpi.c

@@ -1931,6 +1931,17 @@ void starpu_mpi_get_data_on_node(MPI_Comm comm, starpu_data_handle_t data_handle
 	}
 }
 
+void starpu_mpi_get_data_on_all_nodes_detached(MPI_Comm comm, starpu_data_handle_t data_handle)
+{
+	int size, i;
+	starpu_mpi_comm_size(comm, &size);
+#ifdef STARPU_DEVEL
+#warning TODO: use binary communication tree to optimize broadcast
+#endif
+	for (i = 0; i < size; i++)
+		starpu_mpi_get_data_on_node_detached(comm, data_handle, i, NULL, NULL);
+}
+
 void starpu_mpi_data_migrate(MPI_Comm comm, starpu_data_handle_t data, int new_rank)
 {
 	int old_rank = starpu_mpi_data_get_rank(data);

+ 15 - 6
mpi/src/starpu_mpi_task_insert.c

@@ -82,11 +82,11 @@ int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_data_a
 			// No node has been selected yet
 			*xrank = mpi_rank;
 			_STARPU_MPI_DEBUG(100, "Codelet is going to be executed by node %d\n", *xrank);
-			*do_execute = (mpi_rank == me);
+			*do_execute = mpi_rank == STARPU_MPI_PER_NODE || (mpi_rank == me);
 		}
 		else if (mpi_rank != *xrank)
 		{
-			_STARPU_MPI_DEBUG(100, "Another node %d had already been selected to execute the codelet\n", *xrank);
+			_STARPU_MPI_DEBUG(100, "Another node %d had already been selected to execute the codelet, can't now set %d\n", *xrank, mpi_rank);
 			*inconsistent_execute = 1;
 		}
 	}
@@ -105,7 +105,7 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
 			_STARPU_ERROR("StarPU needs to be told the MPI rank of this data, using starpu_mpi_data_register\n");
 		}
 
-		if (do_execute && mpi_rank != me)
+		if (do_execute && mpi_rank != STARPU_MPI_PER_NODE && mpi_rank != me)
 		{
 			/* The node is going to execute the codelet, but it does not own the data, it needs to receive the data from the owner node */
 			int already_received = _starpu_mpi_cache_received_data_set(data);
@@ -146,9 +146,13 @@ void _starpu_mpi_exchange_data_after_execution(starpu_data_handle_t data, enum s
 		{
 			_STARPU_ERROR("StarPU needs to be told the MPI rank of this data, using starpu_mpi_data_register\n");
 		}
+		if (mpi_rank == STARPU_MPI_PER_NODE)
+		{
+			mpi_rank = me;
+		}
 		if (mpi_rank == me)
 		{
-			if (xrank != -1 && me != xrank)
+			if (xrank != -1 && (xrank != STARPU_MPI_PER_NODE && me != xrank))
 			{
 				_STARPU_MPI_DEBUG(1, "Receive data %p back from the task %d which executed the codelet ...\n", data, xrank);
 				if(data_tag == -1)
@@ -184,6 +188,10 @@ void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum star
 		if ((mode & STARPU_R) && do_execute)
 		{
 			int mpi_rank = starpu_mpi_data_get_rank(data);
+			if (mpi_rank == STARPU_MPI_PER_NODE)
+			{
+				mpi_rank = me;
+			}
 			if (mpi_rank != me && mpi_rank != -1)
 			{
 				starpu_data_invalidate_submit(data);
@@ -195,6 +203,7 @@ void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum star
 static
 int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nodes, int *xrank, int *do_execute, struct starpu_data_descr **descrs_p, int *nb_data_p, int *prio_p, va_list varg_list)
 {
+	/* XXX: _fstarpu_mpi_task_decode_v needs to be updated at the same time */
 	va_list varg_list_copy;
 	int inconsistent_execute = 0;
 	int arg_type;
@@ -436,12 +445,12 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
 		// We need to find out which node is going to execute the codelet.
 		_STARPU_MPI_DEBUG(100, "Different nodes are owning W data. The node to execute the codelet is going to be selected with the current selection node policy. See starpu_mpi_node_selection_set_current_policy() to change the policy, or use STARPU_EXECUTE_ON_NODE or STARPU_EXECUTE_ON_DATA to specify the node\n");
 		*xrank = _starpu_mpi_select_node(me, nb_nodes, descrs, nb_data, select_node_policy);
-		*do_execute = (me == *xrank);
+		*do_execute = *xrank == STARPU_MPI_PER_NODE || (me == *xrank);
 	}
 	else
 	{
 		_STARPU_MPI_DEBUG(100, "Inconsistent=%d - xrank=%d\n", inconsistent_execute, *xrank);
-		*do_execute = (me == *xrank);
+		*do_execute = *xrank == STARPU_MPI_PER_NODE || (me == *xrank);
 	}
 	_STARPU_MPI_DEBUG(100, "do_execute=%d\n", *do_execute);
 

+ 2 - 2
mpi/src/starpu_mpi_task_insert_fortran.c

@@ -291,12 +291,12 @@ int _fstarpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_no
 		// We need to find out which node is going to execute the codelet.
 		_STARPU_MPI_DISP("Different nodes are owning W data. The node to execute the codelet is going to be selected with the current selection node policy. See starpu_mpi_node_selection_set_current_policy() to change the policy, or use STARPU_EXECUTE_ON_NODE or STARPU_EXECUTE_ON_DATA to specify the node\n");
 		*xrank = _starpu_mpi_select_node(me, nb_nodes, descrs, nb_data, select_node_policy);
-		*do_execute = (me == *xrank);
+		*do_execute = *xrank == STARPU_MPI_PER_NODE || (me == *xrank);
 	}
 	else
 	{
 		_STARPU_MPI_DEBUG(100, "Inconsistent=%d - xrank=%d\n", inconsistent_execute, *xrank);
-		*do_execute = (me == *xrank);
+		*do_execute = *xrank == STARPU_MPI_PER_NODE || (me == *xrank);
 	}
 	_STARPU_MPI_DEBUG(100, "do_execute=%d\n", *do_execute);
 

+ 3 - 1
mpi/tests/Makefile.am

@@ -126,7 +126,8 @@ starpu_mpi_TESTS +=				\
 	policy_register_many			\
 	policy_selection			\
 	policy_selection2			\
-	ring_async_implicit
+	ring_async_implicit			\
+	temporary
 
 if !STARPU_SIMGRID
 starpu_mpi_TESTS +=				\
@@ -182,6 +183,7 @@ noinst_PROGRAMS =				\
 	ring_sync_detached			\
 	ring_async				\
 	ring_async_implicit			\
+	temporary				\
 	block_interface				\
 	block_interface_pinned			\
 	cache					\

+ 135 - 0
mpi/tests/temporary.c

@@ -0,0 +1,135 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2015, 2016, 2017  CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/* This tests that one can register temporary data0 on each MPI node which can mix with common data0 */
+
+#include <starpu_mpi.h>
+#include "helper.h"
+
+static void func_add(void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
+{
+	int *a = (void*) STARPU_VARIABLE_GET_PTR(descr[0]);
+	const int *b = (void*) STARPU_VARIABLE_GET_PTR(descr[1]);
+	const int *c = (void*) STARPU_VARIABLE_GET_PTR(descr[2]);
+
+	*a = *b + *c;
+	FPRINTF_MPI(stderr, "%d + %d = %d\n", *b, *c, *a);
+}
+
+/* Dummy cost function for simgrid */
+static double cost_function(struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, unsigned nimpl STARPU_ATTRIBUTE_UNUSED)
+{
+	return 0.000001;
+}
+static struct starpu_perfmodel dumb_model =
+{
+	.type          = STARPU_COMMON,
+	.cost_function = cost_function
+};
+
+static struct starpu_codelet codelet_add =
+{
+	.cpu_funcs = {func_add},
+	.nbuffers = 3,
+	.modes = {STARPU_W, STARPU_R, STARPU_R},
+	.model = &dumb_model
+};
+
+int main(int argc, char **argv)
+{
+	int rank, size, n;
+	int ret;
+	int a;
+	int val0, val1;
+	starpu_data_handle_t data0, data1, tmp0, tmp, tmp2;
+
+	ret = starpu_init(NULL);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+	ret = starpu_mpi_init(&argc, &argv, 1);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init");
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
+
+	if (starpu_mpi_cache_is_enabled() == 0) goto skip;
+
+	if (rank == 0)
+	{
+		val0 = 1;
+		starpu_variable_data_register(&data0, STARPU_MAIN_RAM, (uintptr_t)&val0, sizeof(val0));
+		starpu_variable_data_register(&data1, -1, (uintptr_t)NULL, sizeof(val0));
+		starpu_variable_data_register(&tmp0, -1, (uintptr_t)NULL, sizeof(val0));
+		starpu_mpi_data_register(tmp0, -1, 0);
+	}
+	else if (rank == 1)
+	{
+		starpu_variable_data_register(&data0, -1, (uintptr_t)NULL, sizeof(val0));
+		starpu_variable_data_register(&data1, STARPU_MAIN_RAM, (uintptr_t)&val1, sizeof(val1));
+		tmp0 = NULL;
+	}
+	else
+	{
+		starpu_variable_data_register(&data0, -1, (uintptr_t)NULL, sizeof(val0));
+		starpu_variable_data_register(&data1, -1, (uintptr_t)NULL, sizeof(val0));
+		tmp0 = NULL;
+	}
+	starpu_variable_data_register(&tmp, -1, (uintptr_t)NULL, sizeof(val0));
+	starpu_variable_data_register(&tmp2, -1, (uintptr_t)NULL, sizeof(val0));
+
+	starpu_mpi_data_register(data0, 42, 0);
+	starpu_mpi_data_register(data1, 43, 1);
+	starpu_mpi_data_register(tmp, 44, 0);
+	starpu_mpi_data_register(tmp2, -1, STARPU_MPI_PER_NODE);
+
+	/* Test temporary data0 on node 0 only */
+	starpu_mpi_task_insert(MPI_COMM_WORLD, &codelet_add, STARPU_W, tmp0, STARPU_R, data0, STARPU_R, data0, 0);
+
+	starpu_mpi_task_insert(MPI_COMM_WORLD, &codelet_add, STARPU_W, data0, STARPU_R, tmp0, STARPU_R, tmp0, 0);
+
+	starpu_mpi_task_insert(MPI_COMM_WORLD, &codelet_add, STARPU_W, tmp, STARPU_R, data0, STARPU_R, data0, 0);
+
+	/* Now make some tmp per-node, so that each node replicates the computation */
+	for (n = 0; n < size; n++)
+		if (n != 0)
+			/* Get the value on all nodes */
+			starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, tmp, n, NULL, NULL);
+	starpu_mpi_data_set_rank(tmp, STARPU_MPI_PER_NODE);
+
+	/* This task writes to a per-node data, so will be executed by all nodes */
+	starpu_mpi_task_insert(MPI_COMM_WORLD, &codelet_add, STARPU_W, tmp2, STARPU_R, tmp, STARPU_R, tmp, 0);
+
+	/* All MPI nodes have computed the value (no MPI communication here!) */
+	starpu_data_acquire_on_node(tmp2, STARPU_MAIN_RAM, STARPU_R);
+	STARPU_ASSERT(*(int*)starpu_data_handle_to_pointer(tmp2, STARPU_MAIN_RAM) == 16);
+	starpu_data_release_on_node(tmp2, STARPU_MAIN_RAM);
+
+	/* And nodes 0 and 1 do something with it */
+	starpu_mpi_task_insert(MPI_COMM_WORLD, &codelet_add, STARPU_W, data0, STARPU_R, tmp, STARPU_R, tmp2, 0);
+	starpu_mpi_task_insert(MPI_COMM_WORLD, &codelet_add, STARPU_W, data1, STARPU_R, tmp, STARPU_R, tmp2, 0);
+
+	starpu_task_wait_for_all();
+
+	starpu_data_unregister(data0);
+
+skip:
+	starpu_mpi_shutdown();
+	starpu_shutdown();
+
+	if (rank == 0)
+		STARPU_ASSERT_MSG(val0 == 24, "%d should be %d\n", val0, 16 * size);
+	if (rank == 1)
+		STARPU_ASSERT_MSG(val1 == 24, "%d should be %d\n", val0, 16 * size);
+	return 0;
+}