|
@@ -1877,7 +1877,262 @@ CPUs.
|
|
|
@node StarPU MPI support
|
|
|
@chapter StarPU MPI support
|
|
|
|
|
|
-TODO: document include/starpu_mpi.h and explain a simple example (pingpong?)
|
|
|
+The integration of MPI transfers within task parallelism is done in a
|
|
|
+very natural way by the means of asynchronous interactions between the
|
|
|
+application and StarPU.
|
|
|
+
|
|
|
+@menu
|
|
|
+* The API::
|
|
|
+* Simple Example::
|
|
|
+* MPI Insert Task Utility::
|
|
|
+@end menu
|
|
|
+
|
|
|
+@node The API
|
|
|
+@section The API
|
|
|
+
|
|
|
+@subsection Initialisation
|
|
|
+
|
|
|
+@deftypefun int starpu_mpi_initialize (void)
|
|
|
+@end deftypefun
|
|
|
+
|
|
|
+@deftypefun int starpu_mpi_initialize_extended (int @var{initialize_mpi}, int *@var{rank}, int *@var{world_size})
|
|
|
+@end deftypefun
|
|
|
+
|
|
|
+@deftypefun int starpu_mpi_shutdown (void)
|
|
|
+@end deftypefun
|
|
|
+
|
|
|
+@subsection Communication
|
|
|
+
|
|
|
+@deftypefun int starpu_mpi_isend (starpu_data_handle @var{data_handle}, starpu_mpi_req *@var{req}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm})
|
|
|
+@end deftypefun
|
|
|
+
|
|
|
+@deftypefun int starpu_mpi_irecv (starpu_data_handle @var{data_handle}, starpu_mpi_req *@var{req}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm})
|
|
|
+@end deftypefun
|
|
|
+
|
|
|
+@deftypefun int starpu_mpi_send (starpu_data_handle @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm})
|
|
|
+@end deftypefun
|
|
|
+
|
|
|
+@deftypefun int starpu_mpi_recv (starpu_data_handle @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, MPI_Status *@var{status})
|
|
|
+@end deftypefun
|
|
|
+
|
|
|
+@deftypefun int starpu_mpi_isend_detached (starpu_data_handle @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm}, void (*@var{callback})(void *), void *@var{arg})
|
|
|
+@end deftypefun
|
|
|
+
|
|
|
+@deftypefun int starpu_mpi_irecv_detached (starpu_data_handle @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, void (*@var{callback})(void *), void *@var{arg})
|
|
|
+@end deftypefun
|
|
|
+
|
|
|
+@deftypefun int starpu_mpi_wait (starpu_mpi_req *@var{req}, MPI_Status *@var{status})
|
|
|
+@end deftypefun
|
|
|
+
|
|
|
+@deftypefun int starpu_mpi_test (starpu_mpi_req *@var{req}, int *@var{flag}, MPI_Status *@var{status})
|
|
|
+@end deftypefun
|
|
|
+
|
|
|
+@deftypefun int starpu_mpi_barrier (MPI_Comm @var{comm})
|
|
|
+@end deftypefun
|
|
|
+
|
|
|
+@deftypefun int starpu_mpi_isend_detached_unlock_tag (starpu_data_handle @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm}, starpu_tag_t @var{tag})
|
|
|
+When the transfer is completed, the tag is unlocked
|
|
|
+@end deftypefun
|
|
|
+
|
|
|
+@deftypefun int starpu_mpi_irecv_detached_unlock_tag (starpu_data_handle @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, starpu_tag_t @var{tag})
|
|
|
+@end deftypefun
|
|
|
+
|
|
|
+@deftypefun int starpu_mpi_isend_array_detached_unlock_tag (unsigned @var{array_size}, starpu_data_handle *@var{data_handle}, int *@var{dest}, int *@var{mpi_tag}, MPI_Comm *@var{comm}, starpu_tag_t @var{tag})
|
|
|
+Asynchronously send an array of buffers, and unlocks the tag once all
|
|
|
+of them are transmitted.
|
|
|
+@end deftypefun
|
|
|
+
|
|
|
+@deftypefun int starpu_mpi_irecv_array_detached_unlock_tag (unsigned @var{array_size}, starpu_data_handle *@var{data_handle}, int *@var{source}, int *@var{mpi_tag}, MPI_Comm *@var{comm}, starpu_tag_t @var{tag})
|
|
|
+@end deftypefun
|
|
|
+
|
|
|
+@page
|
|
|
+@node Simple Example
|
|
|
+@section Simple Example
|
|
|
+
|
|
|
+@cartouche
|
|
|
+@smallexample
|
|
|
+void increment_token(void)
|
|
|
+@{
|
|
|
+ struct starpu_task *task = starpu_task_create();
|
|
|
+
|
|
|
+ task->cl = &increment_cl;
|
|
|
+ task->buffers[0].handle = token_handle;
|
|
|
+ task->buffers[0].mode = STARPU_RW;
|
|
|
+
|
|
|
+ starpu_task_submit(task);
|
|
|
+@}
|
|
|
+@end smallexample
|
|
|
+@end cartouche
|
|
|
+
|
|
|
+@cartouche
|
|
|
+@smallexample
|
|
|
+int main(int argc, char **argv)
|
|
|
+@{
|
|
|
+ int rank, size;
|
|
|
+
|
|
|
+ starpu_init(NULL);
|
|
|
+ starpu_mpi_initialize_extended(1, &rank, &size);
|
|
|
+
|
|
|
+ starpu_vector_data_register(&token_handle, 0, (uintptr_t)&token, 1, sizeof(unsigned));
|
|
|
+
|
|
|
+ unsigned nloops = NITER;
|
|
|
+ unsigned loop;
|
|
|
+
|
|
|
+ unsigned last_loop = nloops - 1;
|
|
|
+ unsigned last_rank = size - 1;
|
|
|
+@end smallexample
|
|
|
+@end cartouche
|
|
|
+
|
|
|
+@cartouche
|
|
|
+@smallexample
|
|
|
+ for (loop = 0; loop < nloops; loop++) @{
|
|
|
+ int tag = loop*size + rank;
|
|
|
+
|
|
|
+ if (!((loop == 0) && (rank == 0)))
|
|
|
+ @{
|
|
|
+ starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag,
|
|
|
+ MPI_COMM_WORLD, NULL, NULL);
|
|
|
+ @}
|
|
|
+ else @{
|
|
|
+ token = 0;
|
|
|
+ fprintf(stdout, "Start with token value %d\n", token);
|
|
|
+ @}
|
|
|
+
|
|
|
+ increment_token();
|
|
|
+
|
|
|
+ if (!((loop == last_loop) && (rank == last_rank)))
|
|
|
+ @{
|
|
|
+ starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1,
|
|
|
+ MPI_COMM_WORLD, NULL, NULL);
|
|
|
+ @}
|
|
|
+ else @{
|
|
|
+ starpu_data_acquire(token_handle, STARPU_R);
|
|
|
+ fprintf(stdout, "Finished : token value %d\n", token);
|
|
|
+ starpu_data_release(token_handle);
|
|
|
+ @}
|
|
|
+ @}
|
|
|
+
|
|
|
+ starpu_task_wait_for_all();
|
|
|
+@end smallexample
|
|
|
+@end cartouche
|
|
|
+
|
|
|
+@cartouche
|
|
|
+@smallexample
|
|
|
+ starpu_mpi_shutdown();
|
|
|
+ starpu_shutdown();
|
|
|
+
|
|
|
+ if (rank == last_rank)
|
|
|
+ @{
|
|
|
+ fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size);
|
|
|
+ STARPU_ASSERT(token == nloops*size);
|
|
|
+ @}
|
|
|
+@end smallexample
|
|
|
+@end cartouche
|
|
|
+
|
|
|
+@page
|
|
|
+@node MPI Insert Task Utility
|
|
|
+@section MPI Insert Task Utility
|
|
|
+
|
|
|
+@deftypefun void starpu_mpi_insert_task (MPI_Comm @var{comm}, starpu_codelet *@var{cl}, ...)
|
|
|
+Create and submit a task corresponding to @var{cl} with the following
|
|
|
+arguments. The argument list must be zero-terminated.
|
|
|
+
|
|
|
+The arguments following the codelets are the same types as for the
|
|
|
+function @code{starpu_insert_task} defined in @ref{Insert Task
|
|
|
+Utility}. The extra argument @code{STARPU_EXECUTE} followed by an
|
|
|
+integer allows to specify the node to execute the codelet.
|
|
|
+
|
|
|
+The algorithm is as follows:
|
|
|
+@enumerate
|
|
|
+@item Find out whether we are to execute the codelet because we own the
|
|
|
+data to be written to. If different tasks own data to be written to,
|
|
|
+the argument @code{STARPU_EXECUTE} should be used to specify the
|
|
|
+executing task @code{ET}.
|
|
|
+@item Send and receive data as requested. Tasks owning data which need
|
|
|
+to be read by the executing task @code{ET} are sending them to @code{ET}.
|
|
|
+@item Execute the codelet. This is done by the task selected in the
|
|
|
+1st step of the algorithm.
|
|
|
+@item In the case when different tasks own data to be written to, send
|
|
|
+W data back to their owners.
|
|
|
+@end enumerate
|
|
|
+
|
|
|
+The algorithm also includes a cache mechanism that allows not to send
|
|
|
+data twice to the same task, unless the data has been modified.
|
|
|
+
|
|
|
+@end deftypefun
|
|
|
+
|
|
|
+@deftypefun void starpu_mpi_get_data_on_node (MPI_Comm @var{comm}, starpu_data_handle @var{data_handle}, int @var{node})
|
|
|
+@end deftypefun
|
|
|
+
|
|
|
+@page
|
|
|
+
|
|
|
+Here an example showing how to use @code{starpu_mpi_insert_task}. One
|
|
|
+first needs to define a distribution function which specifies the
|
|
|
+locality of the data. Note that that distribution information needs to
|
|
|
+be given to StarPU by calling @code{starpu_data_set_rank}.
|
|
|
+
|
|
|
+@cartouche
|
|
|
+@smallexample
|
|
|
+/* Returns the MPI node number where data is */
|
|
|
+int my_distrib(int x, int y, int nb_nodes) @{
|
|
|
+ /* Cyclic distrib */
|
|
|
+ return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;
|
|
|
+ // /* Linear distrib */
|
|
|
+ // return x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * X;
|
|
|
+@}
|
|
|
+@end smallexample
|
|
|
+@end cartouche
|
|
|
+
|
|
|
+Now the data can be registered within StarPU. Data which are not
|
|
|
+owned but will be needed for computations can be registered through
|
|
|
+the lazy allocation mechanism, i.e. with a @code{home_node} set to -1.
|
|
|
+StarPU will automatically allocate the memory when it is used for the
|
|
|
+first time.
|
|
|
+
|
|
|
+@cartouche
|
|
|
+@smallexample
|
|
|
+ unsigned matrix[X][Y];
|
|
|
+ starpu_data_handle data_handles[X][Y];
|
|
|
+
|
|
|
+ for(x = 0; x < X; x++) @{
|
|
|
+ for (y = 0; y < Y; y++) @{
|
|
|
+ int mpi_rank = my_distrib(x, y, size);
|
|
|
+ if (mpi_rank == rank)
|
|
|
+ /* Owning data */
|
|
|
+ starpu_variable_data_register(&data_handles[x][y], 0,
|
|
|
+ (uintptr_t)&(matrix[x][y]), sizeof(unsigned));
|
|
|
+ else if (rank == mpi_rank+1 || rank == mpi_rank-1)
|
|
|
+ /* I don't own that index, but will need it for my computations */
|
|
|
+ starpu_variable_data_register(&data_handles[x][y], -1,
|
|
|
+ (uintptr_t)NULL, sizeof(unsigned));
|
|
|
+ else
|
|
|
+ /* I know it's useless to allocate anything for this */
|
|
|
+ data_handles[x][y] = NULL;
|
|
|
+ if (data_handles[x][y])
|
|
|
+ starpu_data_set_rank(data_handles[x][y], mpi_rank);
|
|
|
+ @}
|
|
|
+ @}
|
|
|
+@end smallexample
|
|
|
+@end cartouche
|
|
|
+
|
|
|
+Now @code{starpu_mpi_insert_task()} can be called for the different
|
|
|
+steps of the application.
|
|
|
+
|
|
|
+@cartouche
|
|
|
+@smallexample
|
|
|
+ for(loop=0 ; loop<niter; loop++)
|
|
|
+ for (x = 1; x < X-1; x++)
|
|
|
+ for (y = 1; y < Y-1; y++)
|
|
|
+ starpu_mpi_insert_task(MPI_COMM_WORLD, &stencil5_cl,
|
|
|
+ STARPU_RW, data_handles[x][y],
|
|
|
+ STARPU_R, data_handles[x-1][y],
|
|
|
+ STARPU_R, data_handles[x+1][y],
|
|
|
+ STARPU_R, data_handles[x][y-1],
|
|
|
+ STARPU_R, data_handles[x][y+1],
|
|
|
+ 0);
|
|
|
+ starpu_task_wait_for_all();
|
|
|
+@end smallexample
|
|
|
+@end cartouche
|
|
|
|
|
|
@c ---------------------------------------------------------------------
|
|
|
@c Configuration options
|