mpi-support.texi 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. @c -*-texinfo-*-
  2. @c This file is part of the StarPU Handbook.
  3. @c Copyright (C) 2009--2011 Universit@'e de Bordeaux 1
  4. @c Copyright (C) 2010, 2011, 2012, 2013 Centre National de la Recherche Scientifique
  5. @c Copyright (C) 2011 Institut National de Recherche en Informatique et Automatique
  6. @c See the file starpu.texi for copying conditions.
  7. The integration of MPI transfers within task parallelism is done in a
  8. very natural way by the means of asynchronous interactions between the
  9. application and StarPU. This is implemented in a separate libstarpumpi library
  10. which basically provides "StarPU" equivalents of @code{MPI_*} functions, where
  11. @code{void *} buffers are replaced with @code{starpu_data_handle_t}s, and all
  12. GPU-RAM-NIC transfers are handled efficiently by StarPU-MPI. The user has to
  13. use the usual @code{mpirun} command of the MPI implementation to start StarPU on
  14. the different MPI nodes.
  15. An MPI Insert Task function provides an even more seamless transition to a
  16. distributed application, by automatically issuing all required data transfers
  17. according to the task graph and an application-provided distribution.
  18. @menu
  19. * Simple Example::
  20. * Point to point communication::
  21. * Exchanging User Defined Data Interface::
  22. * MPI Insert Task Utility::
  23. * MPI Collective Operations::
  24. @end menu
  25. @node Simple Example
  26. @section Simple Example
  27. The flags required to compile or link against the MPI layer are
  28. accessible with the following commands:
  29. @example
  30. $ pkg-config --cflags starpumpi-1.0 # options for the compiler
  31. $ pkg-config --libs starpumpi-1.0 # options for the linker
  32. @end example
  33. You also need pass the @code{--static} option if the application is to
  34. be linked statically.
  35. @cartouche
  36. @smallexample
  37. void increment_token(void)
  38. @{
  39. struct starpu_task *task = starpu_task_create();
  40. task->cl = &increment_cl;
  41. task->handles[0] = token_handle;
  42. starpu_task_submit(task);
  43. @}
  44. @end smallexample
  45. @end cartouche
  46. @cartouche
  47. @smallexample
  48. int main(int argc, char **argv)
  49. @{
  50. int rank, size;
  51. starpu_init(NULL);
  52. starpu_mpi_initialize_extended(&rank, &size);
  53. starpu_vector_data_register(&token_handle, 0, (uintptr_t)&token, 1, sizeof(unsigned));
  54. unsigned nloops = NITER;
  55. unsigned loop;
  56. unsigned last_loop = nloops - 1;
  57. unsigned last_rank = size - 1;
  58. @end smallexample
  59. @end cartouche
  60. @cartouche
  61. @smallexample
  62. for (loop = 0; loop < nloops; loop++) @{
  63. int tag = loop*size + rank;
  64. if (loop == 0 && rank == 0)
  65. @{
  66. token = 0;
  67. fprintf(stdout, "Start with token value %d\n", token);
  68. @}
  69. else
  70. @{
  71. starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag,
  72. MPI_COMM_WORLD, NULL, NULL);
  73. @}
  74. increment_token();
  75. if (loop == last_loop && rank == last_rank)
  76. @{
  77. starpu_data_acquire(token_handle, STARPU_R);
  78. fprintf(stdout, "Finished: token value %d\n", token);
  79. starpu_data_release(token_handle);
  80. @}
  81. else
  82. @{
  83. starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1,
  84. MPI_COMM_WORLD, NULL, NULL);
  85. @}
  86. @}
  87. starpu_task_wait_for_all();
  88. @end smallexample
  89. @end cartouche
  90. @cartouche
  91. @smallexample
  92. starpu_mpi_shutdown();
  93. starpu_shutdown();
  94. if (rank == last_rank)
  95. @{
  96. fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size);
  97. STARPU_ASSERT(token == nloops*size);
  98. @}
  99. @end smallexample
  100. @end cartouche
  101. @node Point to point communication
  102. @section Point to point communication
  103. The standard point to point communications of MPI have been
  104. implemented. The semantic is similar to the MPI one, but adapted to
  105. the DSM provided by StarPU. A MPI request will only be submitted when
  106. the data is available in the main memory of the node submitting the
  107. request.
  108. There is two types of asynchronous communications: the classic
  109. asynchronous communications and the detached communications. The
  110. classic asynchronous communications (@code{starpu_mpi_isend} and
  111. @code{starpu_mpi_irecv}) need to be followed by a call to
  112. @code{starpu_mpi_wait} or to @code{starpu_mpi_test} to wait for or to
  113. test the completion of the communication. Waiting for or testing the
  114. completion of detached communications is not possible, this is done
  115. internally by StarPU-MPI, on completion, the resources are
  116. automatically released. This mechanism is similar to the pthread
  117. detach state attribute which determines whether a thread will be
  118. created in a joinable or a detached state.
  119. For any communication, the call of the function will result in the
  120. creation of a StarPU-MPI request, the function
  121. @code{starpu_data_acquire_cb} is then called to asynchronously request
  122. StarPU to fetch the data in main memory; when the data is available in
  123. main memory, a StarPU-MPI function is called to put the new request in
  124. the list of the ready requests if it is a send request, or in an
  125. hashmap if it is a receive request.
  126. Internally, all MPI communications submitted by StarPU uses a unique
  127. tag which has a default value, and can be accessed with the functions
  128. @ref{starpu_mpi_get_communication_tag} and
  129. @ref{starpu_mpi_set_communication_tag}.
  130. The matching of tags with corresponding requests is done into StarPU-MPI.
  131. To handle this, any communication is a double-communication based on a
  132. envelope + data system. Every data which will be sent needs to send an
  133. envelope which describes the data (particularly its tag) before sending
  134. the data, so the receiver can get the matching pending receive request
  135. from the hashmap, and submit it to recieve the data correctly.
  136. To this aim, the StarPU-MPI progression thread has a permanent-submitted
  137. request destined to receive incoming envelopes from all sources.
  138. The StarPU-MPI progression thread regularly polls this list of ready
  139. requests. For each new ready request, the appropriate function is
  140. called to post the corresponding MPI call. For example, calling
  141. @code{starpu_mpi_isend} will result in posting @code{MPI_Isend}. If
  142. the request is marked as detached, the request will be put in the list
  143. of detached requests.
  144. The StarPU-MPI progression thread also polls the list of detached
  145. requests. For each detached request, it regularly tests the completion
  146. of the MPI request by calling @code{MPI_Test}. On completion, the data
  147. handle is released, and if a callback was defined, it is called.
  148. Finally, the StarPU-MPI progression thread checks if an envelope has
  149. arrived. If it is, it'll check if the corresponding receive has already
  150. been submitted by the application. If it is, it'll submit the request
  151. just as like as it does with those on the list of ready requests.
  152. If it is not, it'll allocate a temporary handle to store the data that
  153. will arrive just after, so as when the corresponding receive request
  154. will be submitted by the application, it'll copy this temporary handle
  155. into its one instead of submitting a new StarPU-MPI request.
  156. @ref{Communication} gives the list of all the point to point
  157. communications defined in StarPU-MPI.
  158. @node Exchanging User Defined Data Interface
  159. @section Exchanging User Defined Data Interface
  160. New data interfaces defined as explained in @ref{Defining a New Data
  161. Interface} can also be used within StarPU-MPI and exchanged between
  162. nodes. Two functions needs to be defined through
  163. the type @code{struct starpu_data_interface_ops} (@pxref{Defining
  164. Interface}). The pack function takes a handle and returns a
  165. contiguous memory buffer along with its size where data to be conveyed to another node
  166. should be copied. The reversed operation is implemented in the unpack
  167. function which takes a contiguous memory buffer and recreates the data
  168. handle.
  169. @cartouche
  170. @smallexample
  171. static int complex_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, ssize_t *count)
  172. @{
  173. STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
  174. struct starpu_complex_interface *complex_interface =
  175. (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
  176. *count = complex_get_size(handle);
  177. *ptr = malloc(*count);
  178. memcpy(*ptr, complex_interface->real, complex_interface->nx*sizeof(double));
  179. memcpy(*ptr+complex_interface->nx*sizeof(double), complex_interface->imaginary,
  180. complex_interface->nx*sizeof(double));
  181. return 0;
  182. @}
  183. @end smallexample
  184. @end cartouche
  185. @cartouche
  186. @smallexample
  187. static int complex_unpack_data(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count)
  188. @{
  189. STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
  190. struct starpu_complex_interface *complex_interface =
  191. (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
  192. memcpy(complex_interface->real, ptr, complex_interface->nx*sizeof(double));
  193. memcpy(complex_interface->imaginary, ptr+complex_interface->nx*sizeof(double),
  194. complex_interface->nx*sizeof(double));
  195. return 0;
  196. @}
  197. @end smallexample
  198. @end cartouche
  199. @cartouche
  200. @smallexample
  201. static struct starpu_data_interface_ops interface_complex_ops =
  202. @{
  203. ...
  204. .pack_data = complex_pack_data,
  205. .unpack_data = complex_unpack_data
  206. @};
  207. @end smallexample
  208. @end cartouche
  209. @node MPI Insert Task Utility
  210. @section MPI Insert Task Utility
  211. To save the programmer from having to explicit all communications, StarPU
  212. provides an "MPI Insert Task Utility". The principe is that the application
  213. decides a distribution of the data over the MPI nodes by allocating it and
  214. notifying StarPU of that decision, i.e. tell StarPU which MPI node "owns"
  215. which data. It also decides, for each handle, an MPI tag which will be used to
  216. exchange the content of the handle. All MPI nodes then process the whole task
  217. graph, and StarPU automatically determines which node actually execute which
  218. task, and trigger the required MPI transfers.
  219. The list of functions are described in @ref{MPI Insert Task}.
  220. Here an stencil example showing how to use @code{starpu_mpi_insert_task}. One
  221. first needs to define a distribution function which specifies the
  222. locality of the data. Note that that distribution information needs to
  223. be given to StarPU by calling @code{starpu_data_set_rank}. A MPI tag
  224. should also be defined for each data handle by calling
  225. @code{starpu_data_set_tag}.
  226. @cartouche
  227. @smallexample
  228. /* Returns the MPI node number where data is */
  229. int my_distrib(int x, int y, int nb_nodes) @{
  230. /* Block distrib */
  231. return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;
  232. // /* Other examples useful for other kinds of computations */
  233. // /* / distrib */
  234. // return (x+y) % nb_nodes;
  235. // /* Block cyclic distrib */
  236. // unsigned side = sqrt(nb_nodes);
  237. // return x % side + (y % side) * size;
  238. @}
  239. @end smallexample
  240. @end cartouche
  241. Now the data can be registered within StarPU. Data which are not
  242. owned but will be needed for computations can be registered through
  243. the lazy allocation mechanism, i.e. with a @code{home_node} set to -1.
  244. StarPU will automatically allocate the memory when it is used for the
  245. first time.
  246. One can note an optimization here (the @code{else if} test): we only register
  247. data which will be needed by the tasks that we will execute.
  248. @cartouche
  249. @smallexample
  250. unsigned matrix[X][Y];
  251. starpu_data_handle_t data_handles[X][Y];
  252. for(x = 0; x < X; x++) @{
  253. for (y = 0; y < Y; y++) @{
  254. int mpi_rank = my_distrib(x, y, size);
  255. if (mpi_rank == my_rank)
  256. /* Owning data */
  257. starpu_variable_data_register(&data_handles[x][y], 0,
  258. (uintptr_t)&(matrix[x][y]), sizeof(unsigned));
  259. else if (my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
  260. || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size))
  261. /* I don't own that index, but will need it for my computations */
  262. starpu_variable_data_register(&data_handles[x][y], -1,
  263. (uintptr_t)NULL, sizeof(unsigned));
  264. else
  265. /* I know it's useless to allocate anything for this */
  266. data_handles[x][y] = NULL;
  267. if (data_handles[x][y]) @{
  268. starpu_data_set_rank(data_handles[x][y], mpi_rank);
  269. starpu_data_set_tag(data_handles[x][y], x*X+y);
  270. @}
  271. @}
  272. @}
  273. @end smallexample
  274. @end cartouche
  275. Now @code{starpu_mpi_insert_task()} can be called for the different
  276. steps of the application.
  277. @cartouche
  278. @smallexample
  279. for(loop=0 ; loop<niter; loop++)
  280. for (x = 1; x < X-1; x++)
  281. for (y = 1; y < Y-1; y++)
  282. starpu_mpi_insert_task(MPI_COMM_WORLD, &stencil5_cl,
  283. STARPU_RW, data_handles[x][y],
  284. STARPU_R, data_handles[x-1][y],
  285. STARPU_R, data_handles[x+1][y],
  286. STARPU_R, data_handles[x][y-1],
  287. STARPU_R, data_handles[x][y+1],
  288. 0);
  289. starpu_task_wait_for_all();
  290. @end smallexample
  291. @end cartouche
  292. I.e. all MPI nodes process the whole task graph, but as mentioned above, for
  293. each task, only the MPI node which owns the data being written to (here,
  294. @code{data_handles[x][y]}) will actually run the task. The other MPI nodes will
  295. automatically send the required data.
  296. This can be a concern with a growing number of nodes. To avoid this, the
  297. application can prune the task for loops according to the data distribution,
  298. so as to only submit tasks on nodes which have to care about them (either to
  299. execute them, or to send the required data).
  300. @node MPI Collective Operations
  301. @section MPI Collective Operations
  302. The functions are described in @ref{Collective Operations}.
  303. @cartouche
  304. @smallexample
  305. if (rank == root)
  306. @{
  307. /* Allocate the vector */
  308. vector = malloc(nblocks * sizeof(float *));
  309. for(x=0 ; x<nblocks ; x++)
  310. @{
  311. starpu_malloc((void **)&vector[x], block_size*sizeof(float));
  312. @}
  313. @}
  314. /* Allocate data handles and register data to StarPU */
  315. data_handles = malloc(nblocks*sizeof(starpu_data_handle_t *));
  316. for(x = 0; x < nblocks ; x++)
  317. @{
  318. int mpi_rank = my_distrib(x, nodes);
  319. if (rank == root) @{
  320. starpu_vector_data_register(&data_handles[x], 0, (uintptr_t)vector[x],
  321. blocks_size, sizeof(float));
  322. @}
  323. else if ((mpi_rank == rank) || ((rank == mpi_rank+1 || rank == mpi_rank-1))) @{
  324. /* I own that index, or i will need it for my computations */
  325. starpu_vector_data_register(&data_handles[x], -1, (uintptr_t)NULL,
  326. block_size, sizeof(float));
  327. @}
  328. else @{
  329. /* I know it's useless to allocate anything for this */
  330. data_handles[x] = NULL;
  331. @}
  332. if (data_handles[x]) @{
  333. starpu_data_set_rank(data_handles[x], mpi_rank);
  334. starpu_data_set_tag(data_handles[x], x*nblocks+y);
  335. @}
  336. @}
  337. /* Scatter the matrix among the nodes */
  338. starpu_mpi_scatter_detached(data_handles, nblocks, root, MPI_COMM_WORLD);
  339. /* Calculation */
  340. for(x = 0; x < nblocks ; x++) @{
  341. if (data_handles[x]) @{
  342. int owner = starpu_data_get_rank(data_handles[x]);
  343. if (owner == rank) @{
  344. starpu_insert_task(&cl, STARPU_RW, data_handles[x], 0);
  345. @}
  346. @}
  347. @}
  348. /* Gather the matrix on main node */
  349. starpu_mpi_gather_detached(data_handles, nblocks, 0, MPI_COMM_WORLD);
  350. @end smallexample
  351. @end cartouche