123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555 |
- /*
- * This file is part of the StarPU Handbook.
- * Copyright (C) 2009--2011 Universit@'e de Bordeaux
- * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015 CNRS
- * Copyright (C) 2011, 2012 INRIA
- * See the file version.doxy for copying conditions.
- */
- /*! \page MPISupport MPI Support
- The integration of MPI transfers within task parallelism is done in a
- very natural way by the means of asynchronous interactions between the
- application and StarPU. This is implemented in a separate libstarpumpi library
- which basically provides "StarPU" equivalents of <c>MPI_*</c> functions, where
- <c>void *</c> buffers are replaced with ::starpu_data_handle_t, and all
- GPU-RAM-NIC transfers are handled efficiently by StarPU-MPI. The user has to
- use the usual <c>mpirun</c> command of the MPI implementation to start StarPU on
- the different MPI nodes.
- An MPI Insert Task function provides an even more seamless transition to a
- distributed application, by automatically issuing all required data transfers
- according to the task graph and an application-provided distribution.
- \section SimpleExample Simple Example
- The flags required to compile or link against the MPI layer are
- accessible with the following commands:
- \verbatim
- $ pkg-config --cflags starpumpi-1.3 # options for the compiler
- $ pkg-config --libs starpumpi-1.3 # options for the linker
- \endverbatim
- You also need pass the option <c>--static</c> if the application is to
- be linked statically.
- \code{.c}
- void increment_token(void)
- {
- struct starpu_task *task = starpu_task_create();
- task->cl = &increment_cl;
- task->handles[0] = token_handle;
- starpu_task_submit(task);
- }
- int main(int argc, char **argv)
- {
- int rank, size;
- starpu_init(NULL);
- starpu_mpi_initialize_extended(&rank, &size);
- starpu_vector_data_register(&token_handle, STARPU_MAIN_RAM, (uintptr_t)&token, 1, sizeof(unsigned));
- unsigned nloops = NITER;
- unsigned loop;
- unsigned last_loop = nloops - 1;
- unsigned last_rank = size - 1;
- for (loop = 0; loop < nloops; loop++) {
- int tag = loop*size + rank;
- if (loop == 0 && rank == 0)
- {
- token = 0;
- fprintf(stdout, "Start with token value %d\n", token);
- }
- else
- {
- starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag,
- MPI_COMM_WORLD, NULL, NULL);
- }
- increment_token();
- if (loop == last_loop && rank == last_rank)
- {
- starpu_data_acquire(token_handle, STARPU_R);
- fprintf(stdout, "Finished: token value %d\n", token);
- starpu_data_release(token_handle);
- }
- else
- {
- starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1,
- MPI_COMM_WORLD, NULL, NULL);
- }
- }
- starpu_task_wait_for_all();
- starpu_mpi_shutdown();
- starpu_shutdown();
- if (rank == last_rank)
- {
- fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size);
- STARPU_ASSERT(token == nloops*size);
- }
- \endcode
- \section PointToPointCommunication Point To Point Communication
- The standard point to point communications of MPI have been
- implemented. The semantic is similar to the MPI one, but adapted to
- the DSM provided by StarPU. A MPI request will only be submitted when
- the data is available in the main memory of the node submitting the
- request.
- There is two types of asynchronous communications: the classic
- asynchronous communications and the detached communications. The
- classic asynchronous communications (starpu_mpi_isend() and
- starpu_mpi_irecv()) need to be followed by a call to
- starpu_mpi_wait() or to starpu_mpi_test() to wait for or to
- test the completion of the communication. Waiting for or testing the
- completion of detached communications is not possible, this is done
- internally by StarPU-MPI, on completion, the resources are
- automatically released. This mechanism is similar to the pthread
- detach state attribute which determines whether a thread will be
- created in a joinable or a detached state.
- Internally, all communication are divided in 2 communications, a first
- message is used to exchange an envelope describing the data (i.e its
- tag and its size), the data itself is sent in a second message. All
- MPI communications submitted by StarPU uses a unique tag which has a
- default value, and can be accessed with the functions
- starpu_mpi_get_communication_tag() and
- starpu_mpi_set_communication_tag(). The matching of tags with
- corresponding requests is done within StarPU-MPI.
- For any userland communication, the call of the corresponding function
- (e.g starpu_mpi_isend()) will result in the creation of a StarPU-MPI
- request, the function starpu_data_acquire_cb() is then called to
- asynchronously request StarPU to fetch the data in main memory; when
- the data is ready and the corresponding buffer has already been
- received by MPI, it will be copied in the memory of the data,
- otherwise the request is stored in the <em>early requests list</em>. Sending
- requests are stored in the <em>ready requests list</em>.
- While requests need to be processed, the StarPU-MPI progression thread
- does the following:
- <ol>
- <li> it polls the <em>ready requests list</em>. For all the ready
- requests, the appropriate function is called to post the corresponding
- MPI call. For example, an initial call to starpu_mpi_isend() will
- result in a call to <c>MPI_Isend</c>. If the request is marked as
- detached, the request will then be added in the <em>detached requests
- list</em>.
- </li>
- <li> it posts a <c>MPI_Irecv()</c> to retrieve a data envelope.
- </li>
- <li> it polls the <em>detached requests list</em>. For all the detached
- requests, it tests its completion of the MPI request by calling
- <c>MPI_Test</c>. On completion, the data handle is released, and if a
- callback was defined, it is called.
- </li>
- <li> finally, it checks if a data envelope has been received. If so,
- if the data envelope matches a request in the <em>early requests list</em> (i.e
- the request has already been posted by the application), the
- corresponding MPI call is posted (similarly to the first step above).
- If the data envelope does not match any application request, a
- temporary handle is created to receive the data, a StarPU-MPI request
- is created and added into the <em>ready requests list</em>, and thus will be
- processed in the first step of the next loop.
- </li>
- </ol>
- \ref MPIPtpCommunication "Communication" gives the list of all the
- point to point communications defined in StarPU-MPI.
- \section ExchangingUserDefinedDataInterface Exchanging User Defined Data Interface
- New data interfaces defined as explained in \ref
- DefiningANewDataInterface can also be used within StarPU-MPI and
- exchanged between nodes. Two functions needs to be defined through the
- type starpu_data_interface_ops. The function
- starpu_data_interface_ops::pack_data takes a handle and returns a
- contiguous memory buffer allocated with starpu_malloc_flags(ptr, size, 0) along with its size where data to be conveyed
- to another node should be copied. The reversed operation is
- implemented in the function starpu_data_interface_ops::unpack_data which
- takes a contiguous memory buffer and recreates the data handle.
- \code{.c}
- static int complex_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, ssize_t *count)
- {
- STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
- struct starpu_complex_interface *complex_interface =
- (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
- *count = complex_get_size(handle);
- starpu_malloc_flags(ptr, *count, 0);
- memcpy(*ptr, complex_interface->real, complex_interface->nx*sizeof(double));
- memcpy(*ptr+complex_interface->nx*sizeof(double), complex_interface->imaginary,
- complex_interface->nx*sizeof(double));
- return 0;
- }
- static int complex_unpack_data(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count)
- {
- STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
- struct starpu_complex_interface *complex_interface =
- (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
- memcpy(complex_interface->real, ptr, complex_interface->nx*sizeof(double));
- memcpy(complex_interface->imaginary, ptr+complex_interface->nx*sizeof(double),
- complex_interface->nx*sizeof(double));
- return 0;
- }
- static struct starpu_data_interface_ops interface_complex_ops =
- {
- ...
- .pack_data = complex_pack_data,
- .unpack_data = complex_unpack_data
- };
- \endcode
- \section MPIInsertTaskUtility MPI Insert Task Utility
- To save the programmer from having to explicit all communications, StarPU
- provides an "MPI Insert Task Utility". The principe is that the application
- decides a distribution of the data over the MPI nodes by allocating it and
- notifying StarPU of that decision, i.e. tell StarPU which MPI node "owns"
- which data. It also decides, for each handle, an MPI tag which will be used to
- exchange the content of the handle. All MPI nodes then process the whole task
- graph, and StarPU automatically determines which node actually execute which
- task, and trigger the required MPI transfers.
- The list of functions is described in \ref MPIInsertTask "MPI Insert Task".
- Here an stencil example showing how to use starpu_mpi_task_insert(). One
- first needs to define a distribution function which specifies the
- locality of the data. Note that the data needs to be registered to MPI
- by calling starpu_mpi_data_register(). This function allows to set
- the distribution information and the MPI tag which should be used when
- communicating the data. It also allows to automatically clear the MPI
- communication cache when unregistering the data.
- \code{.c}
- /* Returns the MPI node number where data is */
- int my_distrib(int x, int y, int nb_nodes) {
- /* Block distrib */
- return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;
- // /* Other examples useful for other kinds of computations */
- // /* / distrib */
- // return (x+y) % nb_nodes;
- // /* Block cyclic distrib */
- // unsigned side = sqrt(nb_nodes);
- // return x % side + (y % side) * size;
- }
- \endcode
- Now the data can be registered within StarPU. Data which are not
- owned but will be needed for computations can be registered through
- the lazy allocation mechanism, i.e. with a <c>home_node</c> set to <c>-1</c>.
- StarPU will automatically allocate the memory when it is used for the
- first time.
- One can note an optimization here (the <c>else if</c> test): we only register
- data which will be needed by the tasks that we will execute.
- \code{.c}
- unsigned matrix[X][Y];
- starpu_data_handle_t data_handles[X][Y];
- for(x = 0; x < X; x++) {
- for (y = 0; y < Y; y++) {
- int mpi_rank = my_distrib(x, y, size);
- if (mpi_rank == my_rank)
- /* Owning data */
- starpu_variable_data_register(&data_handles[x][y], STARPU_MAIN_RAM,
- (uintptr_t)&(matrix[x][y]), sizeof(unsigned));
- else if (my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
- || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size))
- /* I don't own that index, but will need it for my computations */
- starpu_variable_data_register(&data_handles[x][y], -1,
- (uintptr_t)NULL, sizeof(unsigned));
- else
- /* I know it's useless to allocate anything for this */
- data_handles[x][y] = NULL;
- if (data_handles[x][y]) {
- starpu_mpi_data_register(data_handles[x][y], x*X+y, mpi_rank);
- }
- }
- }
- \endcode
- Now starpu_mpi_task_insert() can be called for the different
- steps of the application.
- \code{.c}
- for(loop=0 ; loop<niter; loop++)
- for (x = 1; x < X-1; x++)
- for (y = 1; y < Y-1; y++)
- starpu_mpi_task_insert(MPI_COMM_WORLD, &stencil5_cl,
- STARPU_RW, data_handles[x][y],
- STARPU_R, data_handles[x-1][y],
- STARPU_R, data_handles[x+1][y],
- STARPU_R, data_handles[x][y-1],
- STARPU_R, data_handles[x][y+1],
- 0);
- starpu_task_wait_for_all();
- \endcode
- I.e. all MPI nodes process the whole task graph, but as mentioned above, for
- each task, only the MPI node which owns the data being written to (here,
- <c>data_handles[x][y]</c>) will actually run the task. The other MPI nodes will
- automatically send the required data.
- This can be a concern with a growing number of nodes. To avoid this, the
- application can prune the task for loops according to the data distribution,
- so as to only submit tasks on nodes which have to care about them (either to
- execute them, or to send the required data).
- A way to do some of this quite easily can be to just add an <c>if</c> like this:
- \code{.c}
- for(loop=0 ; loop<niter; loop++)
- for (x = 1; x < X-1; x++)
- for (y = 1; y < Y-1; y++)
- if (my_distrib(x,y,size) == my_rank
- || my_distrib(x-1,y,size) == my_rank
- || my_distrib(x+1,y,size) == my_rank
- || my_distrib(x,y-1,size) == my_rank
- || my_distrib(x,y+1,size) == my_rank)
- starpu_mpi_insert_task(MPI_COMM_WORLD, &stencil5_cl,
- STARPU_RW, data_handles[x][y],
- STARPU_R, data_handles[x-1][y],
- STARPU_R, data_handles[x+1][y],
- STARPU_R, data_handles[x][y-1],
- STARPU_R, data_handles[x][y+1],
- 0);
- starpu_task_wait_for_all();
- \endcode
- This permits to drop the cost of function call argument passing and parsing.
- If the <c>my_distrib</c> function can be inlined by the compiler, the latter can
- improve the test.
- If the <c>size</c> can be made a compile-time constant, the compiler can
- considerably improve the test further.
- If the distribution function is not too complex and the compiler is very good,
- the latter can even optimize the <c>for</c> loops, thus dramatically reducing
- the cost of task submission.
- A function starpu_mpi_task_build() is also provided with the aim to
- only construct the task structure. All MPI nodes need to call the
- function, only the node which is to execute the task will return a
- valid task structure. Following the execution of the task, all nodes
- need to call the function starpu_mpi_task_post_build() -- with the same
- list of arguments as starpu_mpi_task_build() -- to post all the
- necessary data communications.
- \code{.c}
- struct starpu_task *task;
- task = starpu_mpi_task_build(MPI_COMM_WORLD, &cl,
- STARPU_RW, data_handles[0],
- STARPU_R, data_handles[1],
- 0);
- if (task) starpu_task_submit(task);
- starpu_mpi_task_post_build(MPI_COMM_WORLD, &cl,
- STARPU_RW, data_handles[0],
- STARPU_R, data_handles[1],
- 0);
- \endcode
- \section MPICache MPI cache support
- StarPU-MPI automatically optimizes duplicate data transmissions: if an MPI
- node B needs a piece of data D from MPI node A for several tasks, only one
- transmission of D will take place from A to B, and the value of D will be kept
- on B as long as no task modifies D.
- If a task modifies D, B will wait for all tasks which need the previous value of
- D, before invalidating the value of D. As a consequence, it releases the memory
- occupied by D. Whenever a task running on B needs the new value of D, allocation
- will take place again to receive it.
- Since tasks can be submitted dynamically, StarPU-MPI can not know whether the
- current value of data D will again be used by a newly-submitted task before
- being modified by another newly-submitted task, so until a task is submitted to
- modify the current value, it can not decide by itself whether to flush the cache
- or not. The application can however explicitly tell StarPU-MPI to flush the
- cache by calling starpu_mpi_cache_flush() or starpu_mpi_cache_flush_all_data(),
- for instance in case the data will not be used at all any more (see for instance
- the cholesky example in mpi/examples/matrix_decomposition), or at least not in
- the close future. If a newly-submitted task actually needs the value again,
- another transmission of D will be initiated from A to B. A mere
- starpu_mpi_cache_flush_all_data() can for instance be added at the end of the whole
- algorithm, to express that no data will be reused after that (or at least that
- it is not interesting to keep them in cache). It may however be interesting to
- add fine-graph starpu_mpi_cache_flush() calls during the algorithm; the effect
- for the data deallocation will be the same, but it will additionally release some
- pressure from the StarPU-MPI cache hash table during task submission.
- The whole caching behavior can be disabled thanks to the \ref STARPU_MPI_CACHE
- environment variable. The variable \ref STARPU_MPI_CACHE_STATS can be set to 1
- to enable the runtime to display messages when data are added or removed
- from the cache holding the received data.
- \section MPIMigration MPI Data migration
- The application can dynamically change its mind about the data distribution, to
- balance the load over MPI nodes for instance. This can be done very simply by
- requesting an explicit move and then change the registered rank. For instance,
- we here switch to a new distribution function <c>my_distrib2</c>: we first
- register any data that wasn't registered already and will be needed, then
- migrate the data, and register the new location.
- \code{.c}
- for(x = 0; x < X; x++) {
- for (y = 0; y < Y; y++) {
- int mpi_rank = my_distrib2(x, y, size);
- if (!data_handles[x][y] && (mpi_rank == my_rank
- || my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
- || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size)))
- /* Register newly-needed data */
- starpu_variable_data_register(&data_handles[x][y], -1,
- (uintptr_t)NULL, sizeof(unsigned));
- if (data_handles[x][y]) {
- /* Migrate the data */
- starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, data_handles[x][y], mpi_rank, NULL, NULL);
- /* And register the new rank of the matrix */
- starpu_mpi_data_set_rank(data_handles[x][y], mpi_rank);
- }
- }
- }
- \endcode
- From then on, further tasks submissions will use the new data distribution,
- which will thus change both MPI communications and task assignments.
- Very importantly, since all nodes have to agree on which node owns which data
- so as to determine MPI communications and task assignments the same way, all
- nodes have to perform the same data migration, and at the same point among task
- submissions. It thus does not require a strict synchronization, just a clear
- separation of task submissions before and after the data redistribution.
- Before data unregistration, it has to be migrated back to its original home
- node (the value, at least), since that is where the user-provided buffer
- resides. Otherwise the unregistration will complain that it does not have the
- latest value on the original home node.
- \code{.c}
- for(x = 0; x < X; x++) {
- for (y = 0; y < Y; y++) {
- if (data_handles[x][y]) {
- int mpi_rank = my_distrib(x, y, size);
- /* Get back data to original place where the user-provided buffer is. */
- starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, data_handles[x][y], mpi_rank, NULL, NULL);
- /* And unregister it */
- starpu_data_unregister(data_handles[x][y]);
- }
- }
- }
- \endcode
- \section MPICollective MPI Collective Operations
- The functions are described in \ref MPICollectiveOperations "MPI Collective Operations".
- \code{.c}
- if (rank == root)
- {
- /* Allocate the vector */
- vector = malloc(nblocks * sizeof(float *));
- for(x=0 ; x<nblocks ; x++)
- {
- starpu_malloc((void **)&vector[x], block_size*sizeof(float));
- }
- }
- /* Allocate data handles and register data to StarPU */
- data_handles = malloc(nblocks*sizeof(starpu_data_handle_t *));
- for(x = 0; x < nblocks ; x++)
- {
- int mpi_rank = my_distrib(x, nodes);
- if (rank == root) {
- starpu_vector_data_register(&data_handles[x], STARPU_MAIN_RAM, (uintptr_t)vector[x],
- blocks_size, sizeof(float));
- }
- else if ((mpi_rank == rank) || ((rank == mpi_rank+1 || rank == mpi_rank-1))) {
- /* I own that index, or i will need it for my computations */
- starpu_vector_data_register(&data_handles[x], -1, (uintptr_t)NULL,
- block_size, sizeof(float));
- }
- else {
- /* I know it's useless to allocate anything for this */
- data_handles[x] = NULL;
- }
- if (data_handles[x]) {
- starpu_mpi_data_register(data_handles[x], x*nblocks+y, mpi_rank);
- }
- }
- /* Scatter the matrix among the nodes */
- starpu_mpi_scatter_detached(data_handles, nblocks, root, MPI_COMM_WORLD);
- /* Calculation */
- for(x = 0; x < nblocks ; x++) {
- if (data_handles[x]) {
- int owner = starpu_data_get_rank(data_handles[x]);
- if (owner == rank) {
- starpu_task_insert(&cl, STARPU_RW, data_handles[x], 0);
- }
- }
- }
- /* Gather the matrix on main node */
- starpu_mpi_gather_detached(data_handles, nblocks, 0, MPI_COMM_WORLD);
- \endcode
- */
- \section MPIExamples More MPI examples
- MPI examples are available in the StarPU source code in mpi/examples:
- <ul>
- <li>
- <c>comm</c> shows how to use communicators with StarPU-MPI
- </li>
- <li>
- <c>complex</c> is a simple example using a user-define data interface over
- MPI (complex numbers),
- </li>
- <li>
- <c>stencil5</c> is a simple stencil example using starpu_mpi_task_insert(),
- </li>
- <li>
- <c>matrix_decomposition</c> is a cholesky decomposition example using
- starpu_mpi_task_insert(). The non-distributed version can check for
- <algorithm correctness in 1-node configuration, the distributed version uses
- exactly the same source code, to be used over MPI,
- </li>
- <li>
- <c>mpi_lu</c> is an LU decomposition example, provided in three versions:
- <c>plu_example</c> uses explicit MPI data transfers, <c>plu_implicit_example</c>
- uses implicit MPI data transfers, <c>plu_outofcore_example</c> uses implicit MPI
- data transfers and supports data matrices which do not fit in memory (out-of-core).
- </li>
- </ul>
|