starpu_mpi.c 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009, 2010-2013 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011, 2012, 2013 Centre National de la Recherche Scientifique
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <stdlib.h>
  18. #include <starpu_mpi.h>
  19. #include <starpu_mpi_datatype.h>
  20. #include <starpu_mpi_private.h>
  21. #include <starpu_profiling.h>
  22. #include <starpu_mpi_stats.h>
  23. #include <starpu_mpi_insert_task.h>
  24. #include <common/config.h>
  25. static void _starpu_mpi_submit_new_mpi_request(void *arg);
  26. static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req);
  27. #ifdef STARPU_VERBOSE
  28. static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type);
  29. #endif
  30. static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
  31. int dest, int mpi_tag, MPI_Comm comm,
  32. unsigned detached, void (*callback)(void *), void *arg);
  33. static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg);
  34. static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
  35. /* The list of requests that have been newly submitted by the application */
  36. static struct _starpu_mpi_req_list *new_requests;
  37. /* The list of detached requests that have already been submitted to MPI */
  38. static struct _starpu_mpi_req_list *detached_requests;
  39. static _starpu_pthread_mutex_t detached_requests_mutex;
  40. /* Condition to wake up progression thread */
  41. static _starpu_pthread_cond_t cond_progression;
  42. /* Condition to wake up waiting for all current MPI requests to finish */
  43. static _starpu_pthread_cond_t cond_finished;
  44. static _starpu_pthread_mutex_t mutex;
  45. static pthread_t progress_thread;
  46. static int running = 0;
  47. /* Count requests posted by the application and not yet submitted to MPI, i.e pushed into the new_requests list */
  48. static _starpu_pthread_mutex_t mutex_posted_requests;
  49. static int posted_requests = 0, newer_requests, barrier_running = 0;
  50. #define _STARPU_MPI_INC_POSTED_REQUESTS(value) { _STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
  51. /********************************************************/
  52. /* */
  53. /* Send/Receive functionalities */
  54. /* */
  55. /********************************************************/
  56. static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
  57. int srcdst, int mpi_tag, MPI_Comm comm,
  58. unsigned detached, void (*callback)(void *), void *arg,
  59. enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
  60. enum starpu_access_mode mode)
  61. {
  62. _STARPU_MPI_LOG_IN();
  63. struct _starpu_mpi_req *req = calloc(1, sizeof(struct _starpu_mpi_req));
  64. STARPU_ASSERT(req);
  65. _STARPU_MPI_INC_POSTED_REQUESTS(1);
  66. /* Initialize the request structure */
  67. req->submitted = 0;
  68. req->completed = 0;
  69. _STARPU_PTHREAD_MUTEX_INIT(&req->req_mutex, NULL);
  70. _STARPU_PTHREAD_COND_INIT(&req->req_cond, NULL);
  71. req->request_type = request_type;
  72. req->user_datatype = -1;
  73. req->count = -1;
  74. req->data_handle = data_handle;
  75. req->srcdst = srcdst;
  76. req->mpi_tag = mpi_tag;
  77. req->comm = comm;
  78. req->detached = detached;
  79. req->callback = callback;
  80. req->callback_arg = arg;
  81. req->func = func;
  82. /* Asynchronously request StarPU to fetch the data in main memory: when
  83. * it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
  84. * the request is actually submitted */
  85. starpu_data_acquire_cb(data_handle, mode, _starpu_mpi_submit_new_mpi_request, (void *)req);
  86. _STARPU_MPI_LOG_OUT();
  87. return req;
  88. }
  89. /********************************************************/
  90. /* */
  91. /* Send functionalities */
  92. /* */
  93. /********************************************************/
  94. static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
  95. {
  96. _STARPU_MPI_LOG_IN();
  97. STARPU_ASSERT(req->ptr);
  98. _STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
  99. _starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
  100. TRACE_MPI_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
  101. req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
  102. STARPU_ASSERT(req->ret == MPI_SUCCESS);
  103. TRACE_MPI_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, 0);
  104. /* somebody is perhaps waiting for the MPI request to be posted */
  105. _STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  106. req->submitted = 1;
  107. _STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
  108. _STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  109. _starpu_mpi_handle_detached_request(req);
  110. _STARPU_MPI_LOG_OUT();
  111. }
  112. static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
  113. {
  114. _starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
  115. if (req->user_datatype == 0)
  116. {
  117. req->count = 1;
  118. req->ptr = starpu_handle_get_local_ptr(req->data_handle);
  119. }
  120. else
  121. {
  122. ssize_t psize = -1;
  123. int ret;
  124. // Do not pack the data, just try to find out the size
  125. starpu_handle_pack_data(req->data_handle, NULL, &psize);
  126. if (psize != -1)
  127. {
  128. // We already know the size of the data, let's send it to overlap with the packing of the data
  129. _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->mpi_tag, req->srcdst);
  130. req->count = psize;
  131. ret = MPI_Isend(&req->count, sizeof(req->count), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->size_req);
  132. STARPU_ASSERT(ret == MPI_SUCCESS);
  133. }
  134. // Pack the data
  135. starpu_handle_pack_data(req->data_handle, &req->ptr, &req->count);
  136. if (psize == -1)
  137. {
  138. // We know the size now, let's send it
  139. _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", req->count, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->mpi_tag, req->srcdst);
  140. ret = MPI_Isend(&req->count, sizeof(req->count), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->size_req);
  141. STARPU_ASSERT(ret == MPI_SUCCESS);
  142. }
  143. else
  144. {
  145. // We check the size returned with the 2 calls to pack is the same
  146. STARPU_ASSERT(req->count == psize);
  147. }
  148. // We can send the data now
  149. }
  150. _starpu_mpi_isend_data_func(req);
  151. }
  152. static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
  153. int dest, int mpi_tag, MPI_Comm comm,
  154. unsigned detached, void (*callback)(void *), void *arg)
  155. {
  156. return _starpu_mpi_isend_irecv_common(data_handle, dest, mpi_tag, comm, detached, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R);
  157. }
  158. int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
  159. {
  160. _STARPU_MPI_LOG_IN();
  161. STARPU_ASSERT(public_req);
  162. struct _starpu_mpi_req *req;
  163. req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, NULL, NULL);
  164. STARPU_ASSERT(req);
  165. *public_req = req;
  166. _STARPU_MPI_LOG_OUT();
  167. return 0;
  168. }
  169. int starpu_mpi_isend_detached(starpu_data_handle_t data_handle,
  170. int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
  171. {
  172. _STARPU_MPI_LOG_IN();
  173. _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, callback, arg);
  174. _STARPU_MPI_LOG_OUT();
  175. return 0;
  176. }
  177. int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm)
  178. {
  179. starpu_mpi_req req;
  180. MPI_Status status;
  181. _STARPU_MPI_LOG_IN();
  182. memset(&status, 0, sizeof(MPI_Status));
  183. starpu_mpi_isend(data_handle, &req, dest, mpi_tag, comm);
  184. starpu_mpi_wait(&req, &status);
  185. _STARPU_MPI_LOG_OUT();
  186. return 0;
  187. }
  188. /********************************************************/
  189. /* */
  190. /* Receive functionalities */
  191. /* */
  192. /********************************************************/
  193. static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
  194. {
  195. _STARPU_MPI_LOG_IN();
  196. STARPU_ASSERT(req->ptr);
  197. _STARPU_MPI_DEBUG(2, "post MPI irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
  198. TRACE_MPI_IRECV_SUBMIT_BEGIN(req->srcdst, req->mpi_tag);
  199. req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
  200. STARPU_ASSERT(req->ret == MPI_SUCCESS);
  201. TRACE_MPI_IRECV_SUBMIT_END(req->srcdst, req->mpi_tag);
  202. /* somebody is perhaps waiting for the MPI request to be posted */
  203. _STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  204. req->submitted = 1;
  205. _STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
  206. _STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  207. _starpu_mpi_handle_detached_request(req);
  208. _STARPU_MPI_LOG_OUT();
  209. }
  210. struct _starpu_mpi_irecv_size_callback
  211. {
  212. starpu_data_handle_t handle;
  213. struct _starpu_mpi_req *req;
  214. };
  215. static void _starpu_mpi_irecv_size_callback(void *arg)
  216. {
  217. struct _starpu_mpi_irecv_size_callback *callback = (struct _starpu_mpi_irecv_size_callback *)arg;
  218. starpu_data_unregister(callback->handle);
  219. callback->req->ptr = malloc(callback->req->count);
  220. STARPU_ASSERT_MSG(callback->req->ptr, "cannot allocate message of size %ld\n", callback->req->count);
  221. _starpu_mpi_irecv_data_func(callback->req);
  222. free(callback);
  223. }
  224. static void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
  225. {
  226. _STARPU_MPI_LOG_IN();
  227. _starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
  228. if (req->user_datatype == 0)
  229. {
  230. req->count = 1;
  231. req->ptr = starpu_handle_get_local_ptr(req->data_handle);
  232. _starpu_mpi_irecv_data_func(req);
  233. }
  234. else
  235. {
  236. struct _starpu_mpi_irecv_size_callback *callback = malloc(sizeof(struct _starpu_mpi_irecv_size_callback));
  237. callback->req = req;
  238. starpu_variable_data_register(&callback->handle, 0, (uintptr_t)&(callback->req->count), sizeof(callback->req->count));
  239. _STARPU_MPI_DEBUG(4, "Receiving size with tag %d from node %d\n", req->mpi_tag, req->srcdst);
  240. _starpu_mpi_irecv_common(callback->handle, req->srcdst, req->mpi_tag, req->comm, 1, _starpu_mpi_irecv_size_callback, callback);
  241. }
  242. }
  243. static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg)
  244. {
  245. return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, callback, arg, RECV_REQ, _starpu_mpi_irecv_size_func, STARPU_W);
  246. }
  247. int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
  248. {
  249. _STARPU_MPI_LOG_IN();
  250. STARPU_ASSERT(public_req);
  251. struct _starpu_mpi_req *req;
  252. req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, NULL, NULL);
  253. STARPU_ASSERT(req);
  254. *public_req = req;
  255. _STARPU_MPI_LOG_OUT();
  256. return 0;
  257. }
  258. int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
  259. {
  260. _STARPU_MPI_LOG_IN();
  261. _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg);
  262. _STARPU_MPI_LOG_OUT();
  263. return 0;
  264. }
  265. int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, MPI_Status *status)
  266. {
  267. starpu_mpi_req req;
  268. _STARPU_MPI_LOG_IN();
  269. starpu_mpi_irecv(data_handle, &req, source, mpi_tag, comm);
  270. starpu_mpi_wait(&req, status);
  271. _STARPU_MPI_LOG_OUT();
  272. return 0;
  273. }
  274. static void _starpu_mpi_probe_func(struct _starpu_mpi_req *req)
  275. {
  276. _STARPU_MPI_LOG_IN();
  277. _starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
  278. #ifdef STARPU_DEVEL
  279. #warning TODO: release that assert
  280. #endif
  281. assert(req->user_datatype == 0);
  282. req->count = 1;
  283. req->ptr = starpu_handle_get_local_ptr(req->data_handle);
  284. _STARPU_MPI_DEBUG(2, "MPI probe request %p type %s tag %d src %d data %p ptr %p datatype %d count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, req->user_datatype);
  285. /* somebody is perhaps waiting for the MPI request to be posted */
  286. _STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  287. req->submitted = 1;
  288. _STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
  289. _STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  290. _starpu_mpi_handle_detached_request(req);
  291. _STARPU_MPI_LOG_OUT();
  292. }
  293. int starpu_mpi_irecv_probe_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
  294. {
  295. _STARPU_MPI_LOG_IN();
  296. _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, PROBE_REQ, _starpu_mpi_probe_func, STARPU_W);
  297. _STARPU_MPI_LOG_OUT();
  298. return 0;
  299. }
  300. /********************************************************/
  301. /* */
  302. /* Wait functionalities */
  303. /* */
  304. /********************************************************/
  305. static void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
  306. {
  307. _STARPU_MPI_LOG_IN();
  308. /* Which is the mpi request we are waiting for ? */
  309. struct _starpu_mpi_req *req = waiting_req->other_request;
  310. TRACE_MPI_UWAIT_BEGIN(req->srcdst, req->mpi_tag);
  311. req->ret = MPI_Wait(&req->request, waiting_req->status);
  312. STARPU_ASSERT(req->ret == MPI_SUCCESS);
  313. TRACE_MPI_UWAIT_END(req->srcdst, req->mpi_tag);
  314. _starpu_mpi_handle_request_termination(req);
  315. _STARPU_MPI_LOG_OUT();
  316. }
  317. int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
  318. {
  319. _STARPU_MPI_LOG_IN();
  320. int ret;
  321. struct _starpu_mpi_req *waiting_req = calloc(1, sizeof(struct _starpu_mpi_req));
  322. STARPU_ASSERT(waiting_req);
  323. struct _starpu_mpi_req *req = *public_req;
  324. _STARPU_MPI_INC_POSTED_REQUESTS(1);
  325. /* We cannot try to complete a MPI request that was not actually posted
  326. * to MPI yet. */
  327. _STARPU_PTHREAD_MUTEX_LOCK(&(req->req_mutex));
  328. while (!(req->submitted))
  329. _STARPU_PTHREAD_COND_WAIT(&(req->req_cond), &(req->req_mutex));
  330. _STARPU_PTHREAD_MUTEX_UNLOCK(&(req->req_mutex));
  331. /* Initialize the request structure */
  332. _STARPU_PTHREAD_MUTEX_INIT(&(waiting_req->req_mutex), NULL);
  333. _STARPU_PTHREAD_COND_INIT(&(waiting_req->req_cond), NULL);
  334. waiting_req->status = status;
  335. waiting_req->other_request = req;
  336. waiting_req->func = _starpu_mpi_wait_func;
  337. waiting_req->request_type = WAIT_REQ;
  338. _starpu_mpi_submit_new_mpi_request(waiting_req);
  339. /* We wait for the MPI request to finish */
  340. _STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  341. while (!req->completed)
  342. _STARPU_PTHREAD_COND_WAIT(&req->req_cond, &req->req_mutex);
  343. _STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  344. ret = req->ret;
  345. /* The internal request structure was automatically allocated */
  346. *public_req = NULL;
  347. free(req);
  348. free(waiting_req);
  349. _STARPU_MPI_LOG_OUT();
  350. return ret;
  351. }
  352. /********************************************************/
  353. /* */
  354. /* Test functionalities */
  355. /* */
  356. /********************************************************/
  357. static void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
  358. {
  359. _STARPU_MPI_LOG_IN();
  360. /* Which is the mpi request we are testing for ? */
  361. struct _starpu_mpi_req *req = testing_req->other_request;
  362. _STARPU_MPI_DEBUG(2, "Test request %p type %s tag %d src %d data %p ptr %p datatype %d count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, req->user_datatype);
  363. TRACE_MPI_UTESTING_BEGIN(req->srcdst, req->mpi_tag);
  364. req->ret = MPI_Test(&req->request, testing_req->flag, testing_req->status);
  365. STARPU_ASSERT(req->ret == MPI_SUCCESS);
  366. TRACE_MPI_UTESTING_END(req->srcdst, req->mpi_tag);
  367. if (*testing_req->flag)
  368. {
  369. testing_req->ret = req->ret;
  370. _starpu_mpi_handle_request_termination(req);
  371. }
  372. _STARPU_PTHREAD_MUTEX_LOCK(&testing_req->req_mutex);
  373. testing_req->completed = 1;
  374. _STARPU_PTHREAD_COND_SIGNAL(&testing_req->req_cond);
  375. _STARPU_PTHREAD_MUTEX_UNLOCK(&testing_req->req_mutex);
  376. _STARPU_MPI_LOG_OUT();
  377. }
  378. int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
  379. {
  380. _STARPU_MPI_LOG_IN();
  381. int ret = 0;
  382. STARPU_ASSERT(public_req);
  383. struct _starpu_mpi_req *req = *public_req;
  384. STARPU_ASSERT(!req->detached);
  385. _STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  386. unsigned submitted = req->submitted;
  387. _STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  388. if (submitted)
  389. {
  390. struct _starpu_mpi_req *testing_req = calloc(1, sizeof(struct _starpu_mpi_req));
  391. STARPU_ASSERT(testing_req);
  392. // memset(testing_req, 0, sizeof(struct _starpu_mpi_req));
  393. /* Initialize the request structure */
  394. _STARPU_PTHREAD_MUTEX_INIT(&(testing_req->req_mutex), NULL);
  395. _STARPU_PTHREAD_COND_INIT(&(testing_req->req_cond), NULL);
  396. testing_req->flag = flag;
  397. testing_req->status = status;
  398. testing_req->other_request = req;
  399. testing_req->func = _starpu_mpi_test_func;
  400. testing_req->completed = 0;
  401. testing_req->request_type = TEST_REQ;
  402. _STARPU_MPI_INC_POSTED_REQUESTS(1);
  403. _starpu_mpi_submit_new_mpi_request(testing_req);
  404. /* We wait for the test request to finish */
  405. _STARPU_PTHREAD_MUTEX_LOCK(&(testing_req->req_mutex));
  406. while (!(testing_req->completed))
  407. _STARPU_PTHREAD_COND_WAIT(&(testing_req->req_cond), &(testing_req->req_mutex));
  408. _STARPU_PTHREAD_MUTEX_UNLOCK(&(testing_req->req_mutex));
  409. ret = testing_req->ret;
  410. if (*(testing_req->flag))
  411. {
  412. /* The request was completed so we free the internal
  413. * request structure which was automatically allocated
  414. * */
  415. *public_req = NULL;
  416. free(req);
  417. }
  418. free(testing_req);
  419. }
  420. else
  421. {
  422. *flag = 0;
  423. }
  424. _STARPU_MPI_LOG_OUT();
  425. return ret;
  426. }
  427. /********************************************************/
  428. /* */
  429. /* Barrier functionalities */
  430. /* */
  431. /********************************************************/
  432. static void _starpu_mpi_barrier_func(struct _starpu_mpi_req *barrier_req)
  433. {
  434. _STARPU_MPI_LOG_IN();
  435. barrier_req->ret = MPI_Barrier(barrier_req->comm);
  436. STARPU_ASSERT(barrier_req->ret == MPI_SUCCESS);
  437. _starpu_mpi_handle_request_termination(barrier_req);
  438. _STARPU_MPI_LOG_OUT();
  439. }
  440. int starpu_mpi_barrier(MPI_Comm comm)
  441. {
  442. _STARPU_MPI_LOG_IN();
  443. int ret;
  444. struct _starpu_mpi_req *barrier_req = calloc(1, sizeof(struct _starpu_mpi_req));
  445. STARPU_ASSERT(barrier_req);
  446. /* First wait for *both* all tasks and MPI requests to finish, in case
  447. * some tasks generate MPI requests, MPI requests generate tasks, etc.
  448. */
  449. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  450. STARPU_ASSERT_MSG(!barrier_running, "Concurrent starpu_mpi_barrier is not implemented, even on different communicators");
  451. barrier_running = 1;
  452. do
  453. {
  454. while (posted_requests)
  455. /* Wait for all current MPI requests to finish */
  456. _STARPU_PTHREAD_COND_WAIT(&cond_finished, &mutex);
  457. /* No current request, clear flag */
  458. newer_requests = 0;
  459. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  460. /* Now wait for all tasks */
  461. starpu_task_wait_for_all();
  462. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  463. /* Check newer_requests again, in case some MPI requests
  464. * triggered by tasks completed and triggered tasks between
  465. * wait_for_all finished and we take the lock */
  466. } while (posted_requests || newer_requests);
  467. barrier_running = 0;
  468. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  469. /* Initialize the request structure */
  470. _STARPU_PTHREAD_MUTEX_INIT(&(barrier_req->req_mutex), NULL);
  471. _STARPU_PTHREAD_COND_INIT(&(barrier_req->req_cond), NULL);
  472. barrier_req->func = _starpu_mpi_barrier_func;
  473. barrier_req->request_type = BARRIER_REQ;
  474. barrier_req->comm = comm;
  475. _STARPU_MPI_INC_POSTED_REQUESTS(1);
  476. _starpu_mpi_submit_new_mpi_request(barrier_req);
  477. /* We wait for the MPI request to finish */
  478. _STARPU_PTHREAD_MUTEX_LOCK(&barrier_req->req_mutex);
  479. while (!barrier_req->completed)
  480. _STARPU_PTHREAD_COND_WAIT(&barrier_req->req_cond, &barrier_req->req_mutex);
  481. _STARPU_PTHREAD_MUTEX_UNLOCK(&barrier_req->req_mutex);
  482. ret = barrier_req->ret;
  483. free(barrier_req);
  484. _STARPU_MPI_LOG_OUT();
  485. return ret;
  486. }
  487. /********************************************************/
  488. /* */
  489. /* Progression */
  490. /* */
  491. /********************************************************/
  492. #ifdef STARPU_VERBOSE
  493. static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type)
  494. {
  495. switch (request_type)
  496. {
  497. case SEND_REQ: return "SEND_REQ";
  498. case RECV_REQ: return "RECV_REQ";
  499. case WAIT_REQ: return "WAIT_REQ";
  500. case TEST_REQ: return "TEST_REQ";
  501. case BARRIER_REQ: return "BARRIER_REQ";
  502. case PROBE_REQ: return "PROBE_REQ";
  503. default: return "unknown request type";
  504. }
  505. }
  506. #endif
  507. static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
  508. {
  509. int ret;
  510. _STARPU_MPI_LOG_IN();
  511. _STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %d src %d data %p ptr %p datatype %d count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, req->user_datatype);
  512. if (req->request_type == PROBE_REQ)
  513. {
  514. #ifdef STARPU_DEVEL
  515. #warning TODO: instead of calling MPI_Recv, we should post a starpu mpi recv request
  516. #endif
  517. MPI_Status status;
  518. memset(&status, 0, sizeof(MPI_Status));
  519. req->ret = MPI_Recv(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &status);
  520. STARPU_ASSERT(req->ret == MPI_SUCCESS);
  521. }
  522. if (req->request_type == RECV_REQ || req->request_type == SEND_REQ || req->request_type == PROBE_REQ)
  523. {
  524. if (req->user_datatype == 1)
  525. {
  526. if (req->request_type == SEND_REQ)
  527. {
  528. // We already know the request to send the size is completed, we just call MPI_Test to make sure that the request object is deallocated
  529. MPI_Status status;
  530. int flag;
  531. ret = MPI_Test(&req->size_req, &flag, &status);
  532. STARPU_ASSERT(ret == MPI_SUCCESS);
  533. STARPU_ASSERT(flag);
  534. }
  535. if (req->request_type == RECV_REQ)
  536. // req->ptr is freed by starpu_handle_unpack_data
  537. starpu_handle_unpack_data(req->data_handle, req->ptr, req->count);
  538. else
  539. free(req->ptr);
  540. }
  541. else
  542. {
  543. _starpu_mpi_handle_free_datatype(req->data_handle, &req->datatype);
  544. }
  545. starpu_data_release(req->data_handle);
  546. }
  547. /* Execute the specified callback, if any */
  548. if (req->callback)
  549. req->callback(req->callback_arg);
  550. /* tell anyone potentially waiting on the request that it is
  551. * terminated now */
  552. _STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  553. req->completed = 1;
  554. _STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
  555. _STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  556. _STARPU_MPI_LOG_OUT();
  557. }
  558. static void _starpu_mpi_submit_new_mpi_request(void *arg)
  559. {
  560. _STARPU_MPI_LOG_IN();
  561. struct _starpu_mpi_req *req = arg;
  562. _STARPU_MPI_INC_POSTED_REQUESTS(-1);
  563. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  564. _starpu_mpi_req_list_push_front(new_requests, req);
  565. newer_requests = 1;
  566. _STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype %d count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, req->user_datatype);
  567. _STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
  568. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  569. _STARPU_MPI_LOG_OUT();
  570. }
  571. #ifdef STARPU_MPI_ACTIVITY
  572. static unsigned _starpu_mpi_progression_hook_func(void *arg __attribute__((unused)))
  573. {
  574. unsigned may_block = 1;
  575. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  576. if (!_starpu_mpi_req_list_empty(detached_requests))
  577. {
  578. _STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
  579. may_block = 0;
  580. }
  581. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  582. return may_block;
  583. }
  584. #endif /* STARPU_MPI_ACTIVITY */
  585. static void _starpu_mpi_test_detached_requests(void)
  586. {
  587. _STARPU_MPI_LOG_IN();
  588. int flag;
  589. MPI_Status status;
  590. struct _starpu_mpi_req *req, *next_req;
  591. _STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
  592. for (req = _starpu_mpi_req_list_begin(detached_requests);
  593. req != _starpu_mpi_req_list_end(detached_requests);
  594. req = next_req)
  595. {
  596. next_req = _starpu_mpi_req_list_next(req);
  597. _STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  598. //_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->request, req->mpi_tag, _starpu_mpi_request_type(req->request_type), req->srcdst);
  599. if (req->request_type == PROBE_REQ)
  600. {
  601. req->ret = MPI_Iprobe(req->srcdst, req->mpi_tag, req->comm, &flag, &status);
  602. }
  603. else
  604. {
  605. req->ret = MPI_Test(&req->request, &flag, &status);
  606. }
  607. STARPU_ASSERT(req->ret == MPI_SUCCESS);
  608. if (flag)
  609. {
  610. if (req->request_type == RECV_REQ || req->request_type == PROBE_REQ)
  611. {
  612. TRACE_MPI_IRECV_COMPLETE_BEGIN(req->srcdst, req->mpi_tag);
  613. }
  614. else if (req->request_type == SEND_REQ)
  615. {
  616. TRACE_MPI_ISEND_COMPLETE_BEGIN(req->srcdst, req->mpi_tag, 0);
  617. }
  618. _starpu_mpi_handle_request_termination(req);
  619. if (req->request_type == RECV_REQ || req->request_type == PROBE_REQ)
  620. {
  621. TRACE_MPI_IRECV_COMPLETE_END(req->srcdst, req->mpi_tag);
  622. }
  623. else if (req->request_type == SEND_REQ)
  624. {
  625. TRACE_MPI_ISEND_COMPLETE_END(req->srcdst, req->mpi_tag, 0);
  626. }
  627. }
  628. _STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
  629. if (flag)
  630. {
  631. _starpu_mpi_req_list_erase(detached_requests, req);
  632. free(req);
  633. }
  634. }
  635. _STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  636. _STARPU_MPI_LOG_OUT();
  637. }
  638. static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req)
  639. {
  640. if (req->detached)
  641. {
  642. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  643. _starpu_mpi_req_list_push_front(detached_requests, req);
  644. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  645. starpu_wake_all_blocked_workers();
  646. /* put the submitted request into the list of pending requests
  647. * so that it can be handled by the progression mechanisms */
  648. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  649. _STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
  650. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  651. }
  652. }
  653. static void _starpu_mpi_handle_new_request(struct _starpu_mpi_req *req)
  654. {
  655. _STARPU_MPI_LOG_IN();
  656. STARPU_ASSERT(req);
  657. /* submit the request to MPI */
  658. _STARPU_MPI_DEBUG(2, "Handling new request %p type %s tag %d src %d data %p ptr %p datatype %d count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, req->datatype, (int)req->count, req->user_datatype);
  659. req->func(req);
  660. _STARPU_MPI_LOG_OUT();
  661. }
  662. struct _starpu_mpi_argc_argv
  663. {
  664. int initialize_mpi;
  665. int *argc;
  666. char ***argv;
  667. };
  668. static void _starpu_mpi_print_thread_level_support(int thread_level, char *msg)
  669. {
  670. switch (thread_level)
  671. {
  672. case MPI_THREAD_SERIALIZED:
  673. {
  674. _STARPU_DISP("MPI%s MPI_THREAD_SERIALIZED; Multiple threads may make MPI calls, but only one at a time.\n", msg);
  675. break;
  676. }
  677. case MPI_THREAD_FUNNELED:
  678. {
  679. _STARPU_DISP("MPI%s MPI_THREAD_FUNNELED; The application can safely make calls to StarPU-MPI functions, but should not call directly MPI communication functions.\n", msg);
  680. break;
  681. }
  682. case MPI_THREAD_SINGLE:
  683. {
  684. _STARPU_DISP("MPI%s MPI_THREAD_SINGLE; MPI does not have multi-thread support, this might cause problems. The application can make calls to StarPU-MPI functions, but not call directly MPI Communication functions.\n", msg);
  685. break;
  686. }
  687. }
  688. }
  689. static void *_starpu_mpi_progress_thread_func(void *arg)
  690. {
  691. struct _starpu_mpi_argc_argv *argc_argv = (struct _starpu_mpi_argc_argv *) arg;
  692. if (argc_argv->initialize_mpi)
  693. {
  694. int thread_support;
  695. _STARPU_DEBUG("Calling MPI_Init_thread\n");
  696. if (MPI_Init_thread(argc_argv->argc, argc_argv->argv, MPI_THREAD_SERIALIZED, &thread_support) != MPI_SUCCESS)
  697. {
  698. _STARPU_ERROR("MPI_Init_thread failed\n");
  699. }
  700. _starpu_mpi_print_thread_level_support(thread_support, "_Init_thread level =");
  701. }
  702. else
  703. {
  704. int provided;
  705. MPI_Query_thread(&provided);
  706. _starpu_mpi_print_thread_level_support(provided, " has been initialized with");
  707. }
  708. {
  709. int rank, worldsize;
  710. MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  711. MPI_Comm_size(MPI_COMM_WORLD, &worldsize);
  712. TRACE_MPI_START(rank, worldsize);
  713. #ifdef STARPU_USE_FXT
  714. starpu_set_profiling_id(rank);
  715. #endif //STARPU_USE_FXT
  716. }
  717. /* notify the main thread that the progression thread is ready */
  718. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  719. running = 1;
  720. _STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
  721. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  722. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  723. while (running || posted_requests || !(_starpu_mpi_req_list_empty(new_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
  724. {
  725. /* shall we block ? */
  726. unsigned block = _starpu_mpi_req_list_empty(new_requests);
  727. #ifndef STARPU_MPI_ACTIVITY
  728. block = block && _starpu_mpi_req_list_empty(detached_requests);
  729. #endif /* STARPU_MPI_ACTIVITY */
  730. if (block)
  731. {
  732. _STARPU_MPI_DEBUG(3, "NO MORE REQUESTS TO HANDLE\n");
  733. TRACE_MPI_SLEEP_BEGIN();
  734. if (barrier_running)
  735. /* Tell mpi_barrier */
  736. _STARPU_PTHREAD_COND_SIGNAL(&cond_finished);
  737. _STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
  738. TRACE_MPI_SLEEP_END();
  739. }
  740. /* test whether there are some terminated "detached request" */
  741. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  742. _starpu_mpi_test_detached_requests();
  743. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  744. /* get one request */
  745. struct _starpu_mpi_req *req;
  746. while (!_starpu_mpi_req_list_empty(new_requests))
  747. {
  748. req = _starpu_mpi_req_list_pop_back(new_requests);
  749. /* handling a request is likely to block for a while
  750. * (on a sync_data_with_mem call), we want to let the
  751. * application submit requests in the meantime, so we
  752. * release the lock. */
  753. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  754. _starpu_mpi_handle_new_request(req);
  755. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  756. }
  757. }
  758. STARPU_ASSERT(_starpu_mpi_req_list_empty(detached_requests));
  759. STARPU_ASSERT(_starpu_mpi_req_list_empty(new_requests));
  760. STARPU_ASSERT(posted_requests == 0);
  761. if (argc_argv->initialize_mpi)
  762. {
  763. _STARPU_MPI_DEBUG(3, "Calling MPI_Finalize()\n");
  764. MPI_Finalize();
  765. }
  766. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  767. free(argc_argv);
  768. return NULL;
  769. }
  770. /********************************************************/
  771. /* */
  772. /* (De)Initialization methods */
  773. /* */
  774. /********************************************************/
  775. #ifdef STARPU_MPI_ACTIVITY
  776. static int hookid = - 1;
  777. #endif /* STARPU_MPI_ACTIVITY */
  778. static void _starpu_mpi_add_sync_point_in_fxt(void)
  779. {
  780. #ifdef STARPU_USE_FXT
  781. int rank;
  782. int worldsize;
  783. int ret;
  784. MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  785. MPI_Comm_size(MPI_COMM_WORLD, &worldsize);
  786. ret = MPI_Barrier(MPI_COMM_WORLD);
  787. STARPU_ASSERT(ret == MPI_SUCCESS);
  788. /* We generate a "unique" key so that we can make sure that different
  789. * FxT traces come from the same MPI run. */
  790. int random_number;
  791. /* XXX perhaps we don't want to generate a new seed if the application
  792. * specified some reproductible behaviour ? */
  793. if (rank == 0)
  794. {
  795. srand(time(NULL));
  796. random_number = rand();
  797. }
  798. ret = MPI_Bcast(&random_number, 1, MPI_INT, 0, MPI_COMM_WORLD);
  799. STARPU_ASSERT(ret == MPI_SUCCESS);
  800. TRACE_MPI_BARRIER(rank, worldsize, random_number);
  801. _STARPU_MPI_DEBUG(3, "unique key %x\n", random_number);
  802. #endif
  803. }
  804. static
  805. int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
  806. {
  807. _STARPU_PTHREAD_MUTEX_INIT(&mutex, NULL);
  808. _STARPU_PTHREAD_COND_INIT(&cond_progression, NULL);
  809. _STARPU_PTHREAD_COND_INIT(&cond_finished, NULL);
  810. new_requests = _starpu_mpi_req_list_new();
  811. _STARPU_PTHREAD_MUTEX_INIT(&detached_requests_mutex, NULL);
  812. detached_requests = _starpu_mpi_req_list_new();
  813. _STARPU_PTHREAD_MUTEX_INIT(&mutex_posted_requests, NULL);
  814. struct _starpu_mpi_argc_argv *argc_argv = malloc(sizeof(struct _starpu_mpi_argc_argv));
  815. argc_argv->initialize_mpi = initialize_mpi;
  816. argc_argv->argc = argc;
  817. argc_argv->argv = argv;
  818. _STARPU_PTHREAD_CREATE("MPI progress", &progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
  819. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  820. while (!running)
  821. _STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
  822. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  823. #ifdef STARPU_MPI_ACTIVITY
  824. hookid = starpu_progression_hook_register(progression_hook_func, NULL);
  825. STARPU_ASSERT(hookid >= 0);
  826. #endif /* STARPU_MPI_ACTIVITY */
  827. _starpu_mpi_add_sync_point_in_fxt();
  828. _starpu_mpi_comm_amounts_init(MPI_COMM_WORLD);
  829. _starpu_mpi_cache_init(MPI_COMM_WORLD);
  830. return 0;
  831. }
  832. int starpu_mpi_init(int *argc, char ***argv, int initialize_mpi)
  833. {
  834. return _starpu_mpi_initialize(argc, argv, initialize_mpi);
  835. }
  836. int starpu_mpi_initialize(void)
  837. {
  838. return _starpu_mpi_initialize(NULL, NULL, 0);
  839. }
  840. int starpu_mpi_initialize_extended(int *rank, int *world_size)
  841. {
  842. int ret;
  843. ret = _starpu_mpi_initialize(NULL, NULL, 1);
  844. if (ret == 0)
  845. {
  846. _STARPU_DEBUG("Calling MPI_Comm_rank\n");
  847. MPI_Comm_rank(MPI_COMM_WORLD, rank);
  848. MPI_Comm_size(MPI_COMM_WORLD, world_size);
  849. }
  850. return ret;
  851. }
  852. int starpu_mpi_shutdown(void)
  853. {
  854. void *value;
  855. int rank, world_size;
  856. /* We need to get the rank before calling MPI_Finalize to pass to _starpu_mpi_comm_amounts_display() */
  857. MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  858. MPI_Comm_size(MPI_COMM_WORLD, &world_size);
  859. /* kill the progression thread */
  860. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  861. running = 0;
  862. _STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
  863. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  864. pthread_join(progress_thread, &value);
  865. #ifdef STARPU_MPI_ACTIVITY
  866. starpu_progression_hook_deregister(hookid);
  867. #endif /* STARPU_MPI_ACTIVITY */
  868. TRACE_MPI_STOP(rank, world_size);
  869. /* free the request queues */
  870. _starpu_mpi_req_list_delete(detached_requests);
  871. _starpu_mpi_req_list_delete(new_requests);
  872. _starpu_mpi_comm_amounts_display(rank);
  873. _starpu_mpi_comm_amounts_free();
  874. _starpu_mpi_cache_free(world_size);
  875. return 0;
  876. }