123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421 |
- @c -*-texinfo-*-
- @c This file is part of the StarPU Handbook.
- @c Copyright (C) 2009--2011 Universit@'e de Bordeaux 1
- @c Copyright (C) 2010, 2011 Centre National de la Recherche Scientifique
- @c Copyright (C) 2011 Institut National de Recherche en Informatique et Automatique
- @c See the file starpu.texi for copying conditions.
- @node StarPU MPI support
- @chapter StarPU MPI support
- The integration of MPI transfers within task parallelism is done in a
- very natural way by the means of asynchronous interactions between the
- application and StarPU. This is implemented in a separate libstarpumpi library
- which basically provides "StarPU" equivalents of @code{MPI_*} functions, where
- @code{void *} buffers are replaced with @code{starpu_data_handle_t}s, and all
- GPU-RAM-NIC transfers are handled efficiently by StarPU-MPI. The user has to
- use the usual @code{mpirun} command of the MPI implementation to start StarPU on
- the different MPI nodes.
- An MPI Insert Task function provides an even more seamless transition to a
- distributed application, by automatically issuing all required data transfers
- according to the task graph and an application-provided distribution.
- @menu
- * The API::
- * Simple Example::
- * MPI Insert Task Utility::
- * MPI Collective Operations::
- @end menu
- @node The API
- @section The API
- @subsection Compilation
- The flags required to compile or link against the MPI layer are then
- accessible with the following commands:
- @example
- % pkg-config --cflags libstarpumpi # options for the compiler
- % pkg-config --libs libstarpumpi # options for the linker
- @end example
- @subsection Initialisation
- @deftypefun int starpu_mpi_initialize (void)
- Initializes the starpumpi library. This must be called between calling
- @code{starpu_init} and other @code{starpu_mpi} functions. This
- function does not call @code{MPI_Init}, it should be called beforehand.
- @end deftypefun
- @deftypefun int starpu_mpi_initialize_extended (int *@var{rank}, int *@var{world_size})
- Initializes the starpumpi library. This must be called between calling
- @code{starpu_init} and other @code{starpu_mpi} functions.
- This function calls @code{MPI_Init}, and therefore should be prefered
- to the previous one for MPI implementations which are not thread-safe.
- Returns the current MPI node rank and world size.
- @end deftypefun
- @deftypefun int starpu_mpi_shutdown (void)
- Cleans the starpumpi library. This must be called between calling
- @code{starpu_mpi} functions and @code{starpu_shutdown}.
- @code{MPI_Finalize} will be called if StarPU-MPI has been initialized
- by calling @code{starpu_mpi_initialize_extended}.
- @end deftypefun
- @subsection Communication
- @deftypefun int starpu_mpi_send (starpu_data_handle_t @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm})
- @end deftypefun
- @deftypefun int starpu_mpi_recv (starpu_data_handle_t @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, MPI_Status *@var{status})
- @end deftypefun
- @deftypefun int starpu_mpi_isend (starpu_data_handle_t @var{data_handle}, starpu_mpi_req *@var{req}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm})
- @end deftypefun
- @deftypefun int starpu_mpi_irecv (starpu_data_handle_t @var{data_handle}, starpu_mpi_req *@var{req}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm})
- @end deftypefun
- @deftypefun int starpu_mpi_isend_detached (starpu_data_handle_t @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm}, void (*@var{callback})(void *), void *@var{arg})
- @end deftypefun
- @deftypefun int starpu_mpi_irecv_detached (starpu_data_handle_t @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, void (*@var{callback})(void *), void *@var{arg})
- @end deftypefun
- @deftypefun int starpu_mpi_wait (starpu_mpi_req *@var{req}, MPI_Status *@var{status})
- @end deftypefun
- @deftypefun int starpu_mpi_test (starpu_mpi_req *@var{req}, int *@var{flag}, MPI_Status *@var{status})
- @end deftypefun
- @deftypefun int starpu_mpi_barrier (MPI_Comm @var{comm})
- @end deftypefun
- @deftypefun int starpu_mpi_isend_detached_unlock_tag (starpu_data_handle_t @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm}, starpu_tag_t @var{tag})
- When the transfer is completed, the tag is unlocked
- @end deftypefun
- @deftypefun int starpu_mpi_irecv_detached_unlock_tag (starpu_data_handle_t @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, starpu_tag_t @var{tag})
- @end deftypefun
- @deftypefun int starpu_mpi_isend_array_detached_unlock_tag (unsigned @var{array_size}, starpu_data_handle_t *@var{data_handle}, int *@var{dest}, int *@var{mpi_tag}, MPI_Comm *@var{comm}, starpu_tag_t @var{tag})
- Asynchronously send an array of buffers, and unlocks the tag once all
- of them are transmitted.
- @end deftypefun
- @deftypefun int starpu_mpi_irecv_array_detached_unlock_tag (unsigned @var{array_size}, starpu_data_handle_t *@var{data_handle}, int *@var{source}, int *@var{mpi_tag}, MPI_Comm *@var{comm}, starpu_tag_t @var{tag})
- @end deftypefun
- @page
- @node Simple Example
- @section Simple Example
- @cartouche
- @smallexample
- void increment_token(void)
- @{
- struct starpu_task *task = starpu_task_create();
- task->cl = &increment_cl;
- task->buffers[0].handle = token_handle;
- task->buffers[0].mode = STARPU_RW;
- starpu_task_submit(task);
- @}
- @end smallexample
- @end cartouche
- @cartouche
- @smallexample
- int main(int argc, char **argv)
- @{
- int rank, size;
- starpu_init(NULL);
- starpu_mpi_initialize_extended(&rank, &size);
- starpu_vector_data_register(&token_handle, 0, (uintptr_t)&token, 1, sizeof(unsigned));
- unsigned nloops = NITER;
- unsigned loop;
- unsigned last_loop = nloops - 1;
- unsigned last_rank = size - 1;
- @end smallexample
- @end cartouche
- @cartouche
- @smallexample
- for (loop = 0; loop < nloops; loop++) @{
- int tag = loop*size + rank;
- if (loop == 0 && rank == 0)
- @{
- token = 0;
- fprintf(stdout, "Start with token value %d\n", token);
- @}
- else
- @{
- starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag,
- MPI_COMM_WORLD, NULL, NULL);
- @}
- increment_token();
- if (loop == last_loop && rank == last_rank)
- @{
- starpu_data_acquire(token_handle, STARPU_R);
- fprintf(stdout, "Finished : token value %d\n", token);
- starpu_data_release(token_handle);
- @}
- else
- @{
- starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1,
- MPI_COMM_WORLD, NULL, NULL);
- @}
- @}
- starpu_task_wait_for_all();
- @end smallexample
- @end cartouche
- @cartouche
- @smallexample
- starpu_mpi_shutdown();
- starpu_shutdown();
- if (rank == last_rank)
- @{
- fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size);
- STARPU_ASSERT(token == nloops*size);
- @}
- @end smallexample
- @end cartouche
- @page
- @node MPI Insert Task Utility
- @section MPI Insert Task Utility
- To save the programmer from having to explicit all communications, StarPU
- provides an "MPI Insert Task Utility". The principe is that the application
- decides a distribution of the data over the MPI nodes by allocating it and
- notifying StarPU of that decision, i.e. tell StarPU which MPI node "owns" which
- data. All MPI nodes then process the whole task graph, and StarPU automatically
- determines which node actually execute which task, as well as the required MPI
- transfers.
- @deftypefun int starpu_data_set_tag (starpu_data_handle_t @var{handle}, int @var{tag})
- Tell StarPU-MPI which MPI tag to use when exchanging the data.
- @end deftypefun
- @deftypefun int starpu_data_get_tag (starpu_data_handle_t @var{handle})
- Returns the MPI tag to be used when exchanging the data.
- @end deftypefun
- @deftypefun int starpu_data_set_rank (starpu_data_handle_t @var{handle}, int @var{mpi_rank})
- Tell StarPU-MPI which MPI node "owns" a given data, that is, the node which will
- always keep an up-to-date value, and will by default execute tasks which write
- to it.
- @end deftypefun
- @deftypefun int starpu_data_get_rank (starpu_data_handle_t @var{handle})
- Returns the last value set by @code{starpu_data_set_rank}.
- @end deftypefun
- @deftypefun void starpu_mpi_insert_task (MPI_Comm @var{comm}, starpu_codelet *@var{cl}, ...)
- Create and submit a task corresponding to @var{cl} with the following
- arguments. The argument list must be zero-terminated.
- The arguments following the codelets are the same types as for the
- function @code{starpu_insert_task} defined in @ref{Insert Task
- Utility}. The extra argument @code{STARPU_EXECUTE_ON_NODE} followed by an
- integer allows to specify the MPI node to execute the codelet. It is also
- possible to specify that the node owning a specific data will execute
- the codelet, by using @code{STARPU_EXECUTE_ON_DATA} followed by a data
- handle.
- The internal algorithm is as follows:
- @enumerate
- @item Find out whether we (as an MPI node) are to execute the codelet
- because we own the data to be written to. If different nodes own data
- to be written to, the argument @code{STARPU_EXECUTE_ON_NODE} or
- @code{STARPU_EXECUTE_ON_DATA} has to be used to specify which MPI node will
- execute the task.
- @item Send and receive data as requested. Nodes owning data which need to be
- read by the task are sending them to the MPI node which will execute it. The
- latter receives them.
- @item Execute the codelet. This is done by the MPI node selected in the
- 1st step of the algorithm.
- @item In the case when different MPI nodes own data to be written to, send
- written data back to their owners.
- @end enumerate
- The algorithm also includes a cache mechanism that allows not to send
- data twice to the same MPI node, unless the data has been modified.
- @end deftypefun
- @deftypefun void starpu_mpi_get_data_on_node (MPI_Comm @var{comm}, starpu_data_handle_t @var{data_handle}, int @var{node})
- todo
- @end deftypefun
- Here an stencil example showing how to use @code{starpu_mpi_insert_task}. One
- first needs to define a distribution function which specifies the
- locality of the data. Note that that distribution information needs to
- be given to StarPU by calling @code{starpu_data_set_rank}.
- @cartouche
- @smallexample
- /* Returns the MPI node number where data is */
- int my_distrib(int x, int y, int nb_nodes) @{
- /* Block distrib */
- return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;
- // /* Other examples useful for other kinds of computations */
- // /* / distrib */
- // return (x+y) % nb_nodes;
- // /* Block cyclic distrib */
- // unsigned side = sqrt(nb_nodes);
- // return x % side + (y % side) * size;
- @}
- @end smallexample
- @end cartouche
- Now the data can be registered within StarPU. Data which are not
- owned but will be needed for computations can be registered through
- the lazy allocation mechanism, i.e. with a @code{home_node} set to -1.
- StarPU will automatically allocate the memory when it is used for the
- first time.
- One can note an optimization here (the @code{else if} test): we only register
- data which will be needed by the tasks that we will execute.
- @cartouche
- @smallexample
- unsigned matrix[X][Y];
- starpu_data_handle_t data_handles[X][Y];
- for(x = 0; x < X; x++) @{
- for (y = 0; y < Y; y++) @{
- int mpi_rank = my_distrib(x, y, size);
- if (mpi_rank == my_rank)
- /* Owning data */
- starpu_variable_data_register(&data_handles[x][y], 0,
- (uintptr_t)&(matrix[x][y]), sizeof(unsigned));
- else if (my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
- || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size))
- /* I don't own that index, but will need it for my computations */
- starpu_variable_data_register(&data_handles[x][y], -1,
- (uintptr_t)NULL, sizeof(unsigned));
- else
- /* I know it's useless to allocate anything for this */
- data_handles[x][y] = NULL;
- if (data_handles[x][y])
- starpu_data_set_rank(data_handles[x][y], mpi_rank);
- @}
- @}
- @end smallexample
- @end cartouche
- Now @code{starpu_mpi_insert_task()} can be called for the different
- steps of the application.
- @cartouche
- @smallexample
- for(loop=0 ; loop<niter; loop++)
- for (x = 1; x < X-1; x++)
- for (y = 1; y < Y-1; y++)
- starpu_mpi_insert_task(MPI_COMM_WORLD, &stencil5_cl,
- STARPU_RW, data_handles[x][y],
- STARPU_R, data_handles[x-1][y],
- STARPU_R, data_handles[x+1][y],
- STARPU_R, data_handles[x][y-1],
- STARPU_R, data_handles[x][y+1],
- 0);
- starpu_task_wait_for_all();
- @end smallexample
- @end cartouche
- I.e. all MPI nodes process the whole task graph, but as mentioned above, for
- each task, only the MPI node which owns the data being written to (here,
- @code{data_handles[x][y]}) will actually run the task. The other MPI nodes will
- automatically send the required data.
- @node MPI Collective Operations
- @section MPI Collective Operations
- @deftypefun int starpu_mpi_scatter_detached (starpu_data_handle_t *@var{data_handles}, int @var{count}, int @var{root}, MPI_Comm @var{comm})
- Scatter data among processes of the communicator based on the ownership of
- the data. For each data of the array @var{data_handles}, the
- process @var{root} sends the data to the process owning this data.
- Processes receiving data must have valid data handles to receive them.
- @end deftypefun
- @deftypefun int starpu_mpi_gather_detached (starpu_data_handle_t *@var{data_handles}, int @var{count}, int @var{root}, MPI_Comm @var{comm})
- Gather data from the different processes of the communicator onto the
- process @var{root}. Each process owning data handle in the array
- @var{data_handles} will send them to the process @var{root}. The
- process @var{root} must have valid data handles to receive the data.
- @end deftypefun
- @page
- @cartouche
- @smallexample
- if (rank == root)
- @{
- /* Allocate the vector */
- vector = malloc(nblocks * sizeof(float *));
- for(x=0 ; x<nblocks ; x++)
- @{
- starpu_malloc((void **)&vector[x], block_size*sizeof(float));
- @}
- @}
- /* Allocate data handles and register data to StarPU */
- data_handles = malloc(nblocks*sizeof(starpu_data_handle_t *));
- for(x = 0; x < nblocks ; x++)
- @{
- int mpi_rank = my_distrib(x, nodes);
- if (rank == root) @{
- starpu_vector_data_register(&data_handles[x], 0, (uintptr_t)vector[x],
- blocks_size, sizeof(float));
- @}
- else if ((mpi_rank == rank) || ((rank == mpi_rank+1 || rank == mpi_rank-1))) @{
- /* I own that index, or i will need it for my computations */
- starpu_vector_data_register(&data_handles[x], -1, (uintptr_t)NULL,
- block_size, sizeof(float));
- @}
- else @{
- /* I know it's useless to allocate anything for this */
- data_handles[x] = NULL;
- @}
- if (data_handles[x]) @{
- starpu_data_set_rank(data_handles[x], mpi_rank);
- @}
- @}
- /* Scatter the matrix among the nodes */
- starpu_mpi_scatter_detached(data_handles, nblocks, root, MPI_COMM_WORLD);
- /* Calculation */
- for(x = 0; x < nblocks ; x++) @{
- if (data_handles[x]) @{
- int owner = starpu_data_get_rank(data_handles[x]);
- if (owner == rank) @{
- starpu_insert_task(&cl, STARPU_RW, data_handles[x], 0);
- @}
- @}
- @}
- /* Gather the matrix on main node */
- starpu_mpi_gather_detached(data_handles, nblocks, 0, MPI_COMM_WORLD);
- @end smallexample
- @end cartouche
|