starpu_mpi.c 61 KB


  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009, 2010-2015 Université de Bordeaux
  4. * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015 Centre National de la Recherche Scientifique
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <stdlib.h>
  18. #include <starpu_mpi.h>
  19. #include <starpu_mpi_datatype.h>
  20. #include <starpu_mpi_private.h>
  21. #include <starpu_mpi_cache.h>
  22. #include <starpu_profiling.h>
  23. #include <starpu_mpi_stats.h>
  24. #include <starpu_mpi_cache.h>
  25. #include <starpu_mpi_sync_data.h>
  26. #include <starpu_mpi_early_data.h>
  27. #include <starpu_mpi_early_request.h>
  28. #include <starpu_mpi_select_node.h>
  29. #include <starpu_mpi_tag.h>
  30. #include <common/config.h>
  31. #include <common/thread.h>
  32. #include <datawizard/interfaces/data_interface.h>
  33. #include <datawizard/coherency.h>
  34. #include <core/simgrid.h>
  35. static void _starpu_mpi_add_sync_point_in_fxt(void);
  36. static void _starpu_mpi_submit_ready_request(void *arg);
  37. static void _starpu_mpi_handle_ready_request(struct _starpu_mpi_req *req);
  38. static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req);
  39. #ifdef STARPU_VERBOSE
  40. static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type);
  41. #endif
  42. static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
  43. int dest, int data_tag, MPI_Comm comm,
  44. unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
  45. int sequential_consistency);
  46. static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle,
  47. int source, int data_tag, MPI_Comm comm,
  48. unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
  49. int sequential_consistency, int is_internal_req,
  50. starpu_ssize_t count);
  51. static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
  52. static void _starpu_mpi_early_data_cb(void* arg);
  53. /* The list of ready requests */
  54. static struct _starpu_mpi_req_list *ready_requests;
  55. /* The list of detached requests that have already been submitted to MPI */
  56. static struct _starpu_mpi_req_list *detached_requests;
  57. static starpu_pthread_mutex_t detached_requests_mutex;
  58. /* Condition to wake up progression thread */
  59. static starpu_pthread_cond_t cond_progression;
  60. /* Condition to wake up waiting for all current MPI requests to finish */
  61. static starpu_pthread_cond_t cond_finished;
  62. static starpu_pthread_mutex_t mutex;
  63. static starpu_pthread_t progress_thread;
  64. static int running = 0;
  65. #ifdef STARPU_SIMGRID
  66. static int _mpi_world_size;
  67. static int _mpi_world_rank;
  68. #endif
  69. /* Count requests posted by the application and not yet submitted to MPI */
  70. static starpu_pthread_mutex_t mutex_posted_requests;
  71. static int posted_requests = 0, newer_requests, barrier_running = 0;
  72. #define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
  73. #pragma weak smpi_simulated_main_
  74. extern int smpi_simulated_main_(int argc, char *argv[]);
  75. static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
  76. {
  77. *req = malloc(sizeof(struct _starpu_mpi_req));
  78. STARPU_MPI_ASSERT_MSG(*req, "Invalid request");
  79. /* Initialize the request structure */
  80. (*req)->data_handle = NULL;
  81. (*req)->datatype = 0;
  82. (*req)->ptr = NULL;
  83. (*req)->count = -1;
  84. (*req)->user_datatype = -1;
  85. (*req)->srcdst = -1;
  86. (*req)->data_tag = -1;
  87. (*req)->comm = 0;
  88. (*req)->func = NULL;
  89. (*req)->status = NULL;
  90. (*req)->request = 0;
  91. (*req)->flag = NULL;
  92. (*req)->ret = -1;
  93. STARPU_PTHREAD_MUTEX_INIT(&((*req)->req_mutex), NULL);
  94. STARPU_PTHREAD_COND_INIT(&((*req)->req_cond), NULL);
  95. STARPU_PTHREAD_MUTEX_INIT(&((*req)->posted_mutex), NULL);
  96. STARPU_PTHREAD_COND_INIT(&((*req)->posted_cond), NULL);
  97. (*req)->request_type = UNKNOWN_REQ;
  98. (*req)->submitted = 0;
  99. (*req)->completed = 0;
  100. (*req)->posted = 0;
  101. (*req)->other_request = NULL;
  102. (*req)->sync = 0;
  103. (*req)->detached = -1;
  104. (*req)->callback = NULL;
  105. (*req)->callback_arg = NULL;
  106. (*req)->size_req = 0;
  107. (*req)->internal_req = NULL;
  108. (*req)->is_internal_req = 0;
  109. (*req)->envelope = NULL;
  110. (*req)->sequential_consistency = 1;
  111. }
  112. /********************************************************/
  113. /* */
  114. /* Send/Receive functionalities */
  115. /* */
  116. /********************************************************/
  117. struct _starpu_mpi_early_data_cb_args
  118. {
  119. starpu_data_handle_t data_handle;
  120. starpu_data_handle_t early_handle;
  121. struct _starpu_mpi_req *req;
  122. void *buffer;
  123. };
  124. static void _starpu_mpi_submit_ready_request(void *arg)
  125. {
  126. _STARPU_MPI_LOG_IN();
  127. struct _starpu_mpi_req *req = arg;
  128. _STARPU_MPI_INC_POSTED_REQUESTS(-1);
  129. _STARPU_MPI_DEBUG(3, "new req %p srcdst %d tag %d and type %s %d\n", req, req->srcdst, req->data_tag, _starpu_mpi_request_type(req->request_type), req->is_internal_req);
  130. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  131. if (req->request_type == RECV_REQ)
  132. {
  133. /* Case : the request is the internal receive request submitted
  134. * by StarPU-MPI to receive incoming data without a matching
  135. * early_request from the application. We immediately allocate the
  136. * pointer associated to the data_handle, and push it into the
  137. * ready_requests list, so as the real MPI request can be submitted
  138. * before the next submission of the envelope-catching request. */
  139. if (req->is_internal_req)
  140. {
  141. _starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
  142. if (req->user_datatype == 0)
  143. {
  144. req->count = 1;
  145. req->ptr = starpu_data_get_local_ptr(req->data_handle);
  146. }
  147. else
  148. {
  149. STARPU_ASSERT(req->count);
  150. req->ptr = malloc(req->count);
  151. STARPU_MPI_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
  152. }
  153. _STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
  154. req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, req->ptr,
  155. _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
  156. _starpu_mpi_req_list_push_front(ready_requests, req);
  157. /* inform the starpu mpi thread that the request has been pushed in the ready_requests list */
  158. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  159. STARPU_PTHREAD_MUTEX_LOCK(&req->posted_mutex);
  160. req->posted = 1;
  161. STARPU_PTHREAD_COND_BROADCAST(&req->posted_cond);
  162. STARPU_PTHREAD_MUTEX_UNLOCK(&req->posted_mutex);
  163. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  164. }
  165. else
  166. {
  167. /* test whether some data with the given tag and source have already been received by StarPU-MPI*/
  168. struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(req->data_tag, req->srcdst);
  169. /* Case: a receive request for a data with the given tag and source has already been
  170. * posted by StarPU. Asynchronously requests a Read permission over the temporary handle ,
  171. * so as when the internal receive is completed, the _starpu_mpi_early_data_cb function
  172. * will be called to bring the data back to the original data handle associated to the request.*/
  173. if (early_data_handle)
  174. {
  175. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  176. STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req_mutex));
  177. while (!(early_data_handle->req_ready))
  178. STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req_cond), &(early_data_handle->req_mutex));
  179. STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req_mutex));
  180. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  181. _STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->data_tag);
  182. STARPU_ASSERT(req->data_handle != early_data_handle->handle);
  183. req->internal_req = early_data_handle->req;
  184. struct _starpu_mpi_early_data_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_early_data_cb_args));
  185. cb_args->data_handle = req->data_handle;
  186. cb_args->early_handle = early_data_handle->handle;
  187. cb_args->buffer = early_data_handle->buffer;
  188. cb_args->req = req;
  189. _STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
  190. starpu_data_acquire_cb(early_data_handle->handle,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args);
  191. }
  192. /* Case: no matching data has been received. Store the receive request as an early_request. */
  193. else
  194. {
  195. struct _starpu_mpi_sync_data_handle *sync_data_handle = _starpu_mpi_sync_data_find(req->data_tag, req->srcdst);
  196. _STARPU_MPI_DEBUG(3, "----------> Looking for sync data for tag %d and src %d = %p\n", req->data_tag, req->srcdst, sync_data_handle);
  197. if (sync_data_handle)
  198. {
  199. req->sync = 1;
  200. _starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
  201. if (req->user_datatype == 0)
  202. {
  203. req->count = 1;
  204. req->ptr = starpu_data_get_local_ptr(req->data_handle);
  205. }
  206. else
  207. {
  208. req->count = sync_data_handle->req->count;
  209. STARPU_ASSERT(req->count);
  210. req->ptr = malloc(req->count);
  211. STARPU_MPI_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
  212. }
  213. _starpu_mpi_req_list_push_front(ready_requests, req);
  214. }
  215. else
  216. {
  217. _STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %d) into the request hashmap\n", req, req->srcdst, req->data_tag);
  218. _starpu_mpi_early_request_add(req);
  219. }
  220. }
  221. }
  222. }
  223. else
  224. {
  225. _starpu_mpi_req_list_push_front(ready_requests, req);
  226. _STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
  227. req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
  228. }
  229. newer_requests = 1;
  230. STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
  231. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  232. _STARPU_MPI_LOG_OUT();
  233. }
  234. static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
  235. int srcdst, int data_tag, MPI_Comm comm,
  236. unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
  237. enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
  238. enum starpu_data_access_mode mode,
  239. int sequential_consistency,
  240. int is_internal_req,
  241. starpu_ssize_t count)
  242. {
  243. struct _starpu_mpi_req *req;
  244. _STARPU_MPI_LOG_IN();
  245. _STARPU_MPI_INC_POSTED_REQUESTS(1);
  246. /* Initialize the request structure */
  247. _starpu_mpi_request_init(&req);
  248. req->request_type = request_type;
  249. req->data_handle = data_handle;
  250. req->srcdst = srcdst;
  251. req->data_tag = data_tag;
  252. req->comm = comm;
  253. req->detached = detached;
  254. req->sync = sync;
  255. req->callback = callback;
  256. req->callback_arg = arg;
  257. req->func = func;
  258. req->sequential_consistency = sequential_consistency;
  259. req->is_internal_req = is_internal_req;
  260. req->count = count;
  261. /* Asynchronously request StarPU to fetch the data in main memory: when
  262. * it is available in main memory, _starpu_mpi_submit_ready_request(req) is called and
  263. * the request is actually submitted */
  264. starpu_data_acquire_cb_sequential_consistency(data_handle, mode, _starpu_mpi_submit_ready_request, (void *)req, sequential_consistency);
  265. _STARPU_MPI_LOG_OUT();
  266. return req;
  267. }
  268. /********************************************************/
  269. /* */
  270. /* Send functionalities */
  271. /* */
  272. /********************************************************/
  273. static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
  274. {
  275. _STARPU_MPI_LOG_IN();
  276. _STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p datasize %ld ptr %p datatype '%s' count %d user_datatype %d sync %d\n", req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype, req->sync);
  277. _starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
  278. _STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(req->srcdst, req->data_tag, 0);
  279. if (req->sync == 0)
  280. {
  281. _STARPU_MPI_COMM_TO_DEBUG(req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_DATA, req->data_tag);
  282. req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_DATA, req->comm, &req->request);
  283. STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %s", _starpu_mpi_get_mpi_code(req->ret));
  284. }
  285. else
  286. {
  287. _STARPU_MPI_COMM_TO_DEBUG(req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_SYNC_DATA, req->data_tag);
  288. req->ret = MPI_Issend(req->ptr, req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_SYNC_DATA, req->comm, &req->request);
  289. STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Issend returning %s", _starpu_mpi_get_mpi_code(req->ret));
  290. }
  291. _STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->srcdst, req->data_tag, 0);
  292. /* somebody is perhaps waiting for the MPI request to be posted */
  293. STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  294. req->submitted = 1;
  295. STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
  296. STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  297. _starpu_mpi_handle_detached_request(req);
  298. _STARPU_MPI_LOG_OUT();
  299. }
  300. static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
  301. {
  302. _starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
  303. req->envelope = calloc(1,sizeof(struct _starpu_mpi_envelope));
  304. req->envelope->mode = _STARPU_MPI_ENVELOPE_DATA;
  305. req->envelope->data_tag = req->data_tag;
  306. req->envelope->sync = req->sync;
  307. if (req->user_datatype == 0)
  308. {
  309. int size;
  310. req->count = 1;
  311. req->ptr = starpu_data_get_local_ptr(req->data_handle);
  312. MPI_Type_size(req->datatype, &size);
  313. req->envelope->size = (starpu_ssize_t)req->count * size;
  314. _STARPU_MPI_DEBUG(1, "Post MPI isend count (%ld) datatype_size %ld request to %d\n",req->count,starpu_data_get_size(req->data_handle),req->srcdst);
  315. _STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE);
  316. MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, req->comm, &req->size_req);
  317. }
  318. else
  319. {
  320. int ret;
  321. // Do not pack the data, just try to find out the size
  322. starpu_data_pack(req->data_handle, NULL, &(req->envelope->size));
  323. if (req->envelope->size != -1)
  324. {
  325. // We already know the size of the data, let's send it to overlap with the packing of the data
  326. _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) to node %d (first call to pack)\n", req->envelope->size, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->srcdst);
  327. req->count = req->envelope->size;
  328. _STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE);
  329. ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, req->comm, &req->size_req);
  330. STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %s", _starpu_mpi_get_mpi_code(ret));
  331. }
  332. // Pack the data
  333. starpu_data_pack(req->data_handle, &req->ptr, &req->count);
  334. if (req->envelope->size == -1)
  335. {
  336. // We know the size now, let's send it
  337. _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) to node %d (second call to pack)\n", req->envelope->size, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->srcdst);
  338. _STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE);
  339. ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, req->comm, &req->size_req);
  340. STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %s", _starpu_mpi_get_mpi_code(ret));
  341. }
  342. else
  343. {
  344. // We check the size returned with the 2 calls to pack is the same
  345. STARPU_MPI_ASSERT_MSG(req->count == req->envelope->size, "Calls to pack_data returned different sizes %ld != %ld", req->count, req->envelope->size);
  346. }
  347. // We can send the data now
  348. }
  349. if (req->sync)
  350. {
  351. // If the data is to be sent in synchronous mode, we need to wait for the receiver ready message
  352. struct _starpu_mpi_sync_data_handle *_sync_data = _starpu_mpi_sync_data_create(req);
  353. _starpu_mpi_sync_data_add(_sync_data);
  354. }
  355. else
  356. {
  357. // Otherwise we can send the data
  358. _starpu_mpi_isend_data_func(req);
  359. }
  360. }
  361. static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
  362. int dest, int data_tag, MPI_Comm comm,
  363. unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
  364. int sequential_consistency)
  365. {
  366. return _starpu_mpi_isend_irecv_common(data_handle, dest, data_tag, comm, detached, sync, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R, sequential_consistency, 0, 0);
  367. }
  368. int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, MPI_Comm comm)
  369. {
  370. _STARPU_MPI_LOG_IN();
  371. STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_isend needs a valid starpu_mpi_req");
  372. struct _starpu_mpi_req *req;
  373. _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(dest, data_tag, 0);
  374. req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 0, 0, NULL, NULL, 1);
  375. _STARPU_MPI_TRACE_ISEND_COMPLETE_END(dest, data_tag, 0);
  376. STARPU_MPI_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
  377. *public_req = req;
  378. _STARPU_MPI_LOG_OUT();
  379. return 0;
  380. }
  381. int starpu_mpi_isend_detached(starpu_data_handle_t data_handle,
  382. int dest, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
  383. {
  384. _STARPU_MPI_LOG_IN();
  385. _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 1, 0, callback, arg, 1);
  386. _STARPU_MPI_LOG_OUT();
  387. return 0;
  388. }
  389. int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm)
  390. {
  391. starpu_mpi_req req;
  392. MPI_Status status;
  393. _STARPU_MPI_LOG_IN();
  394. memset(&status, 0, sizeof(MPI_Status));
  395. starpu_mpi_isend(data_handle, &req, dest, data_tag, comm);
  396. starpu_mpi_wait(&req, &status);
  397. _STARPU_MPI_LOG_OUT();
  398. return 0;
  399. }
  400. int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, MPI_Comm comm)
  401. {
  402. _STARPU_MPI_LOG_IN();
  403. STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_issend needs a valid starpu_mpi_req");
  404. struct _starpu_mpi_req *req;
  405. req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 0, 1, NULL, NULL, 1);
  406. STARPU_MPI_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
  407. *public_req = req;
  408. _STARPU_MPI_LOG_OUT();
  409. return 0;
  410. }
  411. int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
  412. {
  413. _STARPU_MPI_LOG_IN();
  414. _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 1, 1, callback, arg, 1);
  415. _STARPU_MPI_LOG_OUT();
  416. return 0;
  417. }
  418. /********************************************************/
  419. /* */
  420. /* receive functionalities */
  421. /* */
  422. /********************************************************/
  423. static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
  424. {
  425. _STARPU_MPI_LOG_IN();
  426. _STARPU_MPI_DEBUG(20, "post MPI irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
  427. _STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(req->srcdst, req->data_tag);
  428. if (req->sync)
  429. {
  430. struct _starpu_mpi_envelope *_envelope = calloc(1,sizeof(struct _starpu_mpi_envelope));
  431. _envelope->mode = _STARPU_MPI_ENVELOPE_SYNC_READY;
  432. _envelope->data_tag = req->data_tag;
  433. _STARPU_MPI_DEBUG(20, "Telling node %d it can send the data and waiting for the data back ...\n", req->srcdst);
  434. _STARPU_MPI_COMM_TO_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE);
  435. req->ret = MPI_Send(_envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _STARPU_MPI_TAG_ENVELOPE, req->comm);
  436. STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Send returning %s", _starpu_mpi_get_mpi_code(req->ret));
  437. }
  438. if (req->sync)
  439. {
  440. _STARPU_MPI_COMM_FROM_DEBUG(req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_SYNC_DATA, req->data_tag);
  441. req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_SYNC_DATA, req->comm, &req->request);
  442. }
  443. else
  444. {
  445. _STARPU_MPI_COMM_FROM_DEBUG(req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_DATA, req->data_tag);
  446. req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, _STARPU_MPI_TAG_DATA, req->comm, &req->request);
  447. }
  448. STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %s", _starpu_mpi_get_mpi_code(req->ret));
  449. _STARPU_MPI_TRACE_IRECV_SUBMIT_END(req->srcdst, req->data_tag);
  450. /* somebody is perhaps waiting for the MPI request to be posted */
  451. STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  452. req->submitted = 1;
  453. STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
  454. STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  455. _starpu_mpi_handle_detached_request(req);
  456. _STARPU_MPI_LOG_OUT();
  457. }
  458. static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count)
  459. {
  460. return _starpu_mpi_isend_irecv_common(data_handle, source, data_tag, comm, detached, sync, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency, is_internal_req, count);
  461. }
  462. int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int data_tag, MPI_Comm comm)
  463. {
  464. _STARPU_MPI_LOG_IN();
  465. STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_irecv needs a valid starpu_mpi_req");
  466. // // We check if a tag is defined for the data handle, if not,
  467. // // we define the one given for the communication.
  468. // // A tag is necessary for the internal mpi engine.
  469. // int tag = starpu_data_get_tag(data_handle);
  470. // if (tag == -1)
  471. // starpu_data_set_tag(data_handle, data_tag);
  472. struct _starpu_mpi_req *req;
  473. _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(source, data_tag);
  474. req = _starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 0, 0, NULL, NULL, 1, 0, 0);
  475. _STARPU_MPI_TRACE_IRECV_COMPLETE_END(source, data_tag);
  476. STARPU_MPI_ASSERT_MSG(req, "Invalid return for _starpu_mpi_irecv_common");
  477. *public_req = req;
  478. _STARPU_MPI_LOG_OUT();
  479. return 0;
  480. }
  481. int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
  482. {
  483. _STARPU_MPI_LOG_IN();
  484. // // We check if a tag is defined for the data handle, if not,
  485. // // we define the one given for the communication.
  486. // // A tag is necessary for the internal mpi engine.
  487. // int tag = starpu_data_get_tag(data_handle);
  488. // if (tag == -1)
  489. // starpu_data_set_tag(data_handle, data_tag);
  490. _starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 1, 0, callback, arg, 1, 0, 0);
  491. _STARPU_MPI_LOG_OUT();
  492. return 0;
  493. }
  494. int starpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_handle, int source, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg, int sequential_consistency)
  495. {
  496. _STARPU_MPI_LOG_IN();
  497. // // We check if a tag is defined for the data handle, if not,
  498. // // we define the one given for the communication.
  499. // // A tag is necessary for the internal mpi engine.
  500. // int tag = starpu_data_get_tag(data_handle);
  501. // if (tag == -1)
  502. // starpu_data_set_tag(data_handle, data_tag);
  503. _starpu_mpi_irecv_common(data_handle, source, data_tag, comm, 1, 0, callback, arg, sequential_consistency, 0, 0);
  504. _STARPU_MPI_LOG_OUT();
  505. return 0;
  506. }
  507. int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int data_tag, MPI_Comm comm, MPI_Status *status)
  508. {
  509. starpu_mpi_req req;
  510. _STARPU_MPI_LOG_IN();
  511. // // We check if a tag is defined for the data handle, if not,
  512. // // we define the one given for the communication.
  513. // // A tag is necessary for the internal mpi engine.
  514. // int tag = starpu_data_get_tag(data_handle);
  515. // if (tag == -1)
  516. // starpu_data_set_tag(data_handle, data_tag);
  517. starpu_mpi_irecv(data_handle, &req, source, data_tag, comm);
  518. starpu_mpi_wait(&req, status);
  519. _STARPU_MPI_LOG_OUT();
  520. return 0;
  521. }
  522. /********************************************************/
  523. /* */
  524. /* Wait functionalities */
  525. /* */
  526. /********************************************************/
  527. static void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
  528. {
  529. _STARPU_MPI_LOG_IN();
  530. /* Which is the mpi request we are waiting for ? */
  531. struct _starpu_mpi_req *req = waiting_req->other_request;
  532. _STARPU_MPI_TRACE_UWAIT_BEGIN(req->srcdst, req->data_tag);
  533. req->ret = MPI_Wait(&req->request, waiting_req->status);
  534. STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_code(req->ret));
  535. _STARPU_MPI_TRACE_UWAIT_END(req->srcdst, req->data_tag);
  536. _starpu_mpi_handle_request_termination(req);
  537. _STARPU_MPI_LOG_OUT();
  538. }
  539. int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
  540. {
  541. int ret;
  542. struct _starpu_mpi_req *req = *public_req;
  543. struct _starpu_mpi_req *waiting_req;
  544. _STARPU_MPI_LOG_IN();
  545. _STARPU_MPI_INC_POSTED_REQUESTS(1);
  546. /* We cannot try to complete a MPI request that was not actually posted
  547. * to MPI yet. */
  548. STARPU_PTHREAD_MUTEX_LOCK(&(req->req_mutex));
  549. while (!(req->submitted))
  550. STARPU_PTHREAD_COND_WAIT(&(req->req_cond), &(req->req_mutex));
  551. STARPU_PTHREAD_MUTEX_UNLOCK(&(req->req_mutex));
  552. /* Initialize the request structure */
  553. _starpu_mpi_request_init(&waiting_req);
  554. waiting_req->status = status;
  555. waiting_req->other_request = req;
  556. waiting_req->func = _starpu_mpi_wait_func;
  557. waiting_req->request_type = WAIT_REQ;
  558. _starpu_mpi_submit_ready_request(waiting_req);
  559. /* We wait for the MPI request to finish */
  560. STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  561. while (!req->completed)
  562. STARPU_PTHREAD_COND_WAIT(&req->req_cond, &req->req_mutex);
  563. STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  564. ret = req->ret;
  565. /* The internal request structure was automatically allocated */
  566. *public_req = NULL;
  567. if (req->internal_req)
  568. {
  569. free(req->internal_req); req->internal_req = NULL;
  570. }
  571. free(req);
  572. free(waiting_req);
  573. _STARPU_MPI_LOG_OUT();
  574. return ret;
  575. }
  576. /********************************************************/
  577. /* */
  578. /* Test functionalities */
  579. /* */
  580. /********************************************************/
  581. static void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
  582. {
  583. _STARPU_MPI_LOG_IN();
  584. /* Which is the mpi request we are testing for ? */
  585. struct _starpu_mpi_req *req = testing_req->other_request;
  586. _STARPU_MPI_DEBUG(2, "Test request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
  587. req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
  588. _STARPU_MPI_TRACE_UTESTING_BEGIN(req->srcdst, req->data_tag);
  589. req->ret = MPI_Test(&req->request, testing_req->flag, testing_req->status);
  590. STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_code(req->ret));
  591. _STARPU_MPI_TRACE_UTESTING_END(req->srcdst, req->data_tag);
  592. if (*testing_req->flag)
  593. {
  594. testing_req->ret = req->ret;
  595. _starpu_mpi_handle_request_termination(req);
  596. }
  597. STARPU_PTHREAD_MUTEX_LOCK(&testing_req->req_mutex);
  598. testing_req->completed = 1;
  599. STARPU_PTHREAD_COND_SIGNAL(&testing_req->req_cond);
  600. STARPU_PTHREAD_MUTEX_UNLOCK(&testing_req->req_mutex);
  601. _STARPU_MPI_LOG_OUT();
  602. }
  603. int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
  604. {
  605. _STARPU_MPI_LOG_IN();
  606. int ret = 0;
  607. STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_test needs a valid starpu_mpi_req");
  608. struct _starpu_mpi_req *req = *public_req;
  609. STARPU_MPI_ASSERT_MSG(!req->detached, "MPI_Test cannot be called on a detached request");
  610. STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  611. unsigned submitted = req->submitted;
  612. STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  613. if (submitted)
  614. {
  615. struct _starpu_mpi_req *testing_req;
  616. _starpu_mpi_request_init(&testing_req);
  617. /* Initialize the request structure */
  618. STARPU_PTHREAD_MUTEX_INIT(&(testing_req->req_mutex), NULL);
  619. STARPU_PTHREAD_COND_INIT(&(testing_req->req_cond), NULL);
  620. testing_req->flag = flag;
  621. testing_req->status = status;
  622. testing_req->other_request = req;
  623. testing_req->func = _starpu_mpi_test_func;
  624. testing_req->completed = 0;
  625. testing_req->request_type = TEST_REQ;
  626. _STARPU_MPI_INC_POSTED_REQUESTS(1);
  627. _starpu_mpi_submit_ready_request(testing_req);
  628. /* We wait for the test request to finish */
  629. STARPU_PTHREAD_MUTEX_LOCK(&(testing_req->req_mutex));
  630. while (!(testing_req->completed))
  631. STARPU_PTHREAD_COND_WAIT(&(testing_req->req_cond), &(testing_req->req_mutex));
  632. STARPU_PTHREAD_MUTEX_UNLOCK(&(testing_req->req_mutex));
  633. ret = testing_req->ret;
  634. if (*(testing_req->flag))
  635. {
  636. /* The request was completed so we free the internal
  637. * request structure which was automatically allocated
  638. * */
  639. *public_req = NULL;
  640. if (req->internal_req)
  641. {
  642. free(req->internal_req); req->internal_req = NULL;
  643. }
  644. free(req);
  645. }
  646. free(testing_req);
  647. }
  648. else
  649. {
  650. *flag = 0;
  651. }
  652. _STARPU_MPI_LOG_OUT();
  653. return ret;
  654. }
  655. /********************************************************/
  656. /* */
  657. /* Barrier functionalities */
  658. /* */
  659. /********************************************************/
  660. static void _starpu_mpi_barrier_func(struct _starpu_mpi_req *barrier_req)
  661. {
  662. _STARPU_MPI_LOG_IN();
  663. barrier_req->ret = MPI_Barrier(barrier_req->comm);
  664. STARPU_MPI_ASSERT_MSG(barrier_req->ret == MPI_SUCCESS, "MPI_Barrier returning %s", _starpu_mpi_get_mpi_code(barrier_req->ret));
  665. _starpu_mpi_handle_request_termination(barrier_req);
  666. _STARPU_MPI_LOG_OUT();
  667. }
  668. int starpu_mpi_barrier(MPI_Comm comm)
  669. {
  670. int ret;
  671. struct _starpu_mpi_req *barrier_req;
  672. _STARPU_MPI_LOG_IN();
  673. _starpu_mpi_request_init(&barrier_req);
  674. /* First wait for *both* all tasks and MPI requests to finish, in case
  675. * some tasks generate MPI requests, MPI requests generate tasks, etc.
  676. */
  677. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  678. STARPU_MPI_ASSERT_MSG(!barrier_running, "Concurrent starpu_mpi_barrier is not implemented, even on different communicators");
  679. barrier_running = 1;
  680. do
  681. {
  682. while (posted_requests)
  683. /* Wait for all current MPI requests to finish */
  684. STARPU_PTHREAD_COND_WAIT(&cond_finished, &mutex);
  685. /* No current request, clear flag */
  686. newer_requests = 0;
  687. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  688. /* Now wait for all tasks */
  689. starpu_task_wait_for_all();
  690. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  691. /* Check newer_requests again, in case some MPI requests
  692. * triggered by tasks completed and triggered tasks between
  693. * wait_for_all finished and we take the lock */
  694. } while (posted_requests || newer_requests);
  695. barrier_running = 0;
  696. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  697. /* Initialize the request structure */
  698. STARPU_PTHREAD_MUTEX_INIT(&(barrier_req->req_mutex), NULL);
  699. STARPU_PTHREAD_COND_INIT(&(barrier_req->req_cond), NULL);
  700. barrier_req->func = _starpu_mpi_barrier_func;
  701. barrier_req->request_type = BARRIER_REQ;
  702. barrier_req->comm = comm;
  703. _STARPU_MPI_INC_POSTED_REQUESTS(1);
  704. _starpu_mpi_submit_ready_request(barrier_req);
  705. /* We wait for the MPI request to finish */
  706. STARPU_PTHREAD_MUTEX_LOCK(&barrier_req->req_mutex);
  707. while (!barrier_req->completed)
  708. STARPU_PTHREAD_COND_WAIT(&barrier_req->req_cond, &barrier_req->req_mutex);
  709. STARPU_PTHREAD_MUTEX_UNLOCK(&barrier_req->req_mutex);
  710. ret = barrier_req->ret;
  711. free(barrier_req);
  712. _STARPU_MPI_LOG_OUT();
  713. return ret;
  714. }
  715. /********************************************************/
  716. /* */
  717. /* Progression */
  718. /* */
  719. /********************************************************/
  720. #ifdef STARPU_VERBOSE
  721. static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type)
  722. {
  723. switch (request_type)
  724. {
  725. case SEND_REQ: return "SEND_REQ";
  726. case RECV_REQ: return "RECV_REQ";
  727. case WAIT_REQ: return "WAIT_REQ";
  728. case TEST_REQ: return "TEST_REQ";
  729. case BARRIER_REQ: return "BARRIER_REQ";
  730. case UNKNOWN_REQ: return "UNSET_REQ";
  731. default: return "unknown request type";
  732. }
  733. }
  734. #endif
  735. static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
  736. {
  737. _STARPU_MPI_LOG_IN();
  738. _STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d internal_req %p\n",
  739. req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, req->ptr,
  740. _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype, req->internal_req);
  741. if (req->internal_req)
  742. {
  743. struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(req->data_tag, req->srcdst);
  744. STARPU_MPI_ASSERT_MSG(early_data_handle, "Could not find a copy data handle with the tag %d and the node %d\n", req->data_tag, req->srcdst);
  745. _STARPU_MPI_DEBUG(3, "Handling deleting of early_data structure from the hashmap..\n");
  746. _starpu_mpi_early_data_delete(early_data_handle);
  747. free(early_data_handle);
  748. }
  749. else
  750. {
  751. if (req->request_type == RECV_REQ || req->request_type == SEND_REQ)
  752. {
  753. if (req->user_datatype == 1)
  754. {
  755. if (req->request_type == SEND_REQ)
  756. {
  757. // We need to make sure the communication for sending the size
  758. // has completed, as MPI can re-order messages, let's call
  759. // MPI_Wait to make sure data have been sent
  760. int ret;
  761. ret = MPI_Wait(&req->size_req, MPI_STATUS_IGNORE);
  762. STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_code(ret));
  763. free(req->ptr);
  764. }
  765. if (req->request_type == RECV_REQ)
  766. {
  767. // req->ptr is freed by starpu_data_unpack
  768. starpu_data_unpack(req->data_handle, req->ptr, req->count);
  769. }
  770. }
  771. else
  772. {
  773. _starpu_mpi_handle_free_datatype(req->data_handle, &req->datatype);
  774. }
  775. }
  776. }
  777. if (req->data_handle)
  778. starpu_data_release(req->data_handle);
  779. if (req->envelope)
  780. {
  781. free(req->envelope);
  782. req->envelope = NULL;
  783. }
  784. /* Execute the specified callback, if any */
  785. if (req->callback)
  786. req->callback(req->callback_arg);
  787. /* tell anyone potentially waiting on the request that it is
  788. * terminated now */
  789. STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
  790. req->completed = 1;
  791. STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
  792. STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
  793. _STARPU_MPI_LOG_OUT();
  794. }
  795. static void _starpu_mpi_early_data_cb(void* arg)
  796. {
  797. struct _starpu_mpi_early_data_cb_args *args = arg;
  798. // We store in the application request the internal MPI
  799. // request so that it can be used by starpu_mpi_wait
  800. args->req->request = args->req->internal_req->request;
  801. args->req->submitted = 1;
  802. if (args->buffer)
  803. {
  804. /* Data has been received as a raw memory, it has to be unpacked */
  805. struct starpu_data_interface_ops *itf_src = starpu_data_get_interface_ops(args->early_handle);
  806. struct starpu_data_interface_ops *itf_dst = starpu_data_get_interface_ops(args->data_handle);
  807. STARPU_MPI_ASSERT_MSG(itf_dst->unpack_data, "The data interface does not define an unpack function\n");
  808. itf_dst->unpack_data(args->data_handle, STARPU_MAIN_RAM, args->buffer, itf_src->get_size(args->early_handle));
  809. free(args->buffer);
  810. }
  811. else
  812. {
  813. struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->early_handle);
  814. void* itf_src = starpu_data_get_interface_on_node(args->early_handle, STARPU_MAIN_RAM);
  815. void* itf_dst = starpu_data_get_interface_on_node(args->data_handle, STARPU_MAIN_RAM);
  816. if (!itf->copy_methods->ram_to_ram)
  817. {
  818. _STARPU_MPI_DEBUG(3, "Initiating any_to_any copy..\n");
  819. itf->copy_methods->any_to_any(itf_src, STARPU_MAIN_RAM, itf_dst, STARPU_MAIN_RAM, NULL);
  820. }
  821. else
  822. {
  823. _STARPU_MPI_DEBUG(3, "Initiating ram_to_ram copy..\n");
  824. itf->copy_methods->ram_to_ram(itf_src, STARPU_MAIN_RAM, itf_dst, STARPU_MAIN_RAM);
  825. }
  826. }
  827. _STARPU_MPI_DEBUG(3, "Done, handling release of early_handle..\n");
  828. starpu_data_release(args->early_handle);
  829. _STARPU_MPI_DEBUG(3, "Done, handling unregister of early_handle..\n");
  830. starpu_data_unregister_submit(args->early_handle);
  831. _STARPU_MPI_DEBUG(3, "Done, handling request %p termination of the already received request\n",args->req);
  832. // If the request is detached, we need to call _starpu_mpi_handle_request_termination
  833. // as it will not be called automatically as the request is not in the list detached_requests
  834. if (args->req->detached)
  835. _starpu_mpi_handle_request_termination(args->req);
  836. // else: If the request is not detached its termination will
  837. // be handled when calling starpu_mpi_wait
  838. free(args);
  839. }
  840. #ifdef STARPU_MPI_ACTIVITY
  841. static unsigned _starpu_mpi_progression_hook_func(void *arg STARPU_ATTRIBUTE_UNUSED)
  842. {
  843. unsigned may_block = 1;
  844. STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
  845. if (!_starpu_mpi_req_list_empty(detached_requests))
  846. {
  847. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  848. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  849. STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
  850. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  851. may_block = 0;
  852. }
  853. else
  854. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  855. return may_block;
  856. }
  857. #endif /* STARPU_MPI_ACTIVITY */
  858. static void _starpu_mpi_test_detached_requests(void)
  859. {
  860. _STARPU_MPI_LOG_IN();
  861. int flag;
  862. MPI_Status status;
  863. struct _starpu_mpi_req *req, *next_req;
  864. STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
  865. for (req = _starpu_mpi_req_list_begin(detached_requests);
  866. req != _starpu_mpi_req_list_end(detached_requests);
  867. req = next_req)
  868. {
  869. next_req = _starpu_mpi_req_list_next(req);
  870. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  871. //_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->request, req->data_tag, _starpu_mpi_request_type(req->request_type), req->srcdst);
  872. req->ret = MPI_Test(&req->request, &flag, &status);
  873. STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_code(req->ret));
  874. if (flag)
  875. {
  876. if (req->request_type == RECV_REQ)
  877. {
  878. _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(req->srcdst, req->data_tag);
  879. }
  880. else if (req->request_type == SEND_REQ)
  881. {
  882. _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(req->srcdst, req->data_tag, 0);
  883. }
  884. _starpu_mpi_handle_request_termination(req);
  885. if (req->request_type == RECV_REQ)
  886. {
  887. _STARPU_MPI_TRACE_IRECV_COMPLETE_END(req->srcdst, req->data_tag);
  888. }
  889. else if (req->request_type == SEND_REQ)
  890. {
  891. _STARPU_MPI_TRACE_ISEND_COMPLETE_END(req->srcdst, req->data_tag, 0);
  892. }
  893. }
  894. STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
  895. if (flag)
  896. {
  897. _starpu_mpi_req_list_erase(detached_requests, req);
  898. #ifdef STARPU_DEVEL
  899. #warning FIXME: when do we free internal requests
  900. #endif
  901. if (!req->is_internal_req)
  902. free(req);
  903. }
  904. }
  905. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  906. _STARPU_MPI_LOG_OUT();
  907. }
  908. static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req)
  909. {
  910. if (req->detached)
  911. {
  912. /* put the submitted request into the list of pending requests
  913. * so that it can be handled by the progression mechanisms */
  914. STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
  915. _starpu_mpi_req_list_push_front(detached_requests, req);
  916. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  917. starpu_wake_all_blocked_workers();
  918. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  919. STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
  920. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  921. }
  922. }
  923. static void _starpu_mpi_handle_ready_request(struct _starpu_mpi_req *req)
  924. {
  925. _STARPU_MPI_LOG_IN();
  926. STARPU_MPI_ASSERT_MSG(req, "Invalid request");
  927. /* submit the request to MPI */
  928. _STARPU_MPI_DEBUG(2, "Handling new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
  929. req, _starpu_mpi_request_type(req->request_type), req->data_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
  930. req->func(req);
  931. _STARPU_MPI_LOG_OUT();
  932. }
  933. struct _starpu_mpi_argc_argv
  934. {
  935. int initialize_mpi;
  936. int *argc;
  937. char ***argv;
  938. MPI_Comm comm;
  939. };
  940. static void _starpu_mpi_print_thread_level_support(int thread_level, char *msg)
  941. {
  942. switch (thread_level)
  943. {
  944. case MPI_THREAD_SERIALIZED:
  945. {
  946. _STARPU_DISP("MPI%s MPI_THREAD_SERIALIZED; Multiple threads may make MPI calls, but only one at a time.\n", msg);
  947. break;
  948. }
  949. case MPI_THREAD_FUNNELED:
  950. {
  951. _STARPU_DISP("MPI%s MPI_THREAD_FUNNELED; The application can safely make calls to StarPU-MPI functions, but should not call directly MPI communication functions.\n", msg);
  952. break;
  953. }
  954. case MPI_THREAD_SINGLE:
  955. {
  956. _STARPU_DISP("MPI%s MPI_THREAD_SINGLE; MPI does not have multi-thread support, this might cause problems. The application can make calls to StarPU-MPI functions, but not call directly MPI Communication functions.\n", msg);
  957. break;
  958. }
  959. }
  960. }
  961. static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope, MPI_Status status, MPI_Comm comm)
  962. {
  963. _STARPU_MPI_DEBUG(20, "Request with tag %d and source %d not found, creating a early_handle to receive incoming data..\n", envelope->data_tag, status.MPI_SOURCE);
  964. _STARPU_MPI_DEBUG(20, "Request sync %d\n", envelope->sync);
  965. struct _starpu_mpi_early_data_handle* early_data_handle = _starpu_mpi_early_data_create(envelope, status.MPI_SOURCE);
  966. starpu_data_handle_t data_handle = NULL;
  967. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  968. data_handle = _starpu_mpi_data_get_data_handle_from_tag(envelope->data_tag);
  969. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  970. if (data_handle && starpu_data_get_interface_id(data_handle) < STARPU_MAX_INTERFACE_ID)
  971. {
  972. /* We know which data will receive it and we won't have to unpack, use just the same kind of data. */
  973. early_data_handle->buffer = NULL;
  974. starpu_data_register_same(&early_data_handle->handle, data_handle);
  975. _starpu_mpi_early_data_add(early_data_handle);
  976. }
  977. else
  978. {
  979. /* The application has not registered yet a data with the tag,
  980. * we are going to receive the data as a raw memory, and give it
  981. * to the application when it post a receive for this tag
  982. */
  983. _STARPU_MPI_DEBUG(3, "Posting a receive for a data of size %d which has not yet been registered\n", (int)early_data_handle->env->size);
  984. early_data_handle->buffer = malloc(early_data_handle->env->size);
  985. starpu_variable_data_register(&early_data_handle->handle, STARPU_MAIN_RAM, (uintptr_t) early_data_handle->buffer, early_data_handle->env->size);
  986. _starpu_mpi_early_data_add(early_data_handle);
  987. }
  988. _STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_handle with tag %d from src %d ..\n", early_data_handle->data_tag, status.MPI_SOURCE);
  989. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  990. early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
  991. early_data_handle->data_tag, comm, 1, 0,
  992. NULL, NULL, 1, 1, envelope->size);
  993. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  994. // We wait until the request is pushed in the
  995. // ready_request list, that ensures that the next loop
  996. // will call _starpu_mpi_handle_ready_request
  997. // on the request and post the corresponding mpi_irecv,
  998. // otherwise, it may lead to read data as envelop
  999. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1000. STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req->posted_mutex));
  1001. while (!(early_data_handle->req->posted))
  1002. STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req->posted_cond), &(early_data_handle->req->posted_mutex));
  1003. STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req->posted_mutex));
  1004. STARPU_PTHREAD_MUTEX_LOCK(&early_data_handle->req_mutex);
  1005. early_data_handle->req_ready = 1;
  1006. STARPU_PTHREAD_COND_BROADCAST(&early_data_handle->req_cond);
  1007. STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_handle->req_mutex);
  1008. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1009. }
  1010. static void *_starpu_mpi_progress_thread_func(void *arg)
  1011. {
  1012. struct _starpu_mpi_argc_argv *argc_argv = (struct _starpu_mpi_argc_argv *) arg;
  1013. int rank, worldsize;
  1014. if (argc_argv->initialize_mpi)
  1015. {
  1016. int thread_support;
  1017. _STARPU_DEBUG("Calling MPI_Init_thread\n");
  1018. if (MPI_Init_thread(argc_argv->argc, argc_argv->argv, MPI_THREAD_SERIALIZED, &thread_support) != MPI_SUCCESS)
  1019. {
  1020. _STARPU_ERROR("MPI_Init_thread failed\n");
  1021. }
  1022. _starpu_mpi_print_thread_level_support(thread_support, "_Init_thread level =");
  1023. }
  1024. else
  1025. {
  1026. int provided;
  1027. MPI_Query_thread(&provided);
  1028. _starpu_mpi_print_thread_level_support(provided, " has been initialized with");
  1029. }
  1030. MPI_Comm_rank(argc_argv->comm, &rank);
  1031. MPI_Comm_size(argc_argv->comm, &worldsize);
  1032. MPI_Comm_set_errhandler(argc_argv->comm, MPI_ERRORS_RETURN);
  1033. #ifdef STARPU_SIMGRID
  1034. _mpi_world_size = worldsize;
  1035. _mpi_world_rank = rank;
  1036. /* Now that MPI is set up, let the rest of simgrid get initialized */
  1037. MSG_process_create_with_arguments("main", smpi_simulated_main_, NULL, _starpu_simgrid_get_host_by_name("MAIN"), *(argc_argv->argc), *(argc_argv->argv));
  1038. #endif
  1039. {
  1040. _STARPU_MPI_TRACE_START(rank, worldsize);
  1041. #ifdef STARPU_USE_FXT
  1042. starpu_profiling_set_id(rank);
  1043. #endif //STARPU_USE_FXT
  1044. }
  1045. _starpu_mpi_add_sync_point_in_fxt();
  1046. _starpu_mpi_comm_amounts_init(argc_argv->comm);
  1047. _starpu_mpi_cache_init(argc_argv->comm);
  1048. _starpu_mpi_select_node_init();
  1049. _starpu_mpi_tag_init();
  1050. _starpu_mpi_early_request_init(worldsize);
  1051. _starpu_mpi_early_data_init(worldsize);
  1052. _starpu_mpi_sync_data_init(worldsize);
  1053. /* notify the main thread that the progression thread is ready */
  1054. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1055. running = 1;
  1056. STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
  1057. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1058. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1059. struct _starpu_mpi_envelope *envelope = calloc(1,sizeof(struct _starpu_mpi_envelope));
  1060. MPI_Request envelope_request;
  1061. int envelope_request_submitted = 0;
  1062. while (running || posted_requests || !(_starpu_mpi_req_list_empty(ready_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))// || !(_starpu_mpi_early_request_count()) || !(_starpu_mpi_sync_data_count()))
  1063. {
  1064. /* shall we block ? */
  1065. unsigned block = _starpu_mpi_req_list_empty(ready_requests) && _starpu_mpi_early_request_count() == 0 && _starpu_mpi_sync_data_count() == 0;
  1066. #ifndef STARPU_MPI_ACTIVITY
  1067. STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
  1068. block = block && _starpu_mpi_req_list_empty(detached_requests);
  1069. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
  1070. #endif /* STARPU_MPI_ACTIVITY */
  1071. if (block)
  1072. {
  1073. _STARPU_MPI_DEBUG(3, "NO MORE REQUESTS TO HANDLE\n");
  1074. _STARPU_MPI_TRACE_SLEEP_BEGIN();
  1075. if (barrier_running)
  1076. /* Tell mpi_barrier */
  1077. STARPU_PTHREAD_COND_SIGNAL(&cond_finished);
  1078. STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
  1079. _STARPU_MPI_TRACE_SLEEP_END();
  1080. }
  1081. /* get one request */
  1082. struct _starpu_mpi_req *req;
  1083. while (!_starpu_mpi_req_list_empty(ready_requests))
  1084. {
  1085. req = _starpu_mpi_req_list_pop_back(ready_requests);
  1086. /* handling a request is likely to block for a while
  1087. * (on a sync_data_with_mem call), we want to let the
  1088. * application submit requests in the meantime, so we
  1089. * release the lock. */
  1090. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1091. _starpu_mpi_handle_ready_request(req);
  1092. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1093. }
  1094. /* If there is no currently submitted envelope_request submitted to
  1095. * catch envelopes from senders, and there is some pending
  1096. * receive requests on our side, we resubmit a header request. */
  1097. if (((_starpu_mpi_early_request_count() > 0) || (_starpu_mpi_sync_data_count() > 0)) && (envelope_request_submitted == 0))// && (HASH_COUNT(_starpu_mpi_early_data_handle_hashmap) == 0))
  1098. {
  1099. _STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop\n");
  1100. _STARPU_MPI_COMM_FROM_DEBUG(sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _STARPU_MPI_TAG_ENVELOPE, _STARPU_MPI_TAG_ENVELOPE);
  1101. MPI_Irecv(envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _STARPU_MPI_TAG_ENVELOPE, argc_argv->comm, &envelope_request);
  1102. envelope_request_submitted = 1;
  1103. }
  1104. /* test whether there are some terminated "detached request" */
  1105. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1106. _starpu_mpi_test_detached_requests();
  1107. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1108. if (envelope_request_submitted == 1)
  1109. {
  1110. int flag,res;
  1111. MPI_Status status;
  1112. //_STARPU_MPI_DEBUG(4, "Test of envelope_request\n");
  1113. /* test whether an envelope has arrived. */
  1114. res = MPI_Test(&envelope_request, &flag, &status);
  1115. STARPU_ASSERT(res == MPI_SUCCESS);
  1116. if (flag)
  1117. {
  1118. _STARPU_MPI_DEBUG(4, "Envelope received with mode %d\n", envelope->mode);
  1119. if (envelope->mode == _STARPU_MPI_ENVELOPE_SYNC_READY)
  1120. {
  1121. struct _starpu_mpi_sync_data_handle *_sync_data = _starpu_mpi_sync_data_find(envelope->data_tag, status.MPI_SOURCE);
  1122. _STARPU_MPI_DEBUG(2000, "Sending data with tag %d to node %d\n", _sync_data->req->data_tag, status.MPI_SOURCE);
  1123. STARPU_MPI_ASSERT_MSG(envelope->data_tag == _sync_data->req->data_tag, "Tag mismatch (envelope %d != req %d)\n", envelope->data_tag, _sync_data->req->data_tag);
  1124. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1125. _starpu_mpi_isend_data_func(_sync_data->req);
  1126. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1127. }
  1128. else
  1129. {
  1130. _STARPU_MPI_DEBUG(3, "Searching for application request with tag %d and source %d (size %ld)\n", envelope->data_tag, status.MPI_SOURCE, envelope->size);
  1131. struct _starpu_mpi_req *early_request = _starpu_mpi_early_request_find(envelope->data_tag, status.MPI_SOURCE);
  1132. /* Case: a data will arrive before a matching receive is
  1133. * posted by the application. Create a temporary handle to
  1134. * store the incoming data, submit a starpu_mpi_irecv_detached
  1135. * on this handle, and store it as an early_data
  1136. */
  1137. if (early_request == NULL)
  1138. {
  1139. if (envelope->sync)
  1140. {
  1141. _STARPU_MPI_DEBUG(2000, "-------------------------> adding request for tag %d\n", envelope->data_tag);
  1142. struct _starpu_mpi_req *new_req;
  1143. #ifdef STARPU_DEVEL
  1144. #warning creating a request is not really useful.
  1145. #endif
  1146. /* Initialize the request structure */
  1147. _starpu_mpi_request_init(&new_req);
  1148. new_req->request_type = RECV_REQ;
  1149. new_req->data_handle = NULL;
  1150. new_req->srcdst = status.MPI_SOURCE;
  1151. new_req->data_tag = envelope->data_tag;
  1152. new_req->comm = argc_argv->comm;
  1153. new_req->detached = 1;
  1154. new_req->sync = 1;
  1155. new_req->callback = NULL;
  1156. new_req->callback_arg = NULL;
  1157. new_req->func = _starpu_mpi_irecv_data_func;
  1158. new_req->sequential_consistency = 1;
  1159. new_req->is_internal_req = 0; // ????
  1160. new_req->count = envelope->size;
  1161. struct _starpu_mpi_sync_data_handle *_sync_data = _starpu_mpi_sync_data_create(new_req);
  1162. _starpu_mpi_sync_data_add(_sync_data);
  1163. }
  1164. else
  1165. {
  1166. _starpu_mpi_receive_early_data(envelope, status, argc_argv->comm);
  1167. }
  1168. }
  1169. /* Case: a matching application request has been found for
  1170. * the incoming data, we handle the correct allocation
  1171. * of the pointer associated to the data handle, then
  1172. * submit the corresponding receive with
  1173. * _starpu_mpi_handle_ready_request. */
  1174. else
  1175. {
  1176. _STARPU_MPI_DEBUG(2000, "A matching application request has been found for the incoming data with tag %d\n", envelope->data_tag);
  1177. _STARPU_MPI_DEBUG(2000, "Request sync %d\n", envelope->sync);
  1178. _starpu_mpi_early_request_delete(early_request);
  1179. early_request->sync = envelope->sync;
  1180. _starpu_mpi_handle_allocate_datatype(early_request->data_handle, &early_request->datatype, &early_request->user_datatype);
  1181. if (early_request->user_datatype == 0)
  1182. {
  1183. early_request->count = 1;
  1184. early_request->ptr = starpu_data_get_local_ptr(early_request->data_handle);
  1185. }
  1186. else
  1187. {
  1188. early_request->count = envelope->size;
  1189. early_request->ptr = malloc(early_request->count);
  1190. STARPU_MPI_ASSERT_MSG(early_request->ptr, "cannot allocate message of size %ld\n", early_request->count);
  1191. }
  1192. _STARPU_MPI_DEBUG(3, "Handling new request... \n");
  1193. /* handling a request is likely to block for a while
  1194. * (on a sync_data_with_mem call), we want to let the
  1195. * application submit requests in the meantime, so we
  1196. * release the lock. */
  1197. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1198. _starpu_mpi_handle_ready_request(early_request);
  1199. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1200. }
  1201. }
  1202. envelope_request_submitted = 0;
  1203. }
  1204. else
  1205. {
  1206. //_STARPU_MPI_DEBUG(4, "Nothing received, continue ..\n");
  1207. }
  1208. }
  1209. }
  1210. if (envelope_request_submitted)
  1211. {
  1212. MPI_Status status;
  1213. MPI_Cancel(&envelope_request);
  1214. MPI_Wait(&envelope_request, &status);
  1215. envelope_request_submitted = 0;
  1216. }
  1217. STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
  1218. STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(ready_requests), "List of ready requests not empty");
  1219. STARPU_MPI_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
  1220. _starpu_mpi_early_request_check_termination();
  1221. _starpu_mpi_early_data_check_termination();
  1222. _starpu_mpi_sync_data_check_termination();
  1223. if (argc_argv->initialize_mpi)
  1224. {
  1225. _STARPU_MPI_DEBUG(3, "Calling MPI_Finalize()\n");
  1226. MPI_Finalize();
  1227. }
  1228. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1229. _starpu_mpi_sync_data_free(worldsize);
  1230. _starpu_mpi_early_data_free(worldsize);
  1231. _starpu_mpi_early_request_free();
  1232. free(argc_argv);
  1233. free(envelope);
  1234. return NULL;
  1235. }
  1236. /********************************************************/
  1237. /* */
  1238. /* (De)Initialization methods */
  1239. /* */
  1240. /********************************************************/
  1241. #ifdef STARPU_MPI_ACTIVITY
  1242. static int hookid = - 1;
  1243. #endif /* STARPU_MPI_ACTIVITY */
  1244. static void _starpu_mpi_add_sync_point_in_fxt(void)
  1245. {
  1246. #ifdef STARPU_USE_FXT
  1247. int rank;
  1248. int worldsize;
  1249. int ret;
  1250. starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
  1251. starpu_mpi_comm_size(MPI_COMM_WORLD, &worldsize);
  1252. ret = MPI_Barrier(MPI_COMM_WORLD);
  1253. STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Barrier returning %s", _starpu_mpi_get_mpi_code(ret));
  1254. /* We generate a "unique" key so that we can make sure that different
  1255. * FxT traces come from the same MPI run. */
  1256. int random_number;
  1257. /* XXX perhaps we don't want to generate a new seed if the application
  1258. * specified some reproductible behaviour ? */
  1259. if (rank == 0)
  1260. {
  1261. srand(time(NULL));
  1262. random_number = rand();
  1263. }
  1264. ret = MPI_Bcast(&random_number, 1, MPI_INT, 0, MPI_COMM_WORLD);
  1265. STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Bcast returning %s", _starpu_mpi_get_mpi_code(ret));
  1266. _STARPU_MPI_TRACE_BARRIER(rank, worldsize, random_number);
  1267. _STARPU_MPI_DEBUG(3, "unique key %x\n", random_number);
  1268. #endif
  1269. }
  1270. static
  1271. int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi, MPI_Comm comm)
  1272. {
  1273. STARPU_PTHREAD_MUTEX_INIT(&mutex, NULL);
  1274. STARPU_PTHREAD_COND_INIT(&cond_progression, NULL);
  1275. STARPU_PTHREAD_COND_INIT(&cond_finished, NULL);
  1276. ready_requests = _starpu_mpi_req_list_new();
  1277. STARPU_PTHREAD_MUTEX_INIT(&detached_requests_mutex, NULL);
  1278. detached_requests = _starpu_mpi_req_list_new();
  1279. STARPU_PTHREAD_MUTEX_INIT(&mutex_posted_requests, NULL);
  1280. struct _starpu_mpi_argc_argv *argc_argv = malloc(sizeof(struct _starpu_mpi_argc_argv));
  1281. argc_argv->initialize_mpi = initialize_mpi;
  1282. argc_argv->argc = argc;
  1283. argc_argv->argv = argv;
  1284. argc_argv->comm = comm;
  1285. #ifdef STARPU_MPI_ACTIVITY
  1286. hookid = starpu_progression_hook_register(_starpu_mpi_progression_hook_func, NULL);
  1287. STARPU_MPI_ASSERT_MSG(hookid >= 0, "starpu_progression_hook_register failed");
  1288. #endif /* STARPU_MPI_ACTIVITY */
  1289. #ifdef STARPU_SIMGRID
  1290. _starpu_mpi_progress_thread_func(argc_argv);
  1291. return 0;
  1292. #else
  1293. STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
  1294. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1295. while (!running)
  1296. STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
  1297. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1298. return 0;
  1299. #endif
  1300. }
  1301. #ifdef STARPU_SIMGRID
  1302. /* This is called before application's main, to initialize SMPI before we can
  1303. * create MSG processes to run application's main */
  1304. int _starpu_mpi_simgrid_init(int argc, char *argv[])
  1305. {
  1306. return _starpu_mpi_initialize(&argc, &argv, 1, MPI_COMM_WORLD);
  1307. }
  1308. #endif
  1309. int starpu_mpi_init_comm(int *argc, char ***argv, int initialize_mpi, MPI_Comm comm)
  1310. {
  1311. #ifdef STARPU_SIMGRID
  1312. STARPU_MPI_ASSERT_MSG(initialize_mpi, "application has to let StarPU initialize MPI");
  1313. return 0;
  1314. #else
  1315. return _starpu_mpi_initialize(argc, argv, initialize_mpi, comm);
  1316. #endif
  1317. }
  1318. int starpu_mpi_init(int *argc, char ***argv, int initialize_mpi)
  1319. {
  1320. return starpu_mpi_init_comm(argc, argv, initialize_mpi, MPI_COMM_WORLD);
  1321. }
  1322. int starpu_mpi_initialize(void)
  1323. {
  1324. #ifdef STARPU_SIMGRID
  1325. STARPU_MPI_ASSERT_MSG(0, "application has to let StarPU initialize MPI");
  1326. return 0;
  1327. #else
  1328. return _starpu_mpi_initialize(NULL, NULL, 0, MPI_COMM_WORLD);
  1329. #endif
  1330. }
  1331. int starpu_mpi_initialize_extended(int *rank, int *world_size)
  1332. {
  1333. #ifdef STARPU_SIMGRID
  1334. *world_size = _mpi_world_size;
  1335. *rank = _mpi_world_rank;
  1336. return 0;
  1337. #else
  1338. int ret;
  1339. ret = _starpu_mpi_initialize(NULL, NULL, 1, MPI_COMM_WORLD);
  1340. if (ret == 0)
  1341. {
  1342. _STARPU_DEBUG("Calling MPI_Comm_rank\n");
  1343. MPI_Comm_rank(MPI_COMM_WORLD, rank);
  1344. MPI_Comm_size(MPI_COMM_WORLD, world_size);
  1345. }
  1346. return ret;
  1347. #endif
  1348. }
  1349. int starpu_mpi_shutdown(void)
  1350. {
  1351. void *value;
  1352. int rank, world_size;
  1353. /* We need to get the rank before calling MPI_Finalize to pass to _starpu_mpi_comm_amounts_display() */
  1354. starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
  1355. starpu_mpi_comm_size(MPI_COMM_WORLD, &world_size);
  1356. /* kill the progression thread */
  1357. STARPU_PTHREAD_MUTEX_LOCK(&mutex);
  1358. running = 0;
  1359. STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
  1360. STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
  1361. starpu_pthread_join(progress_thread, &value);
  1362. #ifdef STARPU_MPI_ACTIVITY
  1363. starpu_progression_hook_deregister(hookid);
  1364. #endif /* STARPU_MPI_ACTIVITY */
  1365. _STARPU_MPI_TRACE_STOP(rank, world_size);
  1366. /* free the request queues */
  1367. _starpu_mpi_req_list_delete(detached_requests);
  1368. _starpu_mpi_req_list_delete(ready_requests);
  1369. _starpu_mpi_comm_amounts_display(rank);
  1370. _starpu_mpi_comm_amounts_free();
  1371. _starpu_mpi_cache_free(world_size);
  1372. _starpu_mpi_tag_free();
  1373. return 0;
  1374. }
  1375. void _starpu_mpi_clear_cache(starpu_data_handle_t data_handle)
  1376. {
  1377. _starpu_mpi_data_release_tag(data_handle);
  1378. struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
  1379. _starpu_mpi_cache_flush(mpi_data->comm, data_handle);
  1380. free(data_handle->mpi_data);
  1381. }
  1382. void starpu_mpi_data_register_comm(starpu_data_handle_t data_handle, int tag, int rank, MPI_Comm comm)
  1383. {
  1384. struct _starpu_mpi_data *mpi_data;
  1385. if (data_handle->mpi_data)
  1386. {
  1387. mpi_data = data_handle->mpi_data;
  1388. }
  1389. else
  1390. {
  1391. mpi_data = malloc(sizeof(struct _starpu_mpi_data));
  1392. data_handle->mpi_data = mpi_data;
  1393. _starpu_mpi_data_register_tag(data_handle, tag);
  1394. _starpu_data_set_unregister_hook(data_handle, _starpu_mpi_clear_cache);
  1395. }
  1396. if (tag != -1)
  1397. {
  1398. mpi_data->tag = tag;
  1399. }
  1400. if (rank != -1)
  1401. {
  1402. mpi_data->rank = rank;
  1403. mpi_data->comm = comm;
  1404. }
  1405. }
  1406. void starpu_mpi_data_set_rank_comm(starpu_data_handle_t handle, int rank, MPI_Comm comm)
  1407. {
  1408. starpu_mpi_data_register_comm(handle, -1, rank, comm);
  1409. }
  1410. void starpu_mpi_data_set_tag(starpu_data_handle_t handle, int tag)
  1411. {
  1412. starpu_mpi_data_register_comm(handle, tag, -1, MPI_COMM_WORLD);
  1413. }
  1414. int starpu_mpi_data_get_rank(starpu_data_handle_t data)
  1415. {
  1416. STARPU_ASSERT_MSG(data->mpi_data, "starpu_mpi_data_register MUST be called for data %p\n", data);
  1417. return ((struct _starpu_mpi_data *)(data->mpi_data))->rank;
  1418. }
  1419. int starpu_mpi_data_get_tag(starpu_data_handle_t data)
  1420. {
  1421. STARPU_ASSERT_MSG(data->mpi_data, "starpu_mpi_data_register MUST be called for data %p\n", data);
  1422. return ((struct _starpu_mpi_data *)(data->mpi_data))->tag;
  1423. }
  1424. int starpu_mpi_comm_size(MPI_Comm comm, int *size)
  1425. {
  1426. #ifdef STARPU_SIMGRID
  1427. STARPU_MPI_ASSERT_MSG(comm == MPI_COMM_WORLD, "StarPU-SMPI only works with MPI_COMM_WORLD for now");
  1428. *size = _mpi_world_size;
  1429. return 0;
  1430. #else
  1431. return MPI_Comm_size(comm, size);
  1432. #endif
  1433. }
  1434. int starpu_mpi_comm_rank(MPI_Comm comm, int *rank)
  1435. {
  1436. #ifdef STARPU_SIMGRID
  1437. STARPU_MPI_ASSERT_MSG(comm == MPI_COMM_WORLD, "StarPU-SMPI only works with MPI_COMM_WORLD for now");
  1438. *rank = _mpi_world_rank;
  1439. return 0;
  1440. #else
  1441. return MPI_Comm_rank(comm, rank);
  1442. #endif
  1443. }
  1444. int starpu_mpi_world_rank(void)
  1445. {
  1446. int rank;
  1447. starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
  1448. return rank;
  1449. }