starpu_mpi_checkpoint_template.h 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2013-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #ifndef _STARPU_MPI_CHECKPOINT_TEMPLATE_H
  17. #define _STARPU_MPI_CHECKPOINT_TEMPLATE_H
  18. #include <starpu_mpi.h>
  19. #include <common/list.h>
  20. #include <starpu_mpi_private.h>
  21. #include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
  22. #ifdef __cplusplus
  23. extern "C"
  24. {
  25. #endif
  26. #define _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE 16
  27. void checkpoint_template_lib_init(void);
  28. void checkpoint_template_lib_quit(void);
  29. int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_instance);
  30. int _starpu_mpi_checkpoint_post_cp_discard_recv(starpu_mpi_checkpoint_template_t cp_template);
  31. int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, va_list varg_list);
  32. int set_pending_checkpoint_template(starpu_mpi_checkpoint_template_t _pending_checkpoint);
  33. int valid_pending_checkpoint_template(starpu_mpi_checkpoint_template_t _pending_checkpoint);
  34. LIST_TYPE(_starpu_mpi_checkpoint_template_item,
  35. int type;
  36. void* ptr;
  37. int count;
  38. int backupped_by;
  39. int backup_of;
  40. starpu_mpi_tag_t tag;
  41. );
  42. struct _starpu_mpi_checkpoint_template{
  43. struct _starpu_mpi_checkpoint_template_item_list list;
  44. int size;
  45. int cp_template_id;
  46. int cp_template_current_instance;
  47. int sent_message_number;
  48. int remaining_ack_awaited;
  49. int pending;
  50. int frozen;
  51. starpu_pthread_mutex_t mutex;
  52. int* backup_of_array;
  53. int backup_of_array_max_size;
  54. int backup_of_array_used_size;
  55. int* backupped_by_array;
  56. int backupped_by_array_max_size;
  57. int backupped_by_array_used_size;
  58. };
  59. static inline int checkpoint_template_array_realloc(int** array, int* max_size, int growth_factor)
  60. {
  61. *array = (int*)realloc(array, growth_factor*(*max_size));
  62. *array[*max_size] = -1;
  63. *max_size = growth_factor*(*max_size);
  64. return *max_size;
  65. }
  66. static inline int checkpoint_template_backup_of_array_realloc_double(struct _starpu_mpi_checkpoint_template* checkpoint_template)
  67. {
  68. return checkpoint_template_array_realloc(&checkpoint_template->backup_of_array, &checkpoint_template->backup_of_array_max_size, 2);
  69. }
  70. static inline int checkpoint_template_backupped_by_array_realloc_double(struct _starpu_mpi_checkpoint_template* checkpoint_template)
  71. {
  72. return checkpoint_template_array_realloc(&checkpoint_template->backupped_by_array, &checkpoint_template->backupped_by_array_max_size, 2);
  73. }
  74. static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_item_create(int type, void* ptr, int count, int backupped_by, int backup_of, starpu_mpi_tag_t tag)
  75. {
  76. struct _starpu_mpi_checkpoint_template_item* item;
  77. _STARPU_MPI_CALLOC(item, 1, sizeof(struct _starpu_mpi_checkpoint_template_item));
  78. item->type = type;
  79. item->ptr = ptr;
  80. item->count = count;
  81. item->backupped_by = backupped_by;
  82. item->backup_of = backup_of;
  83. item->tag = tag;
  84. return item;
  85. }
  86. static inline starpu_mpi_checkpoint_template_t _starpu_mpi_checkpoint_template_new(int cp_id)
  87. {
  88. starpu_mpi_checkpoint_template_t _cp_template;
  89. _STARPU_MPI_CALLOC(_cp_template, 1, sizeof(struct _starpu_mpi_checkpoint_template));
  90. _cp_template->cp_template_id = cp_id;
  91. _cp_template->cp_template_current_instance = 0;
  92. _cp_template->backup_of_array_max_size = _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE;
  93. starpu_malloc((void**)&_cp_template->backup_of_array, _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE);
  94. _cp_template->backup_of_array[0] = -1;
  95. _cp_template->backup_of_array_used_size = 0;
  96. _cp_template->backupped_by_array_max_size = _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE;
  97. starpu_malloc((void**)&_cp_template->backupped_by_array, _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE);
  98. _cp_template->backupped_by_array[0] = -1;
  99. _cp_template->backupped_by_array_used_size = 0;
  100. starpu_pthread_mutex_init(&_cp_template->mutex, NULL);
  101. return _cp_template;
  102. }
  103. static inline int _checkpoint_template_add_to_backup_arrays(starpu_mpi_checkpoint_template_t cp_template, int backupped_by, int backup_of)
  104. {
  105. if (backup_of == -1) {
  106. for (int i = 0; i < cp_template->backupped_by_array_used_size; i++)
  107. {
  108. if (backupped_by == cp_template->backupped_by_array[i]) {
  109. return 0;
  110. }
  111. }
  112. if (cp_template->backupped_by_array_used_size + 1 == cp_template->backupped_by_array_max_size)
  113. {
  114. checkpoint_template_backupped_by_array_realloc_double(cp_template);
  115. }
  116. cp_template->backupped_by_array[cp_template->backupped_by_array_used_size] = backupped_by;
  117. cp_template->backupped_by_array_used_size++;
  118. cp_template->backupped_by_array[cp_template->backupped_by_array_used_size] = -1;
  119. return backupped_by;
  120. }
  121. else if (backupped_by == -1)
  122. {
  123. for (int i = 0; i < cp_template->backup_of_array_used_size; i++)
  124. {
  125. if (backup_of == cp_template->backup_of_array[i]) {
  126. return 0;
  127. }
  128. }
  129. if (cp_template->backup_of_array_used_size + 1 == cp_template->backup_of_array_max_size)
  130. {
  131. checkpoint_template_backup_of_array_realloc_double(cp_template);
  132. }
  133. cp_template->backup_of_array[cp_template->backup_of_array_used_size] = backup_of;
  134. cp_template->backup_of_array_used_size++;
  135. cp_template->backup_of_array[cp_template->backup_of_array_used_size] = -1;
  136. return backup_of;
  137. }
  138. else
  139. {
  140. _STARPU_DISP("[warning] Checkpoint template item does not refer any backup information. This should not happen.\n");
  141. }
  142. }
  143. static int _starpu_mpi_checkpoint_template_add_data(starpu_mpi_checkpoint_template_t cp_template, int type, void* ptr, int count, int backupped_by, int backup_of, starpu_mpi_tag_t tag)
  144. {
  145. starpu_pthread_mutex_lock(&cp_template->mutex);
  146. STARPU_ASSERT_MSG(!cp_template->frozen, "It is not possible to modify registered checkpoint template.\n");
  147. struct _starpu_mpi_checkpoint_template_item* item;
  148. item = _starpu_mpi_checkpoint_template_item_create(type, ptr, count, backupped_by, backup_of, tag);
  149. _starpu_mpi_checkpoint_template_item_list_push_back(&cp_template->list, item);
  150. _checkpoint_template_add_to_backup_arrays(cp_template, backupped_by, backup_of);
  151. starpu_pthread_mutex_unlock(&cp_template->mutex);
  152. return 0;
  153. }
  154. static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_get_first_data(starpu_mpi_checkpoint_template_t template)
  155. {
  156. return _starpu_mpi_checkpoint_template_item_list_front(&template->list);
  157. }
  158. static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_get_next_data(starpu_mpi_checkpoint_template_t template STARPU_ATTRIBUTE_UNUSED, struct _starpu_mpi_checkpoint_template_item* ref_data)
  159. {
  160. return _starpu_mpi_checkpoint_template_item_list_next(ref_data);
  161. }
  162. static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_end(starpu_mpi_checkpoint_template_t template STARPU_ATTRIBUTE_UNUSED)
  163. {
  164. return NULL;
  165. }
  166. static inline int _starpu_checkpoint_template_free(starpu_mpi_checkpoint_template_t cp_template)
  167. {
  168. struct _starpu_mpi_checkpoint_template_item* item;
  169. struct _starpu_mpi_checkpoint_template_item* next_item;
  170. starpu_pthread_mutex_lock(&cp_template->mutex);
  171. item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
  172. while (item != _starpu_mpi_checkpoint_template_end(cp_template))
  173. {
  174. next_item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
  175. starpu_free(item);
  176. item = next_item;
  177. }
  178. starpu_pthread_mutex_unlock(&cp_template->mutex);
  179. starpu_pthread_mutex_destroy(&cp_template->mutex);
  180. starpu_free(cp_template);
  181. return 0;
  182. }
  183. // For test purpose
  184. int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_template);
  185. #ifdef __cplusplus
  186. }
  187. #endif
  188. #endif //_STARPU_MPI_CHECKPOINT_TEMPLATE_H