mpi-support.texi 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. @c This file is part of the StarPU Handbook.
  2. @c Copyright (C) 2009--2011 Universit@'e de Bordeaux 1
  3. @c Copyright (C) 2010, 2011 Centre National de la Recherche Scientifique
  4. @c Copyright (C) 2011 Institut National de Recherche en Informatique et Automatique
  5. @c See the file starpu.texi for copying conditions.
  6. @node StarPU MPI support
  7. @chapter StarPU MPI support
  8. The integration of MPI transfers within task parallelism is done in a
  9. very natural way by the means of asynchronous interactions between the
  10. application and StarPU. This is implemented in a separate libstarpumpi library
  11. which basically provides "StarPU" equivalents of @code{MPI_*} functions, where
  12. @code{void *} buffers are replaced with @code{starpu_data_handle}s, and all
  13. GPU-RAM-NIC transfers are handled efficiently by StarPU-MPI. The user has to
  14. use the usual @code{mpirun} command of the MPI implementation to start StarPU on
  15. the different MPI nodes.
  16. An MPI Insert Task function provides an even more seamless transition to a
  17. distributed application, by automatically issuing all required data transfers
  18. according to the task graph and an application-provided distribution.
  19. @menu
  20. * The API::
  21. * Simple Example::
  22. * MPI Insert Task Utility::
  23. * MPI Collective Operations::
  24. @end menu
  25. @node The API
  26. @section The API
  27. @subsection Compilation
  28. The flags required to compile or link against the MPI layer are then
  29. accessible with the following commands:
  30. @example
  31. % pkg-config --cflags libstarpumpi # options for the compiler
  32. % pkg-config --libs libstarpumpi # options for the linker
  33. @end example
  34. @subsection Initialisation
  35. @deftypefun int starpu_mpi_initialize (void)
  36. Initializes the starpumpi library. This must be called between calling
  37. @code{starpu_init} and other @code{starpu_mpi} functions. This
  38. function does not call @code{MPI_Init}, it should be called beforehand.
  39. @end deftypefun
  40. @deftypefun int starpu_mpi_initialize_extended (int *@var{rank}, int *@var{world_size})
  41. Initializes the starpumpi library. This must be called between calling
  42. @code{starpu_init} and other @code{starpu_mpi} functions.
  43. This function calls @code{MPI_Init}, and therefore should be prefered
  44. to the previous one for MPI implementations which are not thread-safe.
  45. Returns the current MPI node rank and world size.
  46. @end deftypefun
  47. @deftypefun int starpu_mpi_shutdown (void)
  48. Cleans the starpumpi library. This must be called between calling
  49. @code{starpu_mpi} functions and @code{starpu_shutdown}.
  50. @code{MPI_Finalize} will be called if StarPU-MPI has been initialized
  51. by calling @code{starpu_mpi_initialize_extended}.
  52. @end deftypefun
  53. @subsection Communication
  54. @deftypefun int starpu_mpi_send (starpu_data_handle @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm})
  55. @end deftypefun
  56. @deftypefun int starpu_mpi_recv (starpu_data_handle @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, MPI_Status *@var{status})
  57. @end deftypefun
  58. @deftypefun int starpu_mpi_isend (starpu_data_handle @var{data_handle}, starpu_mpi_req *@var{req}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm})
  59. @end deftypefun
  60. @deftypefun int starpu_mpi_irecv (starpu_data_handle @var{data_handle}, starpu_mpi_req *@var{req}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm})
  61. @end deftypefun
  62. @deftypefun int starpu_mpi_isend_detached (starpu_data_handle @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm}, void (*@var{callback})(void *), void *@var{arg})
  63. @end deftypefun
  64. @deftypefun int starpu_mpi_irecv_detached (starpu_data_handle @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, void (*@var{callback})(void *), void *@var{arg})
  65. @end deftypefun
  66. @deftypefun int starpu_mpi_wait (starpu_mpi_req *@var{req}, MPI_Status *@var{status})
  67. @end deftypefun
  68. @deftypefun int starpu_mpi_test (starpu_mpi_req *@var{req}, int *@var{flag}, MPI_Status *@var{status})
  69. @end deftypefun
  70. @deftypefun int starpu_mpi_barrier (MPI_Comm @var{comm})
  71. @end deftypefun
  72. @deftypefun int starpu_mpi_isend_detached_unlock_tag (starpu_data_handle @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm}, starpu_tag @var{tag})
  73. When the transfer is completed, the tag is unlocked
  74. @end deftypefun
  75. @deftypefun int starpu_mpi_irecv_detached_unlock_tag (starpu_data_handle @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, starpu_tag @var{tag})
  76. @end deftypefun
  77. @deftypefun int starpu_mpi_isend_array_detached_unlock_tag (unsigned @var{array_size}, starpu_data_handle *@var{data_handle}, int *@var{dest}, int *@var{mpi_tag}, MPI_Comm *@var{comm}, starpu_tag @var{tag})
  78. Asynchronously send an array of buffers, and unlocks the tag once all
  79. of them are transmitted.
  80. @end deftypefun
  81. @deftypefun int starpu_mpi_irecv_array_detached_unlock_tag (unsigned @var{array_size}, starpu_data_handle *@var{data_handle}, int *@var{source}, int *@var{mpi_tag}, MPI_Comm *@var{comm}, starpu_tag @var{tag})
  82. @end deftypefun
  83. @page
  84. @node Simple Example
  85. @section Simple Example
  86. @cartouche
  87. @smallexample
  88. void increment_token(void)
  89. @{
  90. struct starpu_task *task = starpu_task_create();
  91. task->cl = &increment_cl;
  92. task->buffers[0].handle = token_handle;
  93. task->buffers[0].mode = STARPU_RW;
  94. starpu_task_submit(task);
  95. @}
  96. @end smallexample
  97. @end cartouche
  98. @cartouche
  99. @smallexample
  100. int main(int argc, char **argv)
  101. @{
  102. int rank, size;
  103. starpu_init(NULL);
  104. starpu_mpi_initialize_extended(&rank, &size);
  105. starpu_vector_data_register(&token_handle, 0, (uintptr_t)&token, 1, sizeof(unsigned));
  106. unsigned nloops = NITER;
  107. unsigned loop;
  108. unsigned last_loop = nloops - 1;
  109. unsigned last_rank = size - 1;
  110. @end smallexample
  111. @end cartouche
  112. @cartouche
  113. @smallexample
  114. for (loop = 0; loop < nloops; loop++) @{
  115. int tag = loop*size + rank;
  116. if (loop == 0 && rank == 0)
  117. @{
  118. token = 0;
  119. fprintf(stdout, "Start with token value %d\n", token);
  120. @}
  121. else
  122. @{
  123. starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag,
  124. MPI_COMM_WORLD, NULL, NULL);
  125. @}
  126. increment_token();
  127. if (loop == last_loop && rank == last_rank)
  128. @{
  129. starpu_data_acquire(token_handle, STARPU_R);
  130. fprintf(stdout, "Finished : token value %d\n", token);
  131. starpu_data_release(token_handle);
  132. @}
  133. else
  134. @{
  135. starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1,
  136. MPI_COMM_WORLD, NULL, NULL);
  137. @}
  138. @}
  139. starpu_task_wait_for_all();
  140. @end smallexample
  141. @end cartouche
  142. @cartouche
  143. @smallexample
  144. starpu_mpi_shutdown();
  145. starpu_shutdown();
  146. if (rank == last_rank)
  147. @{
  148. fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size);
  149. STARPU_ASSERT(token == nloops*size);
  150. @}
  151. @end smallexample
  152. @end cartouche
  153. @page
  154. @node MPI Insert Task Utility
  155. @section MPI Insert Task Utility
  156. To save the programmer from having to explicit all communications, StarPU
  157. provides an "MPI Insert Task Utility". The principe is that the application
  158. decides a distribution of the data over the MPI nodes by allocating it and
  159. notifying StarPU of that decision, i.e. tell StarPU which MPI node "owns" which
  160. data. All MPI nodes then process the whole task graph, and StarPU automatically
  161. determines which node actually execute which task, as well as the required MPI
  162. transfers.
  163. @deftypefun int starpu_data_set_rank (starpu_data_handle @var{handle}, int @var{mpi_rank})
  164. Tell StarPU-MPI which MPI node "owns" a given data, that is, the node which will
  165. always keep an up-to-date value, and will by default execute tasks which write
  166. to it.
  167. @end deftypefun
  168. @deftypefun int starpu_data_get_rank (starpu_data_handle @var{handle})
  169. Returns the last value set by @code{starpu_data_set_rank}.
  170. @end deftypefun
  171. @deftypefun void starpu_mpi_insert_task (MPI_Comm @var{comm}, starpu_codelet *@var{cl}, ...)
  172. Create and submit a task corresponding to @var{cl} with the following
  173. arguments. The argument list must be zero-terminated.
  174. The arguments following the codelets are the same types as for the
  175. function @code{starpu_insert_task} defined in @ref{Insert Task
  176. Utility}. The extra argument @code{STARPU_EXECUTE_ON_NODE} followed by an
  177. integer allows to specify the MPI node to execute the codelet. It is also
  178. possible to specify that the node owning a specific data will execute
  179. the codelet, by using @code{STARPU_EXECUTE_ON_DATA} followed by a data
  180. handle.
  181. The internal algorithm is as follows:
  182. @enumerate
  183. @item Find out whether we (as an MPI node) are to execute the codelet
  184. because we own the data to be written to. If different nodes own data
  185. to be written to, the argument @code{STARPU_EXECUTE_ON_NODE} or
  186. @code{STARPU_EXECUTE_ON_DATA} has to be used to specify which MPI node will
  187. execute the task.
  188. @item Send and receive data as requested. Nodes owning data which need to be
  189. read by the task are sending them to the MPI node which will execute it. The
  190. latter receives them.
  191. @item Execute the codelet. This is done by the MPI node selected in the
  192. 1st step of the algorithm.
  193. @item In the case when different MPI nodes own data to be written to, send
  194. written data back to their owners.
  195. @end enumerate
  196. The algorithm also includes a cache mechanism that allows not to send
  197. data twice to the same MPI node, unless the data has been modified.
  198. @end deftypefun
  199. @deftypefun void starpu_mpi_get_data_on_node (MPI_Comm @var{comm}, starpu_data_handle @var{data_handle}, int @var{node})
  200. @end deftypefun
  201. @page
  202. Here an stencil example showing how to use @code{starpu_mpi_insert_task}. One
  203. first needs to define a distribution function which specifies the
  204. locality of the data. Note that that distribution information needs to
  205. be given to StarPU by calling @code{starpu_data_set_rank}.
  206. @cartouche
  207. @smallexample
  208. /* Returns the MPI node number where data is */
  209. int my_distrib(int x, int y, int nb_nodes) @{
  210. /* Block distrib */
  211. return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;
  212. // /* Other examples useful for other kinds of computations */
  213. // /* / distrib */
  214. // return (x+y) % nb_nodes;
  215. // /* Block cyclic distrib */
  216. // unsigned side = sqrt(nb_nodes);
  217. // return x % side + (y % side) * size;
  218. @}
  219. @end smallexample
  220. @end cartouche
  221. Now the data can be registered within StarPU. Data which are not
  222. owned but will be needed for computations can be registered through
  223. the lazy allocation mechanism, i.e. with a @code{home_node} set to -1.
  224. StarPU will automatically allocate the memory when it is used for the
  225. first time.
  226. One can note an optimization here (the @code{else if} test): we only register
  227. data which will be needed by the tasks that we will execute.
  228. @cartouche
  229. @smallexample
  230. unsigned matrix[X][Y];
  231. starpu_data_handle data_handles[X][Y];
  232. for(x = 0; x < X; x++) @{
  233. for (y = 0; y < Y; y++) @{
  234. int mpi_rank = my_distrib(x, y, size);
  235. if (mpi_rank == my_rank)
  236. /* Owning data */
  237. starpu_variable_data_register(&data_handles[x][y], 0,
  238. (uintptr_t)&(matrix[x][y]), sizeof(unsigned));
  239. else if (my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
  240. || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size))
  241. /* I don't own that index, but will need it for my computations */
  242. starpu_variable_data_register(&data_handles[x][y], -1,
  243. (uintptr_t)NULL, sizeof(unsigned));
  244. else
  245. /* I know it's useless to allocate anything for this */
  246. data_handles[x][y] = NULL;
  247. if (data_handles[x][y])
  248. starpu_data_set_rank(data_handles[x][y], mpi_rank);
  249. @}
  250. @}
  251. @end smallexample
  252. @end cartouche
  253. Now @code{starpu_mpi_insert_task()} can be called for the different
  254. steps of the application.
  255. @cartouche
  256. @smallexample
  257. for(loop=0 ; loop<niter; loop++)
  258. for (x = 1; x < X-1; x++)
  259. for (y = 1; y < Y-1; y++)
  260. starpu_mpi_insert_task(MPI_COMM_WORLD, &stencil5_cl,
  261. STARPU_RW, data_handles[x][y],
  262. STARPU_R, data_handles[x-1][y],
  263. STARPU_R, data_handles[x+1][y],
  264. STARPU_R, data_handles[x][y-1],
  265. STARPU_R, data_handles[x][y+1],
  266. 0);
  267. starpu_task_wait_for_all();
  268. @end smallexample
  269. @end cartouche
  270. I.e. all MPI nodes process the whole task graph, but as mentioned above, for
  271. each task, only the MPI node which owns the data being written to (here,
  272. @code{data_handles[x][y]}) will actually run the task. The other MPI nodes will
  273. automatically send the required data.
  274. @node MPI Collective Operations
  275. @section MPI Collective Operations
  276. @deftypefun int starpu_mpi_scatter_detached (starpu_data_handle *@var{data_handles}, int @var{count}, int @var{root}, MPI_Comm @var{comm})
  277. Scatter data among processes of the communicator based on the ownership of
  278. the data. For each data of the array @var{data_handles}, the
  279. process @var{root} sends the data to the process owning this data.
  280. Processes receiving data must have valid data handles to receive them.
  281. @end deftypefun
  282. @deftypefun int starpu_mpi_gather_detached (starpu_data_handle *@var{data_handles}, int @var{count}, int @var{root}, MPI_Comm @var{comm})
  283. Gather data from the different processes of the communicator onto the
  284. process @var{root}. Each process owning data handle in the array
  285. @var{data_handles} will send them to the process @var{root}. The
  286. process @var{root} must have valid data handles to receive the data.
  287. @end deftypefun
  288. @page
  289. @cartouche
  290. @smallexample
  291. if (rank == root)
  292. @{
  293. /* Allocate the vector */
  294. vector = malloc(nblocks * sizeof(float *));
  295. for(x=0 ; x<nblocks ; x++)
  296. @{
  297. starpu_malloc((void **)&vector[x], block_size*sizeof(float));
  298. @}
  299. @}
  300. /* Allocate data handles and register data to StarPU */
  301. data_handles = malloc(nblocks*sizeof(starpu_data_handle *));
  302. for(x = 0; x < nblocks ; x++)
  303. @{
  304. int mpi_rank = my_distrib(x, nodes);
  305. if (rank == root) @{
  306. starpu_vector_data_register(&data_handles[x], 0, (uintptr_t)vector[x],
  307. blocks_size, sizeof(float));
  308. @}
  309. else if ((mpi_rank == rank) || ((rank == mpi_rank+1 || rank == mpi_rank-1))) @{
  310. /* I own that index, or i will need it for my computations */
  311. starpu_vector_data_register(&data_handles[x], -1, (uintptr_t)NULL,
  312. block_size, sizeof(float));
  313. @}
  314. else @{
  315. /* I know it's useless to allocate anything for this */
  316. data_handles[x] = NULL;
  317. @}
  318. if (data_handles[x]) @{
  319. starpu_data_set_rank(data_handles[x], mpi_rank);
  320. @}
  321. @}
  322. /* Scatter the matrix among the nodes */
  323. starpu_mpi_scatter_detached(data_handles, nblocks, root, MPI_COMM_WORLD);
  324. /* Calculation */
  325. for(x = 0; x < nblocks ; x++) @{
  326. if (data_handles[x]) @{
  327. int owner = starpu_data_get_rank(data_handles[x]);
  328. if (owner == rank) @{
  329. starpu_insert_task(&cl, STARPU_RW, data_handles[x], 0);
  330. @}
  331. @}
  332. @}
  333. /* Gather the matrix on main node */
  334. starpu_mpi_gather_detached(data_handles, nblocks, 0, MPI_COMM_WORLD);
  335. @end smallexample
  336. @end cartouche