| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419 | @c -*-texinfo-*-@c This file is part of the StarPU Handbook.@c Copyright (C) 2009--2011  Universit@'e de Bordeaux 1@c Copyright (C) 2010, 2011, 2012, 2013  Centre National de la Recherche Scientifique@c Copyright (C) 2011 Institut National de Recherche en Informatique et Automatique@c See the file starpu.texi for copying conditions.The integration of MPI transfers within task parallelism is done in avery natural way by the means of asynchronous interactions between theapplication and StarPU.  This is implemented in a separate libstarpumpi librarywhich basically provides "StarPU" equivalents of @code{MPI_*} functions, where@code{void *} buffers are replaced with @code{starpu_data_handle_t}s, and allGPU-RAM-NIC transfers are handled efficiently by StarPU-MPI.  The user has touse the usual @code{mpirun} command of the MPI implementation to start StarPU onthe different MPI nodes.An MPI Insert Task function provides an even more seamless transition to adistributed application, by automatically issuing all required data transfersaccording to the task graph and an application-provided distribution.@menu* Simple Example::* Point to point communication::* Exchanging User Defined Data Interface::* MPI Insert Task Utility::* MPI Collective Operations::@end menu@node Simple Example@section Simple ExampleThe flags required to compile or link against the MPI layer areaccessible with the following commands:@example$ pkg-config --cflags starpumpi-1.0  # options for the compiler$ pkg-config --libs starpumpi-1.0    # options for the linker@end exampleYou also need pass the @code{--static} option if the application is tobe linked statically.@cartouche@smallexamplevoid increment_token(void)@{    struct starpu_task *task = starpu_task_create();    task->cl = &increment_cl;    task->handles[0] = token_handle;    starpu_task_submit(task);@}@end smallexample@end cartouche@cartouche@smallexampleint main(int argc, char **argv)@{    int rank, size;    starpu_init(NULL);    starpu_mpi_initialize_extended(&rank, &size);    starpu_vector_data_register(&token_handle, 0, (uintptr_t)&token, 1, sizeof(unsigned));    unsigned nloops = NITER;    unsigned loop;    unsigned last_loop = nloops - 1;    unsigned last_rank = size - 1;@end smallexample@end cartouche@cartouche@smallexample    for (loop = 0; loop < nloops; loop++) @{        int tag = loop*size + rank;        if (loop == 0 && rank == 0)        @{            token = 0;            fprintf(stdout, "Start with token value %d\n", token);        @}        else        @{            starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag,                    MPI_COMM_WORLD, NULL, NULL);        @}        increment_token();        if (loop == last_loop && rank == last_rank)        @{            starpu_data_acquire(token_handle, STARPU_R);            fprintf(stdout, "Finished: token value %d\n", token);            starpu_data_release(token_handle);        @}        else        @{            starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1,                    MPI_COMM_WORLD, NULL, NULL);        @}    @}    starpu_task_wait_for_all();@end smallexample@end cartouche@cartouche@smallexample    starpu_mpi_shutdown();    starpu_shutdown();    if (rank == last_rank)    @{        fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size);        STARPU_ASSERT(token == nloops*size);    @}@end smallexample@end cartouche@node Point to point communication@section Point to point communicationThe standard point to point communications of MPI have beenimplemented. The semantic is similar to the MPI one, but adapted tothe DSM provided by StarPU. A MPI request will only be submitted whenthe data is available in the main memory of the node submitting therequest.There is two types of asynchronous communications: the classicasynchronous communications and the detached communications. Theclassic asynchronous communications (@code{starpu_mpi_isend} and@code{starpu_mpi_irecv}) need to be followed by a call to@code{starpu_mpi_wait} or to @code{starpu_mpi_test} to wait for or totest the completion of the communication. Waiting for or testing thecompletion of detached communications is not possible, this is doneinternally by StarPU-MPI, on completion, the resources areautomatically released. This mechanism is similar to the pthreaddetach state attribute which determines whether a thread will becreated in a joinable or a detached state.For any communication, the call of the function will result in thecreation of a StarPU-MPI request, the function@code{starpu_data_acquire_cb} is then called to asynchronously requestStarPU to fetch the data in main memory; when the data is available inmain memory, a StarPU-MPI function is called to put the new request inthe list of the ready requests if it is a send request, or in anhashmap if it is a receive request.Internally, all MPI communications submitted by StarPU uses a uniquetag which has a default value, and can be accessed with the functions@ref{starpu_mpi_get_communication_tag} and@ref{starpu_mpi_set_communication_tag}.The matching of tags with corresponding requests is done into StarPU-MPI. To handle this, any communication is a double-communication based on a envelope + data system. Every data which will be sent needs to send an envelope which describes the data (particularly its tag) before sending the data, so the receiver can get the matching pending receive request from the hashmap, and submit it to recieve the data correctly.To this aim, the StarPU-MPI progression thread has a permanent-submitted request destined to receive incoming envelopes from all sources.The StarPU-MPI progression thread regularly polls this list of readyrequests. For each new ready request, the appropriate function iscalled to post the corresponding MPI call. For example, calling@code{starpu_mpi_isend} will result in posting @code{MPI_Isend}. Ifthe request is marked as detached, the request will be put in the listof detached requests.The StarPU-MPI progression thread also polls the list of detachedrequests. For each detached request, it regularly tests the completionof the MPI request by calling @code{MPI_Test}. On completion, the datahandle is released, and if a callback was defined, it is called.Finally, the StarPU-MPI progression thread checks if an envelope has arrived. If it is, it'll check if the corresponding receive has alreadybeen submitted by the application. If it is, it'll submit the requestjust as like as it does with those on the list of ready requests.If it is not, it'll allocate a temporary handle to store the data thatwill arrive just after, so as when the corresponding receive requestwill be submitted by the application, it'll copy this temporary handleinto its one instead of submitting a new StarPU-MPI request.@ref{Communication} gives the list of all the point to pointcommunications defined in StarPU-MPI.@node Exchanging User Defined Data Interface@section Exchanging User Defined Data InterfaceNew data interfaces defined as explained in @ref{Defining a New DataInterface} can also be used within StarPU-MPI and exchanged betweennodes. Two functions needs to be defined throughthe type @code{struct starpu_data_interface_ops} (@pxref{DefiningInterface}). The pack function takes a handle and returns acontiguous memory buffer along with its size where data to be conveyed to another nodeshould be copied. The reversed operation is implemented in the unpackfunction which takes a contiguous memory buffer and recreates the datahandle.@cartouche@smallexamplestatic int complex_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, ssize_t *count)@{  STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));  struct starpu_complex_interface *complex_interface =    (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);  *count = complex_get_size(handle);  *ptr = malloc(*count);  memcpy(*ptr, complex_interface->real, complex_interface->nx*sizeof(double));  memcpy(*ptr+complex_interface->nx*sizeof(double), complex_interface->imaginary,         complex_interface->nx*sizeof(double));  return 0;@}@end smallexample@end cartouche@cartouche@smallexamplestatic int complex_unpack_data(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count)@{  STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));  struct starpu_complex_interface *complex_interface =    (struct starpu_complex_interface *)	starpu_data_get_interface_on_node(handle, node);  memcpy(complex_interface->real, ptr, complex_interface->nx*sizeof(double));  memcpy(complex_interface->imaginary, ptr+complex_interface->nx*sizeof(double),         complex_interface->nx*sizeof(double));  return 0;@}@end smallexample@end cartouche@cartouche@smallexamplestatic struct starpu_data_interface_ops interface_complex_ops =@{  ...  .pack_data = complex_pack_data,  .unpack_data = complex_unpack_data@};@end smallexample@end cartouche@node MPI Insert Task Utility@section MPI Insert Task UtilityTo save the programmer from having to explicit all communications, StarPUprovides an "MPI Insert Task Utility". The principe is that the applicationdecides a distribution of the data over the MPI nodes by allocating it andnotifying StarPU of that decision, i.e. tell StarPU which MPI node "owns"which data. It also decides, for each handle, an MPI tag which will be used toexchange the content of the handle. All MPI nodes then process the whole taskgraph, and StarPU automatically determines which node actually execute whichtask, and trigger the required MPI transfers.The list of functions are described in @ref{MPI Insert Task}.Here an stencil example showing how to use @code{starpu_mpi_insert_task}. Onefirst needs to define a distribution function which specifies thelocality of the data. Note that that distribution information needs tobe given to StarPU by calling @code{starpu_data_set_rank}. A MPI tagshould also be defined for each data handle by calling@code{starpu_data_set_tag}.@cartouche@smallexample/* Returns the MPI node number where data is */int my_distrib(int x, int y, int nb_nodes) @{  /* Block distrib */  return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;  // /* Other examples useful for other kinds of computations */  // /* / distrib */  // return (x+y) % nb_nodes;  // /* Block cyclic distrib */  // unsigned side = sqrt(nb_nodes);  // return x % side + (y % side) * size;@}@end smallexample@end cartoucheNow the data can be registered within StarPU. Data which are notowned but will be needed for computations can be registered throughthe lazy allocation mechanism, i.e. with a @code{home_node} set to -1.StarPU will automatically allocate the memory when it is used for thefirst time.One can note an optimization here (the @code{else if} test): we only registerdata which will be needed by the tasks that we will execute.@cartouche@smallexample    unsigned matrix[X][Y];    starpu_data_handle_t data_handles[X][Y];    for(x = 0; x < X; x++) @{        for (y = 0; y < Y; y++) @{            int mpi_rank = my_distrib(x, y, size);             if (mpi_rank == my_rank)                /* Owning data */                starpu_variable_data_register(&data_handles[x][y], 0,                                              (uintptr_t)&(matrix[x][y]), sizeof(unsigned));            else if (my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)                  || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size))                /* I don't own that index, but will need it for my computations */                starpu_variable_data_register(&data_handles[x][y], -1,                                              (uintptr_t)NULL, sizeof(unsigned));            else                /* I know it's useless to allocate anything for this */                data_handles[x][y] = NULL;            if (data_handles[x][y]) @{                starpu_data_set_rank(data_handles[x][y], mpi_rank);                starpu_data_set_tag(data_handles[x][y], x*X+y);            @}        @}    @}@end smallexample@end cartoucheNow @code{starpu_mpi_insert_task()} can be called for the differentsteps of the application.@cartouche@smallexample    for(loop=0 ; loop<niter; loop++)        for (x = 1; x < X-1; x++)            for (y = 1; y < Y-1; y++)                starpu_mpi_insert_task(MPI_COMM_WORLD, &stencil5_cl,                                       STARPU_RW, data_handles[x][y],                                       STARPU_R, data_handles[x-1][y],                                       STARPU_R, data_handles[x+1][y],                                       STARPU_R, data_handles[x][y-1],                                       STARPU_R, data_handles[x][y+1],                                       0);    starpu_task_wait_for_all();@end smallexample@end cartoucheI.e. all MPI nodes process the whole task graph, but as mentioned above, foreach task, only the MPI node which owns the data being written to (here,@code{data_handles[x][y]}) will actually run the task. The other MPI nodes willautomatically send the required data.This can be a concern with a growing number of nodes. To avoid this, theapplication can prune the task for loops according to the data distribution,so as to only submit tasks on nodes which have to care about them (either toexecute them, or to send the required data).@node MPI Collective Operations@section MPI Collective OperationsThe functions are described in @ref{Collective Operations}.@cartouche@smallexampleif (rank == root)@{    /* Allocate the vector */    vector = malloc(nblocks * sizeof(float *));    for(x=0 ; x<nblocks ; x++)    @{        starpu_malloc((void **)&vector[x], block_size*sizeof(float));    @}@}/* Allocate data handles and register data to StarPU */data_handles = malloc(nblocks*sizeof(starpu_data_handle_t *));for(x = 0; x < nblocks ;  x++)@{    int mpi_rank = my_distrib(x, nodes);    if (rank == root) @{        starpu_vector_data_register(&data_handles[x], 0, (uintptr_t)vector[x],                                    blocks_size, sizeof(float));    @}    else if ((mpi_rank == rank) || ((rank == mpi_rank+1 || rank == mpi_rank-1))) @{        /* I own that index, or i will need it for my computations */        starpu_vector_data_register(&data_handles[x], -1, (uintptr_t)NULL,                                   block_size, sizeof(float));    @}    else @{        /* I know it's useless to allocate anything for this */        data_handles[x] = NULL;    @}    if (data_handles[x]) @{        starpu_data_set_rank(data_handles[x], mpi_rank);        starpu_data_set_tag(data_handles[x], x*nblocks+y);    @}@}/* Scatter the matrix among the nodes */starpu_mpi_scatter_detached(data_handles, nblocks, root, MPI_COMM_WORLD);/* Calculation */for(x = 0; x < nblocks ;  x++) @{    if (data_handles[x]) @{        int owner = starpu_data_get_rank(data_handles[x]);        if (owner == rank) @{            starpu_insert_task(&cl, STARPU_RW, data_handles[x], 0);        @}    @}@}/* Gather the matrix on main node */starpu_mpi_gather_detached(data_handles, nblocks, 0, MPI_COMM_WORLD);@end smallexample@end cartouche
 |