starpu_mpi_checkpoint_template.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2013-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <stdarg.h>
  17. #include <stdlib.h>
  18. #include <sys/param.h>
  19. #include <starpu_mpi_private.h>
  20. #include <starpu_mpi_cache.h>
  21. #include <mpi/starpu_mpi_mpi_backend.h>
  22. #include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
  23. #include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
  24. #include <mpi_failure_tolerance/starpu_mpi_ft_service_comms.h>
  25. #include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
  26. #include <mpi_failure_tolerance/starpu_mpi_ft_stats.h>
  27. starpu_pthread_mutex_t cp_template_mutex;
  28. starpu_pthread_mutex_t current_instance_mutex;
  29. starpu_mpi_checkpoint_template_t cp_template_array[MAX_CP_TEMPLATE_NUMBER];
  30. int cp_template_array_size = 0;
  31. int my_rank;
  32. int comm_size;
  33. int current_instance;
  34. typedef int (*backup_of_fn)(int);
  35. int increment_current_instance()
  36. {
  37. int _inst;
  38. starpu_pthread_mutex_lock(&current_instance_mutex);
  39. _inst = ++current_instance;
  40. starpu_pthread_mutex_unlock(&current_instance_mutex);
  41. return _inst;
  42. }
  43. int get_current_instance()
  44. {
  45. int _inst;
  46. starpu_pthread_mutex_lock(&current_instance_mutex);
  47. _inst = current_instance;
  48. starpu_pthread_mutex_unlock(&current_instance_mutex);
  49. return _inst;
  50. }
  51. void checkpoint_template_lib_init(void) {
  52. starpu_pthread_mutex_init(&current_instance_mutex, NULL);
  53. starpu_pthread_mutex_init(&cp_template_mutex, NULL);
  54. starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank);
  55. starpu_mpi_comm_size(MPI_COMM_WORLD, &comm_size);
  56. current_instance = 0;
  57. #ifdef STARPU_MPI_VERBOSE
  58. _starpu_mpi_set_debug_level_max(1000);
  59. #endif
  60. }
  61. void checkpoint_template_lib_quit(void) {
  62. for (int i=0 ; i<MAX_CP_TEMPLATE_NUMBER ; i++)
  63. {
  64. if (cp_template_array[i] == NULL)
  65. {
  66. break;
  67. }
  68. _starpu_checkpoint_template_free(cp_template_array[i]);
  69. cp_template_array[i] = NULL;
  70. }
  71. }
  72. int _starpu_mpi_checkpoint_template_add_data(starpu_mpi_checkpoint_template_t cp_template, int type, void* ptr, int count, int backupped_by, int backup_of, starpu_mpi_tag_t tag)
  73. {
  74. starpu_pthread_mutex_lock(&cp_template->mutex);
  75. STARPU_ASSERT_MSG(!cp_template->frozen, "It is not possible to modify registered checkpoint template.\n");
  76. struct _starpu_mpi_checkpoint_template_item* item;
  77. item = _starpu_mpi_checkpoint_template_item_create(type, ptr, count, backupped_by, backup_of, tag);
  78. _starpu_mpi_checkpoint_template_item_list_push_back(&cp_template->list, item);
  79. _checkpoint_template_add_to_backup_arrays(cp_template, backupped_by, backup_of);
  80. _STARPU_MPI_DEBUG(5, "New checkpoint data entry %p has been added to cp_template with id:%d. (%s)\n", item, cp_template->cp_id, backupped_by == -1 ? "BACKUP_OF" : "BACKUPPED_BY");
  81. starpu_pthread_mutex_unlock(&cp_template->mutex);
  82. return 0;
  83. }
  84. int starpu_mpi_checkpoint_template_create(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, int cp_domain)
  85. {
  86. *cp_template = _starpu_mpi_checkpoint_template_new(cp_id, cp_domain);
  87. return 0;
  88. }
  89. int _starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t cp_template, int arg_type, va_list varg_list)
  90. {
  91. void* ptr;
  92. int count;
  93. int backupped_by;
  94. int data_rank;
  95. starpu_mpi_tag_t tag;
  96. backup_of_fn _backup_of;
  97. arg_type = arg_type & ~STARPU_COMMUTE;
  98. switch(arg_type)
  99. {
  100. case STARPU_R:
  101. ptr = va_arg(varg_list, void*);
  102. count = 1;
  103. backupped_by = va_arg(varg_list, int);
  104. data_rank = starpu_mpi_data_get_rank((starpu_data_handle_t)ptr);
  105. if (my_rank==data_rank)
  106. {
  107. return _starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, backupped_by, -1, -1);
  108. }
  109. else if(my_rank == backupped_by)
  110. {
  111. return _starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, -1, data_rank, -1);
  112. }
  113. else
  114. {
  115. /* Since this data does not concern me (i.e. it is nor my data neither a data which I'm the back up)
  116. * it is considered unecessary to register in the CP */
  117. return 0;
  118. }
  119. case STARPU_VALUE:
  120. ptr = va_arg(varg_list, void*);
  121. count = va_arg(varg_list, int);
  122. tag = va_arg(varg_list, starpu_mpi_tag_t);
  123. _backup_of = va_arg(varg_list, backup_of_fn);
  124. /* I register the backup that will save this data */
  125. _starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, _backup_of(my_rank), -1, tag);
  126. for (int i=0 ; i<my_rank ; i++)
  127. {
  128. if (_backup_of(i) == my_rank)
  129. {
  130. /* I'm the back up of someone else for this data, I have to remember it */
  131. _starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, -1, i, tag);
  132. }
  133. }
  134. for (int i=my_rank+1 ; i<comm_size ; i++)
  135. {
  136. if (_backup_of(i) == my_rank)
  137. {
  138. /* I'm the back up of someone else for this data, I have to remember it */
  139. _starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, -1, i, tag);
  140. }
  141. }
  142. return 0;
  143. // case STARPU_DATA_ARRAY:
  144. // ptr = va_arg(varg_list, void*);
  145. // count = va_arg(varg_list, int);
  146. // backupped_by = va_arg(varg_list, int);
  147. // backup_of = -1;
  148. // break;
  149. default:
  150. STARPU_ABORT_MSG("Unrecognized argument %d, did you perhaps forget to end arguments with 0?\n", arg_type);
  151. }
  152. }
  153. void _cp_discard_message_recv_cb(void* _args)
  154. {
  155. // TODO: store the information of the new CP, for restart purpose
  156. struct _starpu_mpi_cp_discard_arg_cb* arg = (struct _starpu_mpi_cp_discard_arg_cb*) _args;
  157. _STARPU_MPI_FT_STATS_RECV_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg));
  158. _STARPU_MPI_DEBUG(0, "DISCARDING OLD CHECKPOINT DATA of rank %d - new one is CPID:%d - CPINST:%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
  159. checkpoint_package_data_del(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
  160. }
  161. int _starpu_mpi_checkpoint_post_cp_discard_recv(starpu_mpi_checkpoint_template_t cp_template)
  162. {
  163. /* A new CP is submitted. We must post matching recv for the message warning the future checkpoint integrity (so
  164. * I can discard old data from deprecated checkpoint).
  165. * I will receive a msg if I have old CP data.
  166. * TODO: For the message logging discard, I will receive message from the people I exchanged with since the last checkpoint.
  167. * */
  168. struct _starpu_mpi_cp_discard_arg_cb* arg;
  169. int i;
  170. for (i=0 ; i<cp_template->backup_of_array_used_size ; i++)
  171. {
  172. starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
  173. arg->rank = cp_template->backup_of_array[i];
  174. _STARPU_MPI_DEBUG(10, "Post DISCARD msg reception from %d\n", arg->rank);
  175. _ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO,
  176. MPI_COMM_WORLD, _cp_discard_message_recv_cb, (void *) arg);
  177. }
  178. return i;
  179. }
  180. void _cp_discard_message_send_cb(void* _args)
  181. {
  182. _STARPU_MPI_FT_STATS_SEND_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg));
  183. starpu_free(_args);
  184. }
  185. int _starpu_mpi_checkpoint_post_cp_discard_send(starpu_mpi_checkpoint_template_t cp_template, int cp_id, int cp_instance)
  186. {
  187. /* The CP data replication has succeeded. I must send the message warning the checkpoint integrity (so
  188. * they can discard old data from deprecated checkpoint).
  189. * I will send to the ones if it has old CP data from me.
  190. * TODO: For the message logging discard, I will send message to the people I exchanged with since the last checkpoint.
  191. * */
  192. struct _starpu_mpi_cp_discard_arg_cb* arg;
  193. int i;
  194. for (i=0 ; i < cp_template->backupped_by_array_used_size ; i++)
  195. {
  196. starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
  197. arg->rank = cp_template->backupped_by_array[i];
  198. _STARPU_MPI_DEBUG(10, "Post CP DISCARD msg sending to %d\n", arg->rank);
  199. arg->msg.discard=1;
  200. arg->msg.validation=0;
  201. arg->msg.checkpoint_id = cp_id;
  202. arg->msg.checkpoint_instance = cp_instance;
  203. _ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO,
  204. MPI_COMM_WORLD, _cp_discard_message_send_cb, (void *) arg);
  205. }
  206. return 0;
  207. }
  208. starpu_mpi_checkpoint_template_t _starpu_mpi_get_checkpoint_template_by_id(int checkpoint_id)
  209. {
  210. starpu_pthread_mutex_lock(&cp_template_mutex);
  211. for (int i=0 ; i < cp_template_array_size ; i++)
  212. {
  213. // starpu_pthread_mutex_lock(&cp_template_array[i]->mutex);
  214. if (cp_template_array[i]->cp_id == checkpoint_id)
  215. {
  216. // starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
  217. starpu_pthread_mutex_unlock(&cp_template_mutex);
  218. return cp_template_array[i];
  219. }
  220. // starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
  221. }
  222. starpu_pthread_mutex_unlock(&cp_template_mutex);
  223. return NULL;
  224. }
  225. //int _starpu_mpi_checkpoint_post_cp_discard_recv(starpu_mpi_checkpoint_template_t cp_template)
  226. //{
  227. // /* A new CP is submitted. We must post matching recv for the message warning the future checkpoint integrity (so
  228. // * I can tag the data as CP validated, and discard old data from deprecated checkpoint).
  229. // * I will receive a msg if I have old CP data, or if I am the back up for a node into the upcoming Checkpoint.
  230. // * * Here the union of the different list is processed to post message reception only once.
  231. // * TODO: For the message logging discard, I will receive message from the people I exchanged with since the last checkpoint.
  232. // * */
  233. // struct _starpu_mpi_cp_discard_arg_cb* arg;
  234. // int i, j, flag;
  235. // starpu_mpi_checkpoint_template_t old_template;
  236. // for (i=0 ; i<cp_template->backup_of_array_used_size ; i++)
  237. // {
  238. // starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
  239. // arg->rank = cp_template->backup_of_array[i];
  240. // _STARPU_MPI_DEBUG(10, "Posting DISCARD msg reception from %d\n", arg->rank);
  241. // _ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, _cp_discard_message_recv_cb, (void*)arg);
  242. // }
  243. // if (last_valid_checkpoint.checkpoint_id == -1)
  244. // {
  245. // return -1;
  246. // }
  247. // else if (last_valid_checkpoint.checkpoint_id!=cp_template->cp_id)
  248. // {
  249. // old_template = _starpu_mpi_get_checkpoint_template_by_id(last_valid_checkpoint.checkpoint_id);
  250. // for (i=0 ; i<old_template->backup_of_array_used_size ; i++)
  251. // {
  252. // flag=0;
  253. // for(j=0 ; j<cp_template->backup_of_array_used_size ; j++)
  254. // {
  255. // if (cp_template->backup_of_array[j] == old_template->backup_of_array[i])
  256. // {
  257. // flag = 1;
  258. // break;
  259. // }
  260. // }
  261. // if (flag==0)
  262. // {
  263. // starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
  264. // arg->rank = old_template->backup_of_array[i];
  265. // _STARPU_MPI_DEBUG(10, "Posting DISCARD msg reception from %d - LAST VALIDATED CP\n", arg->rank);
  266. // _ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, _cp_discard_message_recv_cb, (void*)arg);
  267. // }
  268. // }
  269. // }
  270. // return 0;
  271. //}
  272. //int _starpu_mpi_checkpoint_post_cp_discard_send(starpu_mpi_checkpoint_template_t cp_template, int cp_id, int cp_instance)
  273. //{
  274. // /* The CP data replication has succeeded. I must send the message warning the future checkpoint integrity (so
  275. // * they can tag the data as CP validated, and discard old data from deprecated checkpoint).
  276. // * I will send to one if it has old CP data from me, or if it is my backup for a data into the just succeeded Checkpoint.
  277. // * * Here the union of the different list is processed to send message only once.
  278. // * TODO: For the message logging discard, I will send message to the people I exchanged with since the last checkpoint.
  279. // * */
  280. // struct _starpu_mpi_cp_discard_arg_cb* arg;
  281. // int i, j, flag;
  282. // starpu_mpi_checkpoint_template_t old_template;
  283. // for (i=0 ; i<cp_template->backupped_by_array_used_size ; i++)
  284. // {
  285. // starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
  286. // arg->rank = cp_template->backupped_by_array[i];
  287. // _STARPU_MPI_DEBUG(10, "Sending DISCARD msg reception to %d\n", arg->rank);
  288. // arg->msg.checkpoint_id = cp_id;
  289. // arg->msg.checkpoint_instance = cp_instance;
  290. // _ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, _cp_discard_message_send_cb, (void*)arg);
  291. // }
  292. // if (last_valid_checkpoint.checkpoint_id == -1)
  293. // {
  294. // return -1;
  295. // }
  296. // else if (last_valid_checkpoint.checkpoint_id!=cp_template->cp_id)
  297. // {
  298. // old_template = _starpu_mpi_get_checkpoint_template_by_id(last_valid_checkpoint.checkpoint_id);
  299. // for (i=0 ; i<old_template->backupped_by_array_used_size ; i++)
  300. // {
  301. // flag=0;
  302. // for(j=0 ; j<cp_template->backupped_by_array_used_size ; j++)
  303. // {
  304. // if (cp_template->backupped_by_array[j] == old_template->backupped_by_array[i])
  305. // {
  306. // flag = 1;
  307. // break;
  308. // }
  309. // }
  310. // if (flag==0)
  311. // {
  312. // starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
  313. // arg->rank = old_template->backupped_by_array[i];
  314. // _STARPU_MPI_DEBUG(10, "Sending DISCARD msg to %d - OLD CP\n", arg->rank);
  315. // arg->msg.checkpoint_id = cp_id;
  316. // arg->msg.checkpoint_instance = cp_instance;
  317. // _ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, _cp_discard_message_send_cb, (void*)arg);
  318. // }
  319. // }
  320. // }
  321. // return 0;
  322. //}
  323. int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t cp_template)
  324. {
  325. // char str[256];
  326. starpu_pthread_mutex_lock(&cp_template->mutex);
  327. _STARPU_MPI_DEBUG(2, "Start freezing checkpoint id:%d\n", cp_template->cp_id);
  328. cp_template->frozen = 1;
  329. cp_template->message_to_send_number = 0;
  330. cp_template->size = _starpu_mpi_checkpoint_template_item_list_size(&cp_template->list);
  331. struct _starpu_mpi_checkpoint_template_item* item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
  332. while (item != _starpu_mpi_checkpoint_template_end(cp_template))
  333. {
  334. if (item->backup_of==-1 && item->backupped_by!=-1)
  335. {
  336. cp_template->message_to_send_number++;
  337. }
  338. item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
  339. }
  340. // sprintf(str, "backupped by Array maxsize:%d - currentsize:%d - ", cp_template->backupped_by_array_max_size, cp_template->backupped_by_array_used_size);
  341. // for (int i=0 ; i<cp_template->backupped_by_array_used_size ; i++)
  342. // {
  343. // sprintf(str,"%s%d ", str, cp_template->backupped_by_array[i]);
  344. // }
  345. // fprintf(stderr, "%s\n", str);
  346. //
  347. // sprintf(str,"backup of Array maxsize:%d - currentsize:%d - ", cp_template->backup_of_array_max_size, cp_template->backup_of_array_used_size);
  348. // for (int i=0 ; i<cp_template->backup_of_array_used_size ; i++)
  349. // {
  350. // sprintf(str,"%s%d ", str, cp_template->backup_of_array[i]);
  351. // }
  352. // fprintf(stderr, "%s\n", str);
  353. starpu_pthread_mutex_unlock(&cp_template->mutex);
  354. starpu_pthread_mutex_lock(&cp_template_mutex);
  355. for (int i=0 ; i < cp_template_array_size ; i++)
  356. {
  357. STARPU_ASSERT_MSG(cp_template_array[i]->cp_id != cp_template->cp_id, "A checkpoint with id %d has already been registered.\n", cp_template->cp_id);
  358. }
  359. cp_template_array[cp_template_array_size] = cp_template;
  360. cp_template_array_size++;
  361. starpu_pthread_mutex_unlock(&cp_template_mutex);
  362. _STARPU_MPI_DEBUG(2, "Checkpoint id:%d is frozen and registered.\n", cp_template->cp_id);
  363. return cp_template->size;
  364. }
  365. int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, int cp_domain, va_list varg_list)
  366. {
  367. int arg_type;
  368. starpu_mpi_checkpoint_template_t _cp_template = _starpu_mpi_checkpoint_template_new(cp_id, cp_domain);
  369. va_list varg_list_copy;
  370. va_copy(varg_list_copy, varg_list);
  371. while ((arg_type = va_arg(varg_list_copy, int)) != 0)
  372. {
  373. _starpu_mpi_checkpoint_template_add_entry(_cp_template, arg_type, varg_list_copy);
  374. }
  375. va_end(varg_list_copy);
  376. _starpu_mpi_checkpoint_template_freeze(_cp_template);
  377. *cp_template = _cp_template;
  378. return 0;
  379. }
  380. int starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t* cp_template)
  381. {
  382. return _starpu_mpi_checkpoint_template_freeze(*cp_template);
  383. }
  384. int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, int cp_domain, ...)
  385. {
  386. va_list varg_list;
  387. va_start(varg_list, cp_domain);
  388. int ret = _starpu_mpi_checkpoint_template_register(cp_template, cp_id, cp_domain, varg_list);
  389. va_end(varg_list);
  390. return ret;
  391. }
  392. int starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t* cp_template, ...)
  393. {
  394. va_list varg_list;
  395. int arg_type;
  396. int ret;
  397. va_start(varg_list, cp_template);
  398. arg_type = va_arg(varg_list, int);
  399. STARPU_ASSERT_MSG(arg_type!=STARPU_NONE, "Unhandled arg_type: STARPU_NONE(0).\n");
  400. ret = _starpu_mpi_checkpoint_template_add_entry(*cp_template, arg_type, varg_list);
  401. va_end(varg_list);
  402. return ret;
  403. }
  404. int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_instance) {
  405. int remaining_ack_messages;
  406. struct _starpu_mpi_checkpoint_tracker* tracker, *tracker1;
  407. starpu_mpi_checkpoint_template_t cp_template = _starpu_mpi_get_checkpoint_template_by_id(checkpoint_id);
  408. starpu_pthread_mutex_lock(&cp_template_mutex);
  409. _STARPU_MPI_DEBUG(20, "Digesting ack recv: id=%d, inst=%d\n", checkpoint_id, checkpoint_instance);
  410. tracker = _starpu_mpi_checkpoint_tracker_update(cp_template, checkpoint_id, cp_template->checkpoint_domain, checkpoint_instance);
  411. remaining_ack_messages = _starpu_mpi_checkpoint_check_tracker(tracker);
  412. if (remaining_ack_messages>0)
  413. {
  414. _STARPU_MPI_DEBUG(20, "The CP (id:%d - inst:%d) found, remaining ack msg awaited:%d.\n", checkpoint_id,
  415. checkpoint_instance, remaining_ack_messages);
  416. }
  417. else if (remaining_ack_messages==0)
  418. {
  419. _STARPU_MPI_DEBUG(0, "The CP (id:%d - inst:%d) has been successfully saved and acknowledged.\n", checkpoint_id, checkpoint_instance);
  420. tracker = _starpu_mpi_checkpoint_tracker_validate_instance(tracker);
  421. if (tracker==NULL)
  422. {
  423. // TODO:should warn some people, because the msg loggin is not implemented(this precise nodes to contact)
  424. _STARPU_MPI_DEBUG(0, "No previous checkpoint to discard\n");
  425. }
  426. else
  427. {
  428. if (tracker->old)
  429. {
  430. tracker1 = _starpu_mpi_checkpoint_tracker_get_last_valid_tracker(tracker->cp_domain);
  431. _starpu_mpi_checkpoint_post_cp_discard_send(tracker->cp_template, tracker1->cp_id, tracker1->cp_inst);
  432. }
  433. else
  434. {
  435. _starpu_mpi_checkpoint_post_cp_discard_send(tracker->cp_template, checkpoint_id, checkpoint_instance);
  436. }
  437. }
  438. }
  439. else if (remaining_ack_messages==-1)
  440. {
  441. STARPU_ABORT_MSG("Inst (id:%d - inst:%d) is already valid. should not have received an ack msg.\n", checkpoint_id, checkpoint_instance);
  442. }
  443. else
  444. {
  445. STARPU_ABORT_MSG("Critical error, can not identify %d as remaining messages\n", remaining_ack_messages);
  446. }
  447. _STARPU_MPI_DEBUG(20, "Digested\n");
  448. starpu_pthread_mutex_unlock(&cp_template_mutex);
  449. return 0;
  450. }
  451. // For test purpose
  452. int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_template)
  453. {
  454. // int val;
  455. int i = 0;
  456. struct _starpu_mpi_checkpoint_template_item* item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
  457. while (item != _starpu_mpi_checkpoint_template_end(cp_template))
  458. {
  459. fprintf(stderr,"Item %2d: ", i);
  460. if (item->type == STARPU_VALUE)
  461. {
  462. // fprintf(stderr, "STARPU_VALUE - Value=%d - backupof:%d - backupedby:%d\n", (*(int *)(item->ptr)), item->backup_of, item->backupped_by);
  463. fprintf(stderr, "STARPU_VALUE - pointer:%p - backupof:%d - backupedby:%d\n", item->ptr, item->backup_of, item->backupped_by);
  464. }
  465. else if (item->type == STARPU_R)
  466. {
  467. // val = *(int*)starpu_data_handle_to_pointer(*(starpu_data_handle_t*)(item->ptr), 0);
  468. // fprintf(stderr, "STARPU_R - Value=%d - backupof:%d - backupedby:%d\n", val, item->backup_of, item->backupped_by);
  469. fprintf(stderr, "STARPU_R - pointer:%p - backupof:%d - backupedby:%d\n", item->ptr, item->backup_of, item->backupped_by);
  470. }
  471. else if (item->type == STARPU_DATA_ARRAY)
  472. {
  473. fprintf(stderr, "STARPU_DATA_ARRAY - Multiple values: %d", *(int*)starpu_data_handle_to_pointer(*((starpu_data_handle_t*)item->ptr), 0));
  474. for (int j=1 ; j<MIN(item->count, 5) ; j++)
  475. {
  476. fprintf(stderr, ", %d", *(int*)starpu_data_handle_to_pointer(((starpu_data_handle_t*)item->ptr)[j], 0)); //j*sizeof(starpu_data_handle_t)
  477. }
  478. fprintf(stderr, "...\n");
  479. }
  480. else
  481. {
  482. printf("Unrecognized type.\n");
  483. }
  484. item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
  485. i++;
  486. }
  487. return 0;
  488. }