mpi-support.texi 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. @c -*-texinfo-*-
  2. @c This file is part of the StarPU Handbook.
  3. @c Copyright (C) 2009--2011 Universit@'e de Bordeaux 1
  4. @c Copyright (C) 2010, 2011 Centre National de la Recherche Scientifique
  5. @c Copyright (C) 2011 Institut National de Recherche en Informatique et Automatique
  6. @c See the file starpu.texi for copying conditions.
  7. @node StarPU MPI support
  8. @chapter StarPU MPI support
  9. The integration of MPI transfers within task parallelism is done in a
  10. very natural way by the means of asynchronous interactions between the
  11. application and StarPU. This is implemented in a separate libstarpumpi library
  12. which basically provides "StarPU" equivalents of @code{MPI_*} functions, where
  13. @code{void *} buffers are replaced with @code{starpu_data_handle_t}s, and all
  14. GPU-RAM-NIC transfers are handled efficiently by StarPU-MPI. The user has to
  15. use the usual @code{mpirun} command of the MPI implementation to start StarPU on
  16. the different MPI nodes.
  17. An MPI Insert Task function provides an even more seamless transition to a
  18. distributed application, by automatically issuing all required data transfers
  19. according to the task graph and an application-provided distribution.
  20. @menu
  21. * The API::
  22. * Simple Example::
  23. * MPI Insert Task Utility::
  24. * MPI Collective Operations::
  25. @end menu
  26. @node The API
  27. @section The API
  28. @subsection Compilation
  29. The flags required to compile or link against the MPI layer are then
  30. accessible with the following commands:
  31. @example
  32. % pkg-config --cflags libstarpumpi # options for the compiler
  33. % pkg-config --libs libstarpumpi # options for the linker
  34. @end example
  35. @subsection Initialisation
  36. @deftypefun int starpu_mpi_initialize (void)
  37. Initializes the starpumpi library. This must be called between calling
  38. @code{starpu_init} and other @code{starpu_mpi} functions. This
  39. function does not call @code{MPI_Init}, it should be called beforehand.
  40. @end deftypefun
  41. @deftypefun int starpu_mpi_initialize_extended (int *@var{rank}, int *@var{world_size})
  42. Initializes the starpumpi library. This must be called between calling
  43. @code{starpu_init} and other @code{starpu_mpi} functions.
  44. This function calls @code{MPI_Init}, and therefore should be prefered
  45. to the previous one for MPI implementations which are not thread-safe.
  46. Returns the current MPI node rank and world size.
  47. @end deftypefun
  48. @deftypefun int starpu_mpi_shutdown (void)
  49. Cleans the starpumpi library. This must be called between calling
  50. @code{starpu_mpi} functions and @code{starpu_shutdown}.
  51. @code{MPI_Finalize} will be called if StarPU-MPI has been initialized
  52. by calling @code{starpu_mpi_initialize_extended}.
  53. @end deftypefun
  54. @subsection Communication
  55. @deftypefun int starpu_mpi_send (starpu_data_handle_t @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm})
  56. @end deftypefun
  57. @deftypefun int starpu_mpi_recv (starpu_data_handle_t @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, MPI_Status *@var{status})
  58. @end deftypefun
  59. @deftypefun int starpu_mpi_isend (starpu_data_handle_t @var{data_handle}, starpu_mpi_req *@var{req}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm})
  60. @end deftypefun
  61. @deftypefun int starpu_mpi_irecv (starpu_data_handle_t @var{data_handle}, starpu_mpi_req *@var{req}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm})
  62. @end deftypefun
  63. @deftypefun int starpu_mpi_isend_detached (starpu_data_handle_t @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm}, void (*@var{callback})(void *), void *@var{arg})
  64. @end deftypefun
  65. @deftypefun int starpu_mpi_irecv_detached (starpu_data_handle_t @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, void (*@var{callback})(void *), void *@var{arg})
  66. @end deftypefun
  67. @deftypefun int starpu_mpi_wait (starpu_mpi_req *@var{req}, MPI_Status *@var{status})
  68. @end deftypefun
  69. @deftypefun int starpu_mpi_test (starpu_mpi_req *@var{req}, int *@var{flag}, MPI_Status *@var{status})
  70. @end deftypefun
  71. @deftypefun int starpu_mpi_barrier (MPI_Comm @var{comm})
  72. @end deftypefun
  73. @deftypefun int starpu_mpi_isend_detached_unlock_tag (starpu_data_handle_t @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm}, starpu_tag_t @var{tag})
  74. When the transfer is completed, the tag is unlocked
  75. @end deftypefun
  76. @deftypefun int starpu_mpi_irecv_detached_unlock_tag (starpu_data_handle_t @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, starpu_tag_t @var{tag})
  77. @end deftypefun
  78. @deftypefun int starpu_mpi_isend_array_detached_unlock_tag (unsigned @var{array_size}, starpu_data_handle_t *@var{data_handle}, int *@var{dest}, int *@var{mpi_tag}, MPI_Comm *@var{comm}, starpu_tag_t @var{tag})
  79. Asynchronously send an array of buffers, and unlocks the tag once all
  80. of them are transmitted.
  81. @end deftypefun
  82. @deftypefun int starpu_mpi_irecv_array_detached_unlock_tag (unsigned @var{array_size}, starpu_data_handle_t *@var{data_handle}, int *@var{source}, int *@var{mpi_tag}, MPI_Comm *@var{comm}, starpu_tag_t @var{tag})
  83. @end deftypefun
  84. @page
  85. @node Simple Example
  86. @section Simple Example
  87. @cartouche
  88. @smallexample
  89. void increment_token(void)
  90. @{
  91. struct starpu_task *task = starpu_task_create();
  92. task->cl = &increment_cl;
  93. task->buffers[0].handle = token_handle;
  94. task->buffers[0].mode = STARPU_RW;
  95. starpu_task_submit(task);
  96. @}
  97. @end smallexample
  98. @end cartouche
  99. @cartouche
  100. @smallexample
  101. int main(int argc, char **argv)
  102. @{
  103. int rank, size;
  104. starpu_init(NULL);
  105. starpu_mpi_initialize_extended(&rank, &size);
  106. starpu_vector_data_register(&token_handle, 0, (uintptr_t)&token, 1, sizeof(unsigned));
  107. unsigned nloops = NITER;
  108. unsigned loop;
  109. unsigned last_loop = nloops - 1;
  110. unsigned last_rank = size - 1;
  111. @end smallexample
  112. @end cartouche
  113. @cartouche
  114. @smallexample
  115. for (loop = 0; loop < nloops; loop++) @{
  116. int tag = loop*size + rank;
  117. if (loop == 0 && rank == 0)
  118. @{
  119. token = 0;
  120. fprintf(stdout, "Start with token value %d\n", token);
  121. @}
  122. else
  123. @{
  124. starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag,
  125. MPI_COMM_WORLD, NULL, NULL);
  126. @}
  127. increment_token();
  128. if (loop == last_loop && rank == last_rank)
  129. @{
  130. starpu_data_acquire(token_handle, STARPU_R);
  131. fprintf(stdout, "Finished : token value %d\n", token);
  132. starpu_data_release(token_handle);
  133. @}
  134. else
  135. @{
  136. starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1,
  137. MPI_COMM_WORLD, NULL, NULL);
  138. @}
  139. @}
  140. starpu_task_wait_for_all();
  141. @end smallexample
  142. @end cartouche
  143. @cartouche
  144. @smallexample
  145. starpu_mpi_shutdown();
  146. starpu_shutdown();
  147. if (rank == last_rank)
  148. @{
  149. fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size);
  150. STARPU_ASSERT(token == nloops*size);
  151. @}
  152. @end smallexample
  153. @end cartouche
  154. @page
  155. @node MPI Insert Task Utility
  156. @section MPI Insert Task Utility
  157. To save the programmer from having to explicit all communications, StarPU
  158. provides an "MPI Insert Task Utility". The principe is that the application
  159. decides a distribution of the data over the MPI nodes by allocating it and
  160. notifying StarPU of that decision, i.e. tell StarPU which MPI node "owns" which
  161. data. All MPI nodes then process the whole task graph, and StarPU automatically
  162. determines which node actually execute which task, as well as the required MPI
  163. transfers.
  164. @deftypefun int starpu_data_set_tag (starpu_data_handle_t @var{handle}, int @var{tag})
  165. Tell StarPU-MPI which MPI tag to use when exchanging the data.
  166. @end deftypefun
  167. @deftypefun int starpu_data_get_tag (starpu_data_handle_t @var{handle})
  168. Returns the MPI tag to be used when exchanging the data.
  169. @end deftypefun
  170. @deftypefun int starpu_data_set_rank (starpu_data_handle_t @var{handle}, int @var{mpi_rank})
  171. Tell StarPU-MPI which MPI node "owns" a given data, that is, the node which will
  172. always keep an up-to-date value, and will by default execute tasks which write
  173. to it.
  174. @end deftypefun
  175. @deftypefun int starpu_data_get_rank (starpu_data_handle_t @var{handle})
  176. Returns the last value set by @code{starpu_data_set_rank}.
  177. @end deftypefun
  178. @deftypefun void starpu_mpi_insert_task (MPI_Comm @var{comm}, starpu_codelet *@var{cl}, ...)
  179. Create and submit a task corresponding to @var{cl} with the following
  180. arguments. The argument list must be zero-terminated.
  181. The arguments following the codelets are the same types as for the
  182. function @code{starpu_insert_task} defined in @ref{Insert Task
  183. Utility}. The extra argument @code{STARPU_EXECUTE_ON_NODE} followed by an
  184. integer allows to specify the MPI node to execute the codelet. It is also
  185. possible to specify that the node owning a specific data will execute
  186. the codelet, by using @code{STARPU_EXECUTE_ON_DATA} followed by a data
  187. handle.
  188. The internal algorithm is as follows:
  189. @enumerate
  190. @item Find out whether we (as an MPI node) are to execute the codelet
  191. because we own the data to be written to. If different nodes own data
  192. to be written to, the argument @code{STARPU_EXECUTE_ON_NODE} or
  193. @code{STARPU_EXECUTE_ON_DATA} has to be used to specify which MPI node will
  194. execute the task.
  195. @item Send and receive data as requested. Nodes owning data which need to be
  196. read by the task are sending them to the MPI node which will execute it. The
  197. latter receives them.
  198. @item Execute the codelet. This is done by the MPI node selected in the
  199. 1st step of the algorithm.
  200. @item In the case when different MPI nodes own data to be written to, send
  201. written data back to their owners.
  202. @end enumerate
  203. The algorithm also includes a cache mechanism that allows not to send
  204. data twice to the same MPI node, unless the data has been modified.
  205. @end deftypefun
  206. @deftypefun void starpu_mpi_get_data_on_node (MPI_Comm @var{comm}, starpu_data_handle_t @var{data_handle}, int @var{node})
  207. todo
  208. @end deftypefun
  209. Here an stencil example showing how to use @code{starpu_mpi_insert_task}. One
  210. first needs to define a distribution function which specifies the
  211. locality of the data. Note that that distribution information needs to
  212. be given to StarPU by calling @code{starpu_data_set_rank}.
  213. @cartouche
  214. @smallexample
  215. /* Returns the MPI node number where data is */
  216. int my_distrib(int x, int y, int nb_nodes) @{
  217. /* Block distrib */
  218. return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;
  219. // /* Other examples useful for other kinds of computations */
  220. // /* / distrib */
  221. // return (x+y) % nb_nodes;
  222. // /* Block cyclic distrib */
  223. // unsigned side = sqrt(nb_nodes);
  224. // return x % side + (y % side) * size;
  225. @}
  226. @end smallexample
  227. @end cartouche
  228. Now the data can be registered within StarPU. Data which are not
  229. owned but will be needed for computations can be registered through
  230. the lazy allocation mechanism, i.e. with a @code{home_node} set to -1.
  231. StarPU will automatically allocate the memory when it is used for the
  232. first time.
  233. One can note an optimization here (the @code{else if} test): we only register
  234. data which will be needed by the tasks that we will execute.
  235. @cartouche
  236. @smallexample
  237. unsigned matrix[X][Y];
  238. starpu_data_handle_t data_handles[X][Y];
  239. for(x = 0; x < X; x++) @{
  240. for (y = 0; y < Y; y++) @{
  241. int mpi_rank = my_distrib(x, y, size);
  242. if (mpi_rank == my_rank)
  243. /* Owning data */
  244. starpu_variable_data_register(&data_handles[x][y], 0,
  245. (uintptr_t)&(matrix[x][y]), sizeof(unsigned));
  246. else if (my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
  247. || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size))
  248. /* I don't own that index, but will need it for my computations */
  249. starpu_variable_data_register(&data_handles[x][y], -1,
  250. (uintptr_t)NULL, sizeof(unsigned));
  251. else
  252. /* I know it's useless to allocate anything for this */
  253. data_handles[x][y] = NULL;
  254. if (data_handles[x][y])
  255. starpu_data_set_rank(data_handles[x][y], mpi_rank);
  256. @}
  257. @}
  258. @end smallexample
  259. @end cartouche
  260. Now @code{starpu_mpi_insert_task()} can be called for the different
  261. steps of the application.
  262. @cartouche
  263. @smallexample
  264. for(loop=0 ; loop<niter; loop++)
  265. for (x = 1; x < X-1; x++)
  266. for (y = 1; y < Y-1; y++)
  267. starpu_mpi_insert_task(MPI_COMM_WORLD, &stencil5_cl,
  268. STARPU_RW, data_handles[x][y],
  269. STARPU_R, data_handles[x-1][y],
  270. STARPU_R, data_handles[x+1][y],
  271. STARPU_R, data_handles[x][y-1],
  272. STARPU_R, data_handles[x][y+1],
  273. 0);
  274. starpu_task_wait_for_all();
  275. @end smallexample
  276. @end cartouche
  277. I.e. all MPI nodes process the whole task graph, but as mentioned above, for
  278. each task, only the MPI node which owns the data being written to (here,
  279. @code{data_handles[x][y]}) will actually run the task. The other MPI nodes will
  280. automatically send the required data.
  281. @node MPI Collective Operations
  282. @section MPI Collective Operations
  283. @deftypefun int starpu_mpi_scatter_detached (starpu_data_handle_t *@var{data_handles}, int @var{count}, int @var{root}, MPI_Comm @var{comm})
  284. Scatter data among processes of the communicator based on the ownership of
  285. the data. For each data of the array @var{data_handles}, the
  286. process @var{root} sends the data to the process owning this data.
  287. Processes receiving data must have valid data handles to receive them.
  288. @end deftypefun
  289. @deftypefun int starpu_mpi_gather_detached (starpu_data_handle_t *@var{data_handles}, int @var{count}, int @var{root}, MPI_Comm @var{comm})
  290. Gather data from the different processes of the communicator onto the
  291. process @var{root}. Each process owning data handle in the array
  292. @var{data_handles} will send them to the process @var{root}. The
  293. process @var{root} must have valid data handles to receive the data.
  294. @end deftypefun
  295. @page
  296. @cartouche
  297. @smallexample
  298. if (rank == root)
  299. @{
  300. /* Allocate the vector */
  301. vector = malloc(nblocks * sizeof(float *));
  302. for(x=0 ; x<nblocks ; x++)
  303. @{
  304. starpu_malloc((void **)&vector[x], block_size*sizeof(float));
  305. @}
  306. @}
  307. /* Allocate data handles and register data to StarPU */
  308. data_handles = malloc(nblocks*sizeof(starpu_data_handle_t *));
  309. for(x = 0; x < nblocks ; x++)
  310. @{
  311. int mpi_rank = my_distrib(x, nodes);
  312. if (rank == root) @{
  313. starpu_vector_data_register(&data_handles[x], 0, (uintptr_t)vector[x],
  314. blocks_size, sizeof(float));
  315. @}
  316. else if ((mpi_rank == rank) || ((rank == mpi_rank+1 || rank == mpi_rank-1))) @{
  317. /* I own that index, or i will need it for my computations */
  318. starpu_vector_data_register(&data_handles[x], -1, (uintptr_t)NULL,
  319. block_size, sizeof(float));
  320. @}
  321. else @{
  322. /* I know it's useless to allocate anything for this */
  323. data_handles[x] = NULL;
  324. @}
  325. if (data_handles[x]) @{
  326. starpu_data_set_rank(data_handles[x], mpi_rank);
  327. @}
  328. @}
  329. /* Scatter the matrix among the nodes */
  330. starpu_mpi_scatter_detached(data_handles, nblocks, root, MPI_COMM_WORLD);
  331. /* Calculation */
  332. for(x = 0; x < nblocks ; x++) @{
  333. if (data_handles[x]) @{
  334. int owner = starpu_data_get_rank(data_handles[x]);
  335. if (owner == rank) @{
  336. starpu_insert_task(&cl, STARPU_RW, data_handles[x], 0);
  337. @}
  338. @}
  339. @}
  340. /* Gather the matrix on main node */
  341. starpu_mpi_gather_detached(data_handles, nblocks, 0, MPI_COMM_WORLD);
  342. @end smallexample
  343. @end cartouche