starpu_mpi_checkpoint_template.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2013-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <stdarg.h>
  17. #include <stdlib.h>
  18. #include <sys/param.h>
  19. #include <starpu_mpi_private.h>
  20. #include <starpu_mpi_cache.h>
  21. #include <mpi/starpu_mpi_mpi_backend.h>
  22. #include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
  23. #include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
  24. #include <mpi_failure_tolerance/starpu_mpi_ft_service_comms.h>
  25. #include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
  26. #include <mpi_failure_tolerance/starpu_mpi_ft_stats.h>
  27. starpu_pthread_mutex_t cp_template_mutex;
  28. starpu_pthread_mutex_t current_instance_mutex;
  29. starpu_mpi_checkpoint_template_t cp_template_array[MAX_CP_TEMPLATE_NUMBER];
  30. int cp_template_array_size = 0;
  31. int comm_size;
  32. int current_instance;
  33. typedef int (*backup_of_fn)(int);
  34. int increment_current_instance()
  35. {
  36. int _inst;
  37. starpu_pthread_mutex_lock(&current_instance_mutex);
  38. _inst = ++current_instance;
  39. starpu_pthread_mutex_unlock(&current_instance_mutex);
  40. return _inst;
  41. }
  42. int get_current_instance()
  43. {
  44. int _inst;
  45. starpu_pthread_mutex_lock(&current_instance_mutex);
  46. _inst = current_instance;
  47. starpu_pthread_mutex_unlock(&current_instance_mutex);
  48. return _inst;
  49. }
  50. void checkpoint_template_lib_init(void) {
  51. starpu_pthread_mutex_init(&current_instance_mutex, NULL);
  52. starpu_pthread_mutex_init(&cp_template_mutex, NULL);
  53. starpu_mpi_comm_rank(MPI_COMM_WORLD, &_my_rank);
  54. starpu_mpi_comm_size(MPI_COMM_WORLD, &comm_size);
  55. current_instance = 0;
  56. #ifdef STARPU_MPI_VERBOSE
  57. _starpu_mpi_set_debug_level_max(1000);
  58. #endif
  59. }
  60. void checkpoint_template_lib_quit(void) {
  61. for (int i=0 ; i<MAX_CP_TEMPLATE_NUMBER ; i++)
  62. {
  63. if (cp_template_array[i] == NULL)
  64. {
  65. break;
  66. }
  67. _starpu_checkpoint_template_free(cp_template_array[i]);
  68. cp_template_array[i] = NULL;
  69. }
  70. }
  71. int _starpu_mpi_checkpoint_template_add_data(starpu_mpi_checkpoint_template_t cp_template, int type, void* ptr, int count, int backupped_by, int backup_of, starpu_mpi_tag_t tag)
  72. {
  73. starpu_pthread_mutex_lock(&cp_template->mutex);
  74. STARPU_ASSERT_MSG(!cp_template->frozen, "It is not possible to modify registered checkpoint template.\n");
  75. struct _starpu_mpi_checkpoint_template_item* item;
  76. item = _starpu_mpi_checkpoint_template_item_create(type, ptr, count, backupped_by, backup_of, tag);
  77. _starpu_mpi_checkpoint_template_item_list_push_back(&cp_template->list, item);
  78. _checkpoint_template_add_to_backup_arrays(cp_template, backupped_by, backup_of);
  79. _STARPU_MPI_DEBUG(5, "New checkpoint data entry %p (data:%p) has been added to cp_template with id:%d. (%s)\n", item, item->ptr, cp_template->cp_id, backupped_by == -1 ? "BACKUP_OF" : "BACKUPPED_BY");
  80. starpu_pthread_mutex_unlock(&cp_template->mutex);
  81. return 0;
  82. }
  83. int starpu_mpi_checkpoint_template_create(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, int cp_domain)
  84. {
  85. *cp_template = _starpu_mpi_checkpoint_template_new(cp_id, cp_domain);
  86. return 0;
  87. }
  88. int _starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t cp_template, int arg_type, va_list varg_list)
  89. {
  90. void* ptr;
  91. int count;
  92. int backupped_by;
  93. int data_rank;
  94. starpu_mpi_tag_t tag;
  95. backup_of_fn _backup_of;
  96. arg_type = arg_type & ~STARPU_COMMUTE;
  97. switch(arg_type)
  98. {
  99. case STARPU_R:
  100. ptr = va_arg(varg_list, void*);
  101. count = 1;
  102. backupped_by = va_arg(varg_list, int);
  103. data_rank = starpu_mpi_data_get_rank((starpu_data_handle_t)ptr);
  104. if (_my_rank==data_rank)
  105. {
  106. return _starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, backupped_by, -1, -1);
  107. }
  108. else if(_my_rank == backupped_by)
  109. {
  110. return _starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, -1, data_rank, -1);
  111. }
  112. else
  113. {
  114. /* Since this data does not concern me (i.e. it is nor my data neither a data which I'm the back up)
  115. * it is considered unecessary to register in the CP */
  116. return 0;
  117. }
  118. case STARPU_VALUE:
  119. ptr = va_arg(varg_list, void*);
  120. count = va_arg(varg_list, int);
  121. tag = va_arg(varg_list, starpu_mpi_tag_t);
  122. _backup_of = va_arg(varg_list, backup_of_fn);
  123. /* I register the backup that will save this data */
  124. _starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, _backup_of(_my_rank), -1, tag);
  125. for (int i=0 ; i<_my_rank ; i++)
  126. {
  127. if (_backup_of(i) == _my_rank)
  128. {
  129. /* I'm the back up of someone else for this data, I have to remember it */
  130. _starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, -1, i, tag);
  131. }
  132. }
  133. for (int i=_my_rank+1 ; i<comm_size ; i++)
  134. {
  135. if (_backup_of(i) == _my_rank)
  136. {
  137. /* I'm the back up of someone else for this data, I have to remember it */
  138. _starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, -1, i, tag);
  139. }
  140. }
  141. return 0;
  142. // case STARPU_DATA_ARRAY:
  143. // ptr = va_arg(varg_list, void*);
  144. // count = va_arg(varg_list, int);
  145. // backupped_by = va_arg(varg_list, int);
  146. // backup_of = -1;
  147. // break;
  148. default:
  149. STARPU_ABORT_MSG("Unrecognized argument %d, did you perhaps forget to end arguments with 0?\n", arg_type);
  150. }
  151. }
  152. void _cp_discard_message_recv_cb(void* _args)
  153. {
  154. // TODO: store the information of the new CP, for restart purpose
  155. struct _starpu_mpi_cp_discard_arg_cb* arg = (struct _starpu_mpi_cp_discard_arg_cb*) _args;
  156. _STARPU_MPI_FT_STATS_RECV_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg));
  157. _STARPU_MPI_DEBUG(0, "DISCARDING OLD CHECKPOINT DATA of rank %d - new one is CPID:%d - CPINST:%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
  158. checkpoint_package_data_del(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
  159. }
  160. int _starpu_mpi_checkpoint_post_cp_discard_recv(starpu_mpi_checkpoint_template_t cp_template)
  161. {
  162. /* A new CP is submitted. We must post matching recv for the message warning the future checkpoint integrity (so
  163. * I can discard old data from deprecated checkpoint).
  164. * I will receive a msg if I have old CP data.
  165. * TODO: For the message logging discard, I will receive message from the people I exchanged with since the last checkpoint.
  166. * */
  167. struct _starpu_mpi_cp_discard_arg_cb* arg;
  168. int i;
  169. for (i=0 ; i<cp_template->backup_of_array_used_size ; i++)
  170. {
  171. starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
  172. arg->rank = cp_template->backup_of_array[i];
  173. _STARPU_MPI_DEBUG(10, "Post DISCARD msg reception from %d\n", arg->rank);
  174. _ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO,
  175. MPI_COMM_WORLD, _cp_discard_message_recv_cb, (void *) arg);
  176. }
  177. return i;
  178. }
  179. void _cp_discard_message_send_cb(void* _args)
  180. {
  181. _STARPU_MPI_FT_STATS_SEND_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg));
  182. starpu_free(_args);
  183. }
  184. int _starpu_mpi_checkpoint_post_cp_discard_send(starpu_mpi_checkpoint_template_t cp_template, int cp_id, int cp_instance)
  185. {
  186. /* The CP data replication has succeeded. I must send the message warning the checkpoint integrity (so
  187. * they can discard old data from deprecated checkpoint).
  188. * I will send to the ones if it has old CP data from me.
  189. * TODO: For the message logging discard, I will send message to the people I exchanged with since the last checkpoint.
  190. * */
  191. struct _starpu_mpi_cp_discard_arg_cb* arg;
  192. int i;
  193. for (i=0 ; i < cp_template->backupped_by_array_used_size ; i++)
  194. {
  195. starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
  196. arg->rank = cp_template->backupped_by_array[i];
  197. _STARPU_MPI_DEBUG(10, "Post CP DISCARD msg sending to %d\n", arg->rank);
  198. arg->msg.discard=1;
  199. arg->msg.validation=0;
  200. arg->msg.checkpoint_id = cp_id;
  201. arg->msg.checkpoint_instance = cp_instance;
  202. _ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO,
  203. MPI_COMM_WORLD, _cp_discard_message_send_cb, (void *) arg);
  204. }
  205. return 0;
  206. }
  207. starpu_mpi_checkpoint_template_t _starpu_mpi_get_checkpoint_template_by_id(int checkpoint_id)
  208. {
  209. starpu_pthread_mutex_lock(&cp_template_mutex);
  210. for (int i=0 ; i < cp_template_array_size ; i++)
  211. {
  212. // starpu_pthread_mutex_lock(&cp_template_array[i]->mutex);
  213. if (cp_template_array[i]->cp_id == checkpoint_id)
  214. {
  215. // starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
  216. starpu_pthread_mutex_unlock(&cp_template_mutex);
  217. return cp_template_array[i];
  218. }
  219. // starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
  220. }
  221. starpu_pthread_mutex_unlock(&cp_template_mutex);
  222. return NULL;
  223. }
  224. //int _starpu_mpi_checkpoint_post_cp_discard_recv(starpu_mpi_checkpoint_template_t cp_template)
  225. //{
  226. // /* A new CP is submitted. We must post matching recv for the message warning the future checkpoint integrity (so
  227. // * I can tag the data as CP validated, and discard old data from deprecated checkpoint).
  228. // * I will receive a msg if I have old CP data, or if I am the back up for a node into the upcoming Checkpoint.
  229. // * * Here the union of the different list is processed to post message reception only once.
  230. // * TODO: For the message logging discard, I will receive message from the people I exchanged with since the last checkpoint.
  231. // * */
  232. // struct _starpu_mpi_cp_discard_arg_cb* arg;
  233. // int i, j, flag;
  234. // starpu_mpi_checkpoint_template_t old_template;
  235. // for (i=0 ; i<cp_template->backup_of_array_used_size ; i++)
  236. // {
  237. // starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
  238. // arg->rank = cp_template->backup_of_array[i];
  239. // _STARPU_MPI_DEBUG(10, "Posting DISCARD msg reception from %d\n", arg->rank);
  240. // _ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, _cp_discard_message_recv_cb, (void*)arg);
  241. // }
  242. // if (last_valid_checkpoint.checkpoint_id == -1)
  243. // {
  244. // return -1;
  245. // }
  246. // else if (last_valid_checkpoint.checkpoint_id!=cp_template->cp_id)
  247. // {
  248. // old_template = _starpu_mpi_get_checkpoint_template_by_id(last_valid_checkpoint.checkpoint_id);
  249. // for (i=0 ; i<old_template->backup_of_array_used_size ; i++)
  250. // {
  251. // flag=0;
  252. // for(j=0 ; j<cp_template->backup_of_array_used_size ; j++)
  253. // {
  254. // if (cp_template->backup_of_array[j] == old_template->backup_of_array[i])
  255. // {
  256. // flag = 1;
  257. // break;
  258. // }
  259. // }
  260. // if (flag==0)
  261. // {
  262. // starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
  263. // arg->rank = old_template->backup_of_array[i];
  264. // _STARPU_MPI_DEBUG(10, "Posting DISCARD msg reception from %d - LAST VALIDATED CP\n", arg->rank);
  265. // _ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, _cp_discard_message_recv_cb, (void*)arg);
  266. // }
  267. // }
  268. // }
  269. // return 0;
  270. //}
  271. //int _starpu_mpi_checkpoint_post_cp_discard_send(starpu_mpi_checkpoint_template_t cp_template, int cp_id, int cp_instance)
  272. //{
  273. // /* The CP data replication has succeeded. I must send the message warning the future checkpoint integrity (so
  274. // * they can tag the data as CP validated, and discard old data from deprecated checkpoint).
  275. // * I will send to one if it has old CP data from me, or if it is my backup for a data into the just succeeded Checkpoint.
  276. // * * Here the union of the different list is processed to send message only once.
  277. // * TODO: For the message logging discard, I will send message to the people I exchanged with since the last checkpoint.
  278. // * */
  279. // struct _starpu_mpi_cp_discard_arg_cb* arg;
  280. // int i, j, flag;
  281. // starpu_mpi_checkpoint_template_t old_template;
  282. // for (i=0 ; i<cp_template->backupped_by_array_used_size ; i++)
  283. // {
  284. // starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
  285. // arg->rank = cp_template->backupped_by_array[i];
  286. // _STARPU_MPI_DEBUG(10, "Sending DISCARD msg reception to %d\n", arg->rank);
  287. // arg->msg.checkpoint_id = cp_id;
  288. // arg->msg.checkpoint_instance = cp_instance;
  289. // _ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, _cp_discard_message_send_cb, (void*)arg);
  290. // }
  291. // if (last_valid_checkpoint.checkpoint_id == -1)
  292. // {
  293. // return -1;
  294. // }
  295. // else if (last_valid_checkpoint.checkpoint_id!=cp_template->cp_id)
  296. // {
  297. // old_template = _starpu_mpi_get_checkpoint_template_by_id(last_valid_checkpoint.checkpoint_id);
  298. // for (i=0 ; i<old_template->backupped_by_array_used_size ; i++)
  299. // {
  300. // flag=0;
  301. // for(j=0 ; j<cp_template->backupped_by_array_used_size ; j++)
  302. // {
  303. // if (cp_template->backupped_by_array[j] == old_template->backupped_by_array[i])
  304. // {
  305. // flag = 1;
  306. // break;
  307. // }
  308. // }
  309. // if (flag==0)
  310. // {
  311. // starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
  312. // arg->rank = old_template->backupped_by_array[i];
  313. // _STARPU_MPI_DEBUG(10, "Sending DISCARD msg to %d - OLD CP\n", arg->rank);
  314. // arg->msg.checkpoint_id = cp_id;
  315. // arg->msg.checkpoint_instance = cp_instance;
  316. // _ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, _cp_discard_message_send_cb, (void*)arg);
  317. // }
  318. // }
  319. // }
  320. // return 0;
  321. //}
  322. int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t cp_template)
  323. {
  324. // char str[256];
  325. starpu_pthread_mutex_lock(&cp_template->mutex);
  326. _STARPU_MPI_DEBUG(2, "Start freezing checkpoint id:%d\n", cp_template->cp_id);
  327. cp_template->frozen = 1;
  328. cp_template->message_to_send_number = 0;
  329. cp_template->size = _starpu_mpi_checkpoint_template_item_list_size(&cp_template->list);
  330. struct _starpu_mpi_checkpoint_template_item* item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
  331. while (item != _starpu_mpi_checkpoint_template_end(cp_template))
  332. {
  333. if (item->backup_of==-1 && item->backupped_by!=-1)
  334. {
  335. cp_template->message_to_send_number++;
  336. }
  337. item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
  338. }
  339. // sprintf(str, "backupped by Array maxsize:%d - currentsize:%d - ", cp_template->backupped_by_array_max_size, cp_template->backupped_by_array_used_size);
  340. // for (int i=0 ; i<cp_template->backupped_by_array_used_size ; i++)
  341. // {
  342. // sprintf(str,"%s%d ", str, cp_template->backupped_by_array[i]);
  343. // }
  344. // fprintf(stderr, "%s\n", str);
  345. //
  346. // sprintf(str,"backup of Array maxsize:%d - currentsize:%d - ", cp_template->backup_of_array_max_size, cp_template->backup_of_array_used_size);
  347. // for (int i=0 ; i<cp_template->backup_of_array_used_size ; i++)
  348. // {
  349. // sprintf(str,"%s%d ", str, cp_template->backup_of_array[i]);
  350. // }
  351. // fprintf(stderr, "%s\n", str);
  352. starpu_pthread_mutex_unlock(&cp_template->mutex);
  353. starpu_pthread_mutex_lock(&cp_template_mutex);
  354. for (int i=0 ; i < cp_template_array_size ; i++)
  355. {
  356. STARPU_ASSERT_MSG(cp_template_array[i]->cp_id != cp_template->cp_id, "A checkpoint with id %d has already been registered.\n", cp_template->cp_id);
  357. }
  358. cp_template_array[cp_template_array_size] = cp_template;
  359. cp_template_array_size++;
  360. starpu_pthread_mutex_unlock(&cp_template_mutex);
  361. _STARPU_MPI_DEBUG(2, "Checkpoint id:%d is frozen and registered.\n", cp_template->cp_id);
  362. return cp_template->size;
  363. }
  364. int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, int cp_domain, va_list varg_list)
  365. {
  366. int arg_type;
  367. starpu_mpi_checkpoint_template_t _cp_template = _starpu_mpi_checkpoint_template_new(cp_id, cp_domain);
  368. va_list varg_list_copy;
  369. va_copy(varg_list_copy, varg_list);
  370. while ((arg_type = va_arg(varg_list_copy, int)) != 0)
  371. {
  372. _starpu_mpi_checkpoint_template_add_entry(_cp_template, arg_type, varg_list_copy);
  373. }
  374. va_end(varg_list_copy);
  375. _starpu_mpi_checkpoint_template_freeze(_cp_template);
  376. *cp_template = _cp_template;
  377. return 0;
  378. }
  379. int starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t* cp_template)
  380. {
  381. return _starpu_mpi_checkpoint_template_freeze(*cp_template);
  382. }
  383. int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, int cp_domain, ...)
  384. {
  385. va_list varg_list;
  386. va_start(varg_list, cp_domain);
  387. int ret = _starpu_mpi_checkpoint_template_register(cp_template, cp_id, cp_domain, varg_list);
  388. va_end(varg_list);
  389. return ret;
  390. }
  391. int starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t* cp_template, ...)
  392. {
  393. va_list varg_list;
  394. int arg_type;
  395. int ret;
  396. va_start(varg_list, cp_template);
  397. arg_type = va_arg(varg_list, int);
  398. STARPU_ASSERT_MSG(arg_type!=STARPU_NONE, "Unhandled arg_type: STARPU_NONE(0).\n");
  399. ret = _starpu_mpi_checkpoint_template_add_entry(*cp_template, arg_type, varg_list);
  400. va_end(varg_list);
  401. return ret;
  402. }
  403. int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_instance) {
  404. int remaining_ack_messages;
  405. struct _starpu_mpi_checkpoint_tracker* tracker, *tracker1;
  406. starpu_mpi_checkpoint_template_t cp_template = _starpu_mpi_get_checkpoint_template_by_id(checkpoint_id);
  407. starpu_pthread_mutex_lock(&cp_template_mutex);
  408. _STARPU_MPI_DEBUG(20, "Digesting ack recv: id=%d, inst=%d\n", checkpoint_id, checkpoint_instance);
  409. tracker = _starpu_mpi_checkpoint_tracker_update(cp_template, checkpoint_id, cp_template->checkpoint_domain, checkpoint_instance);
  410. remaining_ack_messages = _starpu_mpi_checkpoint_check_tracker(tracker);
  411. if (remaining_ack_messages>0)
  412. {
  413. _STARPU_MPI_DEBUG(20, "The CP (id:%d - inst:%d) found, remaining ack msg awaited:%d.\n", checkpoint_id,
  414. checkpoint_instance, remaining_ack_messages);
  415. }
  416. else if (remaining_ack_messages==0)
  417. {
  418. _STARPU_MPI_DEBUG(0, "The CP (id:%d - inst:%d) has been successfully saved and acknowledged.\n", checkpoint_id, checkpoint_instance);
  419. tracker = _starpu_mpi_checkpoint_tracker_validate_instance(tracker);
  420. _STARPU_MPI_TRACE_CHECKPOINT_END(checkpoint_instance, cp_template->checkpoint_domain);
  421. if (tracker==NULL)
  422. {
  423. // TODO:should warn some people, because the msg loggin is not implemented(this precise nodes to contact)
  424. _STARPU_MPI_DEBUG(0, "No previous checkpoint to discard\n");
  425. }
  426. else
  427. {
  428. if (tracker->old)
  429. {
  430. tracker1 = _starpu_mpi_checkpoint_tracker_get_last_valid_tracker(tracker->cp_domain);
  431. _starpu_mpi_checkpoint_post_cp_discard_send(tracker->cp_template, tracker1->cp_id, tracker1->cp_inst);
  432. }
  433. else
  434. {
  435. _starpu_mpi_checkpoint_post_cp_discard_send(tracker->cp_template, checkpoint_id, checkpoint_instance);
  436. }
  437. }
  438. }
  439. else if (remaining_ack_messages==-1)
  440. {
  441. STARPU_ABORT_MSG("Inst (id:%d - inst:%d) is already valid. should not have received an ack msg.\n", checkpoint_id, checkpoint_instance);
  442. }
  443. else
  444. {
  445. STARPU_ABORT_MSG("Critical error, can not identify %d as remaining messages\n", remaining_ack_messages);
  446. }
  447. _STARPU_MPI_DEBUG(20, "Digested\n");
  448. starpu_pthread_mutex_unlock(&cp_template_mutex);
  449. return 0;
  450. }
  451. // For test purpose
  452. int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_template)
  453. {
  454. // int val;
  455. int i = 0;
  456. struct _starpu_mpi_checkpoint_template_item* item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
  457. while (item != _starpu_mpi_checkpoint_template_end(cp_template))
  458. {
  459. fprintf(stderr,"Item %2d: ", i);
  460. if (item->type == STARPU_VALUE)
  461. {
  462. // fprintf(stderr, "STARPU_VALUE - Value=%d - backupof:%d - backupedby:%d\n", (*(int *)(item->ptr)), item->backup_of, item->backupped_by);
  463. fprintf(stderr, "STARPU_VALUE - pointer:%p - backupof:%d - backupedby:%d\n", item->ptr, item->backup_of, item->backupped_by);
  464. }
  465. else if (item->type == STARPU_R)
  466. {
  467. // val = *(int*)starpu_data_handle_to_pointer(*(starpu_data_handle_t*)(item->ptr), 0);
  468. // fprintf(stderr, "STARPU_R - Value=%d - backupof:%d - backupedby:%d\n", val, item->backup_of, item->backupped_by);
  469. fprintf(stderr, "STARPU_R - pointer:%p - backupof:%d - backupedby:%d\n", item->ptr, item->backup_of, item->backupped_by);
  470. }
  471. else if (item->type == STARPU_DATA_ARRAY)
  472. {
  473. // fprintf(stderr, "STARPU_DATA_ARRAY - Multiple values: %d", *(int*)starpu_data_handle_to_pointer(((starpu_data_handle_t)item->ptr), 0));
  474. //
  475. // for (int j=1 ; j<MIN(item->count, 5) ; j++)
  476. // {
  477. // fprintf(stderr, ", %d", *(int*)starpu_data_handle_to_pointer(((starpu_data_handle_t*)item->ptr)[j], 0)); //j*sizeof(starpu_data_handle_t)
  478. // }
  479. // fprintf(stderr, "...\n");
  480. }
  481. else
  482. {
  483. printf("Unrecognized type.\n");
  484. }
  485. item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
  486. i++;
  487. }
  488. return 0;
  489. }