starpu_mpi.c 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. /*
  2. * StarPU
  3. * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <starpu_mpi.h>
  17. #include <starpu_mpi_datatype.h>
  18. static void submit_mpi_req(void *arg);
  19. static void handle_request_termination(struct starpu_mpi_req_s *req);
  20. /* The list of requests that have been newly submitted by the application */
  21. static starpu_mpi_req_list_t new_requests;
  22. static pthread_cond_t cond;
  23. static pthread_mutex_t mutex;
  24. static pthread_t progress_thread;
  25. static int running = 0;
  26. /*
  27. * Isend
  28. */
  29. static void starpu_mpi_isend_func(struct starpu_mpi_req_s *req)
  30. {
  31. void *ptr = starpu_mpi_handle_to_ptr(req->data_handle);
  32. starpu_mpi_handle_to_datatype(req->data_handle, &req->datatype);
  33. MPI_Isend(ptr, 1, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
  34. /* somebody is perhaps waiting for the MPI request to be posted */
  35. pthread_mutex_lock(&req->req_mutex);
  36. req->submitted = 1;
  37. pthread_cond_broadcast(&req->req_cond);
  38. pthread_mutex_unlock(&req->req_mutex);
  39. }
  40. int starpu_mpi_isend(starpu_data_handle data_handle, struct starpu_mpi_req_s *req, int dest, int mpi_tag, MPI_Comm comm)
  41. {
  42. STARPU_ASSERT(req);
  43. /* Initialize the request structure */
  44. req->submitted = 0;
  45. req->completed = 0;
  46. pthread_mutex_init(&req->req_mutex, NULL);
  47. pthread_cond_init(&req->req_cond, NULL);
  48. req->data_handle = data_handle;
  49. req->srcdst = dest;
  50. req->mpi_tag = mpi_tag;
  51. req->comm = comm;
  52. req->detached = 0;
  53. req->func = starpu_mpi_isend_func;
  54. /* Asynchronously request StarPU to fetch the data in main memory: when
  55. * it is available in main memory, submit_mpi_req(req) is called and
  56. * the request is actually submitted */
  57. starpu_sync_data_with_mem_non_blocking(data_handle, STARPU_R,
  58. submit_mpi_req, (void *)req);
  59. return 0;
  60. }
  61. /*
  62. * Isend (detached)
  63. */
  64. int starpu_mpi_isend_detached(starpu_data_handle data_handle, struct starpu_mpi_req_s *req, int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
  65. {
  66. /* TODO */
  67. return 0;
  68. }
  69. /*
  70. * Irecv
  71. */
  72. static void starpu_mpi_irecv_func(struct starpu_mpi_req_s *req)
  73. {
  74. void *ptr = starpu_mpi_handle_to_ptr(req->data_handle);
  75. starpu_mpi_handle_to_datatype(req->data_handle, &req->datatype);
  76. MPI_Irecv(ptr, 1, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
  77. /* somebody is perhaps waiting for the MPI request to be posted */
  78. pthread_mutex_lock(&req->req_mutex);
  79. req->submitted = 1;
  80. pthread_cond_broadcast(&req->req_cond);
  81. pthread_mutex_unlock(&req->req_mutex);
  82. }
  83. int starpu_mpi_irecv(starpu_data_handle data_handle, struct starpu_mpi_req_s *req, int source, int mpi_tag, MPI_Comm comm)
  84. {
  85. STARPU_ASSERT(req);
  86. /* Initialize the request structure */
  87. req->submitted = 0;
  88. pthread_mutex_init(&req->req_mutex, NULL);
  89. pthread_cond_init(&req->req_cond, NULL);
  90. req->data_handle = data_handle;
  91. req->srcdst = source;
  92. req->mpi_tag = mpi_tag;
  93. req->comm = comm;
  94. req->detached = 0;
  95. req->func = starpu_mpi_irecv_func;
  96. /* Asynchronously request StarPU to fetch the data in main memory: when
  97. * it is available in main memory, submit_mpi_req(req) is called and
  98. * the request is actually submitted */
  99. starpu_sync_data_with_mem_non_blocking(data_handle, STARPU_W,
  100. submit_mpi_req, (void *)req);
  101. return 0;
  102. }
  103. /*
  104. * Recv
  105. */
  106. int starpu_mpi_recv(starpu_data_handle data_handle,
  107. int source, int mpi_tag, MPI_Comm comm, MPI_Status *status)
  108. {
  109. struct starpu_mpi_req_s req;
  110. starpu_mpi_irecv(data_handle, &req, source, mpi_tag, comm);
  111. starpu_mpi_wait(&req, status);
  112. return 0;
  113. }
  114. /*
  115. * Send
  116. */
  117. int starpu_mpi_send(starpu_data_handle data_handle,
  118. int dest, int mpi_tag, MPI_Comm comm)
  119. {
  120. struct starpu_mpi_req_s req;
  121. MPI_Status status;
  122. starpu_mpi_isend(data_handle, &req, dest, mpi_tag, comm);
  123. starpu_mpi_wait(&req, &status);
  124. return 0;
  125. }
  126. /*
  127. * Wait
  128. */
  129. static void starpu_mpi_wait_func(struct starpu_mpi_req_s *waiting_req)
  130. {
  131. /* Which is the mpi request we are waiting for ? */
  132. struct starpu_mpi_req_s *req = waiting_req->other_request;
  133. req->ret = MPI_Wait(&req->request, waiting_req->status);
  134. handle_request_termination(req);
  135. }
  136. int starpu_mpi_wait(struct starpu_mpi_req_s *req, MPI_Status *status)
  137. {
  138. int ret;
  139. struct starpu_mpi_req_s waiting_req;
  140. /* We cannot try to complete a MPI request that was not actually posted
  141. * to MPI yet. */
  142. pthread_mutex_lock(&req->req_mutex);
  143. while (!req->submitted)
  144. pthread_cond_wait(&req->req_cond, &req->req_mutex);
  145. pthread_mutex_unlock(&req->req_mutex);
  146. /* Initialize the request structure */
  147. pthread_mutex_init(&waiting_req.req_mutex, NULL);
  148. pthread_cond_init(&waiting_req.req_cond, NULL);
  149. waiting_req.status = status;
  150. waiting_req.other_request = req;
  151. waiting_req.func = starpu_mpi_wait_func;
  152. submit_mpi_req(&waiting_req);
  153. /* We wait for the MPI request to finish */
  154. pthread_mutex_lock(&req->req_mutex);
  155. while (!req->completed)
  156. pthread_cond_wait(&req->req_cond, &req->req_mutex);
  157. pthread_mutex_unlock(&req->req_mutex);
  158. ret = req->ret;
  159. return ret;
  160. }
  161. /*
  162. * Test
  163. */
  164. static void starpu_mpi_test_func(struct starpu_mpi_req_s *testing_req)
  165. {
  166. /* Which is the mpi request we are testing for ? */
  167. struct starpu_mpi_req_s *req = testing_req->other_request;
  168. int ret = MPI_Test(&req->request, testing_req->flag, testing_req->status);
  169. if (*testing_req->flag)
  170. {
  171. testing_req->ret = ret;
  172. handle_request_termination(req);
  173. }
  174. pthread_mutex_lock(&testing_req->req_mutex);
  175. testing_req->completed = 1;
  176. pthread_cond_signal(&testing_req->req_cond);
  177. pthread_mutex_unlock(&testing_req->req_mutex);
  178. }
  179. int starpu_mpi_test(struct starpu_mpi_req_s *req, int *flag, MPI_Status *status)
  180. {
  181. int ret = 0;
  182. struct starpu_mpi_req_s testing_req;
  183. pthread_mutex_lock(&req->req_mutex);
  184. STARPU_ASSERT(!req->detached);
  185. pthread_mutex_lock(&req->req_mutex);
  186. unsigned submitted = req->submitted;
  187. pthread_mutex_unlock(&req->req_mutex);
  188. if (submitted)
  189. {
  190. /* Initialize the request structure */
  191. pthread_mutex_init(&testing_req.req_mutex, NULL);
  192. pthread_cond_init(&testing_req.req_cond, NULL);
  193. testing_req.flag = flag;
  194. testing_req.status = status;
  195. testing_req.other_request = req;
  196. testing_req.func = starpu_mpi_wait_func;
  197. testing_req.completed = 0;
  198. submit_mpi_req(&testing_req);
  199. /* We wait for the test request to finish */
  200. pthread_mutex_lock(&testing_req.req_mutex);
  201. while (!testing_req.completed)
  202. pthread_cond_wait(&testing_req.req_cond, &testing_req.req_mutex);
  203. pthread_mutex_unlock(&testing_req.req_mutex);
  204. ret = testing_req.ret;
  205. }
  206. else {
  207. *flag = 0;
  208. }
  209. return ret;
  210. }
  211. /*
  212. * Requests
  213. */
  214. void handle_request_termination(struct starpu_mpi_req_s *req)
  215. {
  216. MPI_Type_free(&req->datatype);
  217. starpu_release_data_from_mem(req->data_handle);
  218. /* tell anyone potentiallly waiting on the request that it is
  219. * terminated now */
  220. pthread_mutex_lock(&req->req_mutex);
  221. req->completed = 1;
  222. pthread_cond_broadcast(&req->req_cond);
  223. pthread_mutex_unlock(&req->req_mutex);
  224. }
  225. void submit_mpi_req(void *arg)
  226. {
  227. struct starpu_mpi_req_s *req = arg;
  228. pthread_mutex_lock(&mutex);
  229. starpu_mpi_req_list_push_front(new_requests, req);
  230. pthread_cond_broadcast(&cond);
  231. pthread_mutex_unlock(&mutex);
  232. }
  233. /*
  234. * Progression loop
  235. */
  236. void handle_new_request(struct starpu_mpi_req_s *req)
  237. {
  238. STARPU_ASSERT(req);
  239. /* submit the request to MPI */
  240. req->func(req);
  241. }
  242. void *progress_thread_func(void *arg __attribute__((unused)))
  243. {
  244. /* notify the main thread that the progression thread is ready */
  245. pthread_mutex_lock(&mutex);
  246. running = 1;
  247. pthread_cond_signal(&cond);
  248. pthread_mutex_unlock(&mutex);
  249. pthread_mutex_lock(&mutex);
  250. while (running) {
  251. /* TODO test if there is some "detached request" and progress if this is the case */
  252. pthread_cond_wait(&cond, &mutex);
  253. if (!running)
  254. break;
  255. /* Handle new requests */
  256. // while (req = starpu_mpi_req_list_pop_back(new_requests))
  257. /* get one request */
  258. struct starpu_mpi_req_s *req;
  259. while (!starpu_mpi_req_list_empty(new_requests))
  260. {
  261. req = starpu_mpi_req_list_pop_back(new_requests);
  262. /* handling a request is likely to block for a while
  263. * (on a sync_data_with_mem call), we want to let the
  264. * application submit requests in the meantime, so we
  265. * release the lock. */
  266. pthread_mutex_unlock(&mutex);
  267. handle_new_request(req);
  268. pthread_mutex_lock(&mutex);
  269. }
  270. pthread_mutex_unlock(&mutex);
  271. }
  272. pthread_mutex_unlock(&mutex);
  273. return NULL;
  274. }
  275. /*
  276. * (De)Initialization methods
  277. */
  278. int starpu_mpi_initialize(void)
  279. {
  280. pthread_mutex_init(&mutex, NULL);
  281. pthread_cond_init(&cond, NULL);
  282. new_requests = starpu_mpi_req_list_new();
  283. int ret = pthread_create(&progress_thread, NULL, progress_thread_func, NULL);
  284. pthread_mutex_lock(&mutex);
  285. if (!running)
  286. pthread_cond_wait(&cond, &mutex);
  287. pthread_mutex_unlock(&mutex);
  288. return 0;
  289. }
  290. int starpu_mpi_shutdown(void)
  291. {
  292. /* kill the progression thread */
  293. pthread_mutex_lock(&mutex);
  294. running = 0;
  295. pthread_cond_signal(&cond);
  296. pthread_mutex_unlock(&mutex);
  297. void *value;
  298. pthread_join(progress_thread, &value);
  299. /* TODO liberate the queues */
  300. return 0;
  301. }