/*
* This file is part of the StarPU Handbook.
* Copyright (C) 2009--2011 Universit@'e de Bordeaux
* Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015 CNRS
* Copyright (C) 2011, 2012 INRIA
* See the file version.doxy for copying conditions.
*/
/*! \page MPISupport MPI Support
The integration of MPI transfers within task parallelism is done in a
very natural way by the means of asynchronous interactions between the
application and StarPU. This is implemented in a separate libstarpumpi library
which basically provides "StarPU" equivalents of MPI_* functions, where
void * buffers are replaced with ::starpu_data_handle_t, and all
GPU-RAM-NIC transfers are handled efficiently by StarPU-MPI. The user has to
use the usual mpirun command of the MPI implementation to start StarPU on
the different MPI nodes.
An MPI Insert Task function provides an even more seamless transition to a
distributed application, by automatically issuing all required data transfers
according to the task graph and an application-provided distribution.
\section SimpleExample Simple Example
The flags required to compile or link against the MPI layer are
accessible with the following commands:
\verbatim
$ pkg-config --cflags starpumpi-1.3 # options for the compiler
$ pkg-config --libs starpumpi-1.3 # options for the linker
\endverbatim
You also need pass the option --static if the application is to
be linked statically.
\code{.c}
void increment_token(void)
{
struct starpu_task *task = starpu_task_create();
task->cl = &increment_cl;
task->handles[0] = token_handle;
starpu_task_submit(task);
}
int main(int argc, char **argv)
{
int rank, size;
starpu_init(NULL);
starpu_mpi_initialize_extended(&rank, &size);
starpu_vector_data_register(&token_handle, STARPU_MAIN_RAM, (uintptr_t)&token, 1, sizeof(unsigned));
unsigned nloops = NITER;
unsigned loop;
unsigned last_loop = nloops - 1;
unsigned last_rank = size - 1;
for (loop = 0; loop < nloops; loop++) {
int tag = loop*size + rank;
if (loop == 0 && rank == 0)
{
token = 0;
fprintf(stdout, "Start with token value %d\n", token);
}
else
{
starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag,
MPI_COMM_WORLD, NULL, NULL);
}
increment_token();
if (loop == last_loop && rank == last_rank)
{
starpu_data_acquire(token_handle, STARPU_R);
fprintf(stdout, "Finished: token value %d\n", token);
starpu_data_release(token_handle);
}
else
{
starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1,
MPI_COMM_WORLD, NULL, NULL);
}
}
starpu_task_wait_for_all();
starpu_mpi_shutdown();
starpu_shutdown();
if (rank == last_rank)
{
fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size);
STARPU_ASSERT(token == nloops*size);
}
\endcode
\section PointToPointCommunication Point To Point Communication
The standard point to point communications of MPI have been
implemented. The semantic is similar to the MPI one, but adapted to
the DSM provided by StarPU. A MPI request will only be submitted when
the data is available in the main memory of the node submitting the
request.
There is two types of asynchronous communications: the classic
asynchronous communications and the detached communications. The
classic asynchronous communications (starpu_mpi_isend() and
starpu_mpi_irecv()) need to be followed by a call to
starpu_mpi_wait() or to starpu_mpi_test() to wait for or to
test the completion of the communication. Waiting for or testing the
completion of detached communications is not possible, this is done
internally by StarPU-MPI, on completion, the resources are
automatically released. This mechanism is similar to the pthread
detach state attribute which determines whether a thread will be
created in a joinable or a detached state.
Internally, all communication are divided in 2 communications, a first
message is used to exchange an envelope describing the data (i.e its
tag and its size), the data itself is sent in a second message. All
MPI communications submitted by StarPU uses a unique tag which has a
default value, and can be accessed with the functions
starpu_mpi_get_communication_tag() and
starpu_mpi_set_communication_tag(). The matching of tags with
corresponding requests is done within StarPU-MPI.
For any userland communication, the call of the corresponding function
(e.g starpu_mpi_isend()) will result in the creation of a StarPU-MPI
request, the function starpu_data_acquire_cb() is then called to
asynchronously request StarPU to fetch the data in main memory; when
the data is ready and the corresponding buffer has already been
received by MPI, it will be copied in the memory of the data,
otherwise the request is stored in the early requests list. Sending
requests are stored in the ready requests list.
While requests need to be processed, the StarPU-MPI progression thread
does the following:
- it polls the ready requests list. For all the ready
requests, the appropriate function is called to post the corresponding
MPI call. For example, an initial call to starpu_mpi_isend() will
result in a call to MPI_Isend. If the request is marked as
detached, the request will then be added in the detached requests
list.
- it posts a MPI_Irecv() to retrieve a data envelope.
- it polls the detached requests list. For all the detached
requests, it tests its completion of the MPI request by calling
MPI_Test. On completion, the data handle is released, and if a
callback was defined, it is called.
- finally, it checks if a data envelope has been received. If so,
if the data envelope matches a request in the early requests list (i.e
the request has already been posted by the application), the
corresponding MPI call is posted (similarly to the first step above).
If the data envelope does not match any application request, a
temporary handle is created to receive the data, a StarPU-MPI request
is created and added into the ready requests list, and thus will be
processed in the first step of the next loop.
\ref MPIPtpCommunication "Communication" gives the list of all the
point to point communications defined in StarPU-MPI.
\section ExchangingUserDefinedDataInterface Exchanging User Defined Data Interface
New data interfaces defined as explained in \ref
DefiningANewDataInterface can also be used within StarPU-MPI and
exchanged between nodes. Two functions needs to be defined through the
type starpu_data_interface_ops. The function
starpu_data_interface_ops::pack_data takes a handle and returns a
contiguous memory buffer allocated with starpu_malloc_flags(ptr, size, 0) along with its size where data to be conveyed
to another node should be copied. The reversed operation is
implemented in the function starpu_data_interface_ops::unpack_data which
takes a contiguous memory buffer and recreates the data handle.
\code{.c}
static int complex_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, ssize_t *count)
{
STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
struct starpu_complex_interface *complex_interface =
(struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
*count = complex_get_size(handle);
starpu_malloc_flags(ptr, *count, 0);
memcpy(*ptr, complex_interface->real, complex_interface->nx*sizeof(double));
memcpy(*ptr+complex_interface->nx*sizeof(double), complex_interface->imaginary,
complex_interface->nx*sizeof(double));
return 0;
}
static int complex_unpack_data(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count)
{
STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
struct starpu_complex_interface *complex_interface =
(struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
memcpy(complex_interface->real, ptr, complex_interface->nx*sizeof(double));
memcpy(complex_interface->imaginary, ptr+complex_interface->nx*sizeof(double),
complex_interface->nx*sizeof(double));
return 0;
}
static struct starpu_data_interface_ops interface_complex_ops =
{
...
.pack_data = complex_pack_data,
.unpack_data = complex_unpack_data
};
\endcode
\section MPIInsertTaskUtility MPI Insert Task Utility
To save the programmer from having to explicit all communications, StarPU
provides an "MPI Insert Task Utility". The principe is that the application
decides a distribution of the data over the MPI nodes by allocating it and
notifying StarPU of that decision, i.e. tell StarPU which MPI node "owns"
which data. It also decides, for each handle, an MPI tag which will be used to
exchange the content of the handle. All MPI nodes then process the whole task
graph, and StarPU automatically determines which node actually execute which
task, and trigger the required MPI transfers.
The list of functions is described in \ref MPIInsertTask "MPI Insert Task".
Here an stencil example showing how to use starpu_mpi_task_insert(). One
first needs to define a distribution function which specifies the
locality of the data. Note that the data needs to be registered to MPI
by calling starpu_mpi_data_register(). This function allows to set
the distribution information and the MPI tag which should be used when
communicating the data. It also allows to automatically clear the MPI
communication cache when unregistering the data.
\code{.c}
/* Returns the MPI node number where data is */
int my_distrib(int x, int y, int nb_nodes) {
/* Block distrib */
return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;
// /* Other examples useful for other kinds of computations */
// /* / distrib */
// return (x+y) % nb_nodes;
// /* Block cyclic distrib */
// unsigned side = sqrt(nb_nodes);
// return x % side + (y % side) * size;
}
\endcode
Now the data can be registered within StarPU. Data which are not
owned but will be needed for computations can be registered through
the lazy allocation mechanism, i.e. with a home_node set to -1.
StarPU will automatically allocate the memory when it is used for the
first time.
One can note an optimization here (the else if test): we only register
data which will be needed by the tasks that we will execute.
\code{.c}
unsigned matrix[X][Y];
starpu_data_handle_t data_handles[X][Y];
for(x = 0; x < X; x++) {
for (y = 0; y < Y; y++) {
int mpi_rank = my_distrib(x, y, size);
if (mpi_rank == my_rank)
/* Owning data */
starpu_variable_data_register(&data_handles[x][y], STARPU_MAIN_RAM,
(uintptr_t)&(matrix[x][y]), sizeof(unsigned));
else if (my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
|| my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size))
/* I don't own that index, but will need it for my computations */
starpu_variable_data_register(&data_handles[x][y], -1,
(uintptr_t)NULL, sizeof(unsigned));
else
/* I know it's useless to allocate anything for this */
data_handles[x][y] = NULL;
if (data_handles[x][y]) {
starpu_mpi_data_register(data_handles[x][y], x*X+y, mpi_rank);
}
}
}
\endcode
Now starpu_mpi_task_insert() can be called for the different
steps of the application.
\code{.c}
for(loop=0 ; loopdata_handles[x][y]) will actually run the task. The other MPI nodes will
automatically send the required data.
This can be a concern with a growing number of nodes. To avoid this, the
application can prune the task for loops according to the data distribution,
so as to only submit tasks on nodes which have to care about them (either to
execute them, or to send the required data).
A way to do some of this quite easily can be to just add an if like this:
\code{.c}
for(loop=0 ; loopmy_distrib function can be inlined by the compiler, the latter can
improve the test.
If the size can be made a compile-time constant, the compiler can
considerably improve the test further.
If the distribution function is not too complex and the compiler is very good,
the latter can even optimize the for loops, thus dramatically reducing
the cost of task submission.
A function starpu_mpi_task_build() is also provided with the aim to
only construct the task structure. All MPI nodes need to call the
function, only the node which is to execute the task will return a
valid task structure. Following the execution of the task, all nodes
need to call the function starpu_mpi_task_post_build() -- with the same
list of arguments as starpu_mpi_task_build() -- to post all the
necessary data communications.
\code{.c}
struct starpu_task *task;
task = starpu_mpi_task_build(MPI_COMM_WORLD, &cl,
STARPU_RW, data_handles[0],
STARPU_R, data_handles[1],
0);
if (task) starpu_task_submit(task);
starpu_mpi_task_post_build(MPI_COMM_WORLD, &cl,
STARPU_RW, data_handles[0],
STARPU_R, data_handles[1],
0);
\endcode
\section MPICache MPI cache support
StarPU-MPI automatically optimizes duplicate data transmissions: if an MPI
node B needs a piece of data D from MPI node A for several tasks, only one
transmission of D will take place from A to B, and the value of D will be kept
on B as long as no task modifies D.
If a task modifies D, B will wait for all tasks which need the previous value of
D, before invalidating the value of D. As a consequence, it releases the memory
occupied by D. Whenever a task running on B needs the new value of D, allocation
will take place again to receive it.
Since tasks can be submitted dynamically, StarPU-MPI can not know whether the
current value of data D will again be used by a newly-submitted task before
being modified by another newly-submitted task, so until a task is submitted to
modify the current value, it can not decide by itself whether to flush the cache
or not. The application can however explicitly tell StarPU-MPI to flush the
cache by calling starpu_mpi_cache_flush() or starpu_mpi_cache_flush_all_data(),
for instance in case the data will not be used at all any more (see for instance
the cholesky example in mpi/examples/matrix_decomposition), or at least not in
the close future. If a newly-submitted task actually needs the value again,
another transmission of D will be initiated from A to B. A mere
starpu_mpi_cache_flush_all_data() can for instance be added at the end of the whole
algorithm, to express that no data will be reused after that (or at least that
it is not interesting to keep them in cache). It may however be interesting to
add fine-graph starpu_mpi_cache_flush() calls during the algorithm; the effect
for the data deallocation will be the same, but it will additionally release some
pressure from the StarPU-MPI cache hash table during task submission.
The whole caching behavior can be disabled thanks to the \ref STARPU_MPI_CACHE
environment variable. The variable \ref STARPU_MPI_CACHE_STATS can be set to 1
to enable the runtime to display messages when data are added or removed
from the cache holding the received data.
\section MPIMigration MPI Data migration
The application can dynamically change its mind about the data distribution, to
balance the load over MPI nodes for instance. This can be done very simply by
requesting an explicit move and then change the registered rank. For instance,
we here switch to a new distribution function my_distrib2: we first
register any data that wasn't registered already and will be needed, then
migrate the data, and register the new location.
\code{.c}
for(x = 0; x < X; x++) {
for (y = 0; y < Y; y++) {
int mpi_rank = my_distrib2(x, y, size);
if (!data_handles[x][y] && (mpi_rank == my_rank
|| my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
|| my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size)))
/* Register newly-needed data */
starpu_variable_data_register(&data_handles[x][y], -1,
(uintptr_t)NULL, sizeof(unsigned));
if (data_handles[x][y]) {
/* Migrate the data */
starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, data_handles[x][y], mpi_rank, NULL, NULL);
/* And register the new rank of the matrix */
starpu_mpi_data_set_rank(data_handles[x][y], mpi_rank);
}
}
}
\endcode
From then on, further tasks submissions will use the new data distribution,
which will thus change both MPI communications and task assignments.
Very importantly, since all nodes have to agree on which node owns which data
so as to determine MPI communications and task assignments the same way, all
nodes have to perform the same data migration, and at the same point among task
submissions. It thus does not require a strict synchronization, just a clear
separation of task submissions before and after the data redistribution.
Before data unregistration, it has to be migrated back to its original home
node (the value, at least), since that is where the user-provided buffer
resides. Otherwise the unregistration will complain that it does not have the
latest value on the original home node.
\code{.c}
for(x = 0; x < X; x++) {
for (y = 0; y < Y; y++) {
if (data_handles[x][y]) {
int mpi_rank = my_distrib(x, y, size);
/* Get back data to original place where the user-provided buffer is. */
starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, data_handles[x][y], mpi_rank, NULL, NULL);
/* And unregister it */
starpu_data_unregister(data_handles[x][y]);
}
}
}
\endcode
\section MPICollective MPI Collective Operations
The functions are described in \ref MPICollectiveOperations "MPI Collective Operations".
\code{.c}
if (rank == root)
{
/* Allocate the vector */
vector = malloc(nblocks * sizeof(float *));
for(x=0 ; x
comm shows how to use communicators with StarPU-MPI
complex is a simple example using a user-define data interface over
MPI (complex numbers),
stencil5 is a simple stencil example using starpu_mpi_task_insert(),
matrix_decomposition is a cholesky decomposition example using
starpu_mpi_task_insert(). The non-distributed version can check for
mpi_lu is an LU decomposition example, provided in three versions:
plu_example uses explicit MPI data transfers, plu_implicit_example
uses implicit MPI data transfers, plu_outofcore_example uses implicit MPI
data transfers and supports data matrices which do not fit in memory (out-of-core).