starpu_mpi_checkpoint.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2014-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <stdlib.h>
  17. #include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
  18. #include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
  19. #include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
  20. #include <mpi_failure_tolerance/starpu_mpi_ft_service_comms.h>
  21. #include <starpu_mpi_private.h>
  22. #include <mpi/starpu_mpi_mpi_backend.h> // Should be deduced at preprocessing (Nmad vs MPI)
  23. #include "starpu_mpi_cache.h"
  24. #define MAX_CP_TEMPLATE_NUMBER 32 // Arbitrary limit
  25. starpu_pthread_mutex_t cp_lib_mutex;
  26. int my_rank;
  27. extern struct _starpu_mpi_req* _starpu_mpi_isend_cache_aware(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *_arg, void (*alt_callback)(void *), void *_alt_arg, int sequential_consistency);
  28. extern struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *_arg, void (*alt_callback)(void *), void *_alt_arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count);
  29. void _starpu_mpi_treat_ack_receipt_cb(void* _args)
  30. {
  31. struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
  32. int ret;
  33. _STARPU_MPI_DEBUG(3, "ack msg recved id:%d inst:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
  34. ret = _checkpoint_template_digest_ack_reception(arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
  35. if (ret == 0) {
  36. free(arg);
  37. }
  38. else if (ret == -1)
  39. {
  40. STARPU_ABORT_MSG("Could not find CP template, cpid:%d - cpinst:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
  41. }
  42. }
  43. void _arg_free(void* _args)
  44. {
  45. struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
  46. _STARPU_MPI_DEBUG(3, "Ack send succeeded cpid:%d, cpinst:%d, dest:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
  47. free(arg);
  48. }
  49. void _starpu_mpi_store_data_and_send_ack_cb(void* _args)
  50. {
  51. struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
  52. if (STARPU_VALUE == arg->type) {
  53. // an handle has specifically been created, Let's get the value back, and unregister the handle
  54. arg->copy_handle = starpu_data_handle_to_pointer(arg->handle, STARPU_MAIN_RAM);
  55. starpu_data_unregister_submit(arg->handle);
  56. }
  57. checkpoint_package_data_add(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank, arg->tag, arg->type, arg->copy_handle, arg->count);
  58. _STARPU_MPI_DEBUG(3,"Send ack msg to %d: id=%d inst=%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
  59. _ft_service_msg_isend_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _arg_free, _args);
  60. }void _starpu_mpi_release_and_store_data_and_send_ack_cb(void* _args)
  61. {
  62. struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
  63. starpu_data_release(arg->copy_handle);
  64. checkpoint_package_data_add(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank, arg->tag, arg->type, arg->copy_handle, arg->count);
  65. _STARPU_MPI_DEBUG(3,"Send ack msg to %d: id=%d inst=%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
  66. _ft_service_msg_isend_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _arg_free, _args);
  67. }
  68. void _starpu_mpi_push_cp_ack_recv_cb(void* _args)
  69. {
  70. struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
  71. if (STARPU_VALUE == arg->type)
  72. {
  73. free(starpu_data_handle_to_pointer(arg->handle, STARPU_MAIN_RAM));
  74. starpu_data_unregister_submit(arg->handle);
  75. }
  76. _STARPU_MPI_DEBUG(3, "Posting ack recv cb from %d\n", arg->rank);
  77. _ft_service_msg_irecv_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _starpu_mpi_treat_ack_receipt_cb, _args);
  78. }
  79. void _starpu_mpi_cached_push_cp_ack_recv_cb(void* _args)
  80. {
  81. struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
  82. if (STARPU_R == arg->type)
  83. {
  84. starpu_data_release(arg->handle);
  85. }
  86. _STARPU_MPI_DEBUG(3, "Posting ack recv cb from %d\n", arg->rank);
  87. _ft_service_msg_irecv_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _starpu_mpi_treat_ack_receipt_cb, _args);
  88. }
  89. void _starpu_data_release_cb(void* _arg)
  90. {
  91. starpu_data_release(_arg);
  92. }
  93. int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template)
  94. {
  95. starpu_data_handle_t* handle;
  96. struct _starpu_mpi_checkpoint_tracker* tracker;
  97. struct _starpu_mpi_cp_ack_arg_cb* arg;
  98. void* cpy_ptr;
  99. struct _starpu_mpi_checkpoint_template_item* item;
  100. int current_instance;
  101. current_instance = increment_current_instance();
  102. // _starpu_mpi_checkpoint_template_create_instance_tracker(cp_template, cp_template->cp_id, cp_template->checkpoint_domain, current_instance);
  103. _starpu_mpi_checkpoint_post_cp_discard_recv(cp_template);
  104. item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
  105. while (item != _starpu_mpi_checkpoint_template_end(cp_template))
  106. {
  107. switch (item->type)
  108. {
  109. case STARPU_VALUE:
  110. // TODO: Maybe do not pass via starpu handles for external data, and need to reimplement mpi comm layer for
  111. arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
  112. arg->tag = item->tag;
  113. arg->type = STARPU_VALUE;
  114. arg->count = item->count;
  115. arg->msg.checkpoint_id = cp_template->cp_id;
  116. arg->msg.checkpoint_instance = current_instance;
  117. if (item->backupped_by != -1)
  118. {
  119. cpy_ptr = malloc(item->count);
  120. memcpy(cpy_ptr, item->ptr, item->count);
  121. starpu_variable_data_register(&arg->handle, STARPU_MAIN_RAM, (uintptr_t)cpy_ptr, item->count);
  122. arg->rank = item->backupped_by;
  123. _STARPU_MPI_DEBUG(0, "Submit CP: sending external data:%d, tag:%ld, to :%d\n", (int)(*(int*)cpy_ptr), arg->tag, arg->rank);
  124. starpu_mpi_isend_detached_prio(arg->handle, arg->rank, arg->tag, 0, MPI_COMM_WORLD,
  125. &_starpu_mpi_push_cp_ack_recv_cb, (void*)arg);
  126. // The callback needs to free the handle specially created for the send, and post ack recv
  127. }
  128. else if (item->backup_of != -1)
  129. {
  130. cpy_ptr = malloc(item->count);
  131. starpu_variable_data_register(&arg->handle, STARPU_MAIN_RAM, (uintptr_t)cpy_ptr, item->count);
  132. arg->rank = item->backup_of;
  133. _STARPU_MPI_DEBUG(0, "Submit CP: receiving external data tag:%ld, from :%d\n", arg->tag, arg->rank);
  134. starpu_mpi_irecv_detached(arg->handle, arg->rank, arg->tag, MPI_COMM_WORLD,
  135. &_starpu_mpi_store_data_and_send_ack_cb, (void*)arg);
  136. // The callback needs to store the received data and post ack send
  137. }
  138. break;
  139. case STARPU_R:
  140. handle = (starpu_data_handle_t*)item->ptr;
  141. if (starpu_mpi_data_get_rank(*handle)==my_rank)
  142. {
  143. _STARPU_MPI_DEBUG(0, "Submit CP: sending starPU data to %d (tag %d)\n", item->backupped_by, (int)starpu_mpi_data_get_tag(*handle));
  144. arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
  145. arg->rank = item->backupped_by;
  146. arg->handle = *handle;
  147. arg->tag = starpu_mpi_data_get_tag(*handle);
  148. arg->type = STARPU_R;
  149. arg->count = item->count;
  150. arg->msg.checkpoint_id = cp_template->cp_id;
  151. arg->msg.checkpoint_instance = current_instance;
  152. _starpu_mpi_isend_cache_aware(*handle, item->backupped_by, starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0, 0,
  153. &_starpu_mpi_push_cp_ack_recv_cb, (void*)arg, &_starpu_mpi_cached_push_cp_ack_recv_cb, (void*)arg, 1);
  154. // the callbacks need to post ack recv. The cache one needs to release the handle.
  155. }
  156. else if (item->backup_of == starpu_mpi_data_get_rank(*handle))
  157. {
  158. _STARPU_MPI_DEBUG(0, "Submit CP: receiving starPU data from %d (tag %d)\n", starpu_mpi_data_get_rank(*handle), (int)starpu_mpi_data_get_tag(*handle));
  159. arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
  160. arg->rank = item->backup_of;
  161. arg->handle = *handle;
  162. arg->tag = starpu_mpi_data_get_tag(*handle);
  163. arg->type = STARPU_R;
  164. arg->count = item->count;
  165. arg->msg.checkpoint_id = cp_template->cp_id;
  166. arg->msg.checkpoint_instance = current_instance;
  167. _starpu_mpi_irecv_cache_aware(*handle, starpu_mpi_data_get_rank(*handle), starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0,
  168. NULL, NULL, &_starpu_data_release_cb, (void*)arg->handle, 1, 0, 1);
  169. // The callback needs to do nothing. The cached one must release the handle.
  170. starpu_data_dup_ro(&arg->copy_handle, arg->handle, 1);
  171. starpu_data_acquire_cb(arg->copy_handle, STARPU_R, _starpu_mpi_release_and_store_data_and_send_ack_cb, arg);
  172. // The callback need to store the data and post ack send.
  173. }
  174. break;
  175. }
  176. item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
  177. }
  178. return 0;
  179. }
  180. //
  181. ///**
  182. // * receives param of type starpu_mpi_checkpoint_template_t
  183. // * @param args
  184. // * @return
  185. // */
  186. //void _starpu_mpi_checkpoint_ack_send_cb(void* args)
  187. //{
  188. // starpu_mpi_checkpoint_template_t cp_template = (starpu_mpi_checkpoint_template_t) args;
  189. // starpu_pthread_mutex_lock(&cp_template->mutex);
  190. // cp_template->remaining_ack_awaited--;
  191. // starpu_pthread_mutex_unlock(&cp_template->mutex);
  192. //}
  193. //
  194. //void _starpu_checkpoint_cached_data_send_copy_and_ack(void* _arg)
  195. //{
  196. // struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _arg;
  197. // starpu_data_register_same(&arg->copy_handle, arg->handle);
  198. // starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_push_cp_ack_recv_cb, _arg);
  199. // starpu_data_release(arg->handle);
  200. //}
  201. //
  202. //void _starpu_checkpoint_data_send_copy_and_ack(void* _args)
  203. //{
  204. // struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
  205. // starpu_data_register_same(&arg->copy_handle, arg->handle);
  206. // starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_push_cp_ack_recv_cb, _args);
  207. //}
  208. //
  209. //void _starpu_mpi_treat_cache_ack_no_lock_cb(void* _args)
  210. //{
  211. // starpu_mpi_checkpoint_template_t cp_template = (starpu_mpi_checkpoint_template_t)_args;
  212. // cp_template->remaining_ack_awaited--;
  213. //}