@c -*-texinfo-*- @c This file is part of the StarPU Handbook. @c Copyright (C) 2009--2011 Universit@'e de Bordeaux 1 @c Copyright (C) 2010, 2011 Centre National de la Recherche Scientifique @c Copyright (C) 2011 Institut National de Recherche en Informatique et Automatique @c See the file starpu.texi for copying conditions. @node StarPU MPI support @chapter StarPU MPI support The integration of MPI transfers within task parallelism is done in a very natural way by the means of asynchronous interactions between the application and StarPU. This is implemented in a separate libstarpumpi library which basically provides "StarPU" equivalents of @code{MPI_*} functions, where @code{void *} buffers are replaced with @code{starpu_data_handle_t}s, and all GPU-RAM-NIC transfers are handled efficiently by StarPU-MPI. The user has to use the usual @code{mpirun} command of the MPI implementation to start StarPU on the different MPI nodes. An MPI Insert Task function provides an even more seamless transition to a distributed application, by automatically issuing all required data transfers according to the task graph and an application-provided distribution. @menu * The API:: * Simple Example:: * MPI Insert Task Utility:: * MPI Collective Operations:: @end menu @node The API @section The API @subsection Compilation The flags required to compile or link against the MPI layer are then accessible with the following commands: @example % pkg-config --cflags libstarpumpi # options for the compiler % pkg-config --libs libstarpumpi # options for the linker @end example @subsection Initialisation @deftypefun int starpu_mpi_initialize (void) Initializes the starpumpi library. This must be called between calling @code{starpu_init} and other @code{starpu_mpi} functions. This function does not call @code{MPI_Init}, it should be called beforehand. @end deftypefun @deftypefun int starpu_mpi_initialize_extended (int *@var{rank}, int *@var{world_size}) Initializes the starpumpi library. This must be called between calling @code{starpu_init} and other @code{starpu_mpi} functions. This function calls @code{MPI_Init}, and therefore should be prefered to the previous one for MPI implementations which are not thread-safe. Returns the current MPI node rank and world size. @end deftypefun @deftypefun int starpu_mpi_shutdown (void) Cleans the starpumpi library. This must be called between calling @code{starpu_mpi} functions and @code{starpu_shutdown}. @code{MPI_Finalize} will be called if StarPU-MPI has been initialized by calling @code{starpu_mpi_initialize_extended}. @end deftypefun @subsection Communication @deftypefun int starpu_mpi_send (starpu_data_handle_t @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm}) @end deftypefun @deftypefun int starpu_mpi_recv (starpu_data_handle_t @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, MPI_Status *@var{status}) @end deftypefun @deftypefun int starpu_mpi_isend (starpu_data_handle_t @var{data_handle}, starpu_mpi_req *@var{req}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm}) @end deftypefun @deftypefun int starpu_mpi_irecv (starpu_data_handle_t @var{data_handle}, starpu_mpi_req *@var{req}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}) @end deftypefun @deftypefun int starpu_mpi_isend_detached (starpu_data_handle_t @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm}, void (*@var{callback})(void *), void *@var{arg}) @end deftypefun @deftypefun int starpu_mpi_irecv_detached (starpu_data_handle_t @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, void (*@var{callback})(void *), void *@var{arg}) @end deftypefun @deftypefun int starpu_mpi_wait (starpu_mpi_req *@var{req}, MPI_Status *@var{status}) @end deftypefun @deftypefun int starpu_mpi_test (starpu_mpi_req *@var{req}, int *@var{flag}, MPI_Status *@var{status}) @end deftypefun @deftypefun int starpu_mpi_barrier (MPI_Comm @var{comm}) @end deftypefun @deftypefun int starpu_mpi_isend_detached_unlock_tag (starpu_data_handle_t @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm}, starpu_tag_t @var{tag}) When the transfer is completed, the tag is unlocked @end deftypefun @deftypefun int starpu_mpi_irecv_detached_unlock_tag (starpu_data_handle_t @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, starpu_tag_t @var{tag}) @end deftypefun @deftypefun int starpu_mpi_isend_array_detached_unlock_tag (unsigned @var{array_size}, starpu_data_handle_t *@var{data_handle}, int *@var{dest}, int *@var{mpi_tag}, MPI_Comm *@var{comm}, starpu_tag_t @var{tag}) Asynchronously send an array of buffers, and unlocks the tag once all of them are transmitted. @end deftypefun @deftypefun int starpu_mpi_irecv_array_detached_unlock_tag (unsigned @var{array_size}, starpu_data_handle_t *@var{data_handle}, int *@var{source}, int *@var{mpi_tag}, MPI_Comm *@var{comm}, starpu_tag_t @var{tag}) @end deftypefun @page @node Simple Example @section Simple Example @cartouche @smallexample void increment_token(void) @{ struct starpu_task *task = starpu_task_create(); task->cl = &increment_cl; task->buffers[0].handle = token_handle; task->buffers[0].mode = STARPU_RW; starpu_task_submit(task); @} @end smallexample @end cartouche @cartouche @smallexample int main(int argc, char **argv) @{ int rank, size; starpu_init(NULL); starpu_mpi_initialize_extended(&rank, &size); starpu_vector_data_register(&token_handle, 0, (uintptr_t)&token, 1, sizeof(unsigned)); unsigned nloops = NITER; unsigned loop; unsigned last_loop = nloops - 1; unsigned last_rank = size - 1; @end smallexample @end cartouche @cartouche @smallexample for (loop = 0; loop < nloops; loop++) @{ int tag = loop*size + rank; if (loop == 0 && rank == 0) @{ token = 0; fprintf(stdout, "Start with token value %d\n", token); @} else @{ starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag, MPI_COMM_WORLD, NULL, NULL); @} increment_token(); if (loop == last_loop && rank == last_rank) @{ starpu_data_acquire(token_handle, STARPU_R); fprintf(stdout, "Finished : token value %d\n", token); starpu_data_release(token_handle); @} else @{ starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1, MPI_COMM_WORLD, NULL, NULL); @} @} starpu_task_wait_for_all(); @end smallexample @end cartouche @cartouche @smallexample starpu_mpi_shutdown(); starpu_shutdown(); if (rank == last_rank) @{ fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size); STARPU_ASSERT(token == nloops*size); @} @end smallexample @end cartouche @page @node MPI Insert Task Utility @section MPI Insert Task Utility To save the programmer from having to explicit all communications, StarPU provides an "MPI Insert Task Utility". The principe is that the application decides a distribution of the data over the MPI nodes by allocating it and notifying StarPU of that decision, i.e. tell StarPU which MPI node "owns" which data. All MPI nodes then process the whole task graph, and StarPU automatically determines which node actually execute which task, as well as the required MPI transfers. @deftypefun int starpu_data_set_tag (starpu_data_handle_t @var{handle}, int @var{tag}) Tell StarPU-MPI which MPI tag to use when exchanging the data. @end deftypefun @deftypefun int starpu_data_get_tag (starpu_data_handle_t @var{handle}) Returns the MPI tag to be used when exchanging the data. @end deftypefun @deftypefun int starpu_data_set_rank (starpu_data_handle_t @var{handle}, int @var{mpi_rank}) Tell StarPU-MPI which MPI node "owns" a given data, that is, the node which will always keep an up-to-date value, and will by default execute tasks which write to it. @end deftypefun @deftypefun int starpu_data_get_rank (starpu_data_handle_t @var{handle}) Returns the last value set by @code{starpu_data_set_rank}. @end deftypefun @deftypefun void starpu_mpi_insert_task (MPI_Comm @var{comm}, starpu_codelet *@var{cl}, ...) Create and submit a task corresponding to @var{cl} with the following arguments. The argument list must be zero-terminated. The arguments following the codelets are the same types as for the function @code{starpu_insert_task} defined in @ref{Insert Task Utility}. The extra argument @code{STARPU_EXECUTE_ON_NODE} followed by an integer allows to specify the MPI node to execute the codelet. It is also possible to specify that the node owning a specific data will execute the codelet, by using @code{STARPU_EXECUTE_ON_DATA} followed by a data handle. The internal algorithm is as follows: @enumerate @item Find out whether we (as an MPI node) are to execute the codelet because we own the data to be written to. If different nodes own data to be written to, the argument @code{STARPU_EXECUTE_ON_NODE} or @code{STARPU_EXECUTE_ON_DATA} has to be used to specify which MPI node will execute the task. @item Send and receive data as requested. Nodes owning data which need to be read by the task are sending them to the MPI node which will execute it. The latter receives them. @item Execute the codelet. This is done by the MPI node selected in the 1st step of the algorithm. @item In the case when different MPI nodes own data to be written to, send written data back to their owners. @end enumerate The algorithm also includes a cache mechanism that allows not to send data twice to the same MPI node, unless the data has been modified. @end deftypefun @deftypefun void starpu_mpi_get_data_on_node (MPI_Comm @var{comm}, starpu_data_handle_t @var{data_handle}, int @var{node}) todo @end deftypefun Here an stencil example showing how to use @code{starpu_mpi_insert_task}. One first needs to define a distribution function which specifies the locality of the data. Note that that distribution information needs to be given to StarPU by calling @code{starpu_data_set_rank}. @cartouche @smallexample /* Returns the MPI node number where data is */ int my_distrib(int x, int y, int nb_nodes) @{ /* Block distrib */ return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes; // /* Other examples useful for other kinds of computations */ // /* / distrib */ // return (x+y) % nb_nodes; // /* Block cyclic distrib */ // unsigned side = sqrt(nb_nodes); // return x % side + (y % side) * size; @} @end smallexample @end cartouche Now the data can be registered within StarPU. Data which are not owned but will be needed for computations can be registered through the lazy allocation mechanism, i.e. with a @code{home_node} set to -1. StarPU will automatically allocate the memory when it is used for the first time. One can note an optimization here (the @code{else if} test): we only register data which will be needed by the tasks that we will execute. @cartouche @smallexample unsigned matrix[X][Y]; starpu_data_handle_t data_handles[X][Y]; for(x = 0; x < X; x++) @{ for (y = 0; y < Y; y++) @{ int mpi_rank = my_distrib(x, y, size); if (mpi_rank == my_rank) /* Owning data */ starpu_variable_data_register(&data_handles[x][y], 0, (uintptr_t)&(matrix[x][y]), sizeof(unsigned)); else if (my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size) || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size)) /* I don't own that index, but will need it for my computations */ starpu_variable_data_register(&data_handles[x][y], -1, (uintptr_t)NULL, sizeof(unsigned)); else /* I know it's useless to allocate anything for this */ data_handles[x][y] = NULL; if (data_handles[x][y]) starpu_data_set_rank(data_handles[x][y], mpi_rank); @} @} @end smallexample @end cartouche Now @code{starpu_mpi_insert_task()} can be called for the different steps of the application. @cartouche @smallexample for(loop=0 ; loop