/* StarPU --- Runtime system for heterogeneous multicore architectures. * * Copyright (C) 2013-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria * * StarPU is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation; either version 2.1 of the License, or (at * your option) any later version. * * StarPU is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * * See the GNU Lesser General Public License in COPYING.LGPL for more details. */ #ifndef _STARPU_MPI_CHECKPOINT_TEMPLATE_H #define _STARPU_MPI_CHECKPOINT_TEMPLATE_H #include #include #include #include #ifdef __cplusplus extern "C" { #endif #define _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE 16 void checkpoint_template_lib_init(void); void checkpoint_template_lib_quit(void); int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_instance); int _starpu_mpi_checkpoint_post_cp_discard_recv(starpu_mpi_checkpoint_template_t cp_template); int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, va_list varg_list); int set_pending_checkpoint_template(starpu_mpi_checkpoint_template_t _pending_checkpoint); int valid_pending_checkpoint_template(starpu_mpi_checkpoint_template_t _pending_checkpoint); LIST_TYPE(_starpu_mpi_checkpoint_template_item, int type; void* ptr; int count; int backupped_by; int backup_of; starpu_mpi_tag_t tag; ); struct _starpu_mpi_checkpoint_template{ struct _starpu_mpi_checkpoint_template_item_list list; int size; int cp_template_id; int cp_template_current_instance; int sent_message_number; int remaining_ack_awaited; int pending; int frozen; starpu_pthread_mutex_t mutex; int* backup_of_array; int backup_of_array_max_size; int backup_of_array_used_size; int* backupped_by_array; int backupped_by_array_max_size; int backupped_by_array_used_size; }; static inline int checkpoint_template_array_realloc(int** array, int* max_size, int growth_factor) { *array = (int*)realloc(array, growth_factor*(*max_size)); *array[*max_size] = -1; *max_size = growth_factor*(*max_size); return *max_size; } static inline int checkpoint_template_backup_of_array_realloc_double(struct _starpu_mpi_checkpoint_template* checkpoint_template) { return checkpoint_template_array_realloc(&checkpoint_template->backup_of_array, &checkpoint_template->backup_of_array_max_size, 2); } static inline int checkpoint_template_backupped_by_array_realloc_double(struct _starpu_mpi_checkpoint_template* checkpoint_template) { return checkpoint_template_array_realloc(&checkpoint_template->backupped_by_array, &checkpoint_template->backupped_by_array_max_size, 2); } static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_item_create(int type, void* ptr, int count, int backupped_by, int backup_of, starpu_mpi_tag_t tag) { struct _starpu_mpi_checkpoint_template_item* item; _STARPU_MPI_CALLOC(item, 1, sizeof(struct _starpu_mpi_checkpoint_template_item)); item->type = type; item->ptr = ptr; item->count = count; item->backupped_by = backupped_by; item->backup_of = backup_of; item->tag = tag; return item; } static inline starpu_mpi_checkpoint_template_t _starpu_mpi_checkpoint_template_new(int cp_id) { starpu_mpi_checkpoint_template_t _cp_template; _STARPU_MPI_CALLOC(_cp_template, 1, sizeof(struct _starpu_mpi_checkpoint_template)); _cp_template->cp_template_id = cp_id; _cp_template->cp_template_current_instance = 0; _cp_template->backup_of_array_max_size = _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE; starpu_malloc((void**)&_cp_template->backup_of_array, _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE); _cp_template->backup_of_array[0] = -1; _cp_template->backup_of_array_used_size = 0; _cp_template->backupped_by_array_max_size = _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE; starpu_malloc((void**)&_cp_template->backupped_by_array, _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE); _cp_template->backupped_by_array[0] = -1; _cp_template->backupped_by_array_used_size = 0; starpu_pthread_mutex_init(&_cp_template->mutex, NULL); return _cp_template; } static inline int _checkpoint_template_add_to_backup_arrays(starpu_mpi_checkpoint_template_t cp_template, int backupped_by, int backup_of) { if (backup_of == -1) { for (int i = 0; i < cp_template->backupped_by_array_used_size; i++) { if (backupped_by == cp_template->backupped_by_array[i]) { return 0; } } if (cp_template->backupped_by_array_used_size + 1 == cp_template->backupped_by_array_max_size) { checkpoint_template_backupped_by_array_realloc_double(cp_template); } cp_template->backupped_by_array[cp_template->backupped_by_array_used_size] = backupped_by; cp_template->backupped_by_array_used_size++; cp_template->backupped_by_array[cp_template->backupped_by_array_used_size] = -1; return backupped_by; } else if (backupped_by == -1) { for (int i = 0; i < cp_template->backup_of_array_used_size; i++) { if (backup_of == cp_template->backup_of_array[i]) { return 0; } } if (cp_template->backup_of_array_used_size + 1 == cp_template->backup_of_array_max_size) { checkpoint_template_backup_of_array_realloc_double(cp_template); } cp_template->backup_of_array[cp_template->backup_of_array_used_size] = backup_of; cp_template->backup_of_array_used_size++; cp_template->backup_of_array[cp_template->backup_of_array_used_size] = -1; return backup_of; } else { _STARPU_DISP("[warning] Checkpoint template item does not refer any backup information. This should not happen.\n"); } } static int _starpu_mpi_checkpoint_template_add_data(starpu_mpi_checkpoint_template_t cp_template, int type, void* ptr, int count, int backupped_by, int backup_of, starpu_mpi_tag_t tag) { starpu_pthread_mutex_lock(&cp_template->mutex); STARPU_ASSERT_MSG(!cp_template->frozen, "It is not possible to modify registered checkpoint template.\n"); struct _starpu_mpi_checkpoint_template_item* item; item = _starpu_mpi_checkpoint_template_item_create(type, ptr, count, backupped_by, backup_of, tag); _starpu_mpi_checkpoint_template_item_list_push_back(&cp_template->list, item); _checkpoint_template_add_to_backup_arrays(cp_template, backupped_by, backup_of); starpu_pthread_mutex_unlock(&cp_template->mutex); return 0; } static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_get_first_data(starpu_mpi_checkpoint_template_t template) { return _starpu_mpi_checkpoint_template_item_list_front(&template->list); } static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_get_next_data(starpu_mpi_checkpoint_template_t template STARPU_ATTRIBUTE_UNUSED, struct _starpu_mpi_checkpoint_template_item* ref_data) { return _starpu_mpi_checkpoint_template_item_list_next(ref_data); } static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_end(starpu_mpi_checkpoint_template_t template STARPU_ATTRIBUTE_UNUSED) { return NULL; } static inline int _starpu_checkpoint_template_free(starpu_mpi_checkpoint_template_t cp_template) { struct _starpu_mpi_checkpoint_template_item* item; struct _starpu_mpi_checkpoint_template_item* next_item; starpu_pthread_mutex_lock(&cp_template->mutex); item = _starpu_mpi_checkpoint_template_get_first_data(cp_template); while (item != _starpu_mpi_checkpoint_template_end(cp_template)) { next_item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item); starpu_free(item); item = next_item; } starpu_pthread_mutex_unlock(&cp_template->mutex); starpu_pthread_mutex_destroy(&cp_template->mutex); starpu_free(cp_template); return 0; } // For test purpose int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_template); #ifdef __cplusplus } #endif #endif //_STARPU_MPI_CHECKPOINT_TEMPLATE_H