Browse Source

StarPU-MPI: New functions starpu_mpi_scatter_detached() and starpu_mpi_gather_detached()

Nathalie Furmento 14 years ago
parent
commit
556b0333e3
5 changed files with 161 additions and 63 deletions
  1. 74 0
      doc/starpu.texi
  2. 2 1
      mpi/Makefile.am
  3. 2 62
      mpi/examples/scatter_gather/mpi_scatter_gather.c
  4. 3 0
      mpi/starpu_mpi.h
  5. 80 0
      mpi/starpu_mpi_collective.c

+ 74 - 0
doc/starpu.texi

@@ -1991,6 +1991,7 @@ GPU-RAM-NIC transfers are handled efficiently by StarPU-MPI.
 * The API::                     
 * Simple Example::              
 * MPI Insert Task Utility::         
+* MPI Collective Operations::         
 @end menu
 
 @node The API
@@ -2259,6 +2260,79 @@ steps of the application.
 @end smallexample
 @end cartouche
 
+@node MPI Collective Operations
+@section MPI Collective Operations
+
+@deftypefun int starpu_mpi_scatter_detached (starpu_data_handle *@var{data_handles}, int @var{count}, int @var{root}, MPI_Comm @var{comm})
+Scatter data among processes of the communicator based on the ownership of
+the data. For each data of the array @var{data_handles}, the
+process @var{root} sends the data to the process owning this data.
+Processes receiving data must have valid data handles to receive them.
+@end deftypefun
+
+@deftypefun int starpu_mpi_gather_detached (starpu_data_handle *@var{data_handles}, int @var{count}, int @var{root}, MPI_Comm @var{comm})
+Gather data from the different processes of the communicator onto the
+process @var{root}. Each process owning data handle in the array
+@var{data_handles} will send them to the process @var{root}. The
+process @var{root} must have valid data handles to receive the data.
+@end deftypefun
+
+@page
+@cartouche
+@smallexample
+if (rank == root)
+@{
+    /* Allocate the vector */
+    vector = malloc(nblocks * sizeof(float *));
+    for(x=0 ; x<nblocks ; x++)
+    @{
+        starpu_malloc((void **)&vector[x], block_size*sizeof(float));
+    @}
+@}
+
+/* Allocate data handles and register data to StarPU */
+data_handles = malloc(nblocks*sizeof(starpu_data_handle *));
+for(x = 0; x < nblocks ;  x++)
+@{
+    int mpi_rank = my_distrib(x, y, nodes);
+    if (rank == root)
+        starpu_vector_data_register(&data_handles[x], 0, (uintptr_t)vector[x],
+                                    blocks_size, sizeof(float));
+    else  @{
+        if ((mpi_rank == rank) || ((rank == mpi_rank+1 || rank == mpi_rank-1))) @{
+            /* I own that index, or i will need it for my computations */
+            starpu_vector_data_register(&data_handles[x], -1, (uintptr_t)NULL,
+                                        block_size, sizeof(float));
+        @}
+        else @{
+            /* I know it's useless to allocate anything for this */
+            data_handles[x] = NULL;
+        @}
+        if (data_handles[x]) @{
+            starpu_data_set_rank(data_handles[x], mpi_rank);
+        @}
+    @}
+@}
+
+/* Scatter the matrix among the nodes */
+starpu_mpi_scatter_detached(data_handles, nblocks, root, MPI_COMM_WORLD);
+
+/* Calculation */
+for(x = 0; x < nblocks ;  x++) @{
+    if (data_handles[x]) @{
+        int owner = starpu_data_get_rank(data_handles[x]);
+        if (owner == rank) @{
+            starpu_insert_task(&cl, STARPU_RW, data_handles[x], 0);
+        @}
+    @}
+@}
+
+/* Gather the matrix on main node */
+starpu_mpi_gather_detached(data_handles, nblocks, 0, MPI_COMM_WORLD);
+@end smallexample
+@end cartouche
+
+
 @c ---------------------------------------------------------------------
 @c Configuration options
 @c ---------------------------------------------------------------------

+ 2 - 1
mpi/Makefile.am

@@ -79,7 +79,8 @@ libstarpumpi_la_SOURCES =				\
 	starpu_mpi.c					\
 	starpu_mpi_helper.c				\
 	starpu_mpi_datatype.c				\
-	starpu_mpi_insert_task.c
+	starpu_mpi_insert_task.c			\
+	starpu_mpi_collective.c
 
 examplebin_PROGRAMS +=				\
 	examples/stencil/stencil5

+ 2 - 62
mpi/examples/scatter_gather/mpi_scatter_gather.c

@@ -52,66 +52,6 @@ static starpu_codelet cl =
 	.nbuffers = 1
 };
 
-int starpu_mpi_scatter(starpu_data_handle *data_handles, int count, int root, MPI_Comm comm)
-{
-	int rank;
-	int x;
-	int mpi_tag = 0;
-
-	MPI_Comm_rank(comm, &rank);
-
-	for(x = 0; x < count ;  x++)
-	{
-		if (data_handles[x])
-		{
-			int owner = starpu_data_get_rank(data_handles[x]);
-			if ((rank == root) && (owner != root))
-			{
-				//fprintf(stderr, "[%d] Sending data[%d] to %d\n", rank, x, owner);
-				starpu_mpi_isend_detached(data_handles[x], owner, mpi_tag, comm, NULL, NULL);
-			}
-			if ((rank != root) && (owner == rank))
-			{
-				//fprintf(stderr, "[%d] Receiving data[%d] from %d\n", rank, x, root);
-				//MPI_Status status;
-				starpu_mpi_irecv_detached(data_handles[x], root, mpi_tag, comm, NULL, NULL);
-			}
-		}
-		mpi_tag++;
-	}
-	return 0;
-}
-
-int starpu_mpi_gather(starpu_data_handle *data_handles, int count, int root, MPI_Comm comm)
-{
-	int rank;
-	int x;
-	int mpi_tag = 0;
-
-	MPI_Comm_rank(comm, &rank);
-
-	for(x = 0; x < count ;  x++)
-	{
-		if (data_handles[x])
-		{
-			int owner = starpu_data_get_rank(data_handles[x]);
-			if ((rank == root) && (owner != root))
-			{
-				//fprintf(stderr, "[%d] Receiving data[%d] from %d\n", rank, x, owner);
-				//MPI_Status status;
-				starpu_mpi_irecv_detached(data_handles[x], owner, mpi_tag, comm, NULL, NULL);
-			}
-			if ((rank != root) && (owner == rank))
-			{
-				//fprintf(stderr, "[%d] Sending data[%d] to %d\n", rank, x, root);
-				starpu_mpi_isend_detached(data_handles[x], root, mpi_tag, comm, NULL, NULL);
-			}
-		}
-		mpi_tag ++;
-	}
-	return 0;
-}
-
 int main(int argc, char **argv)
 {
         int rank, nodes;
@@ -208,7 +148,7 @@ int main(int argc, char **argv)
         }
 
 	/* Scatter the matrix among the nodes */
-	starpu_mpi_scatter(data_handles, nblocks*nblocks, 0, MPI_COMM_WORLD);
+	starpu_mpi_scatter_detached(data_handles, nblocks*nblocks, 0, MPI_COMM_WORLD);
 
 	/* Calculation */
 	for(x = 0; x < nblocks*nblocks ;  x++)
@@ -228,7 +168,7 @@ int main(int argc, char **argv)
 	}
 
 	/* Gather the matrix on main node */
-	starpu_mpi_gather(data_handles, nblocks*nblocks, 0, MPI_COMM_WORLD);
+	starpu_mpi_gather_detached(data_handles, nblocks*nblocks, 0, MPI_COMM_WORLD);
 
 	/* Unregister matrix from StarPU */
 	for(x=0 ; x<nblocks*nblocks ; x++)

+ 3 - 0
mpi/starpu_mpi.h

@@ -39,6 +39,9 @@ int starpu_mpi_shutdown(void);
 int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...);
 void starpu_mpi_get_data_on_node(MPI_Comm comm, starpu_data_handle data_handle, int node);
 
+int starpu_mpi_scatter_detached(starpu_data_handle *data_handles, int count, int root, MPI_Comm comm);
+int starpu_mpi_gather_detached(starpu_data_handle *data_handles, int count, int root, MPI_Comm comm);
+
 /* Some helper functions */
 
 /* When the transfer is completed, the tag is unlocked */

+ 80 - 0
mpi/starpu_mpi_collective.c

@@ -0,0 +1,80 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2011  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <mpi.h>
+#include <starpu.h>
+#include <starpu_mpi.h>
+
+int starpu_mpi_scatter_detached(starpu_data_handle *data_handles, int count, int root, MPI_Comm comm)
+{
+	int rank;
+	int x;
+	int mpi_tag = 0;
+
+	MPI_Comm_rank(comm, &rank);
+
+	for(x = 0; x < count ;  x++)
+	{
+		if (data_handles[x])
+		{
+			int owner = starpu_data_get_rank(data_handles[x]);
+			if ((rank == root) && (owner != root))
+			{
+				//fprintf(stderr, "[%d] Sending data[%d] to %d\n", rank, x, owner);
+				starpu_mpi_isend_detached(data_handles[x], owner, mpi_tag, comm, NULL, NULL);
+			}
+			if ((rank != root) && (owner == rank))
+			{
+				//fprintf(stderr, "[%d] Receiving data[%d] from %d\n", rank, x, root);
+				//MPI_Status status;
+				starpu_mpi_irecv_detached(data_handles[x], root, mpi_tag, comm, NULL, NULL);
+			}
+		}
+		mpi_tag++;
+	}
+	return 0;
+}
+
+int starpu_mpi_gather_detached(starpu_data_handle *data_handles, int count, int root, MPI_Comm comm)
+{
+	int rank;
+	int x;
+	int mpi_tag = 0;
+
+	MPI_Comm_rank(comm, &rank);
+
+	for(x = 0; x < count ;  x++)
+	{
+		if (data_handles[x])
+		{
+			int owner = starpu_data_get_rank(data_handles[x]);
+			if ((rank == root) && (owner != root))
+			{
+				//fprintf(stderr, "[%d] Receiving data[%d] from %d\n", rank, x, owner);
+				//MPI_Status status;
+				starpu_mpi_irecv_detached(data_handles[x], owner, mpi_tag, comm, NULL, NULL);
+			}
+			if ((rank != root) && (owner == rank))
+			{
+				//fprintf(stderr, "[%d] Sending data[%d] to %d\n", rank, x, root);
+				starpu_mpi_isend_detached(data_handles[x], root, mpi_tag, comm, NULL, NULL);
+			}
+		}
+		mpi_tag ++;
+	}
+	return 0;
+}
+