123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419 |
- @c -*-texinfo-*-
- @c This file is part of the StarPU Handbook.
- @c Copyright (C) 2009--2011 Universit@'e de Bordeaux 1
- @c Copyright (C) 2010, 2011, 2012, 2013 Centre National de la Recherche Scientifique
- @c Copyright (C) 2011 Institut National de Recherche en Informatique et Automatique
- @c See the file starpu.texi for copying conditions.
- The integration of MPI transfers within task parallelism is done in a
- very natural way by the means of asynchronous interactions between the
- application and StarPU. This is implemented in a separate libstarpumpi library
- which basically provides "StarPU" equivalents of @code{MPI_*} functions, where
- @code{void *} buffers are replaced with @code{starpu_data_handle_t}s, and all
- GPU-RAM-NIC transfers are handled efficiently by StarPU-MPI. The user has to
- use the usual @code{mpirun} command of the MPI implementation to start StarPU on
- the different MPI nodes.
- An MPI Insert Task function provides an even more seamless transition to a
- distributed application, by automatically issuing all required data transfers
- according to the task graph and an application-provided distribution.
- @menu
- * Simple Example::
- * Point to point communication::
- * Exchanging User Defined Data Interface::
- * MPI Insert Task Utility::
- * MPI Collective Operations::
- @end menu
- @node Simple Example
- @section Simple Example
- The flags required to compile or link against the MPI layer are
- accessible with the following commands:
- @example
- $ pkg-config --cflags starpumpi-1.0 # options for the compiler
- $ pkg-config --libs starpumpi-1.0 # options for the linker
- @end example
- You also need pass the @code{--static} option if the application is to
- be linked statically.
- @cartouche
- @smallexample
- void increment_token(void)
- @{
- struct starpu_task *task = starpu_task_create();
- task->cl = &increment_cl;
- task->handles[0] = token_handle;
- starpu_task_submit(task);
- @}
- @end smallexample
- @end cartouche
- @cartouche
- @smallexample
- int main(int argc, char **argv)
- @{
- int rank, size;
- starpu_init(NULL);
- starpu_mpi_initialize_extended(&rank, &size);
- starpu_vector_data_register(&token_handle, 0, (uintptr_t)&token, 1, sizeof(unsigned));
- unsigned nloops = NITER;
- unsigned loop;
- unsigned last_loop = nloops - 1;
- unsigned last_rank = size - 1;
- @end smallexample
- @end cartouche
- @cartouche
- @smallexample
- for (loop = 0; loop < nloops; loop++) @{
- int tag = loop*size + rank;
- if (loop == 0 && rank == 0)
- @{
- token = 0;
- fprintf(stdout, "Start with token value %d\n", token);
- @}
- else
- @{
- starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag,
- MPI_COMM_WORLD, NULL, NULL);
- @}
- increment_token();
- if (loop == last_loop && rank == last_rank)
- @{
- starpu_data_acquire(token_handle, STARPU_R);
- fprintf(stdout, "Finished: token value %d\n", token);
- starpu_data_release(token_handle);
- @}
- else
- @{
- starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1,
- MPI_COMM_WORLD, NULL, NULL);
- @}
- @}
- starpu_task_wait_for_all();
- @end smallexample
- @end cartouche
- @cartouche
- @smallexample
- starpu_mpi_shutdown();
- starpu_shutdown();
- if (rank == last_rank)
- @{
- fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size);
- STARPU_ASSERT(token == nloops*size);
- @}
- @end smallexample
- @end cartouche
- @node Point to point communication
- @section Point to point communication
- The standard point to point communications of MPI have been
- implemented. The semantic is similar to the MPI one, but adapted to
- the DSM provided by StarPU. A MPI request will only be submitted when
- the data is available in the main memory of the node submitting the
- request.
- There is two types of asynchronous communications: the classic
- asynchronous communications and the detached communications. The
- classic asynchronous communications (@code{starpu_mpi_isend} and
- @code{starpu_mpi_irecv}) need to be followed by a call to
- @code{starpu_mpi_wait} or to @code{starpu_mpi_test} to wait for or to
- test the completion of the communication. Waiting for or testing the
- completion of detached communications is not possible, this is done
- internally by StarPU-MPI, on completion, the resources are
- automatically released. This mechanism is similar to the pthread
- detach state attribute which determines whether a thread will be
- created in a joinable or a detached state.
- For any communication, the call of the function will result in the
- creation of a StarPU-MPI request, the function
- @code{starpu_data_acquire_cb} is then called to asynchronously request
- StarPU to fetch the data in main memory; when the data is available in
- main memory, a StarPU-MPI function is called to put the new request in
- the list of the ready requests if it is a send request, or in an
- hashmap if it is a receive request.
- Internally, all MPI communications submitted by StarPU uses a unique
- tag which has a default value, and can be accessed with the functions
- @ref{starpu_mpi_get_communication_tag} and
- @ref{starpu_mpi_set_communication_tag}.
- The matching of tags with corresponding requests is done into StarPU-MPI.
- To handle this, any communication is a double-communication based on a
- envelope + data system. Every data which will be sent needs to send an
- envelope which describes the data (particularly its tag) before sending
- the data, so the receiver can get the matching pending receive request
- from the hashmap, and submit it to recieve the data correctly.
- To this aim, the StarPU-MPI progression thread has a permanent-submitted
- request destined to receive incoming envelopes from all sources.
- The StarPU-MPI progression thread regularly polls this list of ready
- requests. For each new ready request, the appropriate function is
- called to post the corresponding MPI call. For example, calling
- @code{starpu_mpi_isend} will result in posting @code{MPI_Isend}. If
- the request is marked as detached, the request will be put in the list
- of detached requests.
- The StarPU-MPI progression thread also polls the list of detached
- requests. For each detached request, it regularly tests the completion
- of the MPI request by calling @code{MPI_Test}. On completion, the data
- handle is released, and if a callback was defined, it is called.
- Finally, the StarPU-MPI progression thread checks if an envelope has
- arrived. If it is, it'll check if the corresponding receive has already
- been submitted by the application. If it is, it'll submit the request
- just as like as it does with those on the list of ready requests.
- If it is not, it'll allocate a temporary handle to store the data that
- will arrive just after, so as when the corresponding receive request
- will be submitted by the application, it'll copy this temporary handle
- into its one instead of submitting a new StarPU-MPI request.
- @ref{Communication} gives the list of all the point to point
- communications defined in StarPU-MPI.
- @node Exchanging User Defined Data Interface
- @section Exchanging User Defined Data Interface
- New data interfaces defined as explained in @ref{Defining a New Data
- Interface} can also be used within StarPU-MPI and exchanged between
- nodes. Two functions needs to be defined through
- the type @code{struct starpu_data_interface_ops} (@pxref{Defining
- Interface}). The pack function takes a handle and returns a
- contiguous memory buffer along with its size where data to be conveyed to another node
- should be copied. The reversed operation is implemented in the unpack
- function which takes a contiguous memory buffer and recreates the data
- handle.
- @cartouche
- @smallexample
- static int complex_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, ssize_t *count)
- @{
- STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
- struct starpu_complex_interface *complex_interface =
- (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
- *count = complex_get_size(handle);
- *ptr = malloc(*count);
- memcpy(*ptr, complex_interface->real, complex_interface->nx*sizeof(double));
- memcpy(*ptr+complex_interface->nx*sizeof(double), complex_interface->imaginary,
- complex_interface->nx*sizeof(double));
- return 0;
- @}
- @end smallexample
- @end cartouche
- @cartouche
- @smallexample
- static int complex_unpack_data(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count)
- @{
- STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
- struct starpu_complex_interface *complex_interface =
- (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
- memcpy(complex_interface->real, ptr, complex_interface->nx*sizeof(double));
- memcpy(complex_interface->imaginary, ptr+complex_interface->nx*sizeof(double),
- complex_interface->nx*sizeof(double));
- return 0;
- @}
- @end smallexample
- @end cartouche
- @cartouche
- @smallexample
- static struct starpu_data_interface_ops interface_complex_ops =
- @{
- ...
- .pack_data = complex_pack_data,
- .unpack_data = complex_unpack_data
- @};
- @end smallexample
- @end cartouche
- @node MPI Insert Task Utility
- @section MPI Insert Task Utility
- To save the programmer from having to explicit all communications, StarPU
- provides an "MPI Insert Task Utility". The principe is that the application
- decides a distribution of the data over the MPI nodes by allocating it and
- notifying StarPU of that decision, i.e. tell StarPU which MPI node "owns"
- which data. It also decides, for each handle, an MPI tag which will be used to
- exchange the content of the handle. All MPI nodes then process the whole task
- graph, and StarPU automatically determines which node actually execute which
- task, and trigger the required MPI transfers.
- The list of functions are described in @ref{MPI Insert Task}.
- Here an stencil example showing how to use @code{starpu_mpi_insert_task}. One
- first needs to define a distribution function which specifies the
- locality of the data. Note that that distribution information needs to
- be given to StarPU by calling @code{starpu_data_set_rank}. A MPI tag
- should also be defined for each data handle by calling
- @code{starpu_data_set_tag}.
- @cartouche
- @smallexample
- /* Returns the MPI node number where data is */
- int my_distrib(int x, int y, int nb_nodes) @{
- /* Block distrib */
- return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;
- // /* Other examples useful for other kinds of computations */
- // /* / distrib */
- // return (x+y) % nb_nodes;
- // /* Block cyclic distrib */
- // unsigned side = sqrt(nb_nodes);
- // return x % side + (y % side) * size;
- @}
- @end smallexample
- @end cartouche
- Now the data can be registered within StarPU. Data which are not
- owned but will be needed for computations can be registered through
- the lazy allocation mechanism, i.e. with a @code{home_node} set to -1.
- StarPU will automatically allocate the memory when it is used for the
- first time.
- One can note an optimization here (the @code{else if} test): we only register
- data which will be needed by the tasks that we will execute.
- @cartouche
- @smallexample
- unsigned matrix[X][Y];
- starpu_data_handle_t data_handles[X][Y];
- for(x = 0; x < X; x++) @{
- for (y = 0; y < Y; y++) @{
- int mpi_rank = my_distrib(x, y, size);
- if (mpi_rank == my_rank)
- /* Owning data */
- starpu_variable_data_register(&data_handles[x][y], 0,
- (uintptr_t)&(matrix[x][y]), sizeof(unsigned));
- else if (my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
- || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size))
- /* I don't own that index, but will need it for my computations */
- starpu_variable_data_register(&data_handles[x][y], -1,
- (uintptr_t)NULL, sizeof(unsigned));
- else
- /* I know it's useless to allocate anything for this */
- data_handles[x][y] = NULL;
- if (data_handles[x][y]) @{
- starpu_data_set_rank(data_handles[x][y], mpi_rank);
- starpu_data_set_tag(data_handles[x][y], x*X+y);
- @}
- @}
- @}
- @end smallexample
- @end cartouche
- Now @code{starpu_mpi_insert_task()} can be called for the different
- steps of the application.
- @cartouche
- @smallexample
- for(loop=0 ; loop<niter; loop++)
- for (x = 1; x < X-1; x++)
- for (y = 1; y < Y-1; y++)
- starpu_mpi_insert_task(MPI_COMM_WORLD, &stencil5_cl,
- STARPU_RW, data_handles[x][y],
- STARPU_R, data_handles[x-1][y],
- STARPU_R, data_handles[x+1][y],
- STARPU_R, data_handles[x][y-1],
- STARPU_R, data_handles[x][y+1],
- 0);
- starpu_task_wait_for_all();
- @end smallexample
- @end cartouche
- I.e. all MPI nodes process the whole task graph, but as mentioned above, for
- each task, only the MPI node which owns the data being written to (here,
- @code{data_handles[x][y]}) will actually run the task. The other MPI nodes will
- automatically send the required data.
- This can be a concern with a growing number of nodes. To avoid this, the
- application can prune the task for loops according to the data distribution,
- so as to only submit tasks on nodes which have to care about them (either to
- execute them, or to send the required data).
- @node MPI Collective Operations
- @section MPI Collective Operations
- The functions are described in @ref{Collective Operations}.
- @cartouche
- @smallexample
- if (rank == root)
- @{
- /* Allocate the vector */
- vector = malloc(nblocks * sizeof(float *));
- for(x=0 ; x<nblocks ; x++)
- @{
- starpu_malloc((void **)&vector[x], block_size*sizeof(float));
- @}
- @}
- /* Allocate data handles and register data to StarPU */
- data_handles = malloc(nblocks*sizeof(starpu_data_handle_t *));
- for(x = 0; x < nblocks ; x++)
- @{
- int mpi_rank = my_distrib(x, nodes);
- if (rank == root) @{
- starpu_vector_data_register(&data_handles[x], 0, (uintptr_t)vector[x],
- blocks_size, sizeof(float));
- @}
- else if ((mpi_rank == rank) || ((rank == mpi_rank+1 || rank == mpi_rank-1))) @{
- /* I own that index, or i will need it for my computations */
- starpu_vector_data_register(&data_handles[x], -1, (uintptr_t)NULL,
- block_size, sizeof(float));
- @}
- else @{
- /* I know it's useless to allocate anything for this */
- data_handles[x] = NULL;
- @}
- if (data_handles[x]) @{
- starpu_data_set_rank(data_handles[x], mpi_rank);
- starpu_data_set_tag(data_handles[x], x*nblocks+y);
- @}
- @}
- /* Scatter the matrix among the nodes */
- starpu_mpi_scatter_detached(data_handles, nblocks, root, MPI_COMM_WORLD);
- /* Calculation */
- for(x = 0; x < nblocks ; x++) @{
- if (data_handles[x]) @{
- int owner = starpu_data_get_rank(data_handles[x]);
- if (owner == rank) @{
- starpu_insert_task(&cl, STARPU_RW, data_handles[x], 0);
- @}
- @}
- @}
- /* Gather the matrix on main node */
- starpu_mpi_gather_detached(data_handles, nblocks, 0, MPI_COMM_WORLD);
- @end smallexample
- @end cartouche
|