16mpi_support.doxy 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503
  1. /*
  2. * This file is part of the StarPU Handbook.
  3. * Copyright (C) 2009--2011 Universit@'e de Bordeaux 1
  4. * Copyright (C) 2010, 2011, 2012, 2013, 2014 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011, 2012 Institut National de Recherche en Informatique et Automatique
  6. * See the file version.doxy for copying conditions.
  7. */
  8. /*! \page MPISupport MPI Support
  9. The integration of MPI transfers within task parallelism is done in a
  10. very natural way by the means of asynchronous interactions between the
  11. application and StarPU. This is implemented in a separate libstarpumpi library
  12. which basically provides "StarPU" equivalents of <c>MPI_*</c> functions, where
  13. <c>void *</c> buffers are replaced with ::starpu_data_handle_t, and all
  14. GPU-RAM-NIC transfers are handled efficiently by StarPU-MPI. The user has to
  15. use the usual <c>mpirun</c> command of the MPI implementation to start StarPU on
  16. the different MPI nodes.
  17. An MPI Insert Task function provides an even more seamless transition to a
  18. distributed application, by automatically issuing all required data transfers
  19. according to the task graph and an application-provided distribution.
  20. \section SimpleExample Simple Example
  21. The flags required to compile or link against the MPI layer are
  22. accessible with the following commands:
  23. \verbatim
  24. $ pkg-config --cflags starpumpi-1.2 # options for the compiler
  25. $ pkg-config --libs starpumpi-1.2 # options for the linker
  26. \endverbatim
  27. You also need pass the option <c>--static</c> if the application is to
  28. be linked statically.
  29. \code{.c}
  30. void increment_token(void)
  31. {
  32. struct starpu_task *task = starpu_task_create();
  33. task->cl = &increment_cl;
  34. task->handles[0] = token_handle;
  35. starpu_task_submit(task);
  36. }
  37. int main(int argc, char **argv)
  38. {
  39. int rank, size;
  40. starpu_init(NULL);
  41. starpu_mpi_initialize_extended(&rank, &size);
  42. starpu_vector_data_register(&token_handle, STARPU_MAIN_RAM, (uintptr_t)&token, 1, sizeof(unsigned));
  43. unsigned nloops = NITER;
  44. unsigned loop;
  45. unsigned last_loop = nloops - 1;
  46. unsigned last_rank = size - 1;
  47. for (loop = 0; loop < nloops; loop++) {
  48. int tag = loop*size + rank;
  49. if (loop == 0 && rank == 0)
  50. {
  51. token = 0;
  52. fprintf(stdout, "Start with token value %d\n", token);
  53. }
  54. else
  55. {
  56. starpu_mpi_irecv_detached(token_handle, (rank+size-1)%size, tag,
  57. MPI_COMM_WORLD, NULL, NULL);
  58. }
  59. increment_token();
  60. if (loop == last_loop && rank == last_rank)
  61. {
  62. starpu_data_acquire(token_handle, STARPU_R);
  63. fprintf(stdout, "Finished: token value %d\n", token);
  64. starpu_data_release(token_handle);
  65. }
  66. else
  67. {
  68. starpu_mpi_isend_detached(token_handle, (rank+1)%size, tag+1,
  69. MPI_COMM_WORLD, NULL, NULL);
  70. }
  71. }
  72. starpu_task_wait_for_all();
  73. starpu_mpi_shutdown();
  74. starpu_shutdown();
  75. if (rank == last_rank)
  76. {
  77. fprintf(stderr, "[%d] token = %d == %d * %d ?\n", rank, token, nloops, size);
  78. STARPU_ASSERT(token == nloops*size);
  79. }
  80. \endcode
  81. \section PointToPointCommunication Point To Point Communication
  82. The standard point to point communications of MPI have been
  83. implemented. The semantic is similar to the MPI one, but adapted to
  84. the DSM provided by StarPU. A MPI request will only be submitted when
  85. the data is available in the main memory of the node submitting the
  86. request.
  87. There is two types of asynchronous communications: the classic
  88. asynchronous communications and the detached communications. The
  89. classic asynchronous communications (starpu_mpi_isend() and
  90. starpu_mpi_irecv()) need to be followed by a call to
  91. starpu_mpi_wait() or to starpu_mpi_test() to wait for or to
  92. test the completion of the communication. Waiting for or testing the
  93. completion of detached communications is not possible, this is done
  94. internally by StarPU-MPI, on completion, the resources are
  95. automatically released. This mechanism is similar to the pthread
  96. detach state attribute which determines whether a thread will be
  97. created in a joinable or a detached state.
  98. For any communication, the call of the function will result in the
  99. creation of a StarPU-MPI request, the function
  100. starpu_data_acquire_cb() is then called to asynchronously request
  101. StarPU to fetch the data in main memory; when the data is available in
  102. main memory, a StarPU-MPI function is called to put the new request in
  103. the list of the ready requests if it is a send request, or in an
  104. hashmap if it is a receive request.
  105. Internally, all MPI communications submitted by StarPU uses a unique
  106. tag which has a default value, and can be accessed with the functions
  107. starpu_mpi_get_communication_tag() and
  108. starpu_mpi_set_communication_tag().
  109. The matching of tags with corresponding requests is done into StarPU-MPI.
  110. To handle this, any communication is a double-communication based on a
  111. envelope + data system. Every data which will be sent needs to send an
  112. envelope which describes the data (particularly its tag) before sending
  113. the data, so the receiver can get the matching pending receive request
  114. from the hashmap, and submit it to recieve the data correctly.
  115. To this aim, the StarPU-MPI progression thread has a permanent-submitted
  116. request destined to receive incoming envelopes from all sources.
  117. The StarPU-MPI progression thread regularly polls this list of ready
  118. requests. For each new ready request, the appropriate function is
  119. called to post the corresponding MPI call. For example, calling
  120. starpu_mpi_isend() will result in posting <c>MPI_Isend</c>. If
  121. the request is marked as detached, the request will be put in the list
  122. of detached requests.
  123. The StarPU-MPI progression thread also polls the list of detached
  124. requests. For each detached request, it regularly tests the completion
  125. of the MPI request by calling <c>MPI_Test</c>. On completion, the data
  126. handle is released, and if a callback was defined, it is called.
  127. Finally, the StarPU-MPI progression thread checks if an envelope has
  128. arrived. If it is, it'll check if the corresponding receive has already
  129. been submitted by the application. If it is, it'll submit the request
  130. just as like as it does with those on the list of ready requests.
  131. If it is not, it'll allocate a temporary handle to store the data that
  132. will arrive just after, so as when the corresponding receive request
  133. will be submitted by the application, it'll copy this temporary handle
  134. into its one instead of submitting a new StarPU-MPI request.
  135. \ref MPIPtpCommunication "Communication" gives the list of all the
  136. point to point communications defined in StarPU-MPI.
  137. \section ExchangingUserDefinedDataInterface Exchanging User Defined Data Interface
  138. New data interfaces defined as explained in \ref
  139. DefiningANewDataInterface can also be used within StarPU-MPI and
  140. exchanged between nodes. Two functions needs to be defined through the
  141. type starpu_data_interface_ops. The function
  142. starpu_data_interface_ops::pack_data takes a handle and returns a
  143. contiguous memory buffer along with its size where data to be conveyed
  144. to another node should be copied. The reversed operation is
  145. implemented in the function starpu_data_interface_ops::unpack_data which
  146. takes a contiguous memory buffer and recreates the data handle.
  147. \code{.c}
  148. static int complex_pack_data(starpu_data_handle_t handle, unsigned node, void **ptr, ssize_t *count)
  149. {
  150. STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
  151. struct starpu_complex_interface *complex_interface =
  152. (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
  153. *count = complex_get_size(handle);
  154. *ptr = malloc(*count);
  155. memcpy(*ptr, complex_interface->real, complex_interface->nx*sizeof(double));
  156. memcpy(*ptr+complex_interface->nx*sizeof(double), complex_interface->imaginary,
  157. complex_interface->nx*sizeof(double));
  158. return 0;
  159. }
  160. static int complex_unpack_data(starpu_data_handle_t handle, unsigned node, void *ptr, size_t count)
  161. {
  162. STARPU_ASSERT(starpu_data_test_if_allocated_on_node(handle, node));
  163. struct starpu_complex_interface *complex_interface =
  164. (struct starpu_complex_interface *) starpu_data_get_interface_on_node(handle, node);
  165. memcpy(complex_interface->real, ptr, complex_interface->nx*sizeof(double));
  166. memcpy(complex_interface->imaginary, ptr+complex_interface->nx*sizeof(double),
  167. complex_interface->nx*sizeof(double));
  168. return 0;
  169. }
  170. static struct starpu_data_interface_ops interface_complex_ops =
  171. {
  172. ...
  173. .pack_data = complex_pack_data,
  174. .unpack_data = complex_unpack_data
  175. };
  176. \endcode
  177. \section MPIInsertTaskUtility MPI Insert Task Utility
  178. To save the programmer from having to explicit all communications, StarPU
  179. provides an "MPI Insert Task Utility". The principe is that the application
  180. decides a distribution of the data over the MPI nodes by allocating it and
  181. notifying StarPU of that decision, i.e. tell StarPU which MPI node "owns"
  182. which data. It also decides, for each handle, an MPI tag which will be used to
  183. exchange the content of the handle. All MPI nodes then process the whole task
  184. graph, and StarPU automatically determines which node actually execute which
  185. task, and trigger the required MPI transfers.
  186. The list of functions is described in \ref MPIInsertTask "MPI Insert Task".
  187. Here an stencil example showing how to use starpu_mpi_task_insert(). One
  188. first needs to define a distribution function which specifies the
  189. locality of the data. Note that the data needs to be registered to MPI
  190. by calling starpu_mpi_data_register(). This function allows to set
  191. the distribution information and the MPI tag which should be used when
  192. communicating the data. The function starpu_mpi_data_register() should
  193. be prefered to starpu_data_set_rank() and starpu_data_set_tag() as
  194. it also allows to automatically clear the MPI communication cache
  195. when unregistering the data.
  196. \code{.c}
  197. /* Returns the MPI node number where data is */
  198. int my_distrib(int x, int y, int nb_nodes) {
  199. /* Block distrib */
  200. return ((int)(x / sqrt(nb_nodes) + (y / sqrt(nb_nodes)) * sqrt(nb_nodes))) % nb_nodes;
  201. // /* Other examples useful for other kinds of computations */
  202. // /* / distrib */
  203. // return (x+y) % nb_nodes;
  204. // /* Block cyclic distrib */
  205. // unsigned side = sqrt(nb_nodes);
  206. // return x % side + (y % side) * size;
  207. }
  208. \endcode
  209. Now the data can be registered within StarPU. Data which are not
  210. owned but will be needed for computations can be registered through
  211. the lazy allocation mechanism, i.e. with a <c>home_node</c> set to <c>-1</c>.
  212. StarPU will automatically allocate the memory when it is used for the
  213. first time.
  214. One can note an optimization here (the <c>else if</c> test): we only register
  215. data which will be needed by the tasks that we will execute.
  216. \code{.c}
  217. unsigned matrix[X][Y];
  218. starpu_data_handle_t data_handles[X][Y];
  219. for(x = 0; x < X; x++) {
  220. for (y = 0; y < Y; y++) {
  221. int mpi_rank = my_distrib(x, y, size);
  222. if (mpi_rank == my_rank)
  223. /* Owning data */
  224. starpu_variable_data_register(&data_handles[x][y], STARPU_MAIN_RAM,
  225. (uintptr_t)&(matrix[x][y]), sizeof(unsigned));
  226. else if (my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
  227. || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size))
  228. /* I don't own that index, but will need it for my computations */
  229. starpu_variable_data_register(&data_handles[x][y], -1,
  230. (uintptr_t)NULL, sizeof(unsigned));
  231. else
  232. /* I know it's useless to allocate anything for this */
  233. data_handles[x][y] = NULL;
  234. if (data_handles[x][y]) {
  235. starpu_mpi_data_register(data_handles[x][y], x*X+y, mpi_rank);
  236. }
  237. }
  238. }
  239. \endcode
  240. Now starpu_mpi_task_insert() can be called for the different
  241. steps of the application.
  242. \code{.c}
  243. for(loop=0 ; loop<niter; loop++)
  244. for (x = 1; x < X-1; x++)
  245. for (y = 1; y < Y-1; y++)
  246. starpu_mpi_task_insert(MPI_COMM_WORLD, &stencil5_cl,
  247. STARPU_RW, data_handles[x][y],
  248. STARPU_R, data_handles[x-1][y],
  249. STARPU_R, data_handles[x+1][y],
  250. STARPU_R, data_handles[x][y-1],
  251. STARPU_R, data_handles[x][y+1],
  252. 0);
  253. starpu_task_wait_for_all();
  254. \endcode
  255. I.e. all MPI nodes process the whole task graph, but as mentioned above, for
  256. each task, only the MPI node which owns the data being written to (here,
  257. <c>data_handles[x][y]</c>) will actually run the task. The other MPI nodes will
  258. automatically send the required data.
  259. This can be a concern with a growing number of nodes. To avoid this, the
  260. application can prune the task for loops according to the data distribution,
  261. so as to only submit tasks on nodes which have to care about them (either to
  262. execute them, or to send the required data).
  263. A function starpu_mpi_task_build() is also provided with the aim to
  264. only construct the task structure. All MPI nodes need to call the
  265. function, only the node which is to execute the task will return a
  266. valid task structure. Following the execution of the task, all nodes
  267. need to call the function starpu_mpi_task_post_build() -- with the same
  268. list of arguments as starpu_mpi_task_build() -- to post all the
  269. necessary data communications.
  270. \code{.c}
  271. struct starpu_task *task;
  272. task = starpu_mpi_task_build(MPI_COMM_WORLD, &cl,
  273. STARPU_RW, data_handles[0],
  274. STARPU_R, data_handles[1],
  275. 0);
  276. if (task) starpu_task_submit(task);
  277. starpu_mpi_task_post_build(MPI_COMM_WORLD, &cl,
  278. STARPU_RW, data_handles[0],
  279. STARPU_R, data_handles[1],
  280. 0);
  281. \endcode
  282. \section MPICache MPI cache support
  283. StarPU-MPI automatically optimizes duplicate data transmissions: if an MPI
  284. node B needs a piece of data D from MPI node A for several tasks, only one
  285. transmission of D will take place from A to B, and the value of D will be kept
  286. on B as long as no task modifies D.
  287. If a task modifies D, B will wait for all tasks which need the previous value of
  288. D, before invalidating the value of D. As a consequence, it releases the memory
  289. occupied by D. Whenever a task running on B needs the new value of D, allocation
  290. will take place again to receive it.
  291. Since tasks can be submitted dynamically, StarPU-MPI can not know whether the
  292. current value of data D will again be used by a newly-submitted task before
  293. being modified by another newly-submitted task, so until a task is submitted to
  294. modify the current value, it can not decide by itself whether to flush the cache
  295. or not. The application can however explicitly tell StarPU-MPI to flush the
  296. cache by calling starpu_mpi_cache_flush() or starpu_mpi_cache_flush_all_data(),
  297. for instance in case the data will not be used at all any more (see for instance
  298. the cholesky example in mpi/examples/matrix_decomposition), or at least not in
  299. the close future. If a newly-submitted task actually needs the value again,
  300. another transmission of D will be initiated from A to B.
  301. The whole caching behavior can be disabled thanks to the ::STARPU_MPI_CACHE
  302. environment variable. The variable ::STARPU_MPI_CACHE_STATS can be set to 1
  303. to enable the runtime to display messages when data are added or removed
  304. from the cache holding the received data.
  305. \section MPIMigration MPI Data migration
  306. The application can dynamically change its mind about the data distribution, to
  307. balance the load over MPI nodes for instance. This can be done very simply by
  308. requesting an explicit move and then change the registered rank. For instance,
  309. we here switch to a new distribution function <c>my_distrib2</c>: we first
  310. register any data that wasn't registered already and will be needed, then
  311. migrate the data, and register the new location.
  312. \code{.c}
  313. for(x = 0; x < X; x++) {
  314. for (y = 0; y < Y; y++) {
  315. int mpi_rank = my_distrib2(x, y, size);
  316. if (!data_handles[x][y] && (mpi_rank == my_rank
  317. || my_rank == my_distrib(x+1, y, size) || my_rank == my_distrib(x-1, y, size)
  318. || my_rank == my_distrib(x, y+1, size) || my_rank == my_distrib(x, y-1, size)))
  319. /* Register newly-needed data */
  320. starpu_variable_data_register(&data_handles[x][y], -1,
  321. (uintptr_t)NULL, sizeof(unsigned));
  322. if (data_handles[x][y]) {
  323. /* Migrate the data */
  324. starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, data_handles[x][y], mpi_rank, NULL, NULL);
  325. /* And register the new rank of the matrix */
  326. starpu_data_set_rank(data_handles[x][y], mpi_rank);
  327. }
  328. }
  329. }
  330. \endcode
  331. From then on, further tasks submissions will use the new data distribution,
  332. which will thus change both MPI communications and task assignments.
  333. Very importantly, since all nodes have to agree on which node owns which data
  334. so as to determine MPI communications and task assignments the same way, all
  335. nodes have to perform the same data migration, and at the same point among task
  336. submissions. It thus does not require a strict synchronization, just a clear
  337. separation of task submissions before and after the data redistribution.
  338. Before data unregistration, it has to be migrated back to its original home
  339. node (the value, at least), since that is where the user-provided buffer
  340. resides. Otherwise the unregistration will complain that it does not have the
  341. latest value on the original home node.
  342. \code{.c}
  343. for(x = 0; x < X; x++) {
  344. for (y = 0; y < Y; y++) {
  345. if (data_handles[x][y]) {
  346. int mpi_rank = my_distrib(x, y, size);
  347. /* Get back data to original place where the user-provided buffer is. */
  348. starpu_mpi_get_data_on_node_detached(MPI_COMM_WORLD, data_handles[x][y], mpi_rank, NULL, NULL);
  349. /* And unregister it */
  350. starpu_data_unregister(data_handles[x][y]);
  351. }
  352. }
  353. }
  354. \endcode
  355. \section MPICollective MPI Collective Operations
  356. The functions are described in \ref MPICollectiveOperations "MPI Collective Operations".
  357. \code{.c}
  358. if (rank == root)
  359. {
  360. /* Allocate the vector */
  361. vector = malloc(nblocks * sizeof(float *));
  362. for(x=0 ; x<nblocks ; x++)
  363. {
  364. starpu_malloc((void **)&vector[x], block_size*sizeof(float));
  365. }
  366. }
  367. /* Allocate data handles and register data to StarPU */
  368. data_handles = malloc(nblocks*sizeof(starpu_data_handle_t *));
  369. for(x = 0; x < nblocks ; x++)
  370. {
  371. int mpi_rank = my_distrib(x, nodes);
  372. if (rank == root) {
  373. starpu_vector_data_register(&data_handles[x], STARPU_MAIN_RAM, (uintptr_t)vector[x],
  374. blocks_size, sizeof(float));
  375. }
  376. else if ((mpi_rank == rank) || ((rank == mpi_rank+1 || rank == mpi_rank-1))) {
  377. /* I own that index, or i will need it for my computations */
  378. starpu_vector_data_register(&data_handles[x], -1, (uintptr_t)NULL,
  379. block_size, sizeof(float));
  380. }
  381. else {
  382. /* I know it's useless to allocate anything for this */
  383. data_handles[x] = NULL;
  384. }
  385. if (data_handles[x]) {
  386. starpu_mpi_data_register(data_handles[x], x*nblocks+y, mpi_rank);
  387. }
  388. }
  389. /* Scatter the matrix among the nodes */
  390. starpu_mpi_scatter_detached(data_handles, nblocks, root, MPI_COMM_WORLD);
  391. /* Calculation */
  392. for(x = 0; x < nblocks ; x++) {
  393. if (data_handles[x]) {
  394. int owner = starpu_data_get_rank(data_handles[x]);
  395. if (owner == rank) {
  396. starpu_task_insert(&cl, STARPU_RW, data_handles[x], 0);
  397. }
  398. }
  399. }
  400. /* Gather the matrix on main node */
  401. starpu_mpi_gather_detached(data_handles, nblocks, 0, MPI_COMM_WORLD);
  402. \endcode
  403. */
  404. \section MPIExamples More MPI examples
  405. MPI examples are available in the StarPU source code in mpi/examples:
  406. <ul>
  407. <li><c>complex</c> is a simple example using a user-define data interface over
  408. MPI (complex numbers),
  409. <li><c>stencil5</c> is a simple stencil example using starpu_mpi_task_insert(),
  410. <li><c>matrix_decomposition</c> is a cholesky decomposition example using
  411. starpu_mpi_task_insert(). The non-distributed version can check for
  412. <algorithm correctness in 1-node configuration, the distributed version uses
  413. exactly the same source code, to be used over MPI,
  414. <li><c>mpi_lu</c> is an LU decomposition example, provided in three versions:
  415. <c>plu_example</c> uses explicit MPI data transfers, <c>plu_implicit_example</c>
  416. uses implicit MPI data transfers, <c>plu_outofcore_example</c> uses implicit MPI
  417. data transfers and supports data matrices which do not fit in memory (out-of-core).
  418. </ul>