Browse Source

Document how to perform MPI data migration

Samuel Thibault 11 years ago
parent
commit
c437b85f8b
2 changed files with 116 additions and 1 deletions
  1. 58 1
      doc/doxygen/chapters/08mpi_support.doxy
  2. 58 0
      mpi/examples/stencil/stencil5.c

+ 58 - 1
doc/doxygen/chapters/08mpi_support.doxy

@@ -271,7 +271,7 @@ data which will be needed by the tasks that we will execute.
     for(x = 0; x < X; x++) {
         for (y = 0; y < Y; y++) {
             int mpi_rank = my_distrib(x, y, size);
-             if (mpi_rank == my_rank)
+            if (mpi_rank == my_rank)
                 /* Owning data */
                 starpu_variable_data_register(&data_handles[x][y], STARPU_MAIN_RAM,
                                               (uintptr_t)&(matrix[x][y]), sizeof(unsigned));
@@ -318,6 +318,63 @@ application can prune the task for loops according to the data distribution,
 so as to only submit tasks on nodes which have to care about them (either to
 execute them, or to send the required data).
 
+\section MPIMigration MPI Data migration
+
+The application can dynamically change its mind about the data distribution, to
+balance the load over MPI nodes for instance. This can be done very simply by
+requesting an explicit move and then change the registered rank. For instance,
+we here switch to a new distribution function <c>my_distrib2</c>: we first
+register any data that wasn't registered already and will be needed, then
+migrate the data, and register the new location.
+
+\code{.c}
+    for(x = 0; x < X; x++) {
+        for (y = 0; y < Y; y++) {
+            int mpi_rank = my_distrib2(x, y, size);
+            if (!data_handles[x][y] && (mpi_rank == my_rank
+                  || my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
+                  || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size)))
+                /* Register newly-needed data */
+                starpu_variable_data_register(&data_handles[x][y], -1,
+                                              (uintptr_t)NULL, sizeof(unsigned));
+            if (data_handles[x][y]) {
+                /* Migrate the data */
+                starpu_mpi_get_data_on_node(MPI_COMM_WORLD, data_handles[x][y], mpi_rank);
+                /* And register the new rank of the matrix */
+                starpu_data_set_rank(data_handles[x][y], mpi_rank);
+            }
+        }
+    }
+\endcode
+
+From then on, further tasks submissions will use the new data distribution,
+which will thus change both MPI communications and task assignments.
+
+Very importantly, since all nodes have to agree on which node owns which data
+so as to determine MPI communications and task assignments the same way, all
+nodes have to perform the same data migration, and at the same point among task
+submissions. It thus does not require a strict synchronization, just a clear
+separation of task submissions before and after the data redistribution.
+
+Before data unregistration, it has to be migrated back to its original home
+node (the value, at least), since that is where the user-provided buffer
+resides. Otherwise the unregistration will complain that it does not have the
+latest value on the original home node.
+
+\code{.c}
+    for(x = 0; x < X; x++) {
+        for (y = 0; y < Y; y++) {
+            if (data_handles[x][y]) {
+                int mpi_rank = my_distrib(x, y, size);
+                /* Get back data to original place where the user-provided buffer is.  */
+                starpu_mpi_get_data_on_node(MPI_COMM_WORLD, data_handles[x][y], mpi_rank);
+                /* And unregister it */
+                starpu_data_unregister(data_handles[x][y]);
+            }
+        }
+    }
+\endcode
+
 \section MPICollective MPI Collective Operations
 
 The functions are described in \ref MPICollectiveOperations "MPI Collective Operations".

+ 58 - 0
mpi/examples/stencil/stencil5.c

@@ -58,6 +58,12 @@ int my_distrib(int x, int y, int nb_nodes)
 	return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;
 }
 
+/* Shifted distribution, for migration example */
+int my_distrib2(int x, int y, int nb_nodes)
+{
+	return (my_distrib(x, y, nb_nodes) + 1) % nb_nodes;
+}
+
 
 static void parse_args(int argc, char **argv)
 {
@@ -91,6 +97,7 @@ int main(int argc, char **argv)
 
 	parse_args(argc, argv);
 
+	/* Initial data values */
 	for(x = 0; x < X; x++)
 	{
 		for (y = 0; y < Y; y++)
@@ -102,6 +109,7 @@ int main(int argc, char **argv)
 	}
 	mean /= value;
 
+	/* Initial distribution */
 	for(x = 0; x < X; x++)
 	{
 		for (y = 0; y < Y; y++)
@@ -132,6 +140,49 @@ int main(int argc, char **argv)
 		}
 	}
 
+	/* First computation with initial distribution */
+	for(loop=0 ; loop<niter; loop++)
+	{
+		for (x = 1; x < X-1; x++)
+		{
+			for (y = 1; y < Y-1; y++)
+			{
+				starpu_mpi_task_insert(MPI_COMM_WORLD, &stencil5_cl, STARPU_RW, data_handles[x][y],
+						       STARPU_R, data_handles[x-1][y], STARPU_R, data_handles[x+1][y],
+						       STARPU_R, data_handles[x][y-1], STARPU_R, data_handles[x][y+1],
+						       0);
+			}
+		}
+	}
+	FPRINTF(stderr, "Waiting ...\n");
+	starpu_task_wait_for_all();
+
+	/* Now migrate data to a new distribution */
+
+	/* First register newly needed data */
+	for(x = 0; x < X; x++)
+	{
+		for (y = 0; y < Y; y++)
+		{
+			int mpi_rank = my_distrib2(x, y, size);
+			if (!data_handles[x][y] && (mpi_rank == my_rank
+				 || my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
+				 || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size)))
+			{
+				/* Register newly-needed data */
+				starpu_variable_data_register(&data_handles[x][y], -1, (uintptr_t)NULL, sizeof(unsigned));
+			}
+			if (data_handles[x][y] && mpi_rank != starpu_data_get_rank(data_handles[x][y]))
+			{
+				/* Migrate the data */
+				starpu_mpi_get_data_on_node(MPI_COMM_WORLD, data_handles[x][y], mpi_rank);
+				/* And register new rank of the matrix */
+				starpu_data_set_rank(data_handles[x][y], mpi_rank);
+			}
+		}
+	}
+
+	/* Second computation with new distribution */
 	for(loop=0 ; loop<niter; loop++)
 	{
 		for (x = 1; x < X-1; x++)
@@ -148,12 +199,19 @@ int main(int argc, char **argv)
 	FPRINTF(stderr, "Waiting ...\n");
 	starpu_task_wait_for_all();
 
+	/* Unregister data */
 	for(x = 0; x < X; x++)
 	{
 		for (y = 0; y < Y; y++)
 		{
 			if (data_handles[x][y])
 			{
+				int mpi_rank = my_distrib(x, y, size);
+				/* Get back data to original place where the user-provided buffer is. */
+				starpu_mpi_get_data_on_node(MPI_COMM_WORLD, data_handles[x][y], mpi_rank);
+				/* Register original rank of the matrix (although useless) */
+				starpu_data_set_rank(data_handles[x][y], mpi_rank);
+				/* And unregister it */
 				starpu_data_unregister(data_handles[x][y]);
 			}
 		}