/* StarPU --- Runtime system for heterogeneous multicore architectures. * * Copyright (C) 2014-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria * * StarPU is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation; either version 2.1 of the License, or (at * your option) any later version. * * StarPU is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * * See the GNU Lesser General Public License in COPYING.LGPL for more details. */ #include #include #include #include #include #include #include #include // Should be deduced at preprocessing (Nmad vs MPI) #include "starpu_mpi_cache.h" #define MAX_CP_TEMPLATE_NUMBER 32 // Arbitrary limit starpu_pthread_mutex_t cp_lib_mutex; int my_rank; extern struct _starpu_mpi_req* _starpu_mpi_isend_cache_aware(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *_arg, int sequential_consistency, int* cache_flag); extern struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *_arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count, int* cache_flag); void _ack_msg_send_cb(void* _args) { struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args; _STARPU_MPI_FT_STATS_SEND_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg)); _STARPU_MPI_DEBUG(3, "Ack send succeeded cpid:%d, cpinst:%d, dest:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank); free(arg); } void _ack_msg_recv_cb(void* _args) { struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args; int ret; _STARPU_MPI_FT_STATS_RECV_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg)); _STARPU_MPI_DEBUG(3, "ack msg recved id:%d inst:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance); ret = _checkpoint_template_digest_ack_reception(arg->msg.checkpoint_id, arg->msg.checkpoint_instance); if (ret == 0) { free(arg); } else if (ret == -1) { STARPU_ABORT_MSG("Could not find CP template, cpid:%d - cpinst:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance); } } void _starpu_mpi_store_data_and_send_ack_cb(struct _starpu_mpi_cp_ack_arg_cb* arg) { checkpoint_package_data_add(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank, arg->tag, arg->type, arg->copy_handle, arg->count); _STARPU_MPI_DEBUG(3,"Send ack msg to %d: id=%d inst=%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance); _ft_service_msg_isend_cb((void *) &arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _ack_msg_send_cb, arg); } void _starpu_mpi_push_cp_ack_recv_cb(struct _starpu_mpi_cp_ack_arg_cb* arg) { _STARPU_MPI_DEBUG(3, "Posting ack recv cb from %d\n", arg->rank); _ft_service_msg_irecv_cb((void *) &arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _ack_msg_recv_cb, arg); } void _recv_internal_dup_ro_cb(void* _args) { struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args; if (arg->cache_flag) { _STARPU_MPI_FT_STATS_RECV_CACHED_CP_DATA(arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->handle): -1); } else { _STARPU_MPI_FT_STATS_RECV_CP_DATA(arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->handle): -1); } starpu_data_release(arg->copy_handle); _starpu_mpi_store_data_and_send_ack_cb(arg); } void _recv_cp_external_data_cb(void* _args) { struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args; _STARPU_MPI_FT_STATS_RECV_CP_DATA(arg->type==STARPU_VALUE?arg->count:arg->type==STARPU_R?starpu_data_get_size(arg->handle):-1); // an handle has specifically been created, Let's get the value back, and unregister the handle arg->copy_handle = starpu_data_handle_to_pointer(arg->handle, STARPU_MAIN_RAM); starpu_data_unregister_submit(arg->handle); _starpu_mpi_store_data_and_send_ack_cb(arg); } void _send_cp_external_data_cb(void* _args) { struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args; _STARPU_MPI_FT_STATS_SEND_CP_DATA(arg->type==STARPU_VALUE?arg->count:arg->type==STARPU_R?starpu_data_get_size(arg->handle):-1); free(starpu_data_handle_to_pointer(arg->handle, STARPU_MAIN_RAM)); starpu_data_unregister_submit(arg->handle); _starpu_mpi_push_cp_ack_recv_cb(arg); } void _send_cp_internal_data_cb(void* _args) { struct _starpu_mpi_cp_ack_arg_cb *arg = (struct _starpu_mpi_cp_ack_arg_cb *) _args; if (arg->cache_flag) { _STARPU_MPI_FT_STATS_SEND_CACHED_CP_DATA(arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->handle): -1); } else { _STARPU_MPI_FT_STATS_SEND_CP_DATA(arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->handle): -1); } _starpu_mpi_push_cp_ack_recv_cb(arg); } void _send_cached_cp_internal_data_cb(void* _args) { struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args; starpu_data_release(arg->handle); _starpu_mpi_push_cp_ack_recv_cb(arg); } int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template) { starpu_data_handle_t handle; struct _starpu_mpi_cp_ack_arg_cb* arg; void* cpy_ptr; struct _starpu_mpi_checkpoint_template_item* item; int current_instance; current_instance = increment_current_instance(); _starpu_mpi_checkpoint_post_cp_discard_recv(cp_template); item = _starpu_mpi_checkpoint_template_get_first_data(cp_template); while (item != _starpu_mpi_checkpoint_template_end(cp_template)) { switch (item->type) { case STARPU_VALUE: // TODO: Maybe do not pass via starpu handles for external data, and need to reimplement mpi comm layer for arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb)); arg->tag = item->tag; arg->type = STARPU_VALUE; arg->count = item->count; arg->msg.checkpoint_id = cp_template->cp_id; arg->msg.checkpoint_instance = current_instance; if (item->backupped_by != -1) { cpy_ptr = malloc(item->count); memcpy(cpy_ptr, item->ptr, item->count); starpu_variable_data_register(&arg->handle, STARPU_MAIN_RAM, (uintptr_t)cpy_ptr, item->count); arg->rank = item->backupped_by; _STARPU_MPI_DEBUG(0, "Submit CP: sending external data:%d, tag:%ld, to :%d\n", (int)(*(int*)cpy_ptr), arg->tag, arg->rank); starpu_mpi_isend_detached_prio(arg->handle, arg->rank, arg->tag, 0, MPI_COMM_WORLD, &_send_cp_external_data_cb, (void*)arg); // The callback needs to free the handle specially created for the send, and post ack recv } else if (item->backup_of != -1) { cpy_ptr = malloc(item->count); starpu_variable_data_register(&arg->handle, STARPU_MAIN_RAM, (uintptr_t)cpy_ptr, item->count); arg->rank = item->backup_of; _STARPU_MPI_DEBUG(0, "Submit CP: receiving external data tag:%ld, from :%d\n", arg->tag, arg->rank); starpu_mpi_irecv_detached(arg->handle, arg->rank, arg->tag, MPI_COMM_WORLD, &_recv_cp_external_data_cb, (void*)arg); // The callback needs to store the received data and post ack send } break; case STARPU_R: handle = (starpu_data_handle_t)item->ptr; if (starpu_mpi_data_get_rank(handle)==my_rank) { _STARPU_MPI_DEBUG(0, "Submit CP: sending starPU data to %d (tag %d)\n", item->backupped_by, (int)starpu_mpi_data_get_tag(handle)); arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb)); arg->rank = item->backupped_by; arg->handle = handle; arg->tag = starpu_mpi_data_get_tag(handle); arg->type = STARPU_R; arg->count = item->count; arg->msg.checkpoint_id = cp_template->cp_id; arg->msg.checkpoint_instance = current_instance; _starpu_mpi_isend_cache_aware(handle, item->backupped_by, starpu_mpi_data_get_tag(handle), MPI_COMM_WORLD, 1, 0, 0, &_send_cp_internal_data_cb, (void*)arg, 1, &arg->cache_flag); // the callbacks need to post ack recv. The cache one needs to release the handle. } else if (item->backup_of == starpu_mpi_data_get_rank(handle)) { _STARPU_MPI_DEBUG(0, "Submit CP: receiving starPU data from %d (tag %d)\n", starpu_mpi_data_get_rank(handle), (int)starpu_mpi_data_get_tag(handle)); arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb)); arg->rank = item->backup_of; arg->handle = handle; arg->tag = starpu_mpi_data_get_tag(handle); arg->type = STARPU_R; arg->count = item->count; arg->msg.checkpoint_id = cp_template->cp_id; arg->msg.checkpoint_instance = current_instance; _starpu_mpi_irecv_cache_aware(handle, starpu_mpi_data_get_rank(handle), starpu_mpi_data_get_tag(handle), MPI_COMM_WORLD, 1, 0, NULL, NULL, 1, 0, 1, &arg->cache_flag); // The callback needs to do nothing. The cached one must release the handle. starpu_data_dup_ro(&arg->copy_handle, arg->handle, 1); starpu_data_acquire_cb(arg->copy_handle, STARPU_R, _recv_internal_dup_ro_cb, arg); // The callback need to store the data and post ack send. } break; } item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item); } return 0; } // ///** // * receives param of type starpu_mpi_checkpoint_template_t // * @param args // * @return // */ //void _starpu_mpi_checkpoint_ack_send_cb(void* args) //{ // starpu_mpi_checkpoint_template_t cp_template = (starpu_mpi_checkpoint_template_t) args; // starpu_pthread_mutex_lock(&cp_template->mutex); // cp_template->remaining_ack_awaited--; // starpu_pthread_mutex_unlock(&cp_template->mutex); //} // //void _starpu_checkpoint_cached_data_send_copy_and_ack(void* _arg) //{ // struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _arg; // starpu_data_register_same(&arg->copy_handle, arg->handle); // starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_push_cp_ack_recv_cb, _arg); // starpu_data_release(arg->handle); //} // //void _starpu_checkpoint_data_send_copy_and_ack(void* _args) //{ // struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args; // starpu_data_register_same(&arg->copy_handle, arg->handle); // starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_push_cp_ack_recv_cb, _args); //} // //void _starpu_mpi_treat_cache_ack_no_lock_cb(void* _args) //{ // starpu_mpi_checkpoint_template_t cp_template = (starpu_mpi_checkpoint_template_t)_args; // cp_template->remaining_ack_awaited--; //}