mpi-support.texi 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. @c -*-texinfo-*-
  2. @c This file is part of the StarPU Handbook.
  3. @c Copyright (C) 2009--2011 Universit@'e de Bordeaux 1
  4. @c Copyright (C) 2010, 2011, 2012, 2013 Centre National de la Recherche Scientifique
  5. @c Copyright (C) 2011 Institut National de Recherche en Informatique et Automatique
  6. @c See the file starpu.texi for copying conditions.
  7. The integration of MPI transfers within task parallelism is done in a
  8. very natural way by the means of asynchronous interactions between the
  9. application and StarPU. This is implemented in a separate libstarpumpi library
  10. which basically provides "StarPU" equivalents of @code{MPI_*} functions, where
  11. @code{void *} buffers are replaced with @code{starpu_data_handle_t}s, and all
  12. GPU-RAM-NIC transfers are handled efficiently by StarPU-MPI. The user has to
  13. use the usual @code{mpirun} command of the MPI implementation to start StarPU on
  14. the different MPI nodes.
  15. An MPI Insert Task function provides an even more seamless transition to a
  16. distributed application, by automatically issuing all required data transfers
  17. according to the task graph and an application-provided distribution.
  18. @menu
  19. * Simple Example::
  20. * Exchanging User Defined Data Interface::
  21. * MPI Insert Task Utility::
  22. * MPI Collective Operations::
  23. @end menu
  24. @node Simple Example
  25. @section Simple Example
  26. The flags required to compile or link against the MPI layer are then
  27. accessible with the following commands:
  28. @example
  29. $ pkg-config --cflags starpumpi-1.0 # options for the compiler
  30. $ pkg-config --libs starpumpi-1.0 # options for the linker
  31. @end example
  32. Also pass the @code{--static} option if the application is to be linked statically.
  33. @cartouche
  34. @smallexample
  35. void increment_token(void)
  36. @{
  37. struct starpu_task *task = starpu_task_create();
  38. task->cl = &increment_cl;
  39. task->handles[0] = token_handle;
  40. starpu_task_submit(task);
  41. @}
  42. @end smallexample
  43. @end cartouche
  44. @cartouche
  45. @smallexample
  46. int main(int argc, char **argv)
  47. @{
  48. int rank, size;
  49. starpu_init(NULL);
  50. starpu_mpi_initialize_extended(&rank, &size);
  51. starpu_vector_data_register(&token_handle, 0, (uintptr_t)&token, 1, sizeof(unsigned));
  52. unsigned nloops = NITER;
  53. unsigned loop;
  54. unsigned last_loop = nloops - 1;
  55. unsigned last_rank = size - 1;
  56. @end smallexample
  57. @end cartouche
  58. @cartouche
  59. @smallexample
  60. for (loop = 0; loop < nloops; loop++) @{
  61. int tag = loop*size + rank;
  62. if (loop == 0 && rank == 0)
  63. @{
  64. token = 0;
  65. fprintf(stdout, "Start with token value %d\n", token);
  66. @}
  67. else
  68. @{
  69. starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag,
  70. MPI_COMM_WORLD, NULL, NULL);
  71. @}
  72. increment_token();
  73. if (loop == last_loop && rank == last_rank)
  74. @{
  75. starpu_data_acquire(token_handle, STARPU_R);
  76. fprintf(stdout, "Finished: token value %d\n", token);
  77. starpu_data_release(token_handle);
  78. @}
  79. else
  80. @{
  81. starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1,
  82. MPI_COMM_WORLD, NULL, NULL);
  83. @}
  84. @}
  85. starpu_task_wait_for_all();
  86. @end smallexample
  87. @end cartouche
  88. @cartouche
  89. @smallexample
  90. starpu_mpi_shutdown();
  91. starpu_shutdown();
  92. if (rank == last_rank)
  93. @{
  94. fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size);
  95. STARPU_ASSERT(token == nloops*size);
  96. @}
  97. @end smallexample
  98. @end cartouche
  99. @page
  100. @node Exchanging User Defined Data Interface
  101. @section Exchanging User Defined Data Interface
  102. New data interfaces defined as explained in @ref{An example
  103. of data interface} can also be used within StarPU-MPI and exchanged
  104. between nodes. Two functions needs to be defined through
  105. the type @code{struct starpu_data_interface_ops} (@pxref{Data
  106. Interface API}). The pack function takes a handle and returns a
  107. contiguous memory buffer along with its size where data to be conveyed to another node
  108. should be copied. The reversed operation is implemented in the unpack
  109. function which takes a contiguous memory buffer and recreates the data
  110. handle.
  111. @cartouche
  112. @smallexample
  113. static int complex_pack_data(starpu_data_handle_t handle, uint32_t node, void **ptr, size_t *count)
  114. @{
  115. STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
  116. struct starpu_complex_interface *complex_interface =
  117. (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
  118. *count = complex_get_size(handle);
  119. *ptr = malloc(*count);
  120. memcpy(*ptr, complex_interface->real, complex_interface->nx*sizeof(double));
  121. memcpy(*ptr+complex_interface->nx*sizeof(double), complex_interface->imaginary,
  122. complex_interface->nx*sizeof(double));
  123. return 0;
  124. @}
  125. @end smallexample
  126. @end cartouche
  127. @cartouche
  128. @smallexample
  129. static int complex_unpack_data(starpu_data_handle_t handle, uint32_t node, void *ptr, size_t count)
  130. @{
  131. STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
  132. struct starpu_complex_interface *complex_interface =
  133. (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
  134. memcpy(complex_interface->real, ptr, complex_interface->nx*sizeof(double));
  135. memcpy(complex_interface->imaginary, ptr+complex_interface->nx*sizeof(double),
  136. complex_interface->nx*sizeof(double));
  137. return 0;
  138. @}
  139. @end smallexample
  140. @end cartouche
  141. @cartouche
  142. @smallexample
  143. static struct starpu_data_interface_ops interface_complex_ops =
  144. @{
  145. ...
  146. .pack_data = complex_pack_data,
  147. .unpack_data = complex_unpack_data
  148. @};
  149. @end smallexample
  150. @end cartouche
  151. @page
  152. @node MPI Insert Task Utility
  153. @section MPI Insert Task Utility
  154. To save the programmer from having to explicit all communications, StarPU
  155. provides an "MPI Insert Task Utility". The principe is that the application
  156. decides a distribution of the data over the MPI nodes by allocating it and
  157. notifying StarPU of that decision, i.e. tell StarPU which MPI node "owns"
  158. which data. It also decides, for each handle, an MPI tag which will be used to
  159. exchange the content of the handle. All MPI nodes then process the whole task
  160. graph, and StarPU automatically determines which node actually execute which
  161. task, and trigger the required MPI transfers.
  162. @deftypefun int starpu_data_set_tag (starpu_data_handle_t @var{handle}, int @var{tag})
  163. Tell StarPU-MPI which MPI tag to use when exchanging the data.
  164. @end deftypefun
  165. @deftypefun int starpu_data_get_tag (starpu_data_handle_t @var{handle})
  166. Returns the MPI tag to be used when exchanging the data.
  167. @end deftypefun
  168. @deftypefun int starpu_data_set_rank (starpu_data_handle_t @var{handle}, int @var{rank})
  169. Tell StarPU-MPI which MPI node "owns" a given data, that is, the node which will
  170. always keep an up-to-date value, and will by default execute tasks which write
  171. to it.
  172. @end deftypefun
  173. @deftypefun int starpu_data_get_rank (starpu_data_handle_t @var{handle})
  174. Returns the last value set by @code{starpu_data_set_rank}.
  175. @end deftypefun
  176. @defmac STARPU_EXECUTE_ON_NODE
  177. this macro is used when calling @code{starpu_mpi_insert_task}, and
  178. must be followed by a integer value which specified the node on which
  179. to execute the codelet.
  180. @end defmac
  181. @defmac STARPU_EXECUTE_ON_DATA
  182. this macro is used when calling @code{starpu_mpi_insert_task}, and
  183. must be followed by a data handle to specify that the node owning the
  184. given data will execute the codelet.
  185. @end defmac
  186. @deftypefun int starpu_mpi_insert_task (MPI_Comm @var{comm}, struct starpu_codelet *@var{codelet}, ...)
  187. Create and submit a task corresponding to @var{codelet} with the following
  188. arguments. The argument list must be zero-terminated.
  189. The arguments following the codelets are the same types as for the
  190. function @code{starpu_insert_task} defined in @ref{Insert Task
  191. Utility}. The extra argument @code{STARPU_EXECUTE_ON_NODE} followed by an
  192. integer allows to specify the MPI node to execute the codelet. It is also
  193. possible to specify that the node owning a specific data will execute
  194. the codelet, by using @code{STARPU_EXECUTE_ON_DATA} followed by a data
  195. handle.
  196. The internal algorithm is as follows:
  197. @enumerate
  198. @item Find out which MPI node is going to execute the codelet.
  199. @enumerate
  200. @item If there is only one node owning data in W mode, it will
  201. be selected;
  202. @item If there is several nodes owning data in W node, the one
  203. selected will be the one having the least data in R mode so as
  204. to minimize the amount of data to be transfered;
  205. @item The argument @code{STARPU_EXECUTE_ON_NODE} followed by an
  206. integer can be used to specify the node;
  207. @item The argument @code{STARPU_EXECUTE_ON_DATA} followed by a
  208. data handle can be used to specify that the node owing the given
  209. data will execute the codelet.
  210. @end enumerate
  211. @item Send and receive data as requested. Nodes owning data which need to be
  212. read by the task are sending them to the MPI node which will execute it. The
  213. latter receives them.
  214. @item Execute the codelet. This is done by the MPI node selected in the
  215. 1st step of the algorithm.
  216. @item If several MPI nodes own data to be written to, send written
  217. data back to their owners.
  218. @end enumerate
  219. The algorithm also includes a communication cache mechanism that
  220. allows not to send data twice to the same MPI node, unless the data
  221. has been modified. The cache can be disabled
  222. (@pxref{STARPU_MPI_CACHE}).
  223. @end deftypefun
  224. @deftypefun void starpu_mpi_get_data_on_node (MPI_Comm @var{comm}, starpu_data_handle_t @var{data_handle}, int @var{node})
  225. Transfer data @var{data_handle} to MPI node @var{node}, sending it from its
  226. owner if needed. At least the target node and the owner have to call the
  227. function.
  228. @end deftypefun
  229. Here an stencil example showing how to use @code{starpu_mpi_insert_task}. One
  230. first needs to define a distribution function which specifies the
  231. locality of the data. Note that that distribution information needs to
  232. be given to StarPU by calling @code{starpu_data_set_rank}.
  233. @cartouche
  234. @smallexample
  235. /* Returns the MPI node number where data is */
  236. int my_distrib(int x, int y, int nb_nodes) @{
  237. /* Block distrib */
  238. return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;
  239. // /* Other examples useful for other kinds of computations */
  240. // /* / distrib */
  241. // return (x+y) % nb_nodes;
  242. // /* Block cyclic distrib */
  243. // unsigned side = sqrt(nb_nodes);
  244. // return x % side + (y % side) * size;
  245. @}
  246. @end smallexample
  247. @end cartouche
  248. Now the data can be registered within StarPU. Data which are not
  249. owned but will be needed for computations can be registered through
  250. the lazy allocation mechanism, i.e. with a @code{home_node} set to -1.
  251. StarPU will automatically allocate the memory when it is used for the
  252. first time.
  253. One can note an optimization here (the @code{else if} test): we only register
  254. data which will be needed by the tasks that we will execute.
  255. @cartouche
  256. @smallexample
  257. unsigned matrix[X][Y];
  258. starpu_data_handle_t data_handles[X][Y];
  259. for(x = 0; x < X; x++) @{
  260. for (y = 0; y < Y; y++) @{
  261. int mpi_rank = my_distrib(x, y, size);
  262. if (mpi_rank == my_rank)
  263. /* Owning data */
  264. starpu_variable_data_register(&data_handles[x][y], 0,
  265. (uintptr_t)&(matrix[x][y]), sizeof(unsigned));
  266. else if (my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
  267. || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size))
  268. /* I don't own that index, but will need it for my computations */
  269. starpu_variable_data_register(&data_handles[x][y], -1,
  270. (uintptr_t)NULL, sizeof(unsigned));
  271. else
  272. /* I know it's useless to allocate anything for this */
  273. data_handles[x][y] = NULL;
  274. if (data_handles[x][y])
  275. starpu_data_set_rank(data_handles[x][y], mpi_rank);
  276. @}
  277. @}
  278. @end smallexample
  279. @end cartouche
  280. Now @code{starpu_mpi_insert_task()} can be called for the different
  281. steps of the application.
  282. @cartouche
  283. @smallexample
  284. for(loop=0 ; loop<niter; loop++)
  285. for (x = 1; x < X-1; x++)
  286. for (y = 1; y < Y-1; y++)
  287. starpu_mpi_insert_task(MPI_COMM_WORLD, &stencil5_cl,
  288. STARPU_RW, data_handles[x][y],
  289. STARPU_R, data_handles[x-1][y],
  290. STARPU_R, data_handles[x+1][y],
  291. STARPU_R, data_handles[x][y-1],
  292. STARPU_R, data_handles[x][y+1],
  293. 0);
  294. starpu_task_wait_for_all();
  295. @end smallexample
  296. @end cartouche
  297. I.e. all MPI nodes process the whole task graph, but as mentioned above, for
  298. each task, only the MPI node which owns the data being written to (here,
  299. @code{data_handles[x][y]}) will actually run the task. The other MPI nodes will
  300. automatically send the required data.
  301. This can be a concern with a growing number of nodes. To avoid this, the
  302. application can prune the task for loops according to the data distribution,
  303. so as to only submit tasks on nodes which have to care about them (either to
  304. execute them, or to send the required data).
  305. @node MPI Collective Operations
  306. @section MPI Collective Operations
  307. @deftypefun int starpu_mpi_scatter_detached (starpu_data_handle_t *@var{data_handles}, int @var{count}, int @var{root}, MPI_Comm @var{comm}, {void (*}@var{scallback})(void *), {void *}@var{sarg}, {void (*}@var{rcallback})(void *), {void *}@var{rarg})
  308. Scatter data among processes of the communicator based on the ownership of
  309. the data. For each data of the array @var{data_handles}, the
  310. process @var{root} sends the data to the process owning this data.
  311. Processes receiving data must have valid data handles to receive them.
  312. On completion of the collective communication, the @var{scallback} function is
  313. called with the argument @var{sarg} on the process @var{root}, the @var{rcallback} function is
  314. called with the argument @var{rarg} on any other process.
  315. @end deftypefun
  316. @deftypefun int starpu_mpi_gather_detached (starpu_data_handle_t *@var{data_handles}, int @var{count}, int @var{root}, MPI_Comm @var{comm}, {void (*}@var{scallback})(void *), {void *}@var{sarg}, {void (*}@var{rcallback})(void *), {void *}@var{rarg})
  317. Gather data from the different processes of the communicator onto the
  318. process @var{root}. Each process owning data handle in the array
  319. @var{data_handles} will send them to the process @var{root}. The
  320. process @var{root} must have valid data handles to receive the data.
  321. On completion of the collective communication, the @var{rcallback} function is
  322. called with the argument @var{rarg} on the process @var{root}, the @var{scallback} function is
  323. called with the argument @var{sarg} on any other process.
  324. @end deftypefun
  325. @page
  326. @cartouche
  327. @smallexample
  328. if (rank == root)
  329. @{
  330. /* Allocate the vector */
  331. vector = malloc(nblocks * sizeof(float *));
  332. for(x=0 ; x<nblocks ; x++)
  333. @{
  334. starpu_malloc((void **)&vector[x], block_size*sizeof(float));
  335. @}
  336. @}
  337. /* Allocate data handles and register data to StarPU */
  338. data_handles = malloc(nblocks*sizeof(starpu_data_handle_t *));
  339. for(x = 0; x < nblocks ; x++)
  340. @{
  341. int mpi_rank = my_distrib(x, nodes);
  342. if (rank == root) @{
  343. starpu_vector_data_register(&data_handles[x], 0, (uintptr_t)vector[x],
  344. blocks_size, sizeof(float));
  345. @}
  346. else if ((mpi_rank == rank) || ((rank == mpi_rank+1 || rank == mpi_rank-1))) @{
  347. /* I own that index, or i will need it for my computations */
  348. starpu_vector_data_register(&data_handles[x], -1, (uintptr_t)NULL,
  349. block_size, sizeof(float));
  350. @}
  351. else @{
  352. /* I know it's useless to allocate anything for this */
  353. data_handles[x] = NULL;
  354. @}
  355. if (data_handles[x]) @{
  356. starpu_data_set_rank(data_handles[x], mpi_rank);
  357. @}
  358. @}
  359. /* Scatter the matrix among the nodes */
  360. starpu_mpi_scatter_detached(data_handles, nblocks, root, MPI_COMM_WORLD);
  361. /* Calculation */
  362. for(x = 0; x < nblocks ; x++) @{
  363. if (data_handles[x]) @{
  364. int owner = starpu_data_get_rank(data_handles[x]);
  365. if (owner == rank) @{
  366. starpu_insert_task(&cl, STARPU_RW, data_handles[x], 0);
  367. @}
  368. @}
  369. @}
  370. /* Gather the matrix on main node */
  371. starpu_mpi_gather_detached(data_handles, nblocks, 0, MPI_COMM_WORLD);
  372. @end smallexample
  373. @end cartouche