starpu_mpi.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009, 2010-2012 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011, 2012 Centre National de la Recherche Scientifique
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <stdlib.h>
  18. #include <starpu_mpi.h>
  19. #include <starpu_mpi_datatype.h>
  20. //#define STARPU_MPI_VERBOSE 1
  21. #include <starpu_mpi_private.h>
  22. #include <starpu_profiling.h>
  23. #include <starpu_mpi_stats.h>
  24. #include <starpu_mpi_insert_task.h>
  25. /* TODO find a better way to select the polling method (perhaps during the
  26. * configuration) */
  27. //#define USE_STARPU_ACTIVITY 1
  28. static void submit_mpi_req(void *arg);
  29. static void handle_request_termination(struct _starpu_mpi_req *req);
  30. /* The list of requests that have been newly submitted by the application */
  31. static struct _starpu_mpi_req_list *new_requests;
  32. /* The list of detached requests that have already been submitted to MPI */
  33. static struct _starpu_mpi_req_list *detached_requests;
  34. static pthread_mutex_t detached_requests_mutex;
  35. /* Condition to wake up progression thread */
  36. static pthread_cond_t cond_progression;
  37. /* Condition to wake up waiting for all current MPI requests to finish */
  38. static pthread_cond_t cond_finished;
  39. static pthread_mutex_t mutex;
  40. static pthread_t progress_thread;
  41. static int running = 0;
  42. /* Count requests posted by the application and not yet submitted to MPI, i.e pushed into the new_requests list */
  43. static pthread_mutex_t mutex_posted_requests;
  44. static int posted_requests = 0, newer_requests, barrier_running = 0;
  45. #define INC_POSTED_REQUESTS(value) { _STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
  46. /*
  47. * Isend
  48. */
  49. static void starpu_mpi_isend_func(struct _starpu_mpi_req *req)
  50. {
  51. int count;
  52. _STARPU_MPI_LOG_IN();
  53. req->needs_unpacking = starpu_mpi_handle_to_datatype(req->data_handle, &req->datatype, &count);
  54. if (req->needs_unpacking)
  55. starpu_handle_pack_data(req->data_handle, &req->ptr);
  56. else
  57. req->ptr = starpu_handle_get_local_ptr(req->data_handle);
  58. STARPU_ASSERT(req->ptr);
  59. _STARPU_MPI_DEBUG("post MPI isend tag %d dst %d ptr %p datatype %p count %d req %p\n", req->mpi_tag, req->srcdst, req->ptr, req->datatype, count, &req->request);
  60. _starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, count);
  61. req->ret = MPI_Isend(req->ptr, count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
  62. STARPU_ASSERT(req->ret == MPI_SUCCESS);
  63. TRACE_MPI_ISEND(req->srcdst, req->mpi_tag, 0);
  64. /* somebody is perhaps waiting for the MPI request to be posted */
  65. _STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  66. req->submitted = 1;
  67. _STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
  68. _STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  69. _STARPU_MPI_LOG_OUT();
  70. }
  71. static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
  72. int dest, int mpi_tag, MPI_Comm comm,
  73. unsigned detached, void (*callback)(void *), void *arg)
  74. {
  75. struct _starpu_mpi_req *req = calloc(1, sizeof(struct _starpu_mpi_req));
  76. STARPU_ASSERT(req);
  77. _STARPU_MPI_LOG_IN();
  78. INC_POSTED_REQUESTS(1);
  79. /* Initialize the request structure */
  80. req->submitted = 0;
  81. req->completed = 0;
  82. _STARPU_PTHREAD_MUTEX_INIT(&req->req_mutex, NULL);
  83. _STARPU_PTHREAD_COND_INIT(&req->req_cond, NULL);
  84. req->request_type = SEND_REQ;
  85. req->data_handle = data_handle;
  86. req->srcdst = dest;
  87. req->mpi_tag = mpi_tag;
  88. req->comm = comm;
  89. req->func = starpu_mpi_isend_func;
  90. req->detached = detached;
  91. req->callback = callback;
  92. req->callback_arg = arg;
  93. /* Asynchronously request StarPU to fetch the data in main memory: when
  94. * it is available in main memory, submit_mpi_req(req) is called and
  95. * the request is actually submitted */
  96. starpu_data_acquire_cb(data_handle, STARPU_R, submit_mpi_req, (void *)req);
  97. _STARPU_MPI_LOG_OUT();
  98. return req;
  99. }
  100. int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
  101. {
  102. _STARPU_MPI_LOG_IN();
  103. STARPU_ASSERT(public_req);
  104. struct _starpu_mpi_req *req;
  105. req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, NULL, NULL);
  106. STARPU_ASSERT(req);
  107. *public_req = req;
  108. _STARPU_MPI_LOG_OUT();
  109. return 0;
  110. }
  111. /*
  112. * Isend (detached)
  113. */
  114. int starpu_mpi_isend_detached(starpu_data_handle_t data_handle,
  115. int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
  116. {
  117. _STARPU_MPI_LOG_IN();
  118. _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, callback, arg);
  119. _STARPU_MPI_LOG_OUT();
  120. return 0;
  121. }
  122. /*
  123. * Irecv
  124. */
  125. static void starpu_mpi_irecv_func(struct _starpu_mpi_req *req)
  126. {
  127. int count;
  128. _STARPU_MPI_LOG_IN();
  129. req->needs_unpacking = starpu_mpi_handle_to_datatype(req->data_handle, &req->datatype, &count);
  130. if (req->needs_unpacking == 1)
  131. req->ptr = malloc(count);
  132. else
  133. req->ptr = starpu_handle_get_local_ptr(req->data_handle);
  134. STARPU_ASSERT(req->ptr);
  135. _STARPU_MPI_DEBUG("post MPI irecv tag %d src %d data %p ptr %p req %p datatype %p\n", req->mpi_tag, req->srcdst, req->data_handle, req->ptr, &req->request, req->datatype);
  136. req->ret = MPI_Irecv(req->ptr, count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
  137. STARPU_ASSERT(req->ret == MPI_SUCCESS);
  138. /* somebody is perhaps waiting for the MPI request to be posted */
  139. _STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  140. req->submitted = 1;
  141. _STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
  142. _STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  143. _STARPU_MPI_LOG_OUT();
  144. }
  145. static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg)
  146. {
  147. _STARPU_MPI_LOG_IN();
  148. struct _starpu_mpi_req *req = calloc(1, sizeof(struct _starpu_mpi_req));
  149. STARPU_ASSERT(req);
  150. INC_POSTED_REQUESTS(1);
  151. /* Initialize the request structure */
  152. req->submitted = 0;
  153. _STARPU_PTHREAD_MUTEX_INIT(&req->req_mutex, NULL);
  154. _STARPU_PTHREAD_COND_INIT(&req->req_cond, NULL);
  155. req->request_type = RECV_REQ;
  156. req->data_handle = data_handle;
  157. req->srcdst = source;
  158. req->mpi_tag = mpi_tag;
  159. req->comm = comm;
  160. req->detached = detached;
  161. req->callback = callback;
  162. req->callback_arg = arg;
  163. req->func = starpu_mpi_irecv_func;
  164. /* Asynchronously request StarPU to fetch the data in main memory: when
  165. * it is available in main memory, submit_mpi_req(req) is called and
  166. * the request is actually submitted */
  167. starpu_data_acquire_cb(data_handle, STARPU_W, submit_mpi_req, (void *)req);
  168. _STARPU_MPI_LOG_OUT();
  169. return req;
  170. }
  171. int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
  172. {
  173. _STARPU_MPI_LOG_IN();
  174. STARPU_ASSERT(public_req);
  175. struct _starpu_mpi_req *req;
  176. req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, NULL, NULL);
  177. STARPU_ASSERT(req);
  178. *public_req = req;
  179. _STARPU_MPI_LOG_OUT();
  180. return 0;
  181. }
  182. /*
  183. * Irecv (detached)
  184. */
  185. int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
  186. {
  187. _STARPU_MPI_LOG_IN();
  188. _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg);
  189. _STARPU_MPI_LOG_OUT();
  190. return 0;
  191. }
  192. /*
  193. * Recv
  194. */
  195. int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, MPI_Status *status)
  196. {
  197. starpu_mpi_req req;
  198. _STARPU_MPI_LOG_IN();
  199. starpu_mpi_irecv(data_handle, &req, source, mpi_tag, comm);
  200. starpu_mpi_wait(&req, status);
  201. _STARPU_MPI_LOG_OUT();
  202. return 0;
  203. }
  204. /*
  205. * Send
  206. */
  207. int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm)
  208. {
  209. starpu_mpi_req req;
  210. MPI_Status status;
  211. _STARPU_MPI_LOG_IN();
  212. memset(&status, 0, sizeof(MPI_Status));
  213. starpu_mpi_isend(data_handle, &req, dest, mpi_tag, comm);
  214. starpu_mpi_wait(&req, &status);
  215. _STARPU_MPI_LOG_OUT();
  216. return 0;
  217. }
  218. /*
  219. * Wait
  220. */
  221. static void starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
  222. {
  223. _STARPU_MPI_LOG_IN();
  224. /* Which is the mpi request we are waiting for ? */
  225. struct _starpu_mpi_req *req = waiting_req->other_request;
  226. req->ret = MPI_Wait(&req->request, waiting_req->status);
  227. STARPU_ASSERT(req->ret == MPI_SUCCESS);
  228. handle_request_termination(req);
  229. _STARPU_MPI_LOG_OUT();
  230. }
  231. int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
  232. {
  233. _STARPU_MPI_LOG_IN();
  234. int ret;
  235. struct _starpu_mpi_req *waiting_req = calloc(1, sizeof(struct _starpu_mpi_req));
  236. STARPU_ASSERT(waiting_req);
  237. struct _starpu_mpi_req *req = *public_req;
  238. INC_POSTED_REQUESTS(1);
  239. /* We cannot try to complete a MPI request that was not actually posted
  240. * to MPI yet. */
  241. _STARPU_PTHREAD_MUTEX_LOCK(&(req->req_mutex));
  242. while (!(req->submitted))
  243. _STARPU_PTHREAD_COND_WAIT(&(req->req_cond), &(req->req_mutex));
  244. _STARPU_PTHREAD_MUTEX_UNLOCK(&(req->req_mutex));
  245. /* Initialize the request structure */
  246. _STARPU_PTHREAD_MUTEX_INIT(&(waiting_req->req_mutex), NULL);
  247. _STARPU_PTHREAD_COND_INIT(&(waiting_req->req_cond), NULL);
  248. waiting_req->status = status;
  249. waiting_req->other_request = req;
  250. waiting_req->func = starpu_mpi_wait_func;
  251. waiting_req->request_type = WAIT_REQ;
  252. submit_mpi_req(waiting_req);
  253. /* We wait for the MPI request to finish */
  254. _STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  255. while (!req->completed)
  256. _STARPU_PTHREAD_COND_WAIT(&req->req_cond, &req->req_mutex);
  257. _STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  258. ret = req->ret;
  259. /* The internal request structure was automatically allocated */
  260. *public_req = NULL;
  261. free(req);
  262. //free(waiting_req);
  263. _STARPU_MPI_LOG_OUT();
  264. return ret;
  265. }
  266. /*
  267. * Test
  268. */
  269. static void starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
  270. {
  271. _STARPU_MPI_LOG_IN();
  272. /* Which is the mpi request we are testing for ? */
  273. struct _starpu_mpi_req *req = testing_req->other_request;
  274. _STARPU_MPI_DEBUG("Test request %p - mpitag %d - TYPE %s %d\n", &req->request, req->mpi_tag, (req->request_type == RECV_REQ)?"recv : source":"send : dest", req->srcdst);
  275. req->ret = MPI_Test(&req->request, testing_req->flag, testing_req->status);
  276. STARPU_ASSERT(req->ret == MPI_SUCCESS);
  277. if (*testing_req->flag)
  278. {
  279. testing_req->ret = req->ret;
  280. handle_request_termination(req);
  281. }
  282. _STARPU_PTHREAD_MUTEX_LOCK(&testing_req->req_mutex);
  283. testing_req->completed = 1;
  284. _STARPU_PTHREAD_COND_SIGNAL(&testing_req->req_cond);
  285. _STARPU_PTHREAD_MUTEX_UNLOCK(&testing_req->req_mutex);
  286. _STARPU_MPI_LOG_OUT();
  287. }
  288. int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
  289. {
  290. _STARPU_MPI_LOG_IN();
  291. int ret = 0;
  292. STARPU_ASSERT(public_req);
  293. struct _starpu_mpi_req *req = *public_req;
  294. STARPU_ASSERT(!req->detached);
  295. _STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  296. unsigned submitted = req->submitted;
  297. _STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  298. if (submitted)
  299. {
  300. struct _starpu_mpi_req *testing_req = calloc(1, sizeof(struct _starpu_mpi_req));
  301. STARPU_ASSERT(testing_req);
  302. // memset(testing_req, 0, sizeof(struct _starpu_mpi_req));
  303. /* Initialize the request structure */
  304. _STARPU_PTHREAD_MUTEX_INIT(&(testing_req->req_mutex), NULL);
  305. _STARPU_PTHREAD_COND_INIT(&(testing_req->req_cond), NULL);
  306. testing_req->flag = flag;
  307. testing_req->status = status;
  308. testing_req->other_request = req;
  309. testing_req->func = starpu_mpi_test_func;
  310. testing_req->completed = 0;
  311. testing_req->request_type = TEST_REQ;
  312. INC_POSTED_REQUESTS(1);
  313. submit_mpi_req(testing_req);
  314. /* We wait for the test request to finish */
  315. _STARPU_PTHREAD_MUTEX_LOCK(&(testing_req->req_mutex));
  316. while (!(testing_req->completed))
  317. _STARPU_PTHREAD_COND_WAIT(&(testing_req->req_cond), &(testing_req->req_mutex));
  318. _STARPU_PTHREAD_MUTEX_UNLOCK(&(testing_req->req_mutex));
  319. ret = testing_req->ret;
  320. if (*(testing_req->flag))
  321. {
  322. /* The request was completed so we free the internal
  323. * request structure which was automatically allocated
  324. * */
  325. *public_req = NULL;
  326. free(req);
  327. }
  328. }
  329. else {
  330. *flag = 0;
  331. }
  332. _STARPU_MPI_LOG_OUT();
  333. return ret;
  334. }
  335. /*
  336. * Barrier
  337. */
  338. static void starpu_mpi_barrier_func(struct _starpu_mpi_req *barrier_req)
  339. {
  340. _STARPU_MPI_LOG_IN();
  341. barrier_req->ret = MPI_Barrier(barrier_req->comm);
  342. STARPU_ASSERT(barrier_req->ret == MPI_SUCCESS);
  343. handle_request_termination(barrier_req);
  344. _STARPU_MPI_LOG_OUT();
  345. }
  346. int starpu_mpi_barrier(MPI_Comm comm)
  347. {
  348. _STARPU_MPI_LOG_IN();
  349. int ret;
  350. struct _starpu_mpi_req *barrier_req = calloc(1, sizeof(struct _starpu_mpi_req));
  351. STARPU_ASSERT(barrier_req);
  352. /* First wait for *both* all tasks and MPI requests to finish, in case
  353. * some tasks generate MPI requests, MPI requests generate tasks, etc.
  354. */
  355. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  356. STARPU_ASSERT_MSG(!barrier_running, "Concurrent starpu_mpi_barrier is not implemented, even on different communicators");
  357. barrier_running = 1;
  358. do {
  359. while (posted_requests)
  360. /* Wait for all current MPI requests to finish */
  361. _STARPU_PTHREAD_COND_WAIT(&cond_finished, &mutex);
  362. /* No current request, clear flag */
  363. newer_requests = 0;
  364. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  365. /* Now wait for all tasks */
  366. starpu_task_wait_for_all();
  367. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  368. /* Check newer_requests again, in case some MPI requests
  369. * triggered by tasks completed and triggered tasks between
  370. * wait_for_all finished and we take the lock */
  371. } while (posted_requests || newer_requests);
  372. barrier_running = 0;
  373. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  374. /* Initialize the request structure */
  375. _STARPU_PTHREAD_MUTEX_INIT(&(barrier_req->req_mutex), NULL);
  376. _STARPU_PTHREAD_COND_INIT(&(barrier_req->req_cond), NULL);
  377. barrier_req->func = starpu_mpi_barrier_func;
  378. barrier_req->request_type = BARRIER_REQ;
  379. barrier_req->comm = comm;
  380. INC_POSTED_REQUESTS(1);
  381. submit_mpi_req(barrier_req);
  382. /* We wait for the MPI request to finish */
  383. _STARPU_PTHREAD_MUTEX_LOCK(&barrier_req->req_mutex);
  384. while (!barrier_req->completed)
  385. _STARPU_PTHREAD_COND_WAIT(&barrier_req->req_cond, &barrier_req->req_mutex);
  386. _STARPU_PTHREAD_MUTEX_UNLOCK(&barrier_req->req_mutex);
  387. ret = barrier_req->ret;
  388. //free(waiting_req);
  389. _STARPU_MPI_LOG_OUT();
  390. return ret;
  391. }
  392. /*
  393. * Requests
  394. */
  395. #ifdef STARPU_MPI_VERBOSE
  396. static char *starpu_mpi_request_type(unsigned request_type)
  397. {
  398. switch (request_type)
  399. {
  400. case SEND_REQ: return "send";
  401. case RECV_REQ: return "recv";
  402. case WAIT_REQ: return "wait";
  403. case TEST_REQ: return "test";
  404. case BARRIER_REQ: return "barrier";
  405. default: return "unknown request type";
  406. }
  407. }
  408. #endif
  409. static void handle_request_termination(struct _starpu_mpi_req *req)
  410. {
  411. _STARPU_MPI_LOG_IN();
  412. _STARPU_MPI_DEBUG("complete MPI (%s %d) data %p req %p - tag %d\n", starpu_mpi_request_type(req->request_type), req->srcdst, req->data_handle, &req->request, req->mpi_tag);
  413. if (req->request_type != BARRIER_REQ) {
  414. if (req->needs_unpacking)
  415. starpu_handle_unpack_data(req->data_handle, req->ptr);
  416. else
  417. MPI_Type_free(&req->datatype);
  418. starpu_data_release(req->data_handle);
  419. }
  420. if (req->request_type == RECV_REQ)
  421. {
  422. TRACE_MPI_IRECV_END(req->srcdst, req->mpi_tag);
  423. }
  424. /* Execute the specified callback, if any */
  425. if (req->callback)
  426. req->callback(req->callback_arg);
  427. /* tell anyone potentiallly waiting on the request that it is
  428. * terminated now */
  429. _STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  430. req->completed = 1;
  431. _STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
  432. _STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  433. _STARPU_MPI_LOG_OUT();
  434. }
  435. static void submit_mpi_req(void *arg)
  436. {
  437. _STARPU_MPI_LOG_IN();
  438. struct _starpu_mpi_req *req = arg;
  439. INC_POSTED_REQUESTS(-1);
  440. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  441. _starpu_mpi_req_list_push_front(new_requests, req);
  442. newer_requests = 1;
  443. _STARPU_MPI_DEBUG("Pushing new request type %d\n", req->request_type);
  444. _STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
  445. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  446. _STARPU_MPI_LOG_OUT();
  447. }
  448. /*
  449. * Scheduler hook
  450. */
  451. #ifdef USE_STARPU_ACTIVITY
  452. static unsigned progression_hook_func(void *arg __attribute__((unused)))
  453. {
  454. unsigned may_block = 1;
  455. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  456. if (!_starpu_mpi_req_list_empty(detached_requests))
  457. {
  458. _STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
  459. may_block = 0;
  460. }
  461. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  462. return may_block;
  463. }
  464. #endif
  465. /*
  466. * Progression loop
  467. */
  468. static void test_detached_requests(void)
  469. {
  470. _STARPU_MPI_LOG_IN();
  471. int flag;
  472. MPI_Status status;
  473. struct _starpu_mpi_req *req, *next_req;
  474. _STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
  475. for (req = _starpu_mpi_req_list_begin(detached_requests);
  476. req != _starpu_mpi_req_list_end(detached_requests);
  477. req = next_req)
  478. {
  479. next_req = _starpu_mpi_req_list_next(req);
  480. _STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  481. //_STARPU_MPI_DEBUG("Test detached request %p - mpitag %d - TYPE %s %d\n", &req->request, req->mpi_tag, (req->request_type == RECV_REQ)?"recv : source":"send : dest", req->srcdst);
  482. req->ret = MPI_Test(&req->request, &flag, &status);
  483. STARPU_ASSERT(req->ret == MPI_SUCCESS);
  484. if (flag)
  485. {
  486. handle_request_termination(req);
  487. }
  488. _STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
  489. if (flag)
  490. _starpu_mpi_req_list_erase(detached_requests, req);
  491. #ifdef STARPU_DEVEL
  492. #warning TODO fix memleak
  493. #endif
  494. /* Detached requests are automatically allocated by the lib */
  495. //if (req->detached)
  496. // free(req);
  497. }
  498. _STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  499. _STARPU_MPI_LOG_OUT();
  500. }
  501. static void handle_new_request(struct _starpu_mpi_req *req)
  502. {
  503. _STARPU_MPI_LOG_IN();
  504. STARPU_ASSERT(req);
  505. /* submit the request to MPI */
  506. _STARPU_MPI_DEBUG("Handling new request type %d\n", req->request_type);
  507. req->func(req);
  508. if (req->detached)
  509. {
  510. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  511. _starpu_mpi_req_list_push_front(detached_requests, req);
  512. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  513. starpu_wake_all_blocked_workers();
  514. /* put the submitted request into the list of pending requests
  515. * so that it can be handled by the progression mechanisms */
  516. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  517. _STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
  518. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  519. }
  520. _STARPU_MPI_LOG_OUT();
  521. }
  522. static void *progress_thread_func(void *arg)
  523. {
  524. int initialize_mpi = *((int *) arg);
  525. _STARPU_DEBUG("Initialize mpi: %d\n", initialize_mpi);
  526. if (initialize_mpi) {
  527. #ifdef STARPU_DEVEL
  528. #warning get real argc and argv from the application
  529. #endif
  530. int argc = 0;
  531. char **argv = NULL;
  532. int thread_support;
  533. _STARPU_DEBUG("Calling MPI_Init_thread\n");
  534. if (MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thread_support) != MPI_SUCCESS) {
  535. fprintf(stderr,"MPI_Init_thread failed\n");
  536. exit(1);
  537. }
  538. if (thread_support == MPI_THREAD_FUNNELED)
  539. fprintf(stderr,"Warning: MPI only has funneled thread support, not serialized, hoping this will work\n");
  540. if (thread_support < MPI_THREAD_FUNNELED)
  541. fprintf(stderr,"Warning: MPI does not have thread support!\n");
  542. }
  543. /* notify the main thread that the progression thread is ready */
  544. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  545. running = 1;
  546. _STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
  547. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  548. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  549. while (running || posted_requests || !(_starpu_mpi_req_list_empty(new_requests)) || !(_starpu_mpi_req_list_empty(detached_requests))) {
  550. /* shall we block ? */
  551. unsigned block = _starpu_mpi_req_list_empty(new_requests);
  552. #ifndef USE_STARPU_ACTIVITY
  553. block = block && _starpu_mpi_req_list_empty(detached_requests);
  554. #endif
  555. if (block)
  556. {
  557. _STARPU_MPI_DEBUG("NO MORE REQUESTS TO HANDLE\n");
  558. if (barrier_running)
  559. /* Tell mpi_barrier */
  560. _STARPU_PTHREAD_COND_SIGNAL(&cond_finished);
  561. _STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
  562. }
  563. /* test whether there are some terminated "detached request" */
  564. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  565. test_detached_requests();
  566. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  567. /* get one request */
  568. struct _starpu_mpi_req *req;
  569. while (!_starpu_mpi_req_list_empty(new_requests))
  570. {
  571. req = _starpu_mpi_req_list_pop_back(new_requests);
  572. /* handling a request is likely to block for a while
  573. * (on a sync_data_with_mem call), we want to let the
  574. * application submit requests in the meantime, so we
  575. * release the lock. */
  576. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  577. handle_new_request(req);
  578. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  579. }
  580. }
  581. STARPU_ASSERT(_starpu_mpi_req_list_empty(detached_requests));
  582. STARPU_ASSERT(_starpu_mpi_req_list_empty(new_requests));
  583. STARPU_ASSERT(posted_requests == 0);
  584. if (initialize_mpi) {
  585. _STARPU_MPI_DEBUG("Calling MPI_Finalize()\n");
  586. MPI_Finalize();
  587. }
  588. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  589. return NULL;
  590. }
  591. /*
  592. * (De)Initialization methods
  593. */
  594. #ifdef USE_STARPU_ACTIVITY
  595. static int hookid = - 1;
  596. #endif
  597. static void _starpu_mpi_add_sync_point_in_fxt(void)
  598. {
  599. #ifdef STARPU_USE_FXT
  600. int rank;
  601. int worldsize;
  602. MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  603. MPI_Comm_size(MPI_COMM_WORLD, &worldsize);
  604. int barrier_ret = MPI_Barrier(MPI_COMM_WORLD);
  605. STARPU_ASSERT(barrier_ret == MPI_SUCCESS);
  606. /* We generate a "unique" key so that we can make sure that different
  607. * FxT traces come from the same MPI run. */
  608. int random_number;
  609. /* XXX perhaps we don't want to generate a new seed if the application
  610. * specified some reproductible behaviour ? */
  611. if (rank == 0)
  612. {
  613. srand(time(NULL));
  614. random_number = rand();
  615. }
  616. MPI_Bcast(&random_number, 1, MPI_INT, 0, MPI_COMM_WORLD);
  617. TRACE_MPI_BARRIER(rank, worldsize, random_number);
  618. _STARPU_MPI_DEBUG("unique key %x\n", random_number);
  619. #endif
  620. }
  621. static
  622. int _starpu_mpi_initialize(int initialize_mpi, int *rank, int *world_size)
  623. {
  624. _STARPU_PTHREAD_MUTEX_INIT(&mutex, NULL);
  625. _STARPU_PTHREAD_COND_INIT(&cond_progression, NULL);
  626. _STARPU_PTHREAD_COND_INIT(&cond_finished, NULL);
  627. new_requests = _starpu_mpi_req_list_new();
  628. _STARPU_PTHREAD_MUTEX_INIT(&detached_requests_mutex, NULL);
  629. detached_requests = _starpu_mpi_req_list_new();
  630. _STARPU_PTHREAD_MUTEX_INIT(&mutex_posted_requests, NULL);
  631. _STARPU_PTHREAD_CREATE(&progress_thread, NULL,
  632. progress_thread_func, (void *)&initialize_mpi);
  633. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  634. while (!running)
  635. _STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
  636. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  637. if (rank && world_size) {
  638. _STARPU_DEBUG("Calling MPI_Comm_rank\n");
  639. MPI_Comm_rank(MPI_COMM_WORLD, rank);
  640. MPI_Comm_size(MPI_COMM_WORLD, world_size);
  641. }
  642. #ifdef STARPU_USE_FXT
  643. int prank;
  644. MPI_Comm_rank(MPI_COMM_WORLD, &prank);
  645. starpu_set_profiling_id(prank);
  646. #endif //STARPU_USE_FXT
  647. #ifdef USE_STARPU_ACTIVITY
  648. hookid = starpu_progression_hook_register(progression_hook_func, NULL);
  649. STARPU_ASSERT(hookid >= 0);
  650. #endif
  651. _starpu_mpi_add_sync_point_in_fxt();
  652. _starpu_mpi_comm_amounts_init(MPI_COMM_WORLD);
  653. _starpu_mpi_tables_init(MPI_COMM_WORLD);
  654. return 0;
  655. }
  656. int starpu_mpi_initialize(void)
  657. {
  658. return _starpu_mpi_initialize(0, NULL, NULL);
  659. }
  660. int starpu_mpi_initialize_extended(int *rank, int *world_size)
  661. {
  662. return _starpu_mpi_initialize(1, rank, world_size);
  663. }
  664. int starpu_mpi_shutdown(void)
  665. {
  666. void *value;
  667. int rank;
  668. /* We need to get the rank before calling MPI_Finalize to pass to _starpu_mpi_comm_amounts_display() */
  669. MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  670. /* kill the progression thread */
  671. _STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  672. running = 0;
  673. _STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
  674. _STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  675. pthread_join(progress_thread, &value);
  676. #ifdef USE_STARPU_ACTIVITY
  677. starpu_progression_hook_deregister(hookid);
  678. #endif
  679. /* free the request queues */
  680. _starpu_mpi_req_list_delete(detached_requests);
  681. _starpu_mpi_req_list_delete(new_requests);
  682. _starpu_mpi_comm_amounts_display(rank);
  683. _starpu_mpi_comm_amounts_free();
  684. return 0;
  685. }