@c -*-texinfo-*- @c This file is part of the StarPU Handbook. @c Copyright (C) 2009--2011 Universit@'e de Bordeaux 1 @c Copyright (C) 2010, 2011, 2012, 2013 Centre National de la Recherche Scientifique @c Copyright (C) 2011 Institut National de Recherche en Informatique et Automatique @c See the file starpu.texi for copying conditions. The integration of MPI transfers within task parallelism is done in a very natural way by the means of asynchronous interactions between the application and StarPU. This is implemented in a separate libstarpumpi library which basically provides "StarPU" equivalents of @code{MPI_*} functions, where @code{void *} buffers are replaced with @code{starpu_data_handle_t}s, and all GPU-RAM-NIC transfers are handled efficiently by StarPU-MPI. The user has to use the usual @code{mpirun} command of the MPI implementation to start StarPU on the different MPI nodes. An MPI Insert Task function provides an even more seamless transition to a distributed application, by automatically issuing all required data transfers according to the task graph and an application-provided distribution. @menu * Simple Example:: * Point to point communication:: * Exchanging User Defined Data Interface:: * MPI Insert Task Utility:: * MPI Collective Operations:: @end menu @node Simple Example @section Simple Example The flags required to compile or link against the MPI layer are then accessible with the following commands: @example $ pkg-config --cflags starpumpi-1.0 # options for the compiler $ pkg-config --libs starpumpi-1.0 # options for the linker @end example Also pass the @code{--static} option if the application is to be linked statically. @cartouche @smallexample void increment_token(void) @{ struct starpu_task *task = starpu_task_create(); task->cl = &increment_cl; task->handles[0] = token_handle; starpu_task_submit(task); @} @end smallexample @end cartouche @cartouche @smallexample int main(int argc, char **argv) @{ int rank, size; starpu_init(NULL); starpu_mpi_initialize_extended(&rank, &size); starpu_vector_data_register(&token_handle, 0, (uintptr_t)&token, 1, sizeof(unsigned)); unsigned nloops = NITER; unsigned loop; unsigned last_loop = nloops - 1; unsigned last_rank = size - 1; @end smallexample @end cartouche @cartouche @smallexample for (loop = 0; loop < nloops; loop++) @{ int tag = loop*size + rank; if (loop == 0 && rank == 0) @{ token = 0; fprintf(stdout, "Start with token value %d\n", token); @} else @{ starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag, MPI_COMM_WORLD, NULL, NULL); @} increment_token(); if (loop == last_loop && rank == last_rank) @{ starpu_data_acquire(token_handle, STARPU_R); fprintf(stdout, "Finished: token value %d\n", token); starpu_data_release(token_handle); @} else @{ starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1, MPI_COMM_WORLD, NULL, NULL); @} @} starpu_task_wait_for_all(); @end smallexample @end cartouche @cartouche @smallexample starpu_mpi_shutdown(); starpu_shutdown(); if (rank == last_rank) @{ fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size); STARPU_ASSERT(token == nloops*size); @} @end smallexample @end cartouche @node Point to point communication @section Point to point communication The standard point to point communications of MPI have been implemented. The semantic is similar to the MPI one, but adapted to the DSM provided by StarPU. A MPI request will only be submitted when the data is available in the main memory of the node submitting the request. There is two types of asynchronous communications: the classic asynchronous communications and the detached communications. The classic asynchronous communications (@code{starpu_mpi_isend} and @code{starpu_mpi_irecv}) need to be followed by a call to @code{starpu_mpi_wait} or to @code{starpu_mpi_test} to wait for or to test the completion of the communication. Waiting for or testing the completion of detached communications is not possible, this is done internally by StarPU-MPI, on completion, the resources are automatically released. This mechanism is similar to the pthread detach state attribute which determines whether a thread will be created in a joinable or a detached state. For any communication, the call of the function will result in the creation of a StarPU-MPI request, the function @code{starpu_data_acquire_cb} is then called to asynchronously request StarPU to fetch the data in main memory; when the data is available in main memory, a StarPU-MPI function is called to put the new request in the list of the ready requests. The StarPU-MPI progression thread regularly polls this list of ready requests. For each new ready request, the appropriate function is called to post the corresponding MPI call. For example, calling @code{starpu_mpi_isend} will result in posting @code{MPI_Isend}. If the request is marked as detached, the request will be put in the list of detached requests. The StarPU-MPI progression thread also polls the list of detached requests. For each detached request, it regularly tests the completion of the MPI request by calling @code{MPI_Test}. On completion, the data handle is released, and if a callback was defined, it is called. @ref{Communication} gives the list of all the point to point communications defined in StarPU-MPI. @node Exchanging User Defined Data Interface @section Exchanging User Defined Data Interface New data interfaces defined as explained in @ref{Defining a New Data Interface} can also be used within StarPU-MPI and exchanged between nodes. Two functions needs to be defined through the type @code{struct starpu_data_interface_ops} (@pxref{Defining Interface}). The pack function takes a handle and returns a contiguous memory buffer along with its size where data to be conveyed to another node should be copied. The reversed operation is implemented in the unpack function which takes a contiguous memory buffer and recreates the data handle. @cartouche @smallexample static int complex_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, ssize_t *count) @{ STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node)); struct starpu_complex_interface *complex_interface = (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node); *count = complex_get_size(handle); *ptr = malloc(*count); memcpy(*ptr, complex_interface->real, complex_interface->nx*sizeof(double)); memcpy(*ptr+complex_interface->nx*sizeof(double), complex_interface->imaginary, complex_interface->nx*sizeof(double)); return 0; @} @end smallexample @end cartouche @cartouche @smallexample static int complex_unpack_data(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count) @{ STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node)); struct starpu_complex_interface *complex_interface = (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node); memcpy(complex_interface->real, ptr, complex_interface->nx*sizeof(double)); memcpy(complex_interface->imaginary, ptr+complex_interface->nx*sizeof(double), complex_interface->nx*sizeof(double)); return 0; @} @end smallexample @end cartouche @cartouche @smallexample static struct starpu_data_interface_ops interface_complex_ops = @{ ... .pack_data = complex_pack_data, .unpack_data = complex_unpack_data @}; @end smallexample @end cartouche @node MPI Insert Task Utility @section MPI Insert Task Utility To save the programmer from having to explicit all communications, StarPU provides an "MPI Insert Task Utility". The principe is that the application decides a distribution of the data over the MPI nodes by allocating it and notifying StarPU of that decision, i.e. tell StarPU which MPI node "owns" which data. It also decides, for each handle, an MPI tag which will be used to exchange the content of the handle. All MPI nodes then process the whole task graph, and StarPU automatically determines which node actually execute which task, and trigger the required MPI transfers. The list of functions are described in @ref{MPI Insert Task}. Here an stencil example showing how to use @code{starpu_mpi_insert_task}. One first needs to define a distribution function which specifies the locality of the data. Note that that distribution information needs to be given to StarPU by calling @code{starpu_data_set_rank}. A MPI tag should also be defined for each data handle by calling @code{starpu_data_set_tag}. @cartouche @smallexample /* Returns the MPI node number where data is */ int my_distrib(int x, int y, int nb_nodes) @{ /* Block distrib */ return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes; // /* Other examples useful for other kinds of computations */ // /* / distrib */ // return (x+y) % nb_nodes; // /* Block cyclic distrib */ // unsigned side = sqrt(nb_nodes); // return x % side + (y % side) * size; @} @end smallexample @end cartouche Now the data can be registered within StarPU. Data which are not owned but will be needed for computations can be registered through the lazy allocation mechanism, i.e. with a @code{home_node} set to -1. StarPU will automatically allocate the memory when it is used for the first time. One can note an optimization here (the @code{else if} test): we only register data which will be needed by the tasks that we will execute. @cartouche @smallexample unsigned matrix[X][Y]; starpu_data_handle_t data_handles[X][Y]; for(x = 0; x < X; x++) @{ for (y = 0; y < Y; y++) @{ int mpi_rank = my_distrib(x, y, size); if (mpi_rank == my_rank) /* Owning data */ starpu_variable_data_register(&data_handles[x][y], 0, (uintptr_t)&(matrix[x][y]), sizeof(unsigned)); else if (my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size) || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size)) /* I don't own that index, but will need it for my computations */ starpu_variable_data_register(&data_handles[x][y], -1, (uintptr_t)NULL, sizeof(unsigned)); else /* I know it's useless to allocate anything for this */ data_handles[x][y] = NULL; if (data_handles[x][y]) @{ starpu_data_set_rank(data_handles[x][y], mpi_rank); starpu_data_set_tag(data_handles[x][y], x*X+y); @} @} @} @end smallexample @end cartouche Now @code{starpu_mpi_insert_task()} can be called for the different steps of the application. @cartouche @smallexample for(loop=0 ; loop