| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477 | @c -*-texinfo-*-@c This file is part of the StarPU Handbook.@c Copyright (C) 2009--2011  Universit@'e de Bordeaux 1@c Copyright (C) 2010, 2011, 2012  Centre National de la Recherche Scientifique@c Copyright (C) 2011 Institut National de Recherche en Informatique et Automatique@c See the file starpu.texi for copying conditions.The integration of MPI transfers within task parallelism is done in avery natural way by the means of asynchronous interactions between theapplication and StarPU.  This is implemented in a separate libstarpumpi librarywhich basically provides "StarPU" equivalents of @code{MPI_*} functions, where@code{void *} buffers are replaced with @code{starpu_data_handle_t}s, and allGPU-RAM-NIC transfers are handled efficiently by StarPU-MPI.  The user has touse the usual @code{mpirun} command of the MPI implementation to start StarPU onthe different MPI nodes.An MPI Insert Task function provides an even more seamless transition to adistributed application, by automatically issuing all required data transfersaccording to the task graph and an application-provided distribution.@menu* The API::* Simple Example::* MPI Insert Task Utility::* MPI Collective Operations::@end menu@node The API@section The API@subsection CompilationThe flags required to compile or link against the MPI layer are thenaccessible with the following commands:@example% pkg-config --cflags starpumpi-1.0  # options for the compiler% pkg-config --libs starpumpi-1.0    # options for the linker@end exampleAlso pass the @code{--static} option if the application is to be linked statically.@subsection Initialisation@deftypefun int starpu_mpi_initialize (void)Initializes the starpumpi library. This must be called between calling@code{starpu_init} and other @code{starpu_mpi} functions. Thisfunction does not call @code{MPI_Init}, it should be called beforehand.@end deftypefun@deftypefun int starpu_mpi_initialize_extended (int *@var{rank}, int *@var{world_size})Initializes the starpumpi library. This must be called between calling@code{starpu_init} and other @code{starpu_mpi} functions.This function calls @code{MPI_Init}, and therefore should be preferedto the previous one for MPI implementations which are not thread-safe.Returns the current MPI node rank and world size.@end deftypefun@deftypefun int starpu_mpi_shutdown (void)Cleans the starpumpi library. This must be called between calling@code{starpu_mpi} functions and @code{starpu_shutdown}.@code{MPI_Finalize} will be called if StarPU-MPI has been initializedby calling @code{starpu_mpi_initialize_extended}.@end deftypefun@subsection CommunicationThe standard point to point communications of MPI have beenimplemented. The semantic is similar to the MPI one, but adapted tothe DSM provided by StarPU. A MPI request will only be submitted whenthe data is available in the main memory of the node submitting therequest.@deftypefun int starpu_mpi_send (starpu_data_handle_t @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm})Performs a standard-mode, blocking send of @var{data_handle} to thenode @var{dest} using the message tag @code{mpi_tag} within thecommunicator @var{comm}.@end deftypefun@deftypefun int starpu_mpi_recv (starpu_data_handle_t @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, MPI_Status *@var{status})Performs a standard-mode, blocking receive in @var{data_handle} from thenode @var{source} using the message tag @code{mpi_tag} within thecommunicator @var{comm}.@end deftypefun@deftypefun int starpu_mpi_isend (starpu_data_handle_t @var{data_handle}, starpu_mpi_req *@var{req}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm})Posts a standard-mode, non blocking send of @var{data_handle} to thenode @var{dest} using the message tag @code{mpi_tag} within thecommunicator @var{comm}. After the call, the pointer to the request@var{req} can be used to test the completion of the communication.@end deftypefun@deftypefun int starpu_mpi_irecv (starpu_data_handle_t @var{data_handle}, starpu_mpi_req *@var{req}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm})Posts a nonblocking receive in @var{data_handle} from thenode @var{source} using the message tag @code{mpi_tag} within thecommunicator @var{comm}. After the call, the pointer to the request@var{req} can be used to test the completion of the communication.@end deftypefun@deftypefun int starpu_mpi_isend_detached (starpu_data_handle_t @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm}, void (*@var{callback})(void *), void *@var{arg})Posts a standard-mode, non blocking send of @var{data_handle} to thenode @var{dest} using the message tag @code{mpi_tag} within thecommunicator @var{comm}. On completion, the @var{callback} function iscalled with the argument @var{arg}.@end deftypefun@deftypefun int starpu_mpi_irecv_detached (starpu_data_handle_t @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, void (*@var{callback})(void *), void *@var{arg})Posts a nonblocking receive in @var{data_handle} from thenode @var{source} using the message tag @code{mpi_tag} within thecommunicator @var{comm}. On completion, the @var{callback} function iscalled with the argument @var{arg}.@end deftypefun@deftypefun int starpu_mpi_wait (starpu_mpi_req *@var{req}, MPI_Status *@var{status})Returns when the operation identified by request @var{req} is complete.@end deftypefun@deftypefun int starpu_mpi_test (starpu_mpi_req *@var{req}, int *@var{flag}, MPI_Status *@var{status})If the operation identified by @var{req} is complete, set @var{flag}to 1. The @var{status} object is set to contain information on thecompleted operation.@end deftypefun@deftypefun int starpu_mpi_barrier (MPI_Comm @var{comm})Blocks the caller until all group members of the communicator@var{comm} have called it.@end deftypefun@deftypefun int starpu_mpi_isend_detached_unlock_tag (starpu_data_handle_t @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm}, starpu_tag_t @var{tag})Posts a standard-mode, non blocking send of @var{data_handle} to thenode @var{dest} using the message tag @code{mpi_tag} within thecommunicator @var{comm}. On completion, @var{tag} is unlocked.@end deftypefun@deftypefun int starpu_mpi_irecv_detached_unlock_tag (starpu_data_handle_t @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, starpu_tag_t @var{tag})Posts a nonblocking receive in @var{data_handle} from thenode @var{source} using the message tag @code{mpi_tag} within thecommunicator @var{comm}. On completion, @var{tag} is unlocked.@end deftypefun@deftypefun int starpu_mpi_isend_array_detached_unlock_tag (unsigned @var{array_size}, starpu_data_handle_t *@var{data_handle}, int *@var{dest}, int *@var{mpi_tag}, MPI_Comm *@var{comm}, starpu_tag_t @var{tag})Posts @var{array_size} standard-mode, non blocking send of the data ofdata @var{data_handle[x]} to the node @var{dest[x]} using the messagetag @code{mpi_tag[x]} within the communicator @var{comm[x]}. Oncompletion of the all the requests, @var{tag} is unlocked.@end deftypefun@deftypefun int starpu_mpi_irecv_array_detached_unlock_tag (unsigned @var{array_size}, starpu_data_handle_t *@var{data_handle}, int *@var{source}, int *@var{mpi_tag}, MPI_Comm *@var{comm}, starpu_tag_t @var{tag})Posts @var{array_size} nonblocking receive in @var{data_handle[x]} from thenode @var{source[x]} using the message tag @code{mpi_tag[x]} within thecommunicator @var{comm[x]}. On completion of the all the requests,@var{tag} is unlocked.@end deftypefun@page@node Simple Example@section Simple Example@cartouche@smallexamplevoid increment_token(void)@{    struct starpu_task *task = starpu_task_create();    task->cl = &increment_cl;    task->handles[0] = token_handle;    starpu_task_submit(task);@}@end smallexample@end cartouche@cartouche@smallexampleint main(int argc, char **argv)@{    int rank, size;    starpu_init(NULL);    starpu_mpi_initialize_extended(&rank, &size);    starpu_vector_data_register(&token_handle, 0, (uintptr_t)&token, 1, sizeof(unsigned));    unsigned nloops = NITER;    unsigned loop;    unsigned last_loop = nloops - 1;    unsigned last_rank = size - 1;@end smallexample@end cartouche@cartouche@smallexample    for (loop = 0; loop < nloops; loop++) @{        int tag = loop*size + rank;        if (loop == 0 && rank == 0)        @{            token = 0;            fprintf(stdout, "Start with token value %d\n", token);        @}        else        @{            starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag,                    MPI_COMM_WORLD, NULL, NULL);        @}        increment_token();        if (loop == last_loop && rank == last_rank)        @{            starpu_data_acquire(token_handle, STARPU_R);            fprintf(stdout, "Finished: token value %d\n", token);            starpu_data_release(token_handle);        @}        else        @{            starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1,                    MPI_COMM_WORLD, NULL, NULL);        @}    @}    starpu_task_wait_for_all();@end smallexample@end cartouche@cartouche@smallexample    starpu_mpi_shutdown();    starpu_shutdown();    if (rank == last_rank)    @{        fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size);        STARPU_ASSERT(token == nloops*size);    @}@end smallexample@end cartouche@page@node MPI Insert Task Utility@section MPI Insert Task UtilityTo save the programmer from having to explicit all communications, StarPUprovides an "MPI Insert Task Utility". The principe is that the applicationdecides a distribution of the data over the MPI nodes by allocating it andnotifying StarPU of that decision, i.e. tell StarPU which MPI node "owns" whichdata. All MPI nodes then process the whole task graph, and StarPU automaticallydetermines which node actually execute which task, as well as the required MPItransfers.@deftypefun int starpu_data_set_tag (starpu_data_handle_t @var{handle}, int @var{tag})Tell StarPU-MPI which MPI tag to use when exchanging the data.@end deftypefun@deftypefun int starpu_data_get_tag (starpu_data_handle_t @var{handle})Returns the MPI tag to be used when exchanging the data.@end deftypefun@deftypefun int starpu_data_set_rank (starpu_data_handle_t @var{handle}, int @var{rank})Tell StarPU-MPI which MPI node "owns" a given data, that is, the node which willalways keep an up-to-date value, and will by default execute tasks which writeto it.@end deftypefun@deftypefun int starpu_data_get_rank (starpu_data_handle_t @var{handle})Returns the last value set by @code{starpu_data_set_rank}.@end deftypefun@defmac STARPU_EXECUTE_ON_NODEthis macro is used when calling @code{starpu_mpi_insert_task}, andmust be followed by a integer value which specified the node on whichto execute the codelet.@end defmac@defmac STARPU_EXECUTE_ON_DATAthis macro is used when calling @code{starpu_mpi_insert_task}, andmust be followed by a data handle to specify that the node owning thegiven data will execute the codelet.@end defmac@deftypefun int starpu_mpi_insert_task (MPI_Comm @var{comm}, struct starpu_codelet *@var{codelet}, ...)Create and submit a task corresponding to @var{codelet} with the followingarguments.  The argument list must be zero-terminated.The arguments following the codelets are the same types as for thefunction @code{starpu_insert_task} defined in @ref{Insert TaskUtility}. The extra argument @code{STARPU_EXECUTE_ON_NODE} followed by aninteger allows to specify the MPI node to execute the codelet. It is alsopossible to specify that the node owning a specific data will executethe codelet, by using @code{STARPU_EXECUTE_ON_DATA} followed by a datahandle.The internal algorithm is as follows:@enumerate@item Find out whether we (as an MPI node) are to execute the codeletbecause we own the data to be written to. If different nodes own datato be written to, the argument @code{STARPU_EXECUTE_ON_NODE} or@code{STARPU_EXECUTE_ON_DATA} has to be used to specify which MPI node willexecute the task.@item Send and receive data as requested. Nodes owning data which need to beread by the task are sending them to the MPI node which will execute it. Thelatter receives them.@item Execute the codelet. This is done by the MPI node selected in the1st step of the algorithm.@item In the case when different MPI nodes own data to be written to, sendwritten data back to their owners.@end enumerateThe algorithm also includes a cache mechanism that allows not to senddata twice to the same MPI node, unless the data has been modified.@end deftypefun@deftypefun void starpu_mpi_get_data_on_node (MPI_Comm @var{comm}, starpu_data_handle_t @var{data_handle}, int @var{node})Transfer data @var{data_handle} to MPI node @var{node}, sending it from itsowner if needed. At least the target node and the owner have to call thefunction.@end deftypefunHere an stencil example showing how to use @code{starpu_mpi_insert_task}. Onefirst needs to define a distribution function which specifies thelocality of the data. Note that that distribution information needs tobe given to StarPU by calling @code{starpu_data_set_rank}.@cartouche@smallexample/* Returns the MPI node number where data is */int my_distrib(int x, int y, int nb_nodes) @{  /* Block distrib */  return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;  // /* Other examples useful for other kinds of computations */  // /* / distrib */  // return (x+y) % nb_nodes;  // /* Block cyclic distrib */  // unsigned side = sqrt(nb_nodes);  // return x % side + (y % side) * size;@}@end smallexample@end cartoucheNow the data can be registered within StarPU. Data which are notowned but will be needed for computations can be registered throughthe lazy allocation mechanism, i.e. with a @code{home_node} set to -1.StarPU will automatically allocate the memory when it is used for thefirst time.One can note an optimization here (the @code{else if} test): we only registerdata which will be needed by the tasks that we will execute.@cartouche@smallexample    unsigned matrix[X][Y];    starpu_data_handle_t data_handles[X][Y];    for(x = 0; x < X; x++) @{        for (y = 0; y < Y; y++) @{            int mpi_rank = my_distrib(x, y, size);             if (mpi_rank == my_rank)                /* Owning data */                starpu_variable_data_register(&data_handles[x][y], 0,                                              (uintptr_t)&(matrix[x][y]), sizeof(unsigned));            else if (my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)                  || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size))                /* I don't own that index, but will need it for my computations */                starpu_variable_data_register(&data_handles[x][y], -1,                                              (uintptr_t)NULL, sizeof(unsigned));            else                /* I know it's useless to allocate anything for this */                data_handles[x][y] = NULL;            if (data_handles[x][y])                starpu_data_set_rank(data_handles[x][y], mpi_rank);        @}    @}@end smallexample@end cartoucheNow @code{starpu_mpi_insert_task()} can be called for the differentsteps of the application.@cartouche@smallexample    for(loop=0 ; loop<niter; loop++)        for (x = 1; x < X-1; x++)            for (y = 1; y < Y-1; y++)                starpu_mpi_insert_task(MPI_COMM_WORLD, &stencil5_cl,                                       STARPU_RW, data_handles[x][y],                                       STARPU_R, data_handles[x-1][y],                                       STARPU_R, data_handles[x+1][y],                                       STARPU_R, data_handles[x][y-1],                                       STARPU_R, data_handles[x][y+1],                                       0);    starpu_task_wait_for_all();@end smallexample@end cartoucheI.e. all MPI nodes process the whole task graph, but as mentioned above, foreach task, only the MPI node which owns the data being written to (here,@code{data_handles[x][y]}) will actually run the task. The other MPI nodes willautomatically send the required data.@node MPI Collective Operations@section MPI Collective Operations@deftypefun int starpu_mpi_scatter_detached (starpu_data_handle_t *@var{data_handles}, int @var{count}, int @var{root}, MPI_Comm @var{comm})Scatter data among processes of the communicator based on the ownership ofthe data. For each data of the array @var{data_handles}, theprocess @var{root} sends the data to the process owning this data.Processes receiving data must have valid data handles to receive them.@end deftypefun@deftypefun int starpu_mpi_gather_detached (starpu_data_handle_t *@var{data_handles}, int @var{count}, int @var{root}, MPI_Comm @var{comm})Gather data from the different processes of the communicator onto theprocess @var{root}. Each process owning data handle in the array@var{data_handles} will send them to the process @var{root}. Theprocess @var{root} must have valid data handles to receive the data.@end deftypefun@page@cartouche@smallexampleif (rank == root)@{    /* Allocate the vector */    vector = malloc(nblocks * sizeof(float *));    for(x=0 ; x<nblocks ; x++)    @{        starpu_malloc((void **)&vector[x], block_size*sizeof(float));    @}@}/* Allocate data handles and register data to StarPU */data_handles = malloc(nblocks*sizeof(starpu_data_handle_t *));for(x = 0; x < nblocks ;  x++)@{    int mpi_rank = my_distrib(x, nodes);    if (rank == root) @{        starpu_vector_data_register(&data_handles[x], 0, (uintptr_t)vector[x],                                    blocks_size, sizeof(float));    @}    else if ((mpi_rank == rank) || ((rank == mpi_rank+1 || rank == mpi_rank-1))) @{        /* I own that index, or i will need it for my computations */        starpu_vector_data_register(&data_handles[x], -1, (uintptr_t)NULL,                                   block_size, sizeof(float));    @}    else @{        /* I know it's useless to allocate anything for this */        data_handles[x] = NULL;    @}    if (data_handles[x]) @{        starpu_data_set_rank(data_handles[x], mpi_rank);    @}@}/* Scatter the matrix among the nodes */starpu_mpi_scatter_detached(data_handles, nblocks, root, MPI_COMM_WORLD);/* Calculation */for(x = 0; x < nblocks ;  x++) @{    if (data_handles[x]) @{        int owner = starpu_data_get_rank(data_handles[x]);        if (owner == rank) @{            starpu_insert_task(&cl, STARPU_RW, data_handles[x], 0);        @}    @}@}/* Gather the matrix on main node */starpu_mpi_gather_detached(data_handles, nblocks, 0, MPI_COMM_WORLD);@end smallexample@end cartouche
 |