mpi-support.texi 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477
  1. @c -*-texinfo-*-
  2. @c This file is part of the StarPU Handbook.
  3. @c Copyright (C) 2009--2011 Universit@'e de Bordeaux 1
  4. @c Copyright (C) 2010, 2011, 2012 Centre National de la Recherche Scientifique
  5. @c Copyright (C) 2011 Institut National de Recherche en Informatique et Automatique
  6. @c See the file starpu.texi for copying conditions.
  7. The integration of MPI transfers within task parallelism is done in a
  8. very natural way by the means of asynchronous interactions between the
  9. application and StarPU. This is implemented in a separate libstarpumpi library
  10. which basically provides "StarPU" equivalents of @code{MPI_*} functions, where
  11. @code{void *} buffers are replaced with @code{starpu_data_handle_t}s, and all
  12. GPU-RAM-NIC transfers are handled efficiently by StarPU-MPI. The user has to
  13. use the usual @code{mpirun} command of the MPI implementation to start StarPU on
  14. the different MPI nodes.
  15. An MPI Insert Task function provides an even more seamless transition to a
  16. distributed application, by automatically issuing all required data transfers
  17. according to the task graph and an application-provided distribution.
  18. @menu
  19. * The API::
  20. * Simple Example::
  21. * MPI Insert Task Utility::
  22. * MPI Collective Operations::
  23. @end menu
  24. @node The API
  25. @section The API
  26. @subsection Compilation
  27. The flags required to compile or link against the MPI layer are then
  28. accessible with the following commands:
  29. @example
  30. % pkg-config --cflags starpumpi-1.0 # options for the compiler
  31. % pkg-config --libs starpumpi-1.0 # options for the linker
  32. @end example
  33. Also pass the @code{--static} option if the application is to be linked statically.
  34. @subsection Initialisation
  35. @deftypefun int starpu_mpi_initialize (void)
  36. Initializes the starpumpi library. This must be called between calling
  37. @code{starpu_init} and other @code{starpu_mpi} functions. This
  38. function does not call @code{MPI_Init}, it should be called beforehand.
  39. @end deftypefun
  40. @deftypefun int starpu_mpi_initialize_extended (int *@var{rank}, int *@var{world_size})
  41. Initializes the starpumpi library. This must be called between calling
  42. @code{starpu_init} and other @code{starpu_mpi} functions.
  43. This function calls @code{MPI_Init}, and therefore should be prefered
  44. to the previous one for MPI implementations which are not thread-safe.
  45. Returns the current MPI node rank and world size.
  46. @end deftypefun
  47. @deftypefun int starpu_mpi_shutdown (void)
  48. Cleans the starpumpi library. This must be called between calling
  49. @code{starpu_mpi} functions and @code{starpu_shutdown}.
  50. @code{MPI_Finalize} will be called if StarPU-MPI has been initialized
  51. by calling @code{starpu_mpi_initialize_extended}.
  52. @end deftypefun
  53. @subsection Communication
  54. The standard point to point communications of MPI have been
  55. implemented. The semantic is similar to the MPI one, but adapted to
  56. the DSM provided by StarPU. A MPI request will only be submitted when
  57. the data is available in the main memory of the node submitting the
  58. request.
  59. @deftypefun int starpu_mpi_send (starpu_data_handle_t @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm})
  60. Performs a standard-mode, blocking send of @var{data_handle} to the
  61. node @var{dest} using the message tag @code{mpi_tag} within the
  62. communicator @var{comm}.
  63. @end deftypefun
  64. @deftypefun int starpu_mpi_recv (starpu_data_handle_t @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, MPI_Status *@var{status})
  65. Performs a standard-mode, blocking receive in @var{data_handle} from the
  66. node @var{source} using the message tag @code{mpi_tag} within the
  67. communicator @var{comm}.
  68. @end deftypefun
  69. @deftypefun int starpu_mpi_isend (starpu_data_handle_t @var{data_handle}, starpu_mpi_req *@var{req}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm})
  70. Posts a standard-mode, non blocking send of @var{data_handle} to the
  71. node @var{dest} using the message tag @code{mpi_tag} within the
  72. communicator @var{comm}. After the call, the pointer to the request
  73. @var{req} can be used to test the completion of the communication.
  74. @end deftypefun
  75. @deftypefun int starpu_mpi_irecv (starpu_data_handle_t @var{data_handle}, starpu_mpi_req *@var{req}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm})
  76. Posts a nonblocking receive in @var{data_handle} from the
  77. node @var{source} using the message tag @code{mpi_tag} within the
  78. communicator @var{comm}. After the call, the pointer to the request
  79. @var{req} can be used to test the completion of the communication.
  80. @end deftypefun
  81. @deftypefun int starpu_mpi_isend_detached (starpu_data_handle_t @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm}, void (*@var{callback})(void *), void *@var{arg})
  82. Posts a standard-mode, non blocking send of @var{data_handle} to the
  83. node @var{dest} using the message tag @code{mpi_tag} within the
  84. communicator @var{comm}. On completion, the @var{callback} function is
  85. called with the argument @var{arg}.
  86. @end deftypefun
  87. @deftypefun int starpu_mpi_irecv_detached (starpu_data_handle_t @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, void (*@var{callback})(void *), void *@var{arg})
  88. Posts a nonblocking receive in @var{data_handle} from the
  89. node @var{source} using the message tag @code{mpi_tag} within the
  90. communicator @var{comm}. On completion, the @var{callback} function is
  91. called with the argument @var{arg}.
  92. @end deftypefun
  93. @deftypefun int starpu_mpi_wait (starpu_mpi_req *@var{req}, MPI_Status *@var{status})
  94. Returns when the operation identified by request @var{req} is complete.
  95. @end deftypefun
  96. @deftypefun int starpu_mpi_test (starpu_mpi_req *@var{req}, int *@var{flag}, MPI_Status *@var{status})
  97. If the operation identified by @var{req} is complete, set @var{flag}
  98. to 1. The @var{status} object is set to contain information on the
  99. completed operation.
  100. @end deftypefun
  101. @deftypefun int starpu_mpi_barrier (MPI_Comm @var{comm})
  102. Blocks the caller until all group members of the communicator
  103. @var{comm} have called it.
  104. @end deftypefun
  105. @deftypefun int starpu_mpi_isend_detached_unlock_tag (starpu_data_handle_t @var{data_handle}, int @var{dest}, int @var{mpi_tag}, MPI_Comm @var{comm}, starpu_tag_t @var{tag})
  106. Posts a standard-mode, non blocking send of @var{data_handle} to the
  107. node @var{dest} using the message tag @code{mpi_tag} within the
  108. communicator @var{comm}. On completion, @var{tag} is unlocked.
  109. @end deftypefun
  110. @deftypefun int starpu_mpi_irecv_detached_unlock_tag (starpu_data_handle_t @var{data_handle}, int @var{source}, int @var{mpi_tag}, MPI_Comm @var{comm}, starpu_tag_t @var{tag})
  111. Posts a nonblocking receive in @var{data_handle} from the
  112. node @var{source} using the message tag @code{mpi_tag} within the
  113. communicator @var{comm}. On completion, @var{tag} is unlocked.
  114. @end deftypefun
  115. @deftypefun int starpu_mpi_isend_array_detached_unlock_tag (unsigned @var{array_size}, starpu_data_handle_t *@var{data_handle}, int *@var{dest}, int *@var{mpi_tag}, MPI_Comm *@var{comm}, starpu_tag_t @var{tag})
  116. Posts @var{array_size} standard-mode, non blocking send of the data of
  117. data @var{data_handle[x]} to the node @var{dest[x]} using the message
  118. tag @code{mpi_tag[x]} within the communicator @var{comm[x]}. On
  119. completion of the all the requests, @var{tag} is unlocked.
  120. @end deftypefun
  121. @deftypefun int starpu_mpi_irecv_array_detached_unlock_tag (unsigned @var{array_size}, starpu_data_handle_t *@var{data_handle}, int *@var{source}, int *@var{mpi_tag}, MPI_Comm *@var{comm}, starpu_tag_t @var{tag})
  122. Posts @var{array_size} nonblocking receive in @var{data_handle[x]} from the
  123. node @var{source[x]} using the message tag @code{mpi_tag[x]} within the
  124. communicator @var{comm[x]}. On completion of the all the requests,
  125. @var{tag} is unlocked.
  126. @end deftypefun
  127. @page
  128. @node Simple Example
  129. @section Simple Example
  130. @cartouche
  131. @smallexample
  132. void increment_token(void)
  133. @{
  134. struct starpu_task *task = starpu_task_create();
  135. task->cl = &increment_cl;
  136. task->handles[0] = token_handle;
  137. starpu_task_submit(task);
  138. @}
  139. @end smallexample
  140. @end cartouche
  141. @cartouche
  142. @smallexample
  143. int main(int argc, char **argv)
  144. @{
  145. int rank, size;
  146. starpu_init(NULL);
  147. starpu_mpi_initialize_extended(&rank, &size);
  148. starpu_vector_data_register(&token_handle, 0, (uintptr_t)&token, 1, sizeof(unsigned));
  149. unsigned nloops = NITER;
  150. unsigned loop;
  151. unsigned last_loop = nloops - 1;
  152. unsigned last_rank = size - 1;
  153. @end smallexample
  154. @end cartouche
  155. @cartouche
  156. @smallexample
  157. for (loop = 0; loop < nloops; loop++) @{
  158. int tag = loop*size + rank;
  159. if (loop == 0 && rank == 0)
  160. @{
  161. token = 0;
  162. fprintf(stdout, "Start with token value %d\n", token);
  163. @}
  164. else
  165. @{
  166. starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag,
  167. MPI_COMM_WORLD, NULL, NULL);
  168. @}
  169. increment_token();
  170. if (loop == last_loop && rank == last_rank)
  171. @{
  172. starpu_data_acquire(token_handle, STARPU_R);
  173. fprintf(stdout, "Finished: token value %d\n", token);
  174. starpu_data_release(token_handle);
  175. @}
  176. else
  177. @{
  178. starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1,
  179. MPI_COMM_WORLD, NULL, NULL);
  180. @}
  181. @}
  182. starpu_task_wait_for_all();
  183. @end smallexample
  184. @end cartouche
  185. @cartouche
  186. @smallexample
  187. starpu_mpi_shutdown();
  188. starpu_shutdown();
  189. if (rank == last_rank)
  190. @{
  191. fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size);
  192. STARPU_ASSERT(token == nloops*size);
  193. @}
  194. @end smallexample
  195. @end cartouche
  196. @page
  197. @node MPI Insert Task Utility
  198. @section MPI Insert Task Utility
  199. To save the programmer from having to explicit all communications, StarPU
  200. provides an "MPI Insert Task Utility". The principe is that the application
  201. decides a distribution of the data over the MPI nodes by allocating it and
  202. notifying StarPU of that decision, i.e. tell StarPU which MPI node "owns" which
  203. data. All MPI nodes then process the whole task graph, and StarPU automatically
  204. determines which node actually execute which task, as well as the required MPI
  205. transfers.
  206. @deftypefun int starpu_data_set_tag (starpu_data_handle_t @var{handle}, int @var{tag})
  207. Tell StarPU-MPI which MPI tag to use when exchanging the data.
  208. @end deftypefun
  209. @deftypefun int starpu_data_get_tag (starpu_data_handle_t @var{handle})
  210. Returns the MPI tag to be used when exchanging the data.
  211. @end deftypefun
  212. @deftypefun int starpu_data_set_rank (starpu_data_handle_t @var{handle}, int @var{rank})
  213. Tell StarPU-MPI which MPI node "owns" a given data, that is, the node which will
  214. always keep an up-to-date value, and will by default execute tasks which write
  215. to it.
  216. @end deftypefun
  217. @deftypefun int starpu_data_get_rank (starpu_data_handle_t @var{handle})
  218. Returns the last value set by @code{starpu_data_set_rank}.
  219. @end deftypefun
  220. @defmac STARPU_EXECUTE_ON_NODE
  221. this macro is used when calling @code{starpu_mpi_insert_task}, and
  222. must be followed by a integer value which specified the node on which
  223. to execute the codelet.
  224. @end defmac
  225. @defmac STARPU_EXECUTE_ON_DATA
  226. this macro is used when calling @code{starpu_mpi_insert_task}, and
  227. must be followed by a data handle to specify that the node owning the
  228. given data will execute the codelet.
  229. @end defmac
  230. @deftypefun int starpu_mpi_insert_task (MPI_Comm @var{comm}, struct starpu_codelet *@var{codelet}, ...)
  231. Create and submit a task corresponding to @var{codelet} with the following
  232. arguments. The argument list must be zero-terminated.
  233. The arguments following the codelets are the same types as for the
  234. function @code{starpu_insert_task} defined in @ref{Insert Task
  235. Utility}. The extra argument @code{STARPU_EXECUTE_ON_NODE} followed by an
  236. integer allows to specify the MPI node to execute the codelet. It is also
  237. possible to specify that the node owning a specific data will execute
  238. the codelet, by using @code{STARPU_EXECUTE_ON_DATA} followed by a data
  239. handle.
  240. The internal algorithm is as follows:
  241. @enumerate
  242. @item Find out whether we (as an MPI node) are to execute the codelet
  243. because we own the data to be written to. If different nodes own data
  244. to be written to, the argument @code{STARPU_EXECUTE_ON_NODE} or
  245. @code{STARPU_EXECUTE_ON_DATA} has to be used to specify which MPI node will
  246. execute the task.
  247. @item Send and receive data as requested. Nodes owning data which need to be
  248. read by the task are sending them to the MPI node which will execute it. The
  249. latter receives them.
  250. @item Execute the codelet. This is done by the MPI node selected in the
  251. 1st step of the algorithm.
  252. @item In the case when different MPI nodes own data to be written to, send
  253. written data back to their owners.
  254. @end enumerate
  255. The algorithm also includes a cache mechanism that allows not to send
  256. data twice to the same MPI node, unless the data has been modified.
  257. @end deftypefun
  258. @deftypefun void starpu_mpi_get_data_on_node (MPI_Comm @var{comm}, starpu_data_handle_t @var{data_handle}, int @var{node})
  259. Transfer data @var{data_handle} to MPI node @var{node}, sending it from its
  260. owner if needed. At least the target node and the owner have to call the
  261. function.
  262. @end deftypefun
  263. Here an stencil example showing how to use @code{starpu_mpi_insert_task}. One
  264. first needs to define a distribution function which specifies the
  265. locality of the data. Note that that distribution information needs to
  266. be given to StarPU by calling @code{starpu_data_set_rank}.
  267. @cartouche
  268. @smallexample
  269. /* Returns the MPI node number where data is */
  270. int my_distrib(int x, int y, int nb_nodes) @{
  271. /* Block distrib */
  272. return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;
  273. // /* Other examples useful for other kinds of computations */
  274. // /* / distrib */
  275. // return (x+y) % nb_nodes;
  276. // /* Block cyclic distrib */
  277. // unsigned side = sqrt(nb_nodes);
  278. // return x % side + (y % side) * size;
  279. @}
  280. @end smallexample
  281. @end cartouche
  282. Now the data can be registered within StarPU. Data which are not
  283. owned but will be needed for computations can be registered through
  284. the lazy allocation mechanism, i.e. with a @code{home_node} set to -1.
  285. StarPU will automatically allocate the memory when it is used for the
  286. first time.
  287. One can note an optimization here (the @code{else if} test): we only register
  288. data which will be needed by the tasks that we will execute.
  289. @cartouche
  290. @smallexample
  291. unsigned matrix[X][Y];
  292. starpu_data_handle_t data_handles[X][Y];
  293. for(x = 0; x < X; x++) @{
  294. for (y = 0; y < Y; y++) @{
  295. int mpi_rank = my_distrib(x, y, size);
  296. if (mpi_rank == my_rank)
  297. /* Owning data */
  298. starpu_variable_data_register(&data_handles[x][y], 0,
  299. (uintptr_t)&(matrix[x][y]), sizeof(unsigned));
  300. else if (my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
  301. || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size))
  302. /* I don't own that index, but will need it for my computations */
  303. starpu_variable_data_register(&data_handles[x][y], -1,
  304. (uintptr_t)NULL, sizeof(unsigned));
  305. else
  306. /* I know it's useless to allocate anything for this */
  307. data_handles[x][y] = NULL;
  308. if (data_handles[x][y])
  309. starpu_data_set_rank(data_handles[x][y], mpi_rank);
  310. @}
  311. @}
  312. @end smallexample
  313. @end cartouche
  314. Now @code{starpu_mpi_insert_task()} can be called for the different
  315. steps of the application.
  316. @cartouche
  317. @smallexample
  318. for(loop=0 ; loop<niter; loop++)
  319. for (x = 1; x < X-1; x++)
  320. for (y = 1; y < Y-1; y++)
  321. starpu_mpi_insert_task(MPI_COMM_WORLD, &stencil5_cl,
  322. STARPU_RW, data_handles[x][y],
  323. STARPU_R, data_handles[x-1][y],
  324. STARPU_R, data_handles[x+1][y],
  325. STARPU_R, data_handles[x][y-1],
  326. STARPU_R, data_handles[x][y+1],
  327. 0);
  328. starpu_task_wait_for_all();
  329. @end smallexample
  330. @end cartouche
  331. I.e. all MPI nodes process the whole task graph, but as mentioned above, for
  332. each task, only the MPI node which owns the data being written to (here,
  333. @code{data_handles[x][y]}) will actually run the task. The other MPI nodes will
  334. automatically send the required data.
  335. @node MPI Collective Operations
  336. @section MPI Collective Operations
  337. @deftypefun int starpu_mpi_scatter_detached (starpu_data_handle_t *@var{data_handles}, int @var{count}, int @var{root}, MPI_Comm @var{comm})
  338. Scatter data among processes of the communicator based on the ownership of
  339. the data. For each data of the array @var{data_handles}, the
  340. process @var{root} sends the data to the process owning this data.
  341. Processes receiving data must have valid data handles to receive them.
  342. @end deftypefun
  343. @deftypefun int starpu_mpi_gather_detached (starpu_data_handle_t *@var{data_handles}, int @var{count}, int @var{root}, MPI_Comm @var{comm})
  344. Gather data from the different processes of the communicator onto the
  345. process @var{root}. Each process owning data handle in the array
  346. @var{data_handles} will send them to the process @var{root}. The
  347. process @var{root} must have valid data handles to receive the data.
  348. @end deftypefun
  349. @page
  350. @cartouche
  351. @smallexample
  352. if (rank == root)
  353. @{
  354. /* Allocate the vector */
  355. vector = malloc(nblocks * sizeof(float *));
  356. for(x=0 ; x<nblocks ; x++)
  357. @{
  358. starpu_malloc((void **)&vector[x], block_size*sizeof(float));
  359. @}
  360. @}
  361. /* Allocate data handles and register data to StarPU */
  362. data_handles = malloc(nblocks*sizeof(starpu_data_handle_t *));
  363. for(x = 0; x < nblocks ; x++)
  364. @{
  365. int mpi_rank = my_distrib(x, nodes);
  366. if (rank == root) @{
  367. starpu_vector_data_register(&data_handles[x], 0, (uintptr_t)vector[x],
  368. blocks_size, sizeof(float));
  369. @}
  370. else if ((mpi_rank == rank) || ((rank == mpi_rank+1 || rank == mpi_rank-1))) @{
  371. /* I own that index, or i will need it for my computations */
  372. starpu_vector_data_register(&data_handles[x], -1, (uintptr_t)NULL,
  373. block_size, sizeof(float));
  374. @}
  375. else @{
  376. /* I know it's useless to allocate anything for this */
  377. data_handles[x] = NULL;
  378. @}
  379. if (data_handles[x]) @{
  380. starpu_data_set_rank(data_handles[x], mpi_rank);
  381. @}
  382. @}
  383. /* Scatter the matrix among the nodes */
  384. starpu_mpi_scatter_detached(data_handles, nblocks, root, MPI_COMM_WORLD);
  385. /* Calculation */
  386. for(x = 0; x < nblocks ; x++) @{
  387. if (data_handles[x]) @{
  388. int owner = starpu_data_get_rank(data_handles[x]);
  389. if (owner == rank) @{
  390. starpu_insert_task(&cl, STARPU_RW, data_handles[x], 0);
  391. @}
  392. @}
  393. @}
  394. /* Gather the matrix on main node */
  395. starpu_mpi_gather_detached(data_handles, nblocks, 0, MPI_COMM_WORLD);
  396. @end smallexample
  397. @end cartouche