123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- /* StarPU --- Runtime system for heterogeneous multicore architectures.
- *
- * Copyright (C) 2014-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
- *
- * StarPU is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * StarPU is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
- #include <stdlib.h>
- #include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
- #include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
- #include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
- #include <mpi_failure_tolerance/starpu_mpi_ft_service_comms.h>
- #include <starpu_mpi_private.h>
- #include <mpi/starpu_mpi_mpi_backend.h> // Should be deduced at preprocessing (Nmad vs MPI)
- #include "starpu_mpi_cache.h"
- #define MAX_CP_TEMPLATE_NUMBER 32 // Arbitrary limit
- starpu_pthread_mutex_t cp_lib_mutex;
- int my_rank;
- extern struct _starpu_mpi_req* _starpu_mpi_isend_cache_aware(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *_arg, void (*alt_callback)(void *), void *_alt_arg, int sequential_consistency);
- extern struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *_arg, void (*alt_callback)(void *), void *_alt_arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count);
- void _starpu_mpi_treat_ack_receipt_cb(void* _args)
- {
- struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
- _STARPU_MPI_DEBUG(3, "ack msg recved id:%d inst:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
- if (_checkpoint_template_digest_ack_reception(arg->msg.checkpoint_id, arg->msg.checkpoint_instance) == 0) {
- free(arg);
- }
- }
- void _arg_free(void* _args)
- {
- struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
- _STARPU_MPI_DEBUG(3, "Ack send succeeded cpid:%d, cpinst:%d, dest:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
- free(arg);
- }
- void _starpu_mpi_push_cp_ack_send_cb(void* _args)
- {
- struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
- _STARPU_MPI_DEBUG(3,"Send ack msg to %d: id=%d inst=%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
- _ft_service_msg_isend_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _arg_free, _args);
- }
- void _starpu_mpi_store_data_and_push_cp_ack_send_cb(void* _args)
- {
- struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
- checkpoint_package_data_add(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank, arg->tag, arg->type, arg->copy_handle, arg->count);
- _starpu_mpi_push_cp_ack_send_cb(_args);
- }
- void _starpu_mpi_push_cp_ack_recv_cb(void* _args)
- {
- struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
- if (STARPU_VALUE == arg->type)
- {
- free(starpu_data_handle_to_pointer(arg->handle, STARPU_MAIN_RAM));
- starpu_data_unregister(arg->handle);
- }
- _STARPU_MPI_DEBUG(3, "Posting ack recv cb from %d\n", arg->rank);
- _ft_service_msg_irecv_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _starpu_mpi_treat_ack_receipt_cb, _args);
- }
- void _starpu_checkpoint_cached_data_recv_copy_and_ack(void* _arg)
- {
- struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _arg;
- starpu_data_dup_ro(&arg->copy_handle, arg->handle, 1, _starpu_mpi_store_data_and_push_cp_ack_send_cb, _arg);
- // starpu_data_register_same(&arg->copy_handle, arg->handle);
- // starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_store_data_and_push_cp_ack_send_cb, _arg);
- starpu_data_release(arg->handle);
- }
- void _starpu_checkpoint_data_recv_copy_and_ack(void* _arg)
- {
- struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _arg;
- if (STARPU_VALUE == arg->type)
- {
- // an handle as specificaly been created, no need to copy the data. Call directly the Callback
- arg->copy_handle = arg->handle;
- _starpu_mpi_store_data_and_push_cp_ack_send_cb(_arg);
- return;
- }
- else if (STARPU_R == arg->type)
- {
- starpu_data_dup_ro(&arg->copy_handle, arg->handle, 1, _starpu_mpi_store_data_and_push_cp_ack_send_cb, _arg);
- // starpu_data_register_same(&arg->copy_handle, arg->handle);
- // starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_store_data_and_push_cp_ack_send_cb, _arg);
- return;
- }
- }
- int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template)
- {
- // TODO: For now checkpoint are not taken asynchronously. It will be later, and then we will have to acquire READ permissions to StarPU in order to not have the data potentially corrupted.
- starpu_data_handle_t* handle;
- struct _starpu_mpi_cp_ack_arg_cb* arg;
- void* cpy_ptr;
- struct _starpu_mpi_checkpoint_template_item* item;
- //MPI_Comm comm;
- starpu_pthread_mutex_lock(&cp_template->mutex);
- set_pending_checkpoint_template(cp_template);
- STARPU_ASSERT_MSG(cp_template->pending==0, "Can not submit a checkpoint while previous instance has not succeeded.\n");
- cp_template->pending = 1;
- cp_template->cp_template_current_instance++;
- cp_template->remaining_ack_awaited = cp_template->sent_message_number;
- starpu_pthread_mutex_unlock(&cp_template->mutex);
- item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
- _starpu_mpi_checkpoint_post_cp_discard_recv(cp_template);
- while (item != _starpu_mpi_checkpoint_template_end(cp_template))
- {
- switch (item->type)
- {
- case STARPU_VALUE:
- // TODO: Maybe do not pass via starpu handles for external data, and need to reimplement mpi comm layer for
- arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
- handle = &arg->handle;
- arg->tag = item->tag;
- arg->type = STARPU_VALUE;
- arg->count = item->count;
- arg->msg.checkpoint_id = cp_template->cp_template_id;
- arg->msg.checkpoint_instance = cp_template->cp_template_current_instance;
- if (item->backupped_by != -1)
- {
- cpy_ptr = malloc(item->count);
- memcpy(cpy_ptr, item->ptr, item->count);
- starpu_variable_data_register(handle, STARPU_MAIN_RAM, (uintptr_t)cpy_ptr, item->count);
- arg->rank = item->backupped_by;
- _STARPU_MPI_DEBUG(0, "Submit CP: sending external data:%d, tag:%ld, to :%d\n", (int)(*(int*)cpy_ptr), arg->tag, arg->rank);
- starpu_mpi_isend_detached_prio(*handle, arg->rank, arg->tag, 0, MPI_COMM_WORLD,
- &_starpu_mpi_push_cp_ack_recv_cb, (void*)arg);
- }
- else if (item->backup_of != -1)
- {
- cpy_ptr = malloc(item->count);
- starpu_variable_data_register(handle, STARPU_MAIN_RAM, (uintptr_t)cpy_ptr, item->count);
- arg->rank = item->backup_of;
- _STARPU_MPI_DEBUG(0, "Submit CP: receiving external data tag:%ld, from :%d\n", arg->tag, arg->rank);
- starpu_mpi_irecv_detached(*handle, arg->rank, arg->tag, MPI_COMM_WORLD,
- &_starpu_checkpoint_data_recv_copy_and_ack, (void*)arg);
- }
- break;
- case STARPU_R:
- handle = (starpu_data_handle_t*)item->ptr;
- if (starpu_mpi_data_get_rank(*handle)==my_rank)
- {
- _STARPU_MPI_DEBUG(0, "Submit CP: sending starPU data to %d (tag %d)\n", item->backupped_by, (int)starpu_mpi_data_get_tag(*handle));
- arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
- arg->rank = item->backupped_by;
- arg->handle = *handle;
- arg->tag = starpu_mpi_data_get_tag(*handle);
- arg->type = STARPU_R;
- arg->count = item->count;
- arg->msg.checkpoint_id = cp_template->cp_template_id;
- arg->msg.checkpoint_instance = cp_template->cp_template_current_instance;
- _starpu_mpi_isend_cache_aware(*handle, item->backupped_by, starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0, 0,
- &_starpu_mpi_push_cp_ack_recv_cb, (void*)arg, &_starpu_mpi_push_cp_ack_recv_cb, (void*)arg, 1);
- }
- else if (item->backup_of == starpu_mpi_data_get_rank(*handle))
- {
- _STARPU_MPI_DEBUG(0, "Submit CP: receiving starPU data from %d (tag %d)\n", starpu_mpi_data_get_rank(*handle), (int)starpu_mpi_data_get_tag(*handle));
- arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
- arg->rank = item->backup_of;
- arg->handle = *handle;
- arg->tag = starpu_mpi_data_get_tag(*handle);
- arg->type = STARPU_R;
- arg->count = item->count;
- arg->msg.checkpoint_id = cp_template->cp_template_id;
- arg->msg.checkpoint_instance = cp_template->cp_template_current_instance;
- _starpu_mpi_irecv_cache_aware(*handle, starpu_mpi_data_get_rank(*handle), starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0,
- &_starpu_checkpoint_data_recv_copy_and_ack, (void*)arg, &_starpu_checkpoint_cached_data_recv_copy_and_ack, (void*)arg, 1, 0, 1);
- }
- break;
- }
- item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
- };
- return 0;
- }
- //
- ///**
- // * receives param of type starpu_mpi_checkpoint_template_t
- // * @param args
- // * @return
- // */
- //void _starpu_mpi_checkpoint_ack_send_cb(void* args)
- //{
- // starpu_mpi_checkpoint_template_t cp_template = (starpu_mpi_checkpoint_template_t) args;
- // starpu_pthread_mutex_lock(&cp_template->mutex);
- // cp_template->remaining_ack_awaited--;
- // starpu_pthread_mutex_unlock(&cp_template->mutex);
- //}
- //
- //void _starpu_checkpoint_cached_data_send_copy_and_ack(void* _arg)
- //{
- // struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _arg;
- // starpu_data_register_same(&arg->copy_handle, arg->handle);
- // starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_push_cp_ack_recv_cb, _arg);
- // starpu_data_release(arg->handle);
- //}
- //
- //void _starpu_checkpoint_data_send_copy_and_ack(void* _args)
- //{
- // struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
- // starpu_data_register_same(&arg->copy_handle, arg->handle);
- // starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_push_cp_ack_recv_cb, _args);
- //}
- //
- //void _starpu_mpi_treat_cache_ack_no_lock_cb(void* _args)
- //{
- // starpu_mpi_checkpoint_template_t cp_template = (starpu_mpi_checkpoint_template_t)_args;
- // cp_template->remaining_ack_awaited--;
- //}
|