starpu_mpi_checkpoint.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2014-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <stdarg.h>
  17. #include <stdlib.h>
  18. #include <common/utils.h>
  19. #include <starpu_mpi_checkpoint.h>
  20. #include <sys/param.h>
  21. #include <starpu_mpi_private.h>
  22. #include <mpi/starpu_mpi_mpi_backend.h> // Should be deduced at preprocessing (Nmad vs MPI)
  23. #include "starpu_mpi_cache.h"
  24. #define MAX_CP_TEMPLATE_NUMBER 32 // Arbitrary limit
  25. starpu_pthread_mutex_t cp_template_mutex;
  26. starpu_mpi_checkpoint_template_t cp_template_array[MAX_CP_TEMPLATE_NUMBER];
  27. int my_rank;
  28. int cp_template_number = 0;
  29. static struct _starpu_mpi_req_list detached_ft_service_requests;
  30. static unsigned detached_send_n_ft_service_requests;
  31. static starpu_pthread_mutex_t detached_ft_service_requests_mutex;
  32. void _starpu_mpi_post_cp_ack_recv_cb(void* _args);
  33. void _starpu_mpi_post_cp_ack_send_cb(void* _args);
  34. void _starpu_mpi_treat_cache_ack_no_lock_cb(void* args);
  35. extern struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count);
  36. extern struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg, int sequential_consistency);
  37. static int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, va_list varg_list)
  38. {
  39. int arg_type;
  40. //void* useless;
  41. void* ptr;
  42. int count;
  43. int backup_rank;
  44. int backup_of;
  45. // int (*_backup_of)(int);
  46. // int (*_backuped_by)(int);
  47. starpu_mpi_checkpoint_template_t _cp_template = _starpu_mpi_checkpoint_template_new(cp_id);
  48. va_list varg_list_copy;
  49. va_copy(varg_list_copy, varg_list);
  50. while ((arg_type = va_arg(varg_list_copy, int)) != 0)
  51. {
  52. STARPU_ASSERT_MSG(!(arg_type & STARPU_COMMUTE), "Unable to checkpoint non sequential task flow.\n");
  53. switch(arg_type)
  54. {
  55. case STARPU_R:
  56. ptr = va_arg(varg_list_copy, void*);
  57. count = 1;
  58. backup_rank = va_arg(varg_list_copy, int);
  59. backup_of = -1;
  60. break;
  61. case STARPU_VALUE:
  62. ptr = va_arg(varg_list_copy, void*);
  63. count = va_arg(varg_list_copy, int);
  64. backup_rank = va_arg(varg_list_copy, int);
  65. backup_of = va_arg(varg_list_copy, int);
  66. break;
  67. // case STARPU_DATA_ARRAY:
  68. // ptr = va_arg(varg_list_copy, void*);
  69. // count = va_arg(varg_list_copy, int);
  70. // backup_rank = va_arg(varg_list_copy, int);
  71. // backup_of = -1;
  72. // break;
  73. default:
  74. STARPU_ABORT_MSG("Unrecognized argument %d, did you perhaps forget to end arguments with 0?\n", arg_type);
  75. break;
  76. }
  77. _starpu_mpi_checkpoint_template_add_data(_cp_template, arg_type, ptr, count, backup_rank, backup_of);
  78. };
  79. va_end(varg_list_copy);
  80. _starpu_mpi_checkpoint_template_freeze(_cp_template);
  81. starpu_pthread_mutex_lock(&cp_template_mutex);
  82. for (int i=0 ; i<cp_template_number ; i++)
  83. {
  84. STARPU_ASSERT_MSG(cp_template_array[i]->cp_template_id != _cp_template->cp_template_id, "A checkpoint with id %d has already been registered.\n", _cp_template->cp_template_id);
  85. }
  86. cp_template_array[cp_template_number] = _cp_template;
  87. cp_template_number++;
  88. starpu_pthread_mutex_unlock(&cp_template_mutex);
  89. *cp_template = _cp_template;
  90. return 0;
  91. }
  92. struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, void (*alt_callback)(void *), void *alt_arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count)
  93. {
  94. struct _starpu_mpi_req* req = NULL;
  95. int already_received = _starpu_mpi_cache_received_data_set(data_handle);
  96. if (already_received == 0)
  97. {
  98. if (data_tag == -1)
  99. _STARPU_ERROR("StarPU needs to be told the MPI tag of this data, using starpu_mpi_data_register\n");
  100. _STARPU_MPI_DEBUG(1, "Receiving data %p from %d\n", data_handle, source);
  101. req = _starpu_mpi_irecv_common(data_handle, source, data_tag, comm, detached, sync, callback, (void*)arg, sequential_consistency, is_internal_req, count);
  102. }
  103. else
  104. {
  105. fprintf(stderr, "STARPU CACHE: Data already received\n");
  106. alt_callback(alt_arg);
  107. }
  108. return req;
  109. }
  110. struct _starpu_mpi_req* _starpu_mpi_isend_cache_aware(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg, void (*alt_callback)(void *), void *alt_arg, int sequential_consistency)
  111. {
  112. struct _starpu_mpi_req* req = NULL;
  113. int already_sent = _starpu_mpi_cache_sent_data_set(data_handle, dest);
  114. if (already_sent == 0)
  115. {
  116. if (data_tag == -1)
  117. _STARPU_ERROR("StarPU needs to be told the MPI tag of this data, using starpu_mpi_data_register\n");
  118. _STARPU_MPI_DEBUG(1, "Receiving data %p from %d\n", data_handle, mpi_rank);
  119. req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, detached, sync, prio, callback, (void*)arg, sequential_consistency);
  120. }
  121. else
  122. {
  123. fprintf(stderr, "STARPU CACHE: Data already sent\n");
  124. alt_callback(alt_arg);
  125. }
  126. return req;
  127. }
  128. int _starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template_t cp_template)
  129. {
  130. starpu_data_handle_t* handle;
  131. struct _starpu_mpi_checkpoint_template_item* item;
  132. //MPI_Comm comm;
  133. starpu_pthread_mutex_lock(&cp_template->mutex);
  134. STARPU_ASSERT_MSG(cp_template->pending==0, "Can not submit a checkpoint while previous instance has not succeeded.\n");
  135. cp_template->pending = 1;
  136. cp_template->remaining_ack_awaited = cp_template->message_number;
  137. item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
  138. fprintf(stderr, "begin iter\n");
  139. while (item != _starpu_mpi_checkpoint_template_end(cp_template))
  140. {
  141. switch (item->type)
  142. {
  143. case STARPU_VALUE:
  144. // starpu_data_handle_t send_handle;
  145. // starpu_variable_data_register(&send_handle, STARPU_MAIN_RAM, (uintptr_t)item->ptr, item->count);
  146. // starpu_mpi_data_register(send_handle, )
  147. // starpu_mpi_send
  148. break;
  149. case STARPU_R:
  150. handle = (starpu_data_handle_t*)item->ptr;
  151. if (starpu_mpi_data_get_rank(*handle)==my_rank)
  152. {
  153. fprintf(stderr,"sending to %d (tag %d)\n", item->backup_rank, (int)starpu_mpi_data_get_tag(*handle));
  154. struct _starpu_mpi_cp_ack_arg_cb* arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
  155. arg->rank = item->backup_rank;
  156. arg->msg.checkpoint_id = cp_template->cp_template_id;
  157. arg->msg.checkpoint_instance = cp_template->cp_template_current_instance;
  158. _starpu_mpi_isend_cache_aware(*handle, item->backup_rank, starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0, 0, &_starpu_mpi_post_cp_ack_recv_cb, (void*)arg, &_starpu_mpi_treat_cache_ack_no_lock_cb, (void*)cp_template, 1);
  159. }
  160. else if (item->backup_rank==my_rank)
  161. {
  162. fprintf(stderr,"recving from %d (tag %d)\n", starpu_mpi_data_get_rank(*handle), (int)starpu_mpi_data_get_tag(*handle));
  163. struct _starpu_mpi_cp_ack_arg_cb* arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
  164. arg->rank = starpu_mpi_data_get_rank(*handle);
  165. arg->msg.checkpoint_id = cp_template->cp_template_id;
  166. arg->msg.checkpoint_instance = cp_template->cp_template_current_instance;
  167. _starpu_mpi_irecv_cache_aware(*handle, starpu_mpi_data_get_rank(*handle), starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0, &_starpu_mpi_post_cp_ack_send_cb, (void*)arg, NULL, NULL, 1, 1, 1);
  168. }
  169. break;
  170. }
  171. item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
  172. };
  173. starpu_pthread_mutex_unlock(&cp_template->mutex);
  174. return 0;
  175. }
  176. //
  177. ///**
  178. // * receives param of type starpu_mpi_checkpoint_template_t
  179. // * @param args
  180. // * @return
  181. // */
  182. //void _starpu_mpi_checkpoint_ack_send_cb(void* args)
  183. //{
  184. // starpu_mpi_checkpoint_template_t cp_template = (starpu_mpi_checkpoint_template_t) args;
  185. // starpu_pthread_mutex_lock(&cp_template->mutex);
  186. // cp_template->remaining_ack_awaited--;
  187. // starpu_pthread_mutex_unlock(&cp_template->mutex);
  188. //}
  189. // For test purpose
  190. int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_template)
  191. {
  192. int val;
  193. int i = 0;
  194. struct _starpu_mpi_checkpoint_template_item* item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
  195. while (item != _starpu_mpi_checkpoint_template_end(cp_template))
  196. {
  197. fprintf(stderr,"Item %2d: ", i);
  198. if (item->type == STARPU_VALUE)
  199. {
  200. fprintf(stderr, "STARPU_VALUE - ");
  201. fprintf(stderr, "Value=%d\n", (*(int *)(item->ptr)));
  202. }
  203. else if (item->type == STARPU_R)
  204. {
  205. val = *(int*)starpu_data_handle_to_pointer(*(starpu_data_handle_t*)(item->ptr), 0);
  206. fprintf(stderr, "STARPU_R - Value=%d\n", val);
  207. }
  208. else if (item->type == STARPU_DATA_ARRAY)
  209. {
  210. fprintf(stderr, "STARPU_DATA_ARRAY - Multiple values: %d", *(int*)starpu_data_handle_to_pointer(*((starpu_data_handle_t*)item->ptr), 0));
  211. for (int j=1 ; j<MIN(item->count, 5) ; j++)
  212. {
  213. fprintf(stderr, ", %d", *(int*)starpu_data_handle_to_pointer(((starpu_data_handle_t*)item->ptr)[j], 0)); //j*sizeof(starpu_data_handle_t)
  214. }
  215. fprintf(stderr, "...\n");
  216. }
  217. else
  218. {
  219. printf("Unrecognized type.\n");
  220. }
  221. item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
  222. i++;
  223. };
  224. return 0;
  225. }
  226. int starpu_mpi_checkpoint_turn_on(void)
  227. {
  228. starpu_pthread_mutex_init(&cp_template_mutex, NULL);
  229. _starpu_mpi_req_list_init(&detached_ft_service_requests);
  230. starpu_pthread_mutex_init(&detached_ft_service_requests_mutex, NULL);
  231. starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank); //TODO: check compatibility with several Comms behaviour
  232. return 0;
  233. }
  234. int starpu_mpi_checkpoint_turn_off(void)
  235. {
  236. for (int i=0 ; i<MAX_CP_TEMPLATE_NUMBER ; i++)
  237. {
  238. if (cp_template_array[i] == NULL)
  239. {
  240. break;
  241. }
  242. _starpu_checkpoint_template_free(cp_template_array[i]);
  243. cp_template_array[i] = NULL;
  244. }
  245. starpu_pthread_mutex_destroy(&cp_template_mutex);
  246. return 0;
  247. }
  248. int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, ...)
  249. {
  250. va_list varg_list;
  251. va_start(varg_list, cp_id);
  252. int ret = _starpu_mpi_checkpoint_template_register(cp_template, cp_id, varg_list);
  253. va_end(varg_list);
  254. return ret;
  255. }
  256. int starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template_t cp_template)
  257. {
  258. return _starpu_mpi_checkpoint_template_submit(cp_template);
  259. }
  260. void _print_ack_sent_cb(void* _args)
  261. {
  262. struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
  263. fprintf(stderr, "Sent succeeded cpid:%d, cpinst:%d, dest:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
  264. free(_args);
  265. }
  266. void _starpu_mpi_treat_cache_ack_no_lock_cb(void* args)
  267. {
  268. starpu_mpi_checkpoint_template_t cp_template = (starpu_mpi_checkpoint_template_t)args;
  269. cp_template->remaining_ack_awaited--;
  270. }
  271. void _starpu_mpi_treat_ack_receipt_cb(void* _args)
  272. {
  273. struct _starpu_mpi_cp_ack_msg* msg = (struct _starpu_mpi_cp_ack_msg*) _args;
  274. starpu_pthread_mutex_lock(&cp_template_mutex);
  275. for (int i=0 ; i<cp_template_number ; i++)
  276. {
  277. starpu_pthread_mutex_lock(&cp_template_array[i]->mutex);
  278. if (cp_template_array[i]->cp_template_id == msg->checkpoint_id && cp_template_array[i]->cp_template_current_instance == msg->checkpoint_instance)
  279. {
  280. cp_template_array[i]->remaining_ack_awaited--;
  281. if (cp_template_array[i]->remaining_ack_awaited == 0)
  282. {
  283. // TODO: share info about cp integrity
  284. fprintf(stderr, "All cp material for cpid:%d, cpinst:%d - have been sent and acknowledged.\n", msg->checkpoint_id, msg->checkpoint_instance);
  285. cp_template_array[i]->pending=0;
  286. }
  287. free(msg);
  288. starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
  289. starpu_pthread_mutex_unlock(&cp_template_mutex);
  290. return;
  291. }
  292. starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
  293. }
  294. starpu_pthread_mutex_unlock(&cp_template_mutex);
  295. }
  296. void _starpu_mpi_post_cp_ack_send_cb(void* _args)
  297. {
  298. struct _starpu_mpi_req* req;
  299. struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
  300. fprintf(stderr, "Send cb\n");
  301. /* Initialize the request structure */
  302. _starpu_mpi_request_init(&req);
  303. req->request_type = SEND_REQ;
  304. /* prio_list is sorted by increasing values */
  305. if (_starpu_mpi_use_prio)
  306. req->prio = 0;
  307. req->data_handle = NULL;
  308. req->node_tag.node.rank = arg->rank;
  309. req->node_tag.data_tag = _STARPU_MPI_TAG_CP_ACK;
  310. req->node_tag.node.comm = MPI_COMM_WORLD;
  311. req->detached = 1;
  312. req->ptr = (void*)&arg->msg;
  313. req->sync = 0;
  314. req->datatype = MPI_BYTE;
  315. req->callback = _print_ack_sent_cb;
  316. req->callback_arg = arg;
  317. req->func = NULL;
  318. req->sequential_consistency = 1;
  319. req->count = sizeof(struct _starpu_mpi_cp_ack_msg);
  320. _mpi_backend._starpu_mpi_backend_request_fill(req, MPI_COMM_WORLD, 0);
  321. STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
  322. MPI_Isend(req->ptr, req->count, req->datatype, req->node_tag.node.rank, req->node_tag.data_tag, req->node_tag.node.comm, &req->backend->data_request);
  323. _starpu_mpi_req_list_push_back(&detached_ft_service_requests, req);
  324. fprintf(stderr, "pushed send: %p in list %p - prev: %p - next: %p - dest:%d - tag:%d\n", req, &detached_ft_service_requests, _starpu_mpi_req_list_prev(req), _starpu_mpi_req_list_next(req), req->node_tag.node.rank, (int)req->node_tag.data_tag);
  325. detached_send_n_ft_service_requests++;
  326. req->submitted = 1;
  327. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
  328. }
  329. void _starpu_mpi_post_cp_ack_recv_cb(void* _args)
  330. {
  331. struct _starpu_mpi_req* req;
  332. struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
  333. /* Initialize the request structure */
  334. _starpu_mpi_request_init(&req);
  335. req->request_type = RECV_REQ;
  336. /* prio_list is sorted by increasing values */
  337. if (_starpu_mpi_use_prio)
  338. req->prio = 0;
  339. req->data_handle = NULL;
  340. req->node_tag.node.rank = arg->rank;
  341. req->node_tag.data_tag = _STARPU_MPI_TAG_CP_ACK;
  342. req->node_tag.node.comm = MPI_COMM_WORLD;
  343. req->detached = 1;
  344. req->ptr = malloc(sizeof(struct _starpu_mpi_cp_ack_msg));
  345. req->sync = 0;
  346. req->datatype = MPI_BYTE;
  347. req->callback = _starpu_mpi_treat_ack_receipt_cb;
  348. req->callback_arg = req->ptr;
  349. req->func = NULL;
  350. req->sequential_consistency = 1;
  351. req->count = sizeof(struct _starpu_mpi_cp_ack_msg);
  352. _mpi_backend._starpu_mpi_backend_request_fill(req, MPI_COMM_WORLD, 0);
  353. STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
  354. MPI_Irecv(req->ptr, req->count, req->datatype, req->node_tag.node.rank, req->node_tag.data_tag, req->node_tag.node.comm, &req->backend->data_request);
  355. _starpu_mpi_req_list_push_back(&detached_ft_service_requests, req);
  356. fprintf(stderr, "pushed recv: %p in list %p - prev: %p - next: %p - src:%d - tag:%d\n", req, &detached_ft_service_requests, _starpu_mpi_req_list_prev(req), _starpu_mpi_req_list_next(req), req->node_tag.node.rank, (int)req->node_tag.data_tag);
  357. req->submitted = 1;
  358. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
  359. }
  360. static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
  361. {
  362. _STARPU_MPI_LOG_IN();
  363. _STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %"PRIi64" src %d data %p ptr %p datatype '%s' count %d registered_datatype %d internal_req %p\n",
  364. req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.node.rank, req->data_handle, req->ptr,
  365. req->datatype_name, (int)req->count, req->registered_datatype, req->backend->internal_req);
  366. if (req->backend->internal_req)
  367. {
  368. free(req->backend->early_data_handle);
  369. req->backend->early_data_handle = NULL;
  370. }
  371. else
  372. {
  373. if (req->request_type == RECV_REQ || req->request_type == SEND_REQ)
  374. {
  375. if (req->registered_datatype == 0)
  376. {
  377. if (req->request_type == SEND_REQ)
  378. {
  379. // We need to make sure the communication for sending the size
  380. // has completed, as MPI can re-order messages, let's call
  381. // MPI_Wait to make sure data have been sent
  382. int ret;
  383. ret = MPI_Wait(&req->backend->size_req, MPI_STATUS_IGNORE);
  384. STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(ret));
  385. starpu_free_on_node_flags(STARPU_MAIN_RAM, (uintptr_t)req->ptr, req->count, 0);
  386. req->ptr = NULL;
  387. }
  388. else if (req->request_type == RECV_REQ)
  389. {
  390. // req->ptr is freed by starpu_data_unpack
  391. starpu_data_unpack(req->data_handle, req->ptr, req->count);
  392. starpu_memory_deallocate(STARPU_MAIN_RAM, req->count);
  393. }
  394. }
  395. else
  396. {
  397. //_starpu_mpi_datatype_free(req->data_handle, &req->datatype);
  398. }
  399. }
  400. _STARPU_MPI_TRACE_TERMINATED(req, req->node_tag.node.rank, req->node_tag.data_tag);
  401. }
  402. _starpu_mpi_release_req_data(req);
  403. if (req->backend->envelope)
  404. {
  405. free(req->backend->envelope);
  406. req->backend->envelope = NULL;
  407. }
  408. /* Execute the specified callback, if any */
  409. if (req->callback)
  410. req->callback(req->callback_arg);
  411. /* tell anyone potentially waiting on the request that it is
  412. * terminated now */
  413. STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
  414. req->completed = 1;
  415. STARPU_PTHREAD_COND_BROADCAST(&req->backend->req_cond);
  416. STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
  417. _STARPU_MPI_LOG_OUT();
  418. }
  419. static void _starpu_mpi_test_ft_detached_requests(void)
  420. {
  421. //_STARPU_MPI_LOG_IN();
  422. int flag;
  423. struct _starpu_mpi_req *req;
  424. STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
  425. if (_starpu_mpi_req_list_empty(&detached_ft_service_requests))
  426. {
  427. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
  428. //_STARPU_MPI_LOG_OUT();
  429. return;
  430. }
  431. _STARPU_MPI_TRACE_TESTING_DETACHED_BEGIN();
  432. req = _starpu_mpi_req_list_begin(&detached_ft_service_requests);
  433. while (req != _starpu_mpi_req_list_end(&detached_ft_service_requests))
  434. {
  435. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
  436. _STARPU_MPI_TRACE_TEST_BEGIN(req->node_tag.node.rank, req->node_tag.data_tag);
  437. //_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %"PRIi64" - TYPE %s %d\n", &req->backend->data_request, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->node_tag.node.rank);
  438. #ifdef STARPU_SIMGRID
  439. req->ret = _starpu_mpi_simgrid_mpi_test(&req->done, &flag);
  440. #else
  441. STARPU_MPI_ASSERT_MSG(req->backend->data_request != MPI_REQUEST_NULL, "Cannot test completion of the request MPI_REQUEST_NULL");
  442. req->ret = MPI_Test(&req->backend->data_request, &flag, MPI_STATUS_IGNORE);
  443. #endif
  444. STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
  445. _STARPU_MPI_TRACE_TEST_END(req->node_tag.node.rank, req->node_tag.data_tag);
  446. if (!flag)
  447. {
  448. req = _starpu_mpi_req_list_next(req);
  449. }
  450. else
  451. {
  452. fprintf(stderr, "req success: %d\n", detached_send_n_ft_service_requests);
  453. _STARPU_MPI_TRACE_POLLING_END();
  454. struct _starpu_mpi_req *next_req;
  455. next_req = _starpu_mpi_req_list_next(req);
  456. _STARPU_MPI_TRACE_COMPLETE_BEGIN(req->request_type, req->node_tag.node.rank, req->node_tag.data_tag);
  457. STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
  458. if (req->request_type == SEND_REQ)
  459. detached_send_n_ft_service_requests--;
  460. _starpu_mpi_req_list_erase(&detached_ft_service_requests, req);
  461. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
  462. _starpu_mpi_handle_request_termination(req);
  463. _STARPU_MPI_TRACE_COMPLETE_END(req->request_type, req->node_tag.node.rank, req->node_tag.data_tag);
  464. STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
  465. /* We don't want to free internal non-detached
  466. requests, we need to get their MPI request before
  467. destroying them */
  468. if (req->backend->is_internal_req && !req->backend->to_destroy)
  469. {
  470. /* We have completed the request, let the application request destroy it */
  471. req->backend->to_destroy = 1;
  472. STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
  473. }
  474. else
  475. {
  476. STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
  477. _starpu_mpi_request_destroy(req);
  478. }
  479. req = next_req;
  480. _STARPU_MPI_TRACE_POLLING_BEGIN();
  481. }
  482. STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
  483. }
  484. _STARPU_MPI_TRACE_TESTING_DETACHED_END();
  485. STARPU_PTHREAD_MUTEX_UNLOCK(&detached_ft_service_requests_mutex);
  486. //_STARPU_MPI_LOG_OUT();
  487. }
  488. void starpu_mpi_ft_progress(void)
  489. {
  490. _starpu_mpi_test_ft_detached_requests();
  491. }
  492. int starpu_mpi_ft_busy()
  493. {
  494. return ! _starpu_mpi_req_list_empty(&detached_ft_service_requests);
  495. }