123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541 |
- /* StarPU --- Runtime system for heterogeneous multicore architectures.
- *
- * Copyright (C) 2013-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
- *
- * StarPU is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * StarPU is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
- #include <stdarg.h>
- #include <stdlib.h>
- #include <sys/param.h>
- #include <starpu_mpi_private.h>
- #include <starpu_mpi_cache.h>
- #include <mpi/starpu_mpi_mpi_backend.h>
- #include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
- #include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
- #include <mpi_failure_tolerance/starpu_mpi_ft_service_comms.h>
- #include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
- #include <mpi_failure_tolerance/starpu_mpi_ft_stats.h>
- starpu_pthread_mutex_t cp_template_mutex;
- starpu_pthread_mutex_t current_instance_mutex;
- starpu_mpi_checkpoint_template_t cp_template_array[MAX_CP_TEMPLATE_NUMBER];
- int cp_template_array_size = 0;
- int my_rank;
- int comm_size;
- int current_instance;
- typedef int (*backup_of_fn)(int);
- int increment_current_instance()
- {
- int _inst;
- starpu_pthread_mutex_lock(¤t_instance_mutex);
- _inst = ++current_instance;
- starpu_pthread_mutex_unlock(¤t_instance_mutex);
- return _inst;
- }
- int get_current_instance()
- {
- int _inst;
- starpu_pthread_mutex_lock(¤t_instance_mutex);
- _inst = current_instance;
- starpu_pthread_mutex_unlock(¤t_instance_mutex);
- return _inst;
- }
- void checkpoint_template_lib_init(void) {
- starpu_pthread_mutex_init(¤t_instance_mutex, NULL);
- starpu_pthread_mutex_init(&cp_template_mutex, NULL);
- starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank);
- starpu_mpi_comm_size(MPI_COMM_WORLD, &comm_size);
- current_instance = 0;
- #ifdef STARPU_MPI_VERBOSE
- _starpu_mpi_set_debug_level_max(1000);
- #endif
- }
- void checkpoint_template_lib_quit(void) {
- for (int i=0 ; i<MAX_CP_TEMPLATE_NUMBER ; i++)
- {
- if (cp_template_array[i] == NULL)
- {
- break;
- }
- _starpu_checkpoint_template_free(cp_template_array[i]);
- cp_template_array[i] = NULL;
- }
- }
- int _starpu_mpi_checkpoint_template_add_data(starpu_mpi_checkpoint_template_t cp_template, int type, void* ptr, int count, int backupped_by, int backup_of, starpu_mpi_tag_t tag)
- {
- starpu_pthread_mutex_lock(&cp_template->mutex);
- STARPU_ASSERT_MSG(!cp_template->frozen, "It is not possible to modify registered checkpoint template.\n");
- struct _starpu_mpi_checkpoint_template_item* item;
- item = _starpu_mpi_checkpoint_template_item_create(type, ptr, count, backupped_by, backup_of, tag);
- _starpu_mpi_checkpoint_template_item_list_push_back(&cp_template->list, item);
- _checkpoint_template_add_to_backup_arrays(cp_template, backupped_by, backup_of);
- _STARPU_MPI_DEBUG(5, "New checkpoint data entry %p has been added to cp_template with id:%d. (%s)\n", item, cp_template->cp_id, backupped_by == -1 ? "BACKUP_OF" : "BACKUPPED_BY");
- starpu_pthread_mutex_unlock(&cp_template->mutex);
- return 0;
- }
- int starpu_mpi_checkpoint_template_create(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, int cp_domain)
- {
- *cp_template = _starpu_mpi_checkpoint_template_new(cp_id, cp_domain);
- return 0;
- }
- int _starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t cp_template, int arg_type, va_list varg_list)
- {
- void* ptr;
- int count;
- int backupped_by;
- int data_rank;
- starpu_mpi_tag_t tag;
- backup_of_fn _backup_of;
- arg_type = arg_type & ~STARPU_COMMUTE;
- switch(arg_type)
- {
- case STARPU_R:
- ptr = va_arg(varg_list, void*);
- count = 1;
- backupped_by = va_arg(varg_list, int);
- data_rank = starpu_mpi_data_get_rank((starpu_data_handle_t)ptr);
- if (my_rank==data_rank)
- {
- return _starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, backupped_by, -1, -1);
- }
- else if(my_rank == backupped_by)
- {
- return _starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, -1, data_rank, -1);
- }
- else
- {
- /* Since this data does not concern me (i.e. it is nor my data neither a data which I'm the back up)
- * it is considered unecessary to register in the CP */
- return 0;
- }
- case STARPU_VALUE:
- ptr = va_arg(varg_list, void*);
- count = va_arg(varg_list, int);
- tag = va_arg(varg_list, starpu_mpi_tag_t);
- _backup_of = va_arg(varg_list, backup_of_fn);
- /* I register the backup that will save this data */
- _starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, _backup_of(my_rank), -1, tag);
- for (int i=0 ; i<my_rank ; i++)
- {
- if (_backup_of(i) == my_rank)
- {
- /* I'm the back up of someone else for this data, I have to remember it */
- _starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, -1, i, tag);
- }
- }
- for (int i=my_rank+1 ; i<comm_size ; i++)
- {
- if (_backup_of(i) == my_rank)
- {
- /* I'm the back up of someone else for this data, I have to remember it */
- _starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, -1, i, tag);
- }
- }
- return 0;
- // case STARPU_DATA_ARRAY:
- // ptr = va_arg(varg_list, void*);
- // count = va_arg(varg_list, int);
- // backupped_by = va_arg(varg_list, int);
- // backup_of = -1;
- // break;
- default:
- STARPU_ABORT_MSG("Unrecognized argument %d, did you perhaps forget to end arguments with 0?\n", arg_type);
- }
- }
- void _cp_discard_message_recv_cb(void* _args)
- {
- // TODO: store the information of the new CP, for restart purpose
- struct _starpu_mpi_cp_discard_arg_cb* arg = (struct _starpu_mpi_cp_discard_arg_cb*) _args;
- _STARPU_MPI_FT_STATS_RECV_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg));
- _STARPU_MPI_DEBUG(0, "DISCARDING OLD CHECKPOINT DATA of rank %d - new one is CPID:%d - CPINST:%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
- checkpoint_package_data_del(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
- }
- int _starpu_mpi_checkpoint_post_cp_discard_recv(starpu_mpi_checkpoint_template_t cp_template)
- {
- /* A new CP is submitted. We must post matching recv for the message warning the future checkpoint integrity (so
- * I can discard old data from deprecated checkpoint).
- * I will receive a msg if I have old CP data.
- * TODO: For the message logging discard, I will receive message from the people I exchanged with since the last checkpoint.
- * */
- struct _starpu_mpi_cp_discard_arg_cb* arg;
- int i;
- for (i=0 ; i<cp_template->backup_of_array_used_size ; i++)
- {
- starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
- arg->rank = cp_template->backup_of_array[i];
- _STARPU_MPI_DEBUG(10, "Post DISCARD msg reception from %d\n", arg->rank);
- _ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO,
- MPI_COMM_WORLD, _cp_discard_message_recv_cb, (void *) arg);
- }
- return i;
- }
- void _cp_discard_message_send_cb(void* _args)
- {
- _STARPU_MPI_FT_STATS_SEND_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg));
- starpu_free(_args);
- }
- int _starpu_mpi_checkpoint_post_cp_discard_send(starpu_mpi_checkpoint_template_t cp_template, int cp_id, int cp_instance)
- {
- /* The CP data replication has succeeded. I must send the message warning the checkpoint integrity (so
- * they can discard old data from deprecated checkpoint).
- * I will send to the ones if it has old CP data from me.
- * TODO: For the message logging discard, I will send message to the people I exchanged with since the last checkpoint.
- * */
- struct _starpu_mpi_cp_discard_arg_cb* arg;
- int i;
- for (i=0 ; i < cp_template->backupped_by_array_used_size ; i++)
- {
- starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
- arg->rank = cp_template->backupped_by_array[i];
- _STARPU_MPI_DEBUG(10, "Post CP DISCARD msg sending to %d\n", arg->rank);
- arg->msg.discard=1;
- arg->msg.validation=0;
- arg->msg.checkpoint_id = cp_id;
- arg->msg.checkpoint_instance = cp_instance;
- _ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO,
- MPI_COMM_WORLD, _cp_discard_message_send_cb, (void *) arg);
- }
- return 0;
- }
- starpu_mpi_checkpoint_template_t _starpu_mpi_get_checkpoint_template_by_id(int checkpoint_id)
- {
- starpu_pthread_mutex_lock(&cp_template_mutex);
- for (int i=0 ; i < cp_template_array_size ; i++)
- {
- // starpu_pthread_mutex_lock(&cp_template_array[i]->mutex);
- if (cp_template_array[i]->cp_id == checkpoint_id)
- {
- // starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
- starpu_pthread_mutex_unlock(&cp_template_mutex);
- return cp_template_array[i];
- }
- // starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
- }
- starpu_pthread_mutex_unlock(&cp_template_mutex);
- return NULL;
- }
- //int _starpu_mpi_checkpoint_post_cp_discard_recv(starpu_mpi_checkpoint_template_t cp_template)
- //{
- // /* A new CP is submitted. We must post matching recv for the message warning the future checkpoint integrity (so
- // * I can tag the data as CP validated, and discard old data from deprecated checkpoint).
- // * I will receive a msg if I have old CP data, or if I am the back up for a node into the upcoming Checkpoint.
- // * * Here the union of the different list is processed to post message reception only once.
- // * TODO: For the message logging discard, I will receive message from the people I exchanged with since the last checkpoint.
- // * */
- // struct _starpu_mpi_cp_discard_arg_cb* arg;
- // int i, j, flag;
- // starpu_mpi_checkpoint_template_t old_template;
- // for (i=0 ; i<cp_template->backup_of_array_used_size ; i++)
- // {
- // starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
- // arg->rank = cp_template->backup_of_array[i];
- // _STARPU_MPI_DEBUG(10, "Posting DISCARD msg reception from %d\n", arg->rank);
- // _ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, _cp_discard_message_recv_cb, (void*)arg);
- // }
- // if (last_valid_checkpoint.checkpoint_id == -1)
- // {
- // return -1;
- // }
- // else if (last_valid_checkpoint.checkpoint_id!=cp_template->cp_id)
- // {
- // old_template = _starpu_mpi_get_checkpoint_template_by_id(last_valid_checkpoint.checkpoint_id);
- // for (i=0 ; i<old_template->backup_of_array_used_size ; i++)
- // {
- // flag=0;
- // for(j=0 ; j<cp_template->backup_of_array_used_size ; j++)
- // {
- // if (cp_template->backup_of_array[j] == old_template->backup_of_array[i])
- // {
- // flag = 1;
- // break;
- // }
- // }
- // if (flag==0)
- // {
- // starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
- // arg->rank = old_template->backup_of_array[i];
- // _STARPU_MPI_DEBUG(10, "Posting DISCARD msg reception from %d - LAST VALIDATED CP\n", arg->rank);
- // _ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, _cp_discard_message_recv_cb, (void*)arg);
- // }
- // }
- // }
- // return 0;
- //}
- //int _starpu_mpi_checkpoint_post_cp_discard_send(starpu_mpi_checkpoint_template_t cp_template, int cp_id, int cp_instance)
- //{
- // /* The CP data replication has succeeded. I must send the message warning the future checkpoint integrity (so
- // * they can tag the data as CP validated, and discard old data from deprecated checkpoint).
- // * I will send to one if it has old CP data from me, or if it is my backup for a data into the just succeeded Checkpoint.
- // * * Here the union of the different list is processed to send message only once.
- // * TODO: For the message logging discard, I will send message to the people I exchanged with since the last checkpoint.
- // * */
- // struct _starpu_mpi_cp_discard_arg_cb* arg;
- // int i, j, flag;
- // starpu_mpi_checkpoint_template_t old_template;
- // for (i=0 ; i<cp_template->backupped_by_array_used_size ; i++)
- // {
- // starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
- // arg->rank = cp_template->backupped_by_array[i];
- // _STARPU_MPI_DEBUG(10, "Sending DISCARD msg reception to %d\n", arg->rank);
- // arg->msg.checkpoint_id = cp_id;
- // arg->msg.checkpoint_instance = cp_instance;
- // _ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, _cp_discard_message_send_cb, (void*)arg);
- // }
- // if (last_valid_checkpoint.checkpoint_id == -1)
- // {
- // return -1;
- // }
- // else if (last_valid_checkpoint.checkpoint_id!=cp_template->cp_id)
- // {
- // old_template = _starpu_mpi_get_checkpoint_template_by_id(last_valid_checkpoint.checkpoint_id);
- // for (i=0 ; i<old_template->backupped_by_array_used_size ; i++)
- // {
- // flag=0;
- // for(j=0 ; j<cp_template->backupped_by_array_used_size ; j++)
- // {
- // if (cp_template->backupped_by_array[j] == old_template->backupped_by_array[i])
- // {
- // flag = 1;
- // break;
- // }
- // }
- // if (flag==0)
- // {
- // starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
- // arg->rank = old_template->backupped_by_array[i];
- // _STARPU_MPI_DEBUG(10, "Sending DISCARD msg to %d - OLD CP\n", arg->rank);
- // arg->msg.checkpoint_id = cp_id;
- // arg->msg.checkpoint_instance = cp_instance;
- // _ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, _cp_discard_message_send_cb, (void*)arg);
- // }
- // }
- // }
- // return 0;
- //}
- int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t cp_template)
- {
- // char str[256];
- starpu_pthread_mutex_lock(&cp_template->mutex);
- _STARPU_MPI_DEBUG(2, "Start freezing checkpoint id:%d\n", cp_template->cp_id);
- cp_template->frozen = 1;
- cp_template->message_to_send_number = 0;
- cp_template->size = _starpu_mpi_checkpoint_template_item_list_size(&cp_template->list);
- struct _starpu_mpi_checkpoint_template_item* item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
- while (item != _starpu_mpi_checkpoint_template_end(cp_template))
- {
- if (item->backup_of==-1 && item->backupped_by!=-1)
- {
- cp_template->message_to_send_number++;
- }
- item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
- }
- // sprintf(str, "backupped by Array maxsize:%d - currentsize:%d - ", cp_template->backupped_by_array_max_size, cp_template->backupped_by_array_used_size);
- // for (int i=0 ; i<cp_template->backupped_by_array_used_size ; i++)
- // {
- // sprintf(str,"%s%d ", str, cp_template->backupped_by_array[i]);
- // }
- // fprintf(stderr, "%s\n", str);
- //
- // sprintf(str,"backup of Array maxsize:%d - currentsize:%d - ", cp_template->backup_of_array_max_size, cp_template->backup_of_array_used_size);
- // for (int i=0 ; i<cp_template->backup_of_array_used_size ; i++)
- // {
- // sprintf(str,"%s%d ", str, cp_template->backup_of_array[i]);
- // }
- // fprintf(stderr, "%s\n", str);
- starpu_pthread_mutex_unlock(&cp_template->mutex);
- starpu_pthread_mutex_lock(&cp_template_mutex);
- for (int i=0 ; i < cp_template_array_size ; i++)
- {
- STARPU_ASSERT_MSG(cp_template_array[i]->cp_id != cp_template->cp_id, "A checkpoint with id %d has already been registered.\n", cp_template->cp_id);
- }
- cp_template_array[cp_template_array_size] = cp_template;
- cp_template_array_size++;
- starpu_pthread_mutex_unlock(&cp_template_mutex);
- _STARPU_MPI_DEBUG(2, "Checkpoint id:%d is frozen and registered.\n", cp_template->cp_id);
- return cp_template->size;
- }
- int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, int cp_domain, va_list varg_list)
- {
- int arg_type;
- starpu_mpi_checkpoint_template_t _cp_template = _starpu_mpi_checkpoint_template_new(cp_id, cp_domain);
- va_list varg_list_copy;
- va_copy(varg_list_copy, varg_list);
- while ((arg_type = va_arg(varg_list_copy, int)) != 0)
- {
- _starpu_mpi_checkpoint_template_add_entry(_cp_template, arg_type, varg_list_copy);
- }
- va_end(varg_list_copy);
- _starpu_mpi_checkpoint_template_freeze(_cp_template);
- *cp_template = _cp_template;
- return 0;
- }
- int starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t* cp_template)
- {
- return _starpu_mpi_checkpoint_template_freeze(*cp_template);
- }
- int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, int cp_domain, ...)
- {
- va_list varg_list;
- va_start(varg_list, cp_domain);
- int ret = _starpu_mpi_checkpoint_template_register(cp_template, cp_id, cp_domain, varg_list);
- va_end(varg_list);
- return ret;
- }
- int starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t* cp_template, ...)
- {
- va_list varg_list;
- int arg_type;
- int ret;
- va_start(varg_list, cp_template);
- arg_type = va_arg(varg_list, int);
- STARPU_ASSERT_MSG(arg_type!=STARPU_NONE, "Unhandled arg_type: STARPU_NONE(0).\n");
- ret = _starpu_mpi_checkpoint_template_add_entry(*cp_template, arg_type, varg_list);
- va_end(varg_list);
- return ret;
- }
- int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_instance) {
- int remaining_ack_messages;
- struct _starpu_mpi_checkpoint_tracker* tracker, *tracker1;
- starpu_mpi_checkpoint_template_t cp_template = _starpu_mpi_get_checkpoint_template_by_id(checkpoint_id);
- starpu_pthread_mutex_lock(&cp_template_mutex);
- _STARPU_MPI_DEBUG(20, "Digesting ack recv: id=%d, inst=%d\n", checkpoint_id, checkpoint_instance);
- tracker = _starpu_mpi_checkpoint_tracker_update(cp_template, checkpoint_id, cp_template->checkpoint_domain, checkpoint_instance);
- remaining_ack_messages = _starpu_mpi_checkpoint_check_tracker(tracker);
- if (remaining_ack_messages>0)
- {
- _STARPU_MPI_DEBUG(20, "The CP (id:%d - inst:%d) found, remaining ack msg awaited:%d.\n", checkpoint_id,
- checkpoint_instance, remaining_ack_messages);
- }
- else if (remaining_ack_messages==0)
- {
- _STARPU_MPI_DEBUG(0, "The CP (id:%d - inst:%d) has been successfully saved and acknowledged.\n", checkpoint_id, checkpoint_instance);
- tracker = _starpu_mpi_checkpoint_tracker_validate_instance(tracker);
- if (tracker==NULL)
- {
- // TODO:should warn some people, because the msg loggin is not implemented(this precise nodes to contact)
- _STARPU_MPI_DEBUG(0, "No previous checkpoint to discard\n");
- }
- else
- {
- if (tracker->old)
- {
- tracker1 = _starpu_mpi_checkpoint_tracker_get_last_valid_tracker(tracker->cp_domain);
- _starpu_mpi_checkpoint_post_cp_discard_send(tracker->cp_template, tracker1->cp_id, tracker1->cp_inst);
- }
- else
- {
- _starpu_mpi_checkpoint_post_cp_discard_send(tracker->cp_template, checkpoint_id, checkpoint_instance);
- }
- }
- }
- else if (remaining_ack_messages==-1)
- {
- STARPU_ABORT_MSG("Inst (id:%d - inst:%d) is already valid. should not have received an ack msg.\n", checkpoint_id, checkpoint_instance);
- }
- else
- {
- STARPU_ABORT_MSG("Critical error, can not identify %d as remaining messages\n", remaining_ack_messages);
- }
- _STARPU_MPI_DEBUG(20, "Digested\n");
- starpu_pthread_mutex_unlock(&cp_template_mutex);
- return 0;
- }
- // For test purpose
- int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_template)
- {
- // int val;
- int i = 0;
- struct _starpu_mpi_checkpoint_template_item* item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
- while (item != _starpu_mpi_checkpoint_template_end(cp_template))
- {
- fprintf(stderr,"Item %2d: ", i);
- if (item->type == STARPU_VALUE)
- {
- // fprintf(stderr, "STARPU_VALUE - Value=%d - backupof:%d - backupedby:%d\n", (*(int *)(item->ptr)), item->backup_of, item->backupped_by);
- fprintf(stderr, "STARPU_VALUE - pointer:%p - backupof:%d - backupedby:%d\n", item->ptr, item->backup_of, item->backupped_by);
- }
- else if (item->type == STARPU_R)
- {
- // val = *(int*)starpu_data_handle_to_pointer(*(starpu_data_handle_t*)(item->ptr), 0);
- // fprintf(stderr, "STARPU_R - Value=%d - backupof:%d - backupedby:%d\n", val, item->backup_of, item->backupped_by);
- fprintf(stderr, "STARPU_R - pointer:%p - backupof:%d - backupedby:%d\n", item->ptr, item->backup_of, item->backupped_by);
- }
- else if (item->type == STARPU_DATA_ARRAY)
- {
- fprintf(stderr, "STARPU_DATA_ARRAY - Multiple values: %d", *(int*)starpu_data_handle_to_pointer(*((starpu_data_handle_t*)item->ptr), 0));
- for (int j=1 ; j<MIN(item->count, 5) ; j++)
- {
- fprintf(stderr, ", %d", *(int*)starpu_data_handle_to_pointer(((starpu_data_handle_t*)item->ptr)[j], 0)); //j*sizeof(starpu_data_handle_t)
- }
- fprintf(stderr, "...\n");
- }
- else
- {
- printf("Unrecognized type.\n");
- }
- item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
- i++;
- }
- return 0;
- }
|