123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- /* StarPU --- Runtime system for heterogeneous multicore architectures.
- *
- * Copyright (C) 2014-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
- *
- * StarPU is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * StarPU is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
- #include <stdlib.h>
- #include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
- #include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
- #include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
- #include <mpi_failure_tolerance/starpu_mpi_ft_service_comms.h>
- #include <mpi_failure_tolerance/starpu_mpi_ft_stats.h>
- #include <starpu_mpi_private.h>
- #include <mpi/starpu_mpi_mpi_backend.h> // Should be deduced at preprocessing (Nmad vs MPI)
- #include "starpu_mpi_cache.h"
- #define MAX_CP_TEMPLATE_NUMBER 32 // Arbitrary limit
- starpu_pthread_mutex_t cp_lib_mutex;
- int my_rank;
- extern struct _starpu_mpi_req* _starpu_mpi_isend_cache_aware(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *_arg, int sequential_consistency, int* cache_flag);
- extern struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *_arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count, int* cache_flag);
- void _ack_msg_send_cb(void* _args)
- {
- struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
- _STARPU_MPI_FT_STATS_SEND_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg));
- _STARPU_MPI_DEBUG(3, "Ack send succeeded cpid:%d, cpinst:%d, dest:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
- free(arg);
- }
- void _ack_msg_recv_cb(void* _args)
- {
- struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
- int ret;
- _STARPU_MPI_FT_STATS_RECV_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg));
- _STARPU_MPI_DEBUG(3, "ack msg recved id:%d inst:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
- ret = _checkpoint_template_digest_ack_reception(arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
- if (ret == 0) {
- free(arg);
- }
- else if (ret == -1)
- {
- STARPU_ABORT_MSG("Could not find CP template, cpid:%d - cpinst:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
- }
- }
- void _starpu_mpi_store_data_and_send_ack_cb(struct _starpu_mpi_cp_ack_arg_cb* arg)
- {
- checkpoint_package_data_add(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank, arg->tag, arg->type, arg->copy_handle, arg->count);
- _STARPU_MPI_DEBUG(3,"Send ack msg to %d: id=%d inst=%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
- _ft_service_msg_isend_cb((void *) &arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank,
- _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _ack_msg_send_cb, arg);
- }
- void _starpu_mpi_push_cp_ack_recv_cb(struct _starpu_mpi_cp_ack_arg_cb* arg)
- {
- _STARPU_MPI_DEBUG(3, "Posting ack recv cb from %d\n", arg->rank);
- _ft_service_msg_irecv_cb((void *) &arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank,
- _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _ack_msg_recv_cb, arg);
- }
- void _recv_internal_dup_ro_cb(void* _args)
- {
- struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
- if (arg->cache_flag) {
- _STARPU_MPI_FT_STATS_RECV_CACHED_CP_DATA(arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->handle): -1);
- }
- else
- {
- _STARPU_MPI_FT_STATS_RECV_CP_DATA(arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->handle): -1);
- }
- starpu_data_release(arg->copy_handle);
- _starpu_mpi_store_data_and_send_ack_cb(arg);
- }
- void _recv_cp_external_data_cb(void* _args)
- {
- struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
- _STARPU_MPI_FT_STATS_RECV_CP_DATA(arg->type==STARPU_VALUE?arg->count:arg->type==STARPU_R?starpu_data_get_size(arg->handle):-1);
- // an handle has specifically been created, Let's get the value back, and unregister the handle
- arg->copy_handle = starpu_data_handle_to_pointer(arg->handle, STARPU_MAIN_RAM);
- starpu_data_unregister_submit(arg->handle);
- _starpu_mpi_store_data_and_send_ack_cb(arg);
- }
- void _send_cp_external_data_cb(void* _args)
- {
- struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
- _STARPU_MPI_FT_STATS_SEND_CP_DATA(arg->type==STARPU_VALUE?arg->count:arg->type==STARPU_R?starpu_data_get_size(arg->handle):-1);
- free(starpu_data_handle_to_pointer(arg->handle, STARPU_MAIN_RAM));
- starpu_data_unregister_submit(arg->handle);
- _starpu_mpi_push_cp_ack_recv_cb(arg);
- }
- void _send_cp_internal_data_cb(void* _args) {
- struct _starpu_mpi_cp_ack_arg_cb *arg = (struct _starpu_mpi_cp_ack_arg_cb *) _args;
- if (arg->cache_flag) {
- _STARPU_MPI_FT_STATS_SEND_CACHED_CP_DATA(arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->handle): -1);
- }
- else
- {
- _STARPU_MPI_FT_STATS_SEND_CP_DATA(arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->handle): -1);
- }
- _starpu_mpi_push_cp_ack_recv_cb(arg);
- }
- void _send_cached_cp_internal_data_cb(void* _args)
- {
- struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
- starpu_data_release(arg->handle);
- _starpu_mpi_push_cp_ack_recv_cb(arg);
- }
- int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template)
- {
- starpu_data_handle_t handle;
- struct _starpu_mpi_cp_ack_arg_cb* arg;
- void* cpy_ptr;
- struct _starpu_mpi_checkpoint_template_item* item;
- int current_instance;
- current_instance = increment_current_instance();
- _starpu_mpi_checkpoint_post_cp_discard_recv(cp_template);
- item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
- while (item != _starpu_mpi_checkpoint_template_end(cp_template))
- {
- switch (item->type)
- {
- case STARPU_VALUE:
- // TODO: Maybe do not pass via starpu handles for external data, and need to reimplement mpi comm layer for
- arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
- arg->tag = item->tag;
- arg->type = STARPU_VALUE;
- arg->count = item->count;
- arg->msg.checkpoint_id = cp_template->cp_id;
- arg->msg.checkpoint_instance = current_instance;
- if (item->backupped_by != -1)
- {
- cpy_ptr = malloc(item->count);
- memcpy(cpy_ptr, item->ptr, item->count);
- starpu_variable_data_register(&arg->handle, STARPU_MAIN_RAM, (uintptr_t)cpy_ptr, item->count);
- arg->rank = item->backupped_by;
- _STARPU_MPI_DEBUG(0, "Submit CP: sending external data:%d, tag:%ld, to :%d\n", (int)(*(int*)cpy_ptr), arg->tag, arg->rank);
- starpu_mpi_isend_detached_prio(arg->handle, arg->rank, arg->tag, 0, MPI_COMM_WORLD,
- &_send_cp_external_data_cb, (void*)arg);
- // The callback needs to free the handle specially created for the send, and post ack recv
- }
- else if (item->backup_of != -1)
- {
- cpy_ptr = malloc(item->count);
- starpu_variable_data_register(&arg->handle, STARPU_MAIN_RAM, (uintptr_t)cpy_ptr, item->count);
- arg->rank = item->backup_of;
- _STARPU_MPI_DEBUG(0, "Submit CP: receiving external data tag:%ld, from :%d\n", arg->tag, arg->rank);
- starpu_mpi_irecv_detached(arg->handle, arg->rank, arg->tag, MPI_COMM_WORLD,
- &_recv_cp_external_data_cb, (void*)arg);
- // The callback needs to store the received data and post ack send
- }
- break;
- case STARPU_R:
- handle = (starpu_data_handle_t)item->ptr;
- if (starpu_mpi_data_get_rank(handle)==my_rank)
- {
- _STARPU_MPI_DEBUG(0, "Submit CP: sending starPU data to %d (tag %d)\n", item->backupped_by, (int)starpu_mpi_data_get_tag(handle));
- arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
- arg->rank = item->backupped_by;
- arg->handle = handle;
- arg->tag = starpu_mpi_data_get_tag(handle);
- arg->type = STARPU_R;
- arg->count = item->count;
- arg->msg.checkpoint_id = cp_template->cp_id;
- arg->msg.checkpoint_instance = current_instance;
- _starpu_mpi_isend_cache_aware(handle, item->backupped_by, starpu_mpi_data_get_tag(handle), MPI_COMM_WORLD, 1, 0, 0,
- &_send_cp_internal_data_cb, (void*)arg, 1, &arg->cache_flag);
- // the callbacks need to post ack recv. The cache one needs to release the handle.
- }
- else if (item->backup_of == starpu_mpi_data_get_rank(handle))
- {
- _STARPU_MPI_DEBUG(0, "Submit CP: receiving starPU data from %d (tag %d)\n", starpu_mpi_data_get_rank(handle), (int)starpu_mpi_data_get_tag(handle));
- arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
- arg->rank = item->backup_of;
- arg->handle = handle;
- arg->tag = starpu_mpi_data_get_tag(handle);
- arg->type = STARPU_R;
- arg->count = item->count;
- arg->msg.checkpoint_id = cp_template->cp_id;
- arg->msg.checkpoint_instance = current_instance;
- _starpu_mpi_irecv_cache_aware(handle, starpu_mpi_data_get_rank(handle), starpu_mpi_data_get_tag(handle), MPI_COMM_WORLD, 1, 0,
- NULL, NULL, 1, 0, 1, &arg->cache_flag);
- // The callback needs to do nothing. The cached one must release the handle.
- starpu_data_dup_ro(&arg->copy_handle, arg->handle, 1);
- starpu_data_acquire_cb(arg->copy_handle, STARPU_R, _recv_internal_dup_ro_cb, arg);
- // The callback need to store the data and post ack send.
- }
- break;
- }
- item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
- }
- return 0;
- }
- //
- ///**
- // * receives param of type starpu_mpi_checkpoint_template_t
- // * @param args
- // * @return
- // */
- //void _starpu_mpi_checkpoint_ack_send_cb(void* args)
- //{
- // starpu_mpi_checkpoint_template_t cp_template = (starpu_mpi_checkpoint_template_t) args;
- // starpu_pthread_mutex_lock(&cp_template->mutex);
- // cp_template->remaining_ack_awaited--;
- // starpu_pthread_mutex_unlock(&cp_template->mutex);
- //}
- //
- //void _starpu_checkpoint_cached_data_send_copy_and_ack(void* _arg)
- //{
- // struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _arg;
- // starpu_data_register_same(&arg->copy_handle, arg->handle);
- // starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_push_cp_ack_recv_cb, _arg);
- // starpu_data_release(arg->handle);
- //}
- //
- //void _starpu_checkpoint_data_send_copy_and_ack(void* _args)
- //{
- // struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
- // starpu_data_register_same(&arg->copy_handle, arg->handle);
- // starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_push_cp_ack_recv_cb, _args);
- //}
- //
- //void _starpu_mpi_treat_cache_ack_no_lock_cb(void* _args)
- //{
- // starpu_mpi_checkpoint_template_t cp_template = (starpu_mpi_checkpoint_template_t)_args;
- // cp_template->remaining_ack_awaited--;
- //}
|