123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453 |
- /*
- * This file is part of the StarPU Handbook.
- * Copyright (C) 2009--2011 Universit@'e de Bordeaux 1
- * Copyright (C) 2010, 2011, 2012, 2013 Centre National de la Recherche Scientifique
- * Copyright (C) 2011, 2012 Institut National de Recherche en Informatique et Automatique
- * See the file version.doxy for copying conditions.
- */
- /*! \page MPISupport MPI Support
- The integration of MPI transfers within task parallelism is done in a
- very natural way by the means of asynchronous interactions between the
- application and StarPU. This is implemented in a separate libstarpumpi library
- which basically provides "StarPU" equivalents of <c>MPI_*</c> functions, where
- <c>void *</c> buffers are replaced with ::starpu_data_handle_t, and all
- GPU-RAM-NIC transfers are handled efficiently by StarPU-MPI. The user has to
- use the usual <c>mpirun</c> command of the MPI implementation to start StarPU on
- the different MPI nodes.
- An MPI Insert Task function provides an even more seamless transition to a
- distributed application, by automatically issuing all required data transfers
- according to the task graph and an application-provided distribution.
- \section SimpleExample Simple Example
- The flags required to compile or link against the MPI layer are
- accessible with the following commands:
- \verbatim
- $ pkg-config --cflags starpumpi-1.2 # options for the compiler
- $ pkg-config --libs starpumpi-1.2 # options for the linker
- \endverbatim
- You also need pass the option <c>--static</c> if the application is to
- be linked statically.
- \code{.c}
- void increment_token(void)
- {
- struct starpu_task *task = starpu_task_create();
- task->cl = &increment_cl;
- task->handles[0] = token_handle;
- starpu_task_submit(task);
- }
- int main(int argc, char **argv)
- {
- int rank, size;
- starpu_init(NULL);
- starpu_mpi_initialize_extended(&rank, &size);
- starpu_vector_data_register(&token_handle, STARPU_MAIN_RAM, (uintptr_t)&token, 1, sizeof(unsigned));
- unsigned nloops = NITER;
- unsigned loop;
- unsigned last_loop = nloops - 1;
- unsigned last_rank = size - 1;
- for (loop = 0; loop < nloops; loop++) {
- int tag = loop*size + rank;
- if (loop == 0 && rank == 0)
- {
- token = 0;
- fprintf(stdout, "Start with token value %d\n", token);
- }
- else
- {
- starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag,
- MPI_COMM_WORLD, NULL, NULL);
- }
- increment_token();
- if (loop == last_loop && rank == last_rank)
- {
- starpu_data_acquire(token_handle, STARPU_R);
- fprintf(stdout, "Finished: token value %d\n", token);
- starpu_data_release(token_handle);
- }
- else
- {
- starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1,
- MPI_COMM_WORLD, NULL, NULL);
- }
- }
- starpu_task_wait_for_all();
- starpu_mpi_shutdown();
- starpu_shutdown();
- if (rank == last_rank)
- {
- fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size);
- STARPU_ASSERT(token == nloops*size);
- }
- \endcode
- \section PointToPointCommunication Point To Point Communication
- The standard point to point communications of MPI have been
- implemented. The semantic is similar to the MPI one, but adapted to
- the DSM provided by StarPU. A MPI request will only be submitted when
- the data is available in the main memory of the node submitting the
- request.
- There is two types of asynchronous communications: the classic
- asynchronous communications and the detached communications. The
- classic asynchronous communications (starpu_mpi_isend() and
- starpu_mpi_irecv()) need to be followed by a call to
- starpu_mpi_wait() or to starpu_mpi_test() to wait for or to
- test the completion of the communication. Waiting for or testing the
- completion of detached communications is not possible, this is done
- internally by StarPU-MPI, on completion, the resources are
- automatically released. This mechanism is similar to the pthread
- detach state attribute which determines whether a thread will be
- created in a joinable or a detached state.
- For any communication, the call of the function will result in the
- creation of a StarPU-MPI request, the function
- starpu_data_acquire_cb() is then called to asynchronously request
- StarPU to fetch the data in main memory; when the data is available in
- main memory, a StarPU-MPI function is called to put the new request in
- the list of the ready requests if it is a send request, or in an
- hashmap if it is a receive request.
- Internally, all MPI communications submitted by StarPU uses a unique
- tag which has a default value, and can be accessed with the functions
- starpu_mpi_get_communication_tag() and
- starpu_mpi_set_communication_tag().
- The matching of tags with corresponding requests is done into StarPU-MPI.
- To handle this, any communication is a double-communication based on a
- envelope + data system. Every data which will be sent needs to send an
- envelope which describes the data (particularly its tag) before sending
- the data, so the receiver can get the matching pending receive request
- from the hashmap, and submit it to recieve the data correctly.
- To this aim, the StarPU-MPI progression thread has a permanent-submitted
- request destined to receive incoming envelopes from all sources.
- The StarPU-MPI progression thread regularly polls this list of ready
- requests. For each new ready request, the appropriate function is
- called to post the corresponding MPI call. For example, calling
- starpu_mpi_isend() will result in posting <c>MPI_Isend</c>. If
- the request is marked as detached, the request will be put in the list
- of detached requests.
- The StarPU-MPI progression thread also polls the list of detached
- requests. For each detached request, it regularly tests the completion
- of the MPI request by calling <c>MPI_Test</c>. On completion, the data
- handle is released, and if a callback was defined, it is called.
- Finally, the StarPU-MPI progression thread checks if an envelope has
- arrived. If it is, it'll check if the corresponding receive has already
- been submitted by the application. If it is, it'll submit the request
- just as like as it does with those on the list of ready requests.
- If it is not, it'll allocate a temporary handle to store the data that
- will arrive just after, so as when the corresponding receive request
- will be submitted by the application, it'll copy this temporary handle
- into its one instead of submitting a new StarPU-MPI request.
- \ref MPIPtpCommunication "Communication" gives the list of all the
- point to point communications defined in StarPU-MPI.
- \section ExchangingUserDefinedDataInterface Exchanging User Defined Data Interface
- New data interfaces defined as explained in \ref
- DefiningANewDataInterface can also be used within StarPU-MPI and
- exchanged between nodes. Two functions needs to be defined through the
- type starpu_data_interface_ops. The function
- starpu_data_interface_ops::pack_data takes a handle and returns a
- contiguous memory buffer along with its size where data to be conveyed
- to another node should be copied. The reversed operation is
- implemented in the function starpu_data_interface_ops::unpack_data which
- takes a contiguous memory buffer and recreates the data handle.
- \code{.c}
- static int complex_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, ssize_t *count)
- {
- STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
- struct starpu_complex_interface *complex_interface =
- (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
- *count = complex_get_size(handle);
- *ptr = malloc(*count);
- memcpy(*ptr, complex_interface->real, complex_interface->nx*sizeof(double));
- memcpy(*ptr+complex_interface->nx*sizeof(double), complex_interface->imaginary,
- complex_interface->nx*sizeof(double));
- return 0;
- }
- static int complex_unpack_data(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count)
- {
- STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
- struct starpu_complex_interface *complex_interface =
- (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
- memcpy(complex_interface->real, ptr, complex_interface->nx*sizeof(double));
- memcpy(complex_interface->imaginary, ptr+complex_interface->nx*sizeof(double),
- complex_interface->nx*sizeof(double));
- return 0;
- }
- static struct starpu_data_interface_ops interface_complex_ops =
- {
- ...
- .pack_data = complex_pack_data,
- .unpack_data = complex_unpack_data
- };
- \endcode
- \section MPIInsertTaskUtility MPI Insert Task Utility
- To save the programmer from having to explicit all communications, StarPU
- provides an "MPI Insert Task Utility". The principe is that the application
- decides a distribution of the data over the MPI nodes by allocating it and
- notifying StarPU of that decision, i.e. tell StarPU which MPI node "owns"
- which data. It also decides, for each handle, an MPI tag which will be used to
- exchange the content of the handle. All MPI nodes then process the whole task
- graph, and StarPU automatically determines which node actually execute which
- task, and trigger the required MPI transfers.
- The list of functions is described in \ref MPIInsertTask "MPI Insert Task".
- Here an stencil example showing how to use starpu_mpi_task_insert(). One
- first needs to define a distribution function which specifies the
- locality of the data. Note that that distribution information needs to
- be given to StarPU by calling starpu_data_set_rank(). A MPI tag
- should also be defined for each data handle by calling
- starpu_data_set_tag().
- \code{.c}
- /* Returns the MPI node number where data is */
- int my_distrib(int x, int y, int nb_nodes) {
- /* Block distrib */
- return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;
- // /* Other examples useful for other kinds of computations */
- // /* / distrib */
- // return (x+y) % nb_nodes;
- // /* Block cyclic distrib */
- // unsigned side = sqrt(nb_nodes);
- // return x % side + (y % side) * size;
- }
- \endcode
- Now the data can be registered within StarPU. Data which are not
- owned but will be needed for computations can be registered through
- the lazy allocation mechanism, i.e. with a <c>home_node</c> set to <c>-1</c>.
- StarPU will automatically allocate the memory when it is used for the
- first time.
- One can note an optimization here (the <c>else if</c> test): we only register
- data which will be needed by the tasks that we will execute.
- \code{.c}
- unsigned matrix[X][Y];
- starpu_data_handle_t data_handles[X][Y];
- for(x = 0; x < X; x++) {
- for (y = 0; y < Y; y++) {
- int mpi_rank = my_distrib(x, y, size);
- if (mpi_rank == my_rank)
- /* Owning data */
- starpu_variable_data_register(&data_handles[x][y], STARPU_MAIN_RAM,
- (uintptr_t)&(matrix[x][y]), sizeof(unsigned));
- else if (my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
- || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size))
- /* I don't own that index, but will need it for my computations */
- starpu_variable_data_register(&data_handles[x][y], -1,
- (uintptr_t)NULL, sizeof(unsigned));
- else
- /* I know it's useless to allocate anything for this */
- data_handles[x][y] = NULL;
- if (data_handles[x][y]) {
- starpu_data_set_rank(data_handles[x][y], mpi_rank);
- starpu_data_set_tag(data_handles[x][y], x*X+y);
- }
- }
- }
- \endcode
- Now starpu_mpi_task_insert() can be called for the different
- steps of the application.
- \code{.c}
- for(loop=0 ; loop<niter; loop++)
- for (x = 1; x < X-1; x++)
- for (y = 1; y < Y-1; y++)
- starpu_mpi_task_insert(MPI_COMM_WORLD, &stencil5_cl,
- STARPU_RW, data_handles[x][y],
- STARPU_R, data_handles[x-1][y],
- STARPU_R, data_handles[x+1][y],
- STARPU_R, data_handles[x][y-1],
- STARPU_R, data_handles[x][y+1],
- 0);
- starpu_task_wait_for_all();
- \endcode
- I.e. all MPI nodes process the whole task graph, but as mentioned above, for
- each task, only the MPI node which owns the data being written to (here,
- <c>data_handles[x][y]</c>) will actually run the task. The other MPI nodes will
- automatically send the required data.
- This can be a concern with a growing number of nodes. To avoid this, the
- application can prune the task for loops according to the data distribution,
- so as to only submit tasks on nodes which have to care about them (either to
- execute them, or to send the required data).
- \section MPIMigration MPI Data migration
- The application can dynamically change its mind about the data distribution, to
- balance the load over MPI nodes for instance. This can be done very simply by
- requesting an explicit move and then change the registered rank. For instance,
- we here switch to a new distribution function <c>my_distrib2</c>: we first
- register any data that wasn't registered already and will be needed, then
- migrate the data, and register the new location.
- \code{.c}
- for(x = 0; x < X; x++) {
- for (y = 0; y < Y; y++) {
- int mpi_rank = my_distrib2(x, y, size);
- if (!data_handles[x][y] && (mpi_rank == my_rank
- || my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
- || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size)))
- /* Register newly-needed data */
- starpu_variable_data_register(&data_handles[x][y], -1,
- (uintptr_t)NULL, sizeof(unsigned));
- if (data_handles[x][y]) {
- /* Migrate the data */
- starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, data_handles[x][y], mpi_rank, NULL, NULL);
- /* And register the new rank of the matrix */
- starpu_data_set_rank(data_handles[x][y], mpi_rank);
- }
- }
- }
- \endcode
- From then on, further tasks submissions will use the new data distribution,
- which will thus change both MPI communications and task assignments.
- Very importantly, since all nodes have to agree on which node owns which data
- so as to determine MPI communications and task assignments the same way, all
- nodes have to perform the same data migration, and at the same point among task
- submissions. It thus does not require a strict synchronization, just a clear
- separation of task submissions before and after the data redistribution.
- Before data unregistration, it has to be migrated back to its original home
- node (the value, at least), since that is where the user-provided buffer
- resides. Otherwise the unregistration will complain that it does not have the
- latest value on the original home node.
- \code{.c}
- for(x = 0; x < X; x++) {
- for (y = 0; y < Y; y++) {
- if (data_handles[x][y]) {
- int mpi_rank = my_distrib(x, y, size);
- /* Get back data to original place where the user-provided buffer is. */
- starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, data_handles[x][y], mpi_rank, NULL, NULL);
- /* And unregister it */
- starpu_data_unregister(data_handles[x][y]);
- }
- }
- }
- \endcode
- \section MPICollective MPI Collective Operations
- The functions are described in \ref MPICollectiveOperations "MPI Collective Operations".
- \code{.c}
- if (rank == root)
- {
- /* Allocate the vector */
- vector = malloc(nblocks * sizeof(float *));
- for(x=0 ; x<nblocks ; x++)
- {
- starpu_malloc((void **)&vector[x], block_size*sizeof(float));
- }
- }
- /* Allocate data handles and register data to StarPU */
- data_handles = malloc(nblocks*sizeof(starpu_data_handle_t *));
- for(x = 0; x < nblocks ; x++)
- {
- int mpi_rank = my_distrib(x, nodes);
- if (rank == root) {
- starpu_vector_data_register(&data_handles[x], STARPU_MAIN_RAM, (uintptr_t)vector[x],
- blocks_size, sizeof(float));
- }
- else if ((mpi_rank == rank) || ((rank == mpi_rank+1 || rank == mpi_rank-1))) {
- /* I own that index, or i will need it for my computations */
- starpu_vector_data_register(&data_handles[x], -1, (uintptr_t)NULL,
- block_size, sizeof(float));
- }
- else {
- /* I know it's useless to allocate anything for this */
- data_handles[x] = NULL;
- }
- if (data_handles[x]) {
- starpu_data_set_rank(data_handles[x], mpi_rank);
- starpu_data_set_tag(data_handles[x], x*nblocks+y);
- }
- }
- /* Scatter the matrix among the nodes */
- starpu_mpi_scatter_detached(data_handles, nblocks, root, MPI_COMM_WORLD);
- /* Calculation */
- for(x = 0; x < nblocks ; x++) {
- if (data_handles[x]) {
- int owner = starpu_data_get_rank(data_handles[x]);
- if (owner == rank) {
- starpu_task_insert(&cl, STARPU_RW, data_handles[x], 0);
- }
- }
- }
- /* Gather the matrix on main node */
- starpu_mpi_gather_detached(data_handles, nblocks, 0, MPI_COMM_WORLD);
- \endcode
- */
- \section MPIExamples More MPI examples
- MPI examples are available in the StarPU source code in mpi/examples:
- <ul>
- <li><c>complex</c> is a simple example using a user-define data interface over
- MPI (complex numbers),
- <li><c>stencil5</c> is a simple stencil example using starpu_mpi_task_insert(),
- <li><c>matrix_decomposition</c> is a cholesky decomposition example using
- starpu_mpi_task_insert(). The non-distributed version can check for
- <algorithm correctness in 1-node configuration, the distributed version uses
- exactly the same source code, to be used over MPI,
- <li><c>mpi_lu</c> is an LU decomposition example, provided in three versions:
- <c>plu_example</c> uses explicit MPI data transfers, <c>plu_implicit_example</c>
- uses implicit MPI data transfers, <c>plu_outofcore_example</c> uses implicit MPI
- data transfers and supports data matrices which do not fit in memory (out-of-core).
- </ul>
|