| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555 | /* * This file is part of the StarPU Handbook. * Copyright (C) 2009--2011  Universit@'e de Bordeaux * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015  CNRS * Copyright (C) 2011, 2012 INRIA * See the file version.doxy for copying conditions. *//*! \page MPISupport MPI SupportThe integration of MPI transfers within task parallelism is done in avery natural way by the means of asynchronous interactions between theapplication and StarPU.  This is implemented in a separate libstarpumpi librarywhich basically provides "StarPU" equivalents of <c>MPI_*</c> functions, where<c>void *</c> buffers are replaced with ::starpu_data_handle_t, and allGPU-RAM-NIC transfers are handled efficiently by StarPU-MPI.  The user has touse the usual <c>mpirun</c> command of the MPI implementation to start StarPU onthe different MPI nodes.An MPI Insert Task function provides an even more seamless transition to adistributed application, by automatically issuing all required data transfersaccording to the task graph and an application-provided distribution.\section SimpleExample Simple ExampleThe flags required to compile or link against the MPI layer areaccessible with the following commands:\verbatim$ pkg-config --cflags starpumpi-1.3  # options for the compiler$ pkg-config --libs starpumpi-1.3    # options for the linker\endverbatimYou also need pass the option <c>--static</c> if the application is tobe linked statically.\code{.c}void increment_token(void){    struct starpu_task *task = starpu_task_create();    task->cl = &increment_cl;    task->handles[0] = token_handle;    starpu_task_submit(task);}int main(int argc, char **argv){    int rank, size;    starpu_init(NULL);    starpu_mpi_initialize_extended(&rank, &size);    starpu_vector_data_register(&token_handle, STARPU_MAIN_RAM, (uintptr_t)&token, 1, sizeof(unsigned));    unsigned nloops = NITER;    unsigned loop;    unsigned last_loop = nloops - 1;    unsigned last_rank = size - 1;    for (loop = 0; loop < nloops; loop++) {        int tag = loop*size + rank;        if (loop == 0 && rank == 0)        {            token = 0;            fprintf(stdout, "Start with token value %d\n", token);        }        else        {            starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag,                    MPI_COMM_WORLD, NULL, NULL);        }        increment_token();        if (loop == last_loop && rank == last_rank)        {            starpu_data_acquire(token_handle, STARPU_R);            fprintf(stdout, "Finished: token value %d\n", token);            starpu_data_release(token_handle);        }        else        {            starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1,                    MPI_COMM_WORLD, NULL, NULL);        }    }    starpu_task_wait_for_all();    starpu_mpi_shutdown();    starpu_shutdown();    if (rank == last_rank)    {        fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size);        STARPU_ASSERT(token == nloops*size);    }\endcode\section PointToPointCommunication Point To Point CommunicationThe standard point to point communications of MPI have beenimplemented. The semantic is similar to the MPI one, but adapted tothe DSM provided by StarPU. A MPI request will only be submitted whenthe data is available in the main memory of the node submitting therequest.There is two types of asynchronous communications: the classicasynchronous communications and the detached communications. Theclassic asynchronous communications (starpu_mpi_isend() andstarpu_mpi_irecv()) need to be followed by a call tostarpu_mpi_wait() or to starpu_mpi_test() to wait for or totest the completion of the communication. Waiting for or testing thecompletion of detached communications is not possible, this is doneinternally by StarPU-MPI, on completion, the resources areautomatically released. This mechanism is similar to the pthreaddetach state attribute which determines whether a thread will becreated in a joinable or a detached state.Internally, all communication are divided in 2 communications, a firstmessage is used to exchange an envelope describing the data (i.e itstag and its size), the data itself is sent in a second message. AllMPI communications submitted by StarPU uses a unique tag which has adefault value, and can be accessed with the functionsstarpu_mpi_get_communication_tag() andstarpu_mpi_set_communication_tag(). The matching of tags withcorresponding requests is done within StarPU-MPI.For any userland communication, the call of the corresponding function(e.g starpu_mpi_isend()) will result in the creation of a StarPU-MPIrequest, the function starpu_data_acquire_cb() is then called toasynchronously request StarPU to fetch the data in main memory; whenthe data is ready and the corresponding buffer has already beenreceived by MPI, it will be copied in the memory of the data,otherwise the request is stored in the <em>early requests list</em>. Sendingrequests are stored in the <em>ready requests list</em>.While requests need to be processed, the StarPU-MPI progression threaddoes the following:<ol><li> it polls the <em>ready requests list</em>. For all the readyrequests, the appropriate function is called to post the correspondingMPI call. For example, an initial call to starpu_mpi_isend() willresult in a call to <c>MPI_Isend</c>. If the request is marked asdetached, the request will then be added in the <em>detached requestslist</em>.</li><li> it posts a <c>MPI_Irecv()</c> to retrieve a data envelope.</li><li> it polls the <em>detached requests list</em>. For all the detachedrequests, it tests its completion of the MPI request by calling<c>MPI_Test</c>. On completion, the data handle is released, and if acallback was defined, it is called.</li><li> finally, it checks if a data envelope has been received. If so,if the data envelope matches a request in the <em>early requests list</em> (i.ethe request has already been posted by the application), thecorresponding MPI call is posted (similarly to the first step above).If the data envelope does not match any application request, atemporary handle is created to receive the data, a StarPU-MPI requestis created and added into the <em>ready requests list</em>, and thus will beprocessed in the first step of the next loop.</li></ol>\ref MPIPtpCommunication "Communication" gives the list of all thepoint to point communications defined in StarPU-MPI.\section ExchangingUserDefinedDataInterface Exchanging User Defined Data InterfaceNew data interfaces defined as explained in \refDefiningANewDataInterface can also be used within StarPU-MPI andexchanged between nodes. Two functions needs to be defined through thetype starpu_data_interface_ops. The functionstarpu_data_interface_ops::pack_data takes a handle and returns acontiguous memory buffer allocated with starpu_malloc_flags(ptr, size, 0) along with its size where data to be conveyedto another node should be copied. The reversed operation isimplemented in the function starpu_data_interface_ops::unpack_data whichtakes a contiguous memory buffer and recreates the data handle.\code{.c}static int complex_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, ssize_t *count){  STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));  struct starpu_complex_interface *complex_interface =    (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);  *count = complex_get_size(handle);  starpu_malloc_flags(ptr, *count, 0);  memcpy(*ptr, complex_interface->real, complex_interface->nx*sizeof(double));  memcpy(*ptr+complex_interface->nx*sizeof(double), complex_interface->imaginary,         complex_interface->nx*sizeof(double));  return 0;}static int complex_unpack_data(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count){  STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));  struct starpu_complex_interface *complex_interface =    (struct starpu_complex_interface *)	starpu_data_get_interface_on_node(handle, node);  memcpy(complex_interface->real, ptr, complex_interface->nx*sizeof(double));  memcpy(complex_interface->imaginary, ptr+complex_interface->nx*sizeof(double),         complex_interface->nx*sizeof(double));  return 0;}static struct starpu_data_interface_ops interface_complex_ops ={  ...  .pack_data = complex_pack_data,  .unpack_data = complex_unpack_data};\endcode\section MPIInsertTaskUtility MPI Insert Task UtilityTo save the programmer from having to explicit all communications, StarPUprovides an "MPI Insert Task Utility". The principe is that the applicationdecides a distribution of the data over the MPI nodes by allocating it andnotifying StarPU of that decision, i.e. tell StarPU which MPI node "owns"which data. It also decides, for each handle, an MPI tag which will be used toexchange the content of the handle. All MPI nodes then process the whole taskgraph, and StarPU automatically determines which node actually execute whichtask, and trigger the required MPI transfers.The list of functions is described in \ref MPIInsertTask "MPI Insert Task".Here an stencil example showing how to use starpu_mpi_task_insert(). Onefirst needs to define a distribution function which specifies thelocality of the data. Note that the data needs to be registered to MPIby calling starpu_mpi_data_register(). This function allows to setthe distribution information and the MPI tag which should be used whencommunicating the data. It also allows to automatically clear the MPIcommunication cache when unregistering the data.\code{.c}/* Returns the MPI node number where data is */int my_distrib(int x, int y, int nb_nodes) {  /* Block distrib */  return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;  // /* Other examples useful for other kinds of computations */  // /* / distrib */  // return (x+y) % nb_nodes;  // /* Block cyclic distrib */  // unsigned side = sqrt(nb_nodes);  // return x % side + (y % side) * size;}\endcodeNow the data can be registered within StarPU. Data which are notowned but will be needed for computations can be registered throughthe lazy allocation mechanism, i.e. with a <c>home_node</c> set to <c>-1</c>.StarPU will automatically allocate the memory when it is used for thefirst time.One can note an optimization here (the <c>else if</c> test): we only registerdata which will be needed by the tasks that we will execute.\code{.c}    unsigned matrix[X][Y];    starpu_data_handle_t data_handles[X][Y];    for(x = 0; x < X; x++) {        for (y = 0; y < Y; y++) {            int mpi_rank = my_distrib(x, y, size);            if (mpi_rank == my_rank)                /* Owning data */                starpu_variable_data_register(&data_handles[x][y], STARPU_MAIN_RAM,                                              (uintptr_t)&(matrix[x][y]), sizeof(unsigned));            else if (my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)                  || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size))                /* I don't own that index, but will need it for my computations */                starpu_variable_data_register(&data_handles[x][y], -1,                                              (uintptr_t)NULL, sizeof(unsigned));            else                /* I know it's useless to allocate anything for this */                data_handles[x][y] = NULL;            if (data_handles[x][y]) {                starpu_mpi_data_register(data_handles[x][y], x*X+y, mpi_rank);            }        }    }\endcodeNow starpu_mpi_task_insert() can be called for the differentsteps of the application.\code{.c}    for(loop=0 ; loop<niter; loop++)        for (x = 1; x < X-1; x++)            for (y = 1; y < Y-1; y++)                starpu_mpi_task_insert(MPI_COMM_WORLD, &stencil5_cl,                                       STARPU_RW, data_handles[x][y],                                       STARPU_R, data_handles[x-1][y],                                       STARPU_R, data_handles[x+1][y],                                       STARPU_R, data_handles[x][y-1],                                       STARPU_R, data_handles[x][y+1],                                       0);    starpu_task_wait_for_all();\endcodeI.e. all MPI nodes process the whole task graph, but as mentioned above, foreach task, only the MPI node which owns the data being written to (here,<c>data_handles[x][y]</c>) will actually run the task. The other MPI nodes willautomatically send the required data.This can be a concern with a growing number of nodes. To avoid this, theapplication can prune the task for loops according to the data distribution,so as to only submit tasks on nodes which have to care about them (either toexecute them, or to send the required data).A way to do some of this quite easily can be to just add an <c>if</c> like this:\code{.c}    for(loop=0 ; loop<niter; loop++)        for (x = 1; x < X-1; x++)            for (y = 1; y < Y-1; y++)                if (my_distrib(x,y,size) == my_rank                 || my_distrib(x-1,y,size) == my_rank                 || my_distrib(x+1,y,size) == my_rank                 || my_distrib(x,y-1,size) == my_rank                 || my_distrib(x,y+1,size) == my_rank)                    starpu_mpi_insert_task(MPI_COMM_WORLD, &stencil5_cl,                                           STARPU_RW, data_handles[x][y],                                           STARPU_R, data_handles[x-1][y],                                           STARPU_R, data_handles[x+1][y],                                           STARPU_R, data_handles[x][y-1],                                           STARPU_R, data_handles[x][y+1],                                           0);    starpu_task_wait_for_all();\endcodeThis permits to drop the cost of function call argument passing and parsing.If the <c>my_distrib</c> function can be inlined by the compiler, the latter canimprove the test.If the <c>size</c> can be made a compile-time constant, the compiler canconsiderably improve the test further.If the distribution function is not too complex and the compiler is very good,the latter can even optimize the <c>for</c> loops, thus dramatically reducingthe cost of task submission.A function starpu_mpi_task_build() is also provided with the aim toonly construct the task structure. All MPI nodes need to call thefunction, only the node which is to execute the task will return avalid task structure. Following the execution of the task, all nodesneed to call the function starpu_mpi_task_post_build() -- with the samelist of arguments as starpu_mpi_task_build() -- to post all thenecessary data communications.\code{.c}struct starpu_task *task;task = starpu_mpi_task_build(MPI_COMM_WORLD, &cl,                             STARPU_RW, data_handles[0],                             STARPU_R, data_handles[1],                             0);if (task) starpu_task_submit(task);starpu_mpi_task_post_build(MPI_COMM_WORLD, &cl,                           STARPU_RW, data_handles[0],                           STARPU_R, data_handles[1],                           0);\endcode\section MPICache MPI cache supportStarPU-MPI automatically optimizes duplicate data transmissions: if an MPInode B needs a piece of data D from MPI node A for several tasks, only onetransmission of D will take place from A to B, and the value of D will be kepton B as long as no task modifies D.If a task modifies D, B will wait for all tasks which need the previous value ofD, before invalidating the value of D. As a consequence, it releases the memoryoccupied by D. Whenever a task running on B needs the new value of D, allocationwill take place again to receive it.Since tasks can be submitted dynamically, StarPU-MPI can not know whether thecurrent value of data D will again be used by a newly-submitted task beforebeing modified by another newly-submitted task, so until a task is submitted tomodify the current value, it can not decide by itself whether to flush the cacheor not.  The application can however explicitly tell StarPU-MPI to flush thecache by calling starpu_mpi_cache_flush() or starpu_mpi_cache_flush_all_data(),for instance in case the data will not be used at all any more (see for instancethe cholesky example in mpi/examples/matrix_decomposition), or at least not inthe close future. If a newly-submitted task actually needs the value again,another transmission of D will be initiated from A to B.  A merestarpu_mpi_cache_flush_all_data() can for instance be added at the end of the wholealgorithm, to express that no data will be reused after that (or at least thatit is not interesting to keep them in cache).  It may however be interesting toadd fine-graph starpu_mpi_cache_flush() calls during the algorithm; the effectfor the data deallocation will be the same, but it will additionally release somepressure from the StarPU-MPI cache hash table during task submission.The whole caching behavior can be disabled thanks to the \ref STARPU_MPI_CACHEenvironment variable. The variable \ref STARPU_MPI_CACHE_STATS can be set to 1to enable the runtime to display messages when data are added or removedfrom the cache holding the received data.\section MPIMigration MPI Data migrationThe application can dynamically change its mind about the data distribution, tobalance the load over MPI nodes for instance. This can be done very simply byrequesting an explicit move and then change the registered rank. For instance,we here switch to a new distribution function <c>my_distrib2</c>: we firstregister any data that wasn't registered already and will be needed, thenmigrate the data, and register the new location.\code{.c}    for(x = 0; x < X; x++) {        for (y = 0; y < Y; y++) {            int mpi_rank = my_distrib2(x, y, size);            if (!data_handles[x][y] && (mpi_rank == my_rank                  || my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)                  || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size)))                /* Register newly-needed data */                starpu_variable_data_register(&data_handles[x][y], -1,                                              (uintptr_t)NULL, sizeof(unsigned));            if (data_handles[x][y]) {                /* Migrate the data */                starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, data_handles[x][y], mpi_rank, NULL, NULL);                /* And register the new rank of the matrix */                starpu_mpi_data_set_rank(data_handles[x][y], mpi_rank);            }        }    }\endcodeFrom then on, further tasks submissions will use the new data distribution,which will thus change both MPI communications and task assignments.Very importantly, since all nodes have to agree on which node owns which dataso as to determine MPI communications and task assignments the same way, allnodes have to perform the same data migration, and at the same point among tasksubmissions. It thus does not require a strict synchronization, just a clearseparation of task submissions before and after the data redistribution.Before data unregistration, it has to be migrated back to its original homenode (the value, at least), since that is where the user-provided bufferresides. Otherwise the unregistration will complain that it does not have thelatest value on the original home node.\code{.c}    for(x = 0; x < X; x++) {        for (y = 0; y < Y; y++) {            if (data_handles[x][y]) {                int mpi_rank = my_distrib(x, y, size);                /* Get back data to original place where the user-provided buffer is.  */                starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, data_handles[x][y], mpi_rank, NULL, NULL);                /* And unregister it */                starpu_data_unregister(data_handles[x][y]);            }        }    }\endcode\section MPICollective MPI Collective OperationsThe functions are described in \ref MPICollectiveOperations "MPI Collective Operations".\code{.c}if (rank == root){    /* Allocate the vector */    vector = malloc(nblocks * sizeof(float *));    for(x=0 ; x<nblocks ; x++)    {        starpu_malloc((void **)&vector[x], block_size*sizeof(float));    }}/* Allocate data handles and register data to StarPU */data_handles = malloc(nblocks*sizeof(starpu_data_handle_t *));for(x = 0; x < nblocks ;  x++){    int mpi_rank = my_distrib(x, nodes);    if (rank == root) {        starpu_vector_data_register(&data_handles[x], STARPU_MAIN_RAM, (uintptr_t)vector[x],                                    blocks_size, sizeof(float));    }    else if ((mpi_rank == rank) || ((rank == mpi_rank+1 || rank == mpi_rank-1))) {        /* I own that index, or i will need it for my computations */        starpu_vector_data_register(&data_handles[x], -1, (uintptr_t)NULL,                                   block_size, sizeof(float));    }    else {        /* I know it's useless to allocate anything for this */        data_handles[x] = NULL;    }    if (data_handles[x]) {        starpu_mpi_data_register(data_handles[x], x*nblocks+y, mpi_rank);    }}/* Scatter the matrix among the nodes */starpu_mpi_scatter_detached(data_handles, nblocks, root, MPI_COMM_WORLD);/* Calculation */for(x = 0; x < nblocks ;  x++) {    if (data_handles[x]) {        int owner = starpu_data_get_rank(data_handles[x]);        if (owner == rank) {            starpu_task_insert(&cl, STARPU_RW, data_handles[x], 0);        }    }}/* Gather the matrix on main node */starpu_mpi_gather_detached(data_handles, nblocks, 0, MPI_COMM_WORLD);\endcode*/\section MPIExamples More MPI examplesMPI examples are available in the StarPU source code in mpi/examples:<ul><li><c>comm</c> shows how to use communicators with StarPU-MPI</li><li><c>complex</c> is a simple example using a user-define data interface overMPI (complex numbers),</li><li><c>stencil5</c> is a simple stencil example using starpu_mpi_task_insert(),</li><li><c>matrix_decomposition</c> is a cholesky decomposition example usingstarpu_mpi_task_insert(). The non-distributed version can check for<algorithm correctness in 1-node configuration, the distributed version usesexactly the same source code, to be used over MPI,</li><li><c>mpi_lu</c> is an LU decomposition example, provided in three versions:<c>plu_example</c> uses explicit MPI data transfers, <c>plu_implicit_example</c>uses implicit MPI data transfers, <c>plu_outofcore_example</c> uses implicit MPIdata transfers and supports data matrices which do not fit in memory (out-of-core).</li></ul>
 |