starpu_mpi_checkpoint.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2014-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <stdlib.h>
  17. #include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
  18. #include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
  19. #include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
  20. #include <mpi_failure_tolerance/starpu_mpi_ft_service_comms.h>
  21. #include <mpi_failure_tolerance/starpu_mpi_ft_stats.h>
  22. #include <starpu_mpi_private.h>
  23. #include <mpi/starpu_mpi_mpi_backend.h> // Should be deduced at preprocessing (Nmad vs MPI)
  24. #include "starpu_mpi_cache.h"
  25. #define MAX_CP_TEMPLATE_NUMBER 32 // Arbitrary limit
  26. starpu_pthread_mutex_t cp_lib_mutex;
  27. int my_rank;
  28. extern struct _starpu_mpi_req* _starpu_mpi_isend_cache_aware(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *_arg, int sequential_consistency, int* cache_flag);
  29. extern struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *_arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count, int* cache_flag);
  30. void _ack_msg_send_cb(void* _args)
  31. {
  32. struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
  33. _STARPU_MPI_FT_STATS_SEND_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg));
  34. _STARPU_MPI_DEBUG(3, "Ack send succeeded cpid:%d, cpinst:%d, dest:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
  35. free(arg);
  36. }
  37. void _ack_msg_recv_cb(void* _args)
  38. {
  39. struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
  40. int ret;
  41. _STARPU_MPI_FT_STATS_RECV_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg));
  42. _STARPU_MPI_DEBUG(3, "ack msg recved id:%d inst:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
  43. ret = _checkpoint_template_digest_ack_reception(arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
  44. if (ret == 0) {
  45. free(arg);
  46. }
  47. else if (ret == -1)
  48. {
  49. STARPU_ABORT_MSG("Could not find CP template, cpid:%d - cpinst:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
  50. }
  51. }
  52. void _starpu_mpi_store_data_and_send_ack_cb(struct _starpu_mpi_cp_ack_arg_cb* arg)
  53. {
  54. checkpoint_package_data_add(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank, arg->tag, arg->type, arg->copy_handle, arg->count);
  55. _STARPU_MPI_DEBUG(3,"Send ack msg to %d: id=%d inst=%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
  56. _ft_service_msg_isend_cb((void *) &arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank,
  57. _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _ack_msg_send_cb, arg);
  58. }
  59. void _starpu_mpi_push_cp_ack_recv_cb(struct _starpu_mpi_cp_ack_arg_cb* arg)
  60. {
  61. _STARPU_MPI_DEBUG(3, "Posting ack recv cb from %d\n", arg->rank);
  62. _ft_service_msg_irecv_cb((void *) &arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank,
  63. _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _ack_msg_recv_cb, arg);
  64. }
  65. void _recv_internal_dup_ro_cb(void* _args)
  66. {
  67. struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
  68. if (arg->cache_flag) {
  69. _STARPU_MPI_FT_STATS_RECV_CACHED_CP_DATA(arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->handle): -1);
  70. }
  71. else
  72. {
  73. _STARPU_MPI_FT_STATS_RECV_CP_DATA(arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->handle): -1);
  74. }
  75. starpu_data_release(arg->copy_handle);
  76. _starpu_mpi_store_data_and_send_ack_cb(arg);
  77. }
  78. void _recv_cp_external_data_cb(void* _args)
  79. {
  80. struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
  81. _STARPU_MPI_FT_STATS_RECV_CP_DATA(arg->type==STARPU_VALUE?arg->count:arg->type==STARPU_R?starpu_data_get_size(arg->handle):-1);
  82. // an handle has specifically been created, Let's get the value back, and unregister the handle
  83. arg->copy_handle = starpu_data_handle_to_pointer(arg->handle, STARPU_MAIN_RAM);
  84. starpu_data_unregister_submit(arg->handle);
  85. _starpu_mpi_store_data_and_send_ack_cb(arg);
  86. }
  87. void _send_cp_external_data_cb(void* _args)
  88. {
  89. struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
  90. _STARPU_MPI_FT_STATS_SEND_CP_DATA(arg->type==STARPU_VALUE?arg->count:arg->type==STARPU_R?starpu_data_get_size(arg->handle):-1);
  91. free(starpu_data_handle_to_pointer(arg->handle, STARPU_MAIN_RAM));
  92. starpu_data_unregister_submit(arg->handle);
  93. _starpu_mpi_push_cp_ack_recv_cb(arg);
  94. }
  95. void _send_cp_internal_data_cb(void* _args) {
  96. struct _starpu_mpi_cp_ack_arg_cb *arg = (struct _starpu_mpi_cp_ack_arg_cb *) _args;
  97. if (arg->cache_flag) {
  98. _STARPU_MPI_FT_STATS_SEND_CACHED_CP_DATA(arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->handle): -1);
  99. }
  100. else
  101. {
  102. _STARPU_MPI_FT_STATS_SEND_CP_DATA(arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->handle): -1);
  103. }
  104. _starpu_mpi_push_cp_ack_recv_cb(arg);
  105. }
  106. void _send_cached_cp_internal_data_cb(void* _args)
  107. {
  108. struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
  109. starpu_data_release(arg->handle);
  110. _starpu_mpi_push_cp_ack_recv_cb(arg);
  111. }
  112. int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template)
  113. {
  114. starpu_data_handle_t handle;
  115. struct _starpu_mpi_cp_ack_arg_cb* arg;
  116. void* cpy_ptr;
  117. struct _starpu_mpi_checkpoint_template_item* item;
  118. int current_instance;
  119. current_instance = increment_current_instance();
  120. _starpu_mpi_checkpoint_post_cp_discard_recv(cp_template);
  121. item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
  122. while (item != _starpu_mpi_checkpoint_template_end(cp_template))
  123. {
  124. switch (item->type)
  125. {
  126. case STARPU_VALUE:
  127. // TODO: Maybe do not pass via starpu handles for external data, and need to reimplement mpi comm layer for
  128. arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
  129. arg->tag = item->tag;
  130. arg->type = STARPU_VALUE;
  131. arg->count = item->count;
  132. arg->msg.checkpoint_id = cp_template->cp_id;
  133. arg->msg.checkpoint_instance = current_instance;
  134. if (item->backupped_by != -1)
  135. {
  136. cpy_ptr = malloc(item->count);
  137. memcpy(cpy_ptr, item->ptr, item->count);
  138. starpu_variable_data_register(&arg->handle, STARPU_MAIN_RAM, (uintptr_t)cpy_ptr, item->count);
  139. arg->rank = item->backupped_by;
  140. _STARPU_MPI_DEBUG(0, "Submit CP: sending external data:%d, tag:%ld, to :%d\n", (int)(*(int*)cpy_ptr), arg->tag, arg->rank);
  141. starpu_mpi_isend_detached_prio(arg->handle, arg->rank, arg->tag, 0, MPI_COMM_WORLD,
  142. &_send_cp_external_data_cb, (void*)arg);
  143. // The callback needs to free the handle specially created for the send, and post ack recv
  144. }
  145. else if (item->backup_of != -1)
  146. {
  147. cpy_ptr = malloc(item->count);
  148. starpu_variable_data_register(&arg->handle, STARPU_MAIN_RAM, (uintptr_t)cpy_ptr, item->count);
  149. arg->rank = item->backup_of;
  150. _STARPU_MPI_DEBUG(0, "Submit CP: receiving external data tag:%ld, from :%d\n", arg->tag, arg->rank);
  151. starpu_mpi_irecv_detached(arg->handle, arg->rank, arg->tag, MPI_COMM_WORLD,
  152. &_recv_cp_external_data_cb, (void*)arg);
  153. // The callback needs to store the received data and post ack send
  154. }
  155. break;
  156. case STARPU_R:
  157. handle = (starpu_data_handle_t)item->ptr;
  158. if (starpu_mpi_data_get_rank(handle)==my_rank)
  159. {
  160. _STARPU_MPI_DEBUG(0, "Submit CP: sending starPU data to %d (tag %d)\n", item->backupped_by, (int)starpu_mpi_data_get_tag(handle));
  161. arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
  162. arg->rank = item->backupped_by;
  163. arg->handle = handle;
  164. arg->tag = starpu_mpi_data_get_tag(handle);
  165. arg->type = STARPU_R;
  166. arg->count = item->count;
  167. arg->msg.checkpoint_id = cp_template->cp_id;
  168. arg->msg.checkpoint_instance = current_instance;
  169. _starpu_mpi_isend_cache_aware(handle, item->backupped_by, starpu_mpi_data_get_tag(handle), MPI_COMM_WORLD, 1, 0, 0,
  170. &_send_cp_internal_data_cb, (void*)arg, 1, &arg->cache_flag);
  171. // the callbacks need to post ack recv. The cache one needs to release the handle.
  172. }
  173. else if (item->backup_of == starpu_mpi_data_get_rank(handle))
  174. {
  175. _STARPU_MPI_DEBUG(0, "Submit CP: receiving starPU data from %d (tag %d)\n", starpu_mpi_data_get_rank(handle), (int)starpu_mpi_data_get_tag(handle));
  176. arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
  177. arg->rank = item->backup_of;
  178. arg->handle = handle;
  179. arg->tag = starpu_mpi_data_get_tag(handle);
  180. arg->type = STARPU_R;
  181. arg->count = item->count;
  182. arg->msg.checkpoint_id = cp_template->cp_id;
  183. arg->msg.checkpoint_instance = current_instance;
  184. _starpu_mpi_irecv_cache_aware(handle, starpu_mpi_data_get_rank(handle), starpu_mpi_data_get_tag(handle), MPI_COMM_WORLD, 1, 0,
  185. NULL, NULL, 1, 0, 1, &arg->cache_flag);
  186. // The callback needs to do nothing. The cached one must release the handle.
  187. starpu_data_dup_ro(&arg->copy_handle, arg->handle, 1);
  188. starpu_data_acquire_cb(arg->copy_handle, STARPU_R, _recv_internal_dup_ro_cb, arg);
  189. // The callback need to store the data and post ack send.
  190. }
  191. break;
  192. }
  193. item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
  194. }
  195. return 0;
  196. }
  197. //
  198. ///**
  199. // * receives param of type starpu_mpi_checkpoint_template_t
  200. // * @param args
  201. // * @return
  202. // */
  203. //void _starpu_mpi_checkpoint_ack_send_cb(void* args)
  204. //{
  205. // starpu_mpi_checkpoint_template_t cp_template = (starpu_mpi_checkpoint_template_t) args;
  206. // starpu_pthread_mutex_lock(&cp_template->mutex);
  207. // cp_template->remaining_ack_awaited--;
  208. // starpu_pthread_mutex_unlock(&cp_template->mutex);
  209. //}
  210. //
  211. //void _starpu_checkpoint_cached_data_send_copy_and_ack(void* _arg)
  212. //{
  213. // struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _arg;
  214. // starpu_data_register_same(&arg->copy_handle, arg->handle);
  215. // starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_push_cp_ack_recv_cb, _arg);
  216. // starpu_data_release(arg->handle);
  217. //}
  218. //
  219. //void _starpu_checkpoint_data_send_copy_and_ack(void* _args)
  220. //{
  221. // struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
  222. // starpu_data_register_same(&arg->copy_handle, arg->handle);
  223. // starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_push_cp_ack_recv_cb, _args);
  224. //}
  225. //
  226. //void _starpu_mpi_treat_cache_ack_no_lock_cb(void* _args)
  227. //{
  228. // starpu_mpi_checkpoint_template_t cp_template = (starpu_mpi_checkpoint_template_t)_args;
  229. // cp_template->remaining_ack_awaited--;
  230. //}