/*
* This file is part of the StarPU Handbook.
* Copyright (C) 2009--2011 Universit@'e de Bordeaux 1
* Copyright (C) 2010, 2011, 2012, 2013 Centre National de la Recherche Scientifique
* Copyright (C) 2011, 2012 Institut National de Recherche en Informatique et Automatique
* See the file version.doxy for copying conditions.
*/
/*! \page MPISupport MPI Support
The integration of MPI transfers within task parallelism is done in a
very natural way by the means of asynchronous interactions between the
application and StarPU. This is implemented in a separate libstarpumpi library
which basically provides "StarPU" equivalents of MPI_* functions, where
void * buffers are replaced with ::starpu_data_handle_t, and all
GPU-RAM-NIC transfers are handled efficiently by StarPU-MPI. The user has to
use the usual mpirun command of the MPI implementation to start StarPU on
the different MPI nodes.
An MPI Insert Task function provides an even more seamless transition to a
distributed application, by automatically issuing all required data transfers
according to the task graph and an application-provided distribution.
\section SimpleExample Simple Example
The flags required to compile or link against the MPI layer are
accessible with the following commands:
\verbatim
$ pkg-config --cflags starpumpi-1.2 # options for the compiler
$ pkg-config --libs starpumpi-1.2 # options for the linker
\endverbatim
You also need pass the option --static if the application is to
be linked statically.
\code{.c}
void increment_token(void)
{
struct starpu_task *task = starpu_task_create();
task->cl = &increment_cl;
task->handles[0] = token_handle;
starpu_task_submit(task);
}
int main(int argc, char **argv)
{
int rank, size;
starpu_init(NULL);
starpu_mpi_initialize_extended(&rank, &size);
starpu_vector_data_register(&token_handle, STARPU_MAIN_RAM, (uintptr_t)&token, 1, sizeof(unsigned));
unsigned nloops = NITER;
unsigned loop;
unsigned last_loop = nloops - 1;
unsigned last_rank = size - 1;
for (loop = 0; loop < nloops; loop++) {
int tag = loop*size + rank;
if (loop == 0 && rank == 0)
{
token = 0;
fprintf(stdout, "Start with token value %d\n", token);
}
else
{
starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag,
MPI_COMM_WORLD, NULL, NULL);
}
increment_token();
if (loop == last_loop && rank == last_rank)
{
starpu_data_acquire(token_handle, STARPU_R);
fprintf(stdout, "Finished: token value %d\n", token);
starpu_data_release(token_handle);
}
else
{
starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1,
MPI_COMM_WORLD, NULL, NULL);
}
}
starpu_task_wait_for_all();
starpu_mpi_shutdown();
starpu_shutdown();
if (rank == last_rank)
{
fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size);
STARPU_ASSERT(token == nloops*size);
}
\endcode
\section PointToPointCommunication Point To Point Communication
The standard point to point communications of MPI have been
implemented. The semantic is similar to the MPI one, but adapted to
the DSM provided by StarPU. A MPI request will only be submitted when
the data is available in the main memory of the node submitting the
request.
There is two types of asynchronous communications: the classic
asynchronous communications and the detached communications. The
classic asynchronous communications (starpu_mpi_isend() and
starpu_mpi_irecv()) need to be followed by a call to
starpu_mpi_wait() or to starpu_mpi_test() to wait for or to
test the completion of the communication. Waiting for or testing the
completion of detached communications is not possible, this is done
internally by StarPU-MPI, on completion, the resources are
automatically released. This mechanism is similar to the pthread
detach state attribute which determines whether a thread will be
created in a joinable or a detached state.
For any communication, the call of the function will result in the
creation of a StarPU-MPI request, the function
starpu_data_acquire_cb() is then called to asynchronously request
StarPU to fetch the data in main memory; when the data is available in
main memory, a StarPU-MPI function is called to put the new request in
the list of the ready requests if it is a send request, or in an
hashmap if it is a receive request.
Internally, all MPI communications submitted by StarPU uses a unique
tag which has a default value, and can be accessed with the functions
starpu_mpi_get_communication_tag() and
starpu_mpi_set_communication_tag().
The matching of tags with corresponding requests is done into StarPU-MPI.
To handle this, any communication is a double-communication based on a
envelope + data system. Every data which will be sent needs to send an
envelope which describes the data (particularly its tag) before sending
the data, so the receiver can get the matching pending receive request
from the hashmap, and submit it to recieve the data correctly.
To this aim, the StarPU-MPI progression thread has a permanent-submitted
request destined to receive incoming envelopes from all sources.
The StarPU-MPI progression thread regularly polls this list of ready
requests. For each new ready request, the appropriate function is
called to post the corresponding MPI call. For example, calling
starpu_mpi_isend() will result in posting MPI_Isend. If
the request is marked as detached, the request will be put in the list
of detached requests.
The StarPU-MPI progression thread also polls the list of detached
requests. For each detached request, it regularly tests the completion
of the MPI request by calling MPI_Test. On completion, the data
handle is released, and if a callback was defined, it is called.
Finally, the StarPU-MPI progression thread checks if an envelope has
arrived. If it is, it'll check if the corresponding receive has already
been submitted by the application. If it is, it'll submit the request
just as like as it does with those on the list of ready requests.
If it is not, it'll allocate a temporary handle to store the data that
will arrive just after, so as when the corresponding receive request
will be submitted by the application, it'll copy this temporary handle
into its one instead of submitting a new StarPU-MPI request.
\ref MPIPtpCommunication "Communication" gives the list of all the
point to point communications defined in StarPU-MPI.
\section ExchangingUserDefinedDataInterface Exchanging User Defined Data Interface
New data interfaces defined as explained in \ref
DefiningANewDataInterface can also be used within StarPU-MPI and
exchanged between nodes. Two functions needs to be defined through the
type starpu_data_interface_ops. The function
starpu_data_interface_ops::pack_data takes a handle and returns a
contiguous memory buffer along with its size where data to be conveyed
to another node should be copied. The reversed operation is
implemented in the function starpu_data_interface_ops::unpack_data which
takes a contiguous memory buffer and recreates the data handle.
\code{.c}
static int complex_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, ssize_t *count)
{
STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
struct starpu_complex_interface *complex_interface =
(struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
*count = complex_get_size(handle);
*ptr = malloc(*count);
memcpy(*ptr, complex_interface->real, complex_interface->nx*sizeof(double));
memcpy(*ptr+complex_interface->nx*sizeof(double), complex_interface->imaginary,
complex_interface->nx*sizeof(double));
return 0;
}
static int complex_unpack_data(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count)
{
STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
struct starpu_complex_interface *complex_interface =
(struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
memcpy(complex_interface->real, ptr, complex_interface->nx*sizeof(double));
memcpy(complex_interface->imaginary, ptr+complex_interface->nx*sizeof(double),
complex_interface->nx*sizeof(double));
return 0;
}
static struct starpu_data_interface_ops interface_complex_ops =
{
...
.pack_data = complex_pack_data,
.unpack_data = complex_unpack_data
};
\endcode
\section MPIInsertTaskUtility MPI Insert Task Utility
To save the programmer from having to explicit all communications, StarPU
provides an "MPI Insert Task Utility". The principe is that the application
decides a distribution of the data over the MPI nodes by allocating it and
notifying StarPU of that decision, i.e. tell StarPU which MPI node "owns"
which data. It also decides, for each handle, an MPI tag which will be used to
exchange the content of the handle. All MPI nodes then process the whole task
graph, and StarPU automatically determines which node actually execute which
task, and trigger the required MPI transfers.
The list of functions is described in \ref MPIInsertTask "MPI Insert Task".
Here an stencil example showing how to use starpu_mpi_insert_task(). One
first needs to define a distribution function which specifies the
locality of the data. Note that that distribution information needs to
be given to StarPU by calling starpu_data_set_rank(). A MPI tag
should also be defined for each data handle by calling
starpu_data_set_tag().
\code{.c}
/* Returns the MPI node number where data is */
int my_distrib(int x, int y, int nb_nodes) {
/* Block distrib */
return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;
// /* Other examples useful for other kinds of computations */
// /* / distrib */
// return (x+y) % nb_nodes;
// /* Block cyclic distrib */
// unsigned side = sqrt(nb_nodes);
// return x % side + (y % side) * size;
}
\endcode
Now the data can be registered within StarPU. Data which are not
owned but will be needed for computations can be registered through
the lazy allocation mechanism, i.e. with a home_node set to -1.
StarPU will automatically allocate the memory when it is used for the
first time.
One can note an optimization here (the else if test): we only register
data which will be needed by the tasks that we will execute.
\code{.c}
unsigned matrix[X][Y];
starpu_data_handle_t data_handles[X][Y];
for(x = 0; x < X; x++) {
for (y = 0; y < Y; y++) {
int mpi_rank = my_distrib(x, y, size);
if (mpi_rank == my_rank)
/* Owning data */
starpu_variable_data_register(&data_handles[x][y], STARPU_MAIN_RAM,
(uintptr_t)&(matrix[x][y]), sizeof(unsigned));
else if (my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
|| my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size))
/* I don't own that index, but will need it for my computations */
starpu_variable_data_register(&data_handles[x][y], -1,
(uintptr_t)NULL, sizeof(unsigned));
else
/* I know it's useless to allocate anything for this */
data_handles[x][y] = NULL;
if (data_handles[x][y]) {
starpu_data_set_rank(data_handles[x][y], mpi_rank);
starpu_data_set_tag(data_handles[x][y], x*X+y);
}
}
}
\endcode
Now starpu_mpi_insert_task() can be called for the different
steps of the application.
\code{.c}
for(loop=0 ; loopdata_handles[x][y]) will actually run the task. The other MPI nodes will
automatically send the required data.
This can be a concern with a growing number of nodes. To avoid this, the
application can prune the task for loops according to the data distribution,
so as to only submit tasks on nodes which have to care about them (either to
execute them, or to send the required data).
\section MPICollective MPI Collective Operations
The functions are described in \ref MPICollectiveOperations "MPI Collective Operations".
\code{.c}
if (rank == root)
{
/* Allocate the vector */
vector = malloc(nblocks * sizeof(float *));
for(x=0 ; x
complex is a simple example using a user-define data interface over
MPI (complex numbers),
stencil5 is a simple stencil example using starpu_mpi_insert_task,
matrix_decomposition is a cholesky decomposition example using
starpu_mpi_insert_task. The non-distributed version can check for
mpi_lu is an LU decomposition example, provided in both explicit and
implicit versions.