|
@@ -26,30 +26,29 @@
|
|
|
#include <mpi_failure_tolerance/starpu_mpi_ft_service_comms.h>
|
|
|
#include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
|
|
|
|
|
|
-#include <starpu_mpi_private.h>
|
|
|
-
|
|
|
-#define MAX_CP_TEMPLATE_NUMBER 32 // Arbitrary limit
|
|
|
|
|
|
starpu_pthread_mutex_t cp_template_mutex;
|
|
|
starpu_mpi_checkpoint_template_t cp_template_array[MAX_CP_TEMPLATE_NUMBER];
|
|
|
+int cp_template_array_size = 0;
|
|
|
int my_rank;
|
|
|
int comm_size;
|
|
|
-int cp_template_number = 0;
|
|
|
-struct _starpu_mpi_cp_ack_msg last_valid_checkpoint;
|
|
|
-starpu_mpi_checkpoint_template_t pending_checkpoint;
|
|
|
-starpu_pthread_mutex_t checkpoint_pending_mutex;
|
|
|
+
|
|
|
+struct _starpu_mpi_checkpoint_template_tracking_inst* last_valid_cp;
|
|
|
+struct _starpu_mpi_checkpoint_template_tracking_inst_list future_tracking_list;
|
|
|
+struct _starpu_mpi_checkpoint_template_tracking_inst_list pending_tracking_list;
|
|
|
+
|
|
|
|
|
|
typedef int (*backup_of_fn)(int);
|
|
|
|
|
|
|
|
|
void checkpoint_template_lib_init(void) {
|
|
|
starpu_pthread_mutex_init(&cp_template_mutex, NULL);
|
|
|
- starpu_pthread_mutex_init(&checkpoint_pending_mutex, NULL);
|
|
|
starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank);
|
|
|
starpu_mpi_comm_size(MPI_COMM_WORLD, &comm_size);
|
|
|
- last_valid_checkpoint.checkpoint_id = -1;
|
|
|
- last_valid_checkpoint.checkpoint_instance = -1;
|
|
|
- pending_checkpoint = NULL;
|
|
|
+ last_valid_cp = _starpu_mpi_checkpoint_template_tracking_inst_new();
|
|
|
+ _starpu_mpi_checkpoint_template_tracking_inst_init(last_valid_cp);
|
|
|
+ _starpu_mpi_checkpoint_template_tracking_inst_list_init(&future_tracking_list);
|
|
|
+ _starpu_mpi_checkpoint_template_tracking_inst_list_init(&pending_tracking_list);
|
|
|
|
|
|
_starpu_mpi_set_debug_level_max(1000);
|
|
|
}
|
|
@@ -66,29 +65,6 @@ void checkpoint_template_lib_quit(void) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-int set_pending_checkpoint_template(starpu_mpi_checkpoint_template_t _pending_checkpoint)
|
|
|
-{
|
|
|
- int ret=starpu_pthread_mutex_trylock(&checkpoint_pending_mutex);
|
|
|
- if (EBUSY==ret)
|
|
|
- {
|
|
|
- _STARPU_DISP("[warning] The process is blocked, a checkpoint has been submitted while the previous one's "
|
|
|
- "submission has not ended. The submission has to wait, try to submit checkpoint "
|
|
|
- "less frequently.\n");
|
|
|
- starpu_pthread_mutex_lock(&checkpoint_pending_mutex);
|
|
|
- }
|
|
|
- STARPU_ASSERT_MSG(pending_checkpoint==NULL, "There is already a checkpoint submission pending. This should not happen.\n");
|
|
|
- pending_checkpoint = _pending_checkpoint;
|
|
|
- return 0;
|
|
|
-}
|
|
|
-
|
|
|
-int valid_pending_checkpoint_template(starpu_mpi_checkpoint_template_t _pending_checkpoint)
|
|
|
-{
|
|
|
- STARPU_ASSERT_MSG(pending_checkpoint==_pending_checkpoint, "This checkpoint is not the one marked as pending. This should not happen.\n");
|
|
|
- pending_checkpoint = NULL;
|
|
|
- starpu_pthread_mutex_unlock(&checkpoint_pending_mutex);
|
|
|
- return 0;
|
|
|
-}
|
|
|
-
|
|
|
int _starpu_mpi_checkpoint_template_add_data(starpu_mpi_checkpoint_template_t cp_template, int type, void* ptr, int count, int backupped_by, int backup_of, starpu_mpi_tag_t tag)
|
|
|
{
|
|
|
starpu_pthread_mutex_lock(&cp_template->mutex);
|
|
@@ -172,82 +148,36 @@ int _starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t c
|
|
|
// break;
|
|
|
default:
|
|
|
STARPU_ABORT_MSG("Unrecognized argument %d, did you perhaps forget to end arguments with 0?\n", arg_type);
|
|
|
- break;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-static starpu_mpi_checkpoint_template_t _starpu_mpi_get_checkpoint_template_by_id(int checkpoint_id)
|
|
|
-{
|
|
|
- starpu_pthread_mutex_lock(&cp_template_mutex);
|
|
|
- for (int i=0 ; i<cp_template_number ; i++)
|
|
|
- {
|
|
|
- starpu_pthread_mutex_lock(&cp_template_array[i]->mutex);
|
|
|
- if (cp_template_array[i]->cp_template_id == checkpoint_id)
|
|
|
- {
|
|
|
- starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
|
|
|
- starpu_pthread_mutex_unlock(&cp_template_mutex);
|
|
|
- return cp_template_array[i];
|
|
|
- }
|
|
|
- starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
|
|
|
- }
|
|
|
- starpu_pthread_mutex_unlock(&cp_template_mutex);
|
|
|
- return NULL;
|
|
|
-}
|
|
|
-
|
|
|
void checkpoint_discard(void* _args)
|
|
|
{
|
|
|
- // TODO: flag data as "CP ready", since the CP has succeeded
|
|
|
+ // TODO: store the information of the new CP, for restart purpose
|
|
|
struct _starpu_mpi_cp_discard_arg_cb* arg = (struct _starpu_mpi_cp_discard_arg_cb*) _args;
|
|
|
_STARPU_MPI_DEBUG(0, "DISCARDING OLD CHECKPOINT DATA of rank %d - new one is CPID:%d - CPINST:%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
|
|
|
checkpoint_package_data_del(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
|
|
|
}
|
|
|
|
|
|
+
|
|
|
int _starpu_mpi_checkpoint_post_cp_discard_recv(starpu_mpi_checkpoint_template_t cp_template)
|
|
|
{
|
|
|
/* A new CP is submitted. We must post matching recv for the message warning the future checkpoint integrity (so
|
|
|
- * I can tag the data as CP validated, and discard old data from deprecated checkpoint).
|
|
|
- * I will receive a msg if I have old CP data, or if I am the back up for a node into the upcoming Checkpoint.
|
|
|
- * * Here the union of the different list is processed to post message reception only once.
|
|
|
+ * I can discard old data from deprecated checkpoint).
|
|
|
+ * I will receive a msg if I have old CP data.
|
|
|
* TODO: For the message logging discard, I will receive message from the people I exchanged with since the last checkpoint.
|
|
|
* */
|
|
|
struct _starpu_mpi_cp_discard_arg_cb* arg;
|
|
|
- int i, j, flag;
|
|
|
- starpu_mpi_checkpoint_template_t old_template;
|
|
|
+ int i;
|
|
|
+
|
|
|
for (i=0 ; i<cp_template->backup_of_array_used_size ; i++)
|
|
|
{
|
|
|
starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
|
|
|
arg->rank = cp_template->backup_of_array[i];
|
|
|
- _STARPU_MPI_DEBUG(10, "Posting DISCARD msg reception from %d\n", arg->rank);
|
|
|
- _ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_DISCARD, MPI_COMM_WORLD, checkpoint_discard, (void*)arg);
|
|
|
- }
|
|
|
- if (last_valid_checkpoint.checkpoint_id == -1)
|
|
|
- {
|
|
|
- return -1;
|
|
|
+ _STARPU_MPI_DEBUG(10, "Post DISCARD msg reception from %d\n", arg->rank);
|
|
|
+ _ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, checkpoint_discard, (void*)arg);
|
|
|
}
|
|
|
- else if (last_valid_checkpoint.checkpoint_id!=cp_template->cp_template_id)
|
|
|
- {
|
|
|
- old_template = _starpu_mpi_get_checkpoint_template_by_id(last_valid_checkpoint.checkpoint_id);
|
|
|
- for (i=0 ; i<old_template->backup_of_array_used_size ; i++)
|
|
|
- {
|
|
|
- flag=0;
|
|
|
- for(j=0 ; j<cp_template->backup_of_array_used_size ; j++)
|
|
|
- {
|
|
|
- if (cp_template->backup_of_array[j] == old_template->backup_of_array[i])
|
|
|
- {
|
|
|
- flag = 1;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- if (flag==0)
|
|
|
- {
|
|
|
- starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
|
|
|
- arg->rank = old_template->backup_of_array[i];
|
|
|
- _STARPU_MPI_DEBUG(10, "Posting DISCARD msg reception from %d - LAST VALIDATED CP\n", arg->rank);
|
|
|
- _ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_DISCARD, MPI_COMM_WORLD, checkpoint_discard, (void*)arg);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return 0;
|
|
|
+ return i;
|
|
|
}
|
|
|
|
|
|
void free_arg(void* _args)
|
|
@@ -257,64 +187,138 @@ void free_arg(void* _args)
|
|
|
|
|
|
int _starpu_mpi_checkpoint_post_cp_discard_send(starpu_mpi_checkpoint_template_t cp_template, int cp_id, int cp_instance)
|
|
|
{
|
|
|
- /* The CP data replication has succeeded. I must send the message warning the future checkpoint integrity (so
|
|
|
- * they can tag the data as CP validated, and discard old data from deprecated checkpoint).
|
|
|
- * I will send to one if it has old CP data from me, or if it is my backup for a data into the just succeeded Checkpoint.
|
|
|
- * * Here the union of the different list is processed to send message only once.
|
|
|
+ /* The CP data replication has succeeded. I must send the message warning the checkpoint integrity (so
|
|
|
+ * they can discard old data from deprecated checkpoint).
|
|
|
+ * I will send to the ones if it has old CP data from me.
|
|
|
* TODO: For the message logging discard, I will send message to the people I exchanged with since the last checkpoint.
|
|
|
* */
|
|
|
struct _starpu_mpi_cp_discard_arg_cb* arg;
|
|
|
- int i, j, flag;
|
|
|
- starpu_mpi_checkpoint_template_t old_template;
|
|
|
- for (i=0 ; i<cp_template->backupped_by_array_used_size ; i++)
|
|
|
+ int i;
|
|
|
+
|
|
|
+ for (i=0 ; i < cp_template->backupped_by_array_used_size ; i++)
|
|
|
{
|
|
|
starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
|
|
|
arg->rank = cp_template->backupped_by_array[i];
|
|
|
- _STARPU_MPI_DEBUG(10, "Sending DISCARD msg reception to %d\n", arg->rank);
|
|
|
+ _STARPU_MPI_DEBUG(10, "Post CP DISCARD msg sending to %d\n", arg->rank);
|
|
|
+ arg->msg.discard=1;
|
|
|
+ arg->msg.validation=0;
|
|
|
arg->msg.checkpoint_id = cp_id;
|
|
|
arg->msg.checkpoint_instance = cp_instance;
|
|
|
- _ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_DISCARD, MPI_COMM_WORLD, free_arg, (void*)arg);
|
|
|
- }
|
|
|
- if (last_valid_checkpoint.checkpoint_id == -1)
|
|
|
- {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- else if (last_valid_checkpoint.checkpoint_id!=cp_template->cp_template_id)
|
|
|
- {
|
|
|
- old_template = _starpu_mpi_get_checkpoint_template_by_id(last_valid_checkpoint.checkpoint_id);
|
|
|
- for (i=0 ; i<old_template->backupped_by_array_used_size ; i++)
|
|
|
- {
|
|
|
- flag=0;
|
|
|
- for(j=0 ; j<cp_template->backupped_by_array_used_size ; j++)
|
|
|
- {
|
|
|
- if (cp_template->backupped_by_array[j] == old_template->backupped_by_array[i])
|
|
|
- {
|
|
|
- flag = 1;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- if (flag==0)
|
|
|
- {
|
|
|
- starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
|
|
|
- arg->rank = old_template->backupped_by_array[i];
|
|
|
- _STARPU_MPI_DEBUG(10, "Sending DISCARD msg to %d - OLD CP\n", arg->rank);
|
|
|
- arg->msg.checkpoint_id = cp_id;
|
|
|
- arg->msg.checkpoint_instance = cp_instance;
|
|
|
- _ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_DISCARD, MPI_COMM_WORLD, free_arg, (void*)arg);
|
|
|
- }
|
|
|
- }
|
|
|
+ _ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, free_arg, (void*)arg);
|
|
|
}
|
|
|
+
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+//int _starpu_mpi_checkpoint_post_cp_discard_recv(starpu_mpi_checkpoint_template_t cp_template)
|
|
|
+//{
|
|
|
+// /* A new CP is submitted. We must post matching recv for the message warning the future checkpoint integrity (so
|
|
|
+// * I can tag the data as CP validated, and discard old data from deprecated checkpoint).
|
|
|
+// * I will receive a msg if I have old CP data, or if I am the back up for a node into the upcoming Checkpoint.
|
|
|
+// * * Here the union of the different list is processed to post message reception only once.
|
|
|
+// * TODO: For the message logging discard, I will receive message from the people I exchanged with since the last checkpoint.
|
|
|
+// * */
|
|
|
+// struct _starpu_mpi_cp_discard_arg_cb* arg;
|
|
|
+// int i, j, flag;
|
|
|
+// starpu_mpi_checkpoint_template_t old_template;
|
|
|
+// for (i=0 ; i<cp_template->backup_of_array_used_size ; i++)
|
|
|
+// {
|
|
|
+// starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
|
|
|
+// arg->rank = cp_template->backup_of_array[i];
|
|
|
+// _STARPU_MPI_DEBUG(10, "Posting DISCARD msg reception from %d\n", arg->rank);
|
|
|
+// _ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, checkpoint_discard, (void*)arg);
|
|
|
+// }
|
|
|
+// if (last_valid_checkpoint.checkpoint_id == -1)
|
|
|
+// {
|
|
|
+// return -1;
|
|
|
+// }
|
|
|
+// else if (last_valid_checkpoint.checkpoint_id!=cp_template->cp_template_id)
|
|
|
+// {
|
|
|
+// old_template = _starpu_mpi_get_checkpoint_template_by_id(last_valid_checkpoint.checkpoint_id);
|
|
|
+// for (i=0 ; i<old_template->backup_of_array_used_size ; i++)
|
|
|
+// {
|
|
|
+// flag=0;
|
|
|
+// for(j=0 ; j<cp_template->backup_of_array_used_size ; j++)
|
|
|
+// {
|
|
|
+// if (cp_template->backup_of_array[j] == old_template->backup_of_array[i])
|
|
|
+// {
|
|
|
+// flag = 1;
|
|
|
+// break;
|
|
|
+// }
|
|
|
+// }
|
|
|
+// if (flag==0)
|
|
|
+// {
|
|
|
+// starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
|
|
|
+// arg->rank = old_template->backup_of_array[i];
|
|
|
+// _STARPU_MPI_DEBUG(10, "Posting DISCARD msg reception from %d - LAST VALIDATED CP\n", arg->rank);
|
|
|
+// _ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, checkpoint_discard, (void*)arg);
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+// return 0;
|
|
|
+//}
|
|
|
+
|
|
|
+//int _starpu_mpi_checkpoint_post_cp_discard_send(starpu_mpi_checkpoint_template_t cp_template, int cp_id, int cp_instance)
|
|
|
+//{
|
|
|
+// /* The CP data replication has succeeded. I must send the message warning the future checkpoint integrity (so
|
|
|
+// * they can tag the data as CP validated, and discard old data from deprecated checkpoint).
|
|
|
+// * I will send to one if it has old CP data from me, or if it is my backup for a data into the just succeeded Checkpoint.
|
|
|
+// * * Here the union of the different list is processed to send message only once.
|
|
|
+// * TODO: For the message logging discard, I will send message to the people I exchanged with since the last checkpoint.
|
|
|
+// * */
|
|
|
+// struct _starpu_mpi_cp_discard_arg_cb* arg;
|
|
|
+// int i, j, flag;
|
|
|
+// starpu_mpi_checkpoint_template_t old_template;
|
|
|
+// for (i=0 ; i<cp_template->backupped_by_array_used_size ; i++)
|
|
|
+// {
|
|
|
+// starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
|
|
|
+// arg->rank = cp_template->backupped_by_array[i];
|
|
|
+// _STARPU_MPI_DEBUG(10, "Sending DISCARD msg reception to %d\n", arg->rank);
|
|
|
+// arg->msg.checkpoint_id = cp_id;
|
|
|
+// arg->msg.checkpoint_instance = cp_instance;
|
|
|
+// _ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, free_arg, (void*)arg);
|
|
|
+// }
|
|
|
+// if (last_valid_checkpoint.checkpoint_id == -1)
|
|
|
+// {
|
|
|
+// return -1;
|
|
|
+// }
|
|
|
+// else if (last_valid_checkpoint.checkpoint_id!=cp_template->cp_template_id)
|
|
|
+// {
|
|
|
+// old_template = _starpu_mpi_get_checkpoint_template_by_id(last_valid_checkpoint.checkpoint_id);
|
|
|
+// for (i=0 ; i<old_template->backupped_by_array_used_size ; i++)
|
|
|
+// {
|
|
|
+// flag=0;
|
|
|
+// for(j=0 ; j<cp_template->backupped_by_array_used_size ; j++)
|
|
|
+// {
|
|
|
+// if (cp_template->backupped_by_array[j] == old_template->backupped_by_array[i])
|
|
|
+// {
|
|
|
+// flag = 1;
|
|
|
+// break;
|
|
|
+// }
|
|
|
+// }
|
|
|
+// if (flag==0)
|
|
|
+// {
|
|
|
+// starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
|
|
|
+// arg->rank = old_template->backupped_by_array[i];
|
|
|
+// _STARPU_MPI_DEBUG(10, "Sending DISCARD msg to %d - OLD CP\n", arg->rank);
|
|
|
+// arg->msg.checkpoint_id = cp_id;
|
|
|
+// arg->msg.checkpoint_instance = cp_instance;
|
|
|
+// _ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_INFO, MPI_COMM_WORLD, free_arg, (void*)arg);
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+// return 0;
|
|
|
+//}
|
|
|
+
|
|
|
int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t cp_template)
|
|
|
{
|
|
|
char str[256];
|
|
|
starpu_pthread_mutex_lock(&cp_template->mutex);
|
|
|
_STARPU_MPI_DEBUG(2, "Start freezing checkpoint id:%d\n", cp_template->cp_template_id);
|
|
|
- cp_template->frozen = 1;
|
|
|
- cp_template->sent_message_number = 0;
|
|
|
- cp_template->size = _starpu_mpi_checkpoint_template_item_list_size(&cp_template->list);
|
|
|
+ cp_template->frozen = 1;
|
|
|
+ cp_template->message_to_send_number = 0;
|
|
|
+ cp_template->size = _starpu_mpi_checkpoint_template_item_list_size(&cp_template->list);
|
|
|
|
|
|
struct _starpu_mpi_checkpoint_template_item* item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
|
|
|
|
|
@@ -322,7 +326,7 @@ int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t cp_t
|
|
|
{
|
|
|
if (item->backup_of==-1 && item->backupped_by!=-1)
|
|
|
{
|
|
|
- cp_template->sent_message_number++;
|
|
|
+ cp_template->message_to_send_number++;
|
|
|
}
|
|
|
item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
|
|
|
}
|
|
@@ -343,12 +347,12 @@ int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t cp_t
|
|
|
starpu_pthread_mutex_unlock(&cp_template->mutex);
|
|
|
|
|
|
starpu_pthread_mutex_lock(&cp_template_mutex);
|
|
|
- for (int i=0 ; i<cp_template_number ; i++)
|
|
|
+ for (int i=0 ; i < cp_template_array_size ; i++)
|
|
|
{
|
|
|
STARPU_ASSERT_MSG(cp_template_array[i]->cp_template_id != cp_template->cp_template_id, "A checkpoint with id %d has already been registered.\n", cp_template->cp_template_id);
|
|
|
}
|
|
|
- cp_template_array[cp_template_number] = cp_template;
|
|
|
- cp_template_number++;
|
|
|
+ cp_template_array[cp_template_array_size] = cp_template;
|
|
|
+ cp_template_array_size++;
|
|
|
starpu_pthread_mutex_unlock(&cp_template_mutex);
|
|
|
|
|
|
_STARPU_MPI_DEBUG(2, "Checkpoint id:%d is frozen and registered.\n", cp_template->cp_template_id);
|
|
@@ -367,7 +371,7 @@ int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* c
|
|
|
while ((arg_type = va_arg(varg_list_copy, int)) != 0)
|
|
|
{
|
|
|
_starpu_mpi_checkpoint_template_add_entry(_cp_template, arg_type, varg_list_copy);
|
|
|
- };
|
|
|
+ }
|
|
|
va_end(varg_list_copy);
|
|
|
|
|
|
_starpu_mpi_checkpoint_template_freeze(_cp_template);
|
|
@@ -404,34 +408,60 @@ int starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t* c
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_instance) {
|
|
|
+ int remaining_ack_messages;
|
|
|
+ struct _starpu_mpi_checkpoint_template_tracking_inst* last_valid_tracking_inst;
|
|
|
starpu_mpi_checkpoint_template_t cp_template = _starpu_mpi_get_checkpoint_template_by_id(checkpoint_id);
|
|
|
+ starpu_mpi_checkpoint_template_t alt_cp_template;
|
|
|
starpu_pthread_mutex_lock(&cp_template_mutex);
|
|
|
_STARPU_MPI_DEBUG(20, "Digesting ack recv: id=%d, inst=%d\n", checkpoint_id, checkpoint_instance);
|
|
|
|
|
|
- starpu_pthread_mutex_lock(&cp_template->mutex);
|
|
|
- _STARPU_MPI_DEBUG(20, "Mutex taken\n");
|
|
|
- if (cp_template->cp_template_current_instance == checkpoint_instance)
|
|
|
+ remaining_ack_messages = _starpu_mpi_checkpoint_template_track_inst_treat_ack(cp_template, checkpoint_id,
|
|
|
+ checkpoint_instance);
|
|
|
+
|
|
|
+// starpu_pthread_mutex_lock(&cp_template->mutex);
|
|
|
+ if (remaining_ack_messages>0)
|
|
|
{
|
|
|
- cp_template->remaining_ack_awaited--;
|
|
|
- _STARPU_MPI_DEBUG(20, "Inst found, remaining ack msg awaited:%d\n", cp_template->remaining_ack_awaited);
|
|
|
- if (cp_template->remaining_ack_awaited == 0)
|
|
|
+ _STARPU_MPI_DEBUG(20, "The CP (id:%d - inst:%d) found, remaining ack msg awaited:%d.\n", checkpoint_id,
|
|
|
+ checkpoint_instance, remaining_ack_messages);
|
|
|
+ }
|
|
|
+ else if (remaining_ack_messages==0)
|
|
|
+ {
|
|
|
+ _STARPU_MPI_DEBUG(0, "The CP (id:%d - inst:%d) has been successfully saved and acknowledged.\n", checkpoint_id, checkpoint_instance);
|
|
|
+ last_valid_tracking_inst = _starpu_mpi_checkpoint_template_check_validation_coherency(checkpoint_id, checkpoint_instance);
|
|
|
+ STARPU_MPI_ASSERT_MSG(last_valid_tracking_inst != NULL, "I couldn't check validation coherency for CP (id:%d - inst:%d), certainly nothing refers to it in pending inst tracking list.\n", checkpoint_id, checkpoint_instance);
|
|
|
+ if (!(last_valid_tracking_inst->cp_id==checkpoint_id && last_valid_tracking_inst->cp_inst==checkpoint_instance))
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(0, "My CP has been successfully saved and acknowledged - cpid:%d, cpinst:%d\n", checkpoint_id, checkpoint_instance);
|
|
|
+ _STARPU_MPI_DEBUG(0, "The CP (id:%d - inst:%d) has been fully acknowledged, while a more recent one (id:%d - inst:%d) is already validated.\n", checkpoint_id, checkpoint_instance, last_valid_tracking_inst->cp_id, last_valid_tracking_inst->cp_inst);
|
|
|
+ checkpoint_id = last_valid_tracking_inst->cp_id;
|
|
|
+ checkpoint_instance = last_valid_tracking_inst->cp_inst;
|
|
|
+ // I have to warn the backups of the just acknowledged CP that the CP is already out of date. I must send a them a discard directly msg
|
|
|
_starpu_mpi_checkpoint_post_cp_discard_send(cp_template, checkpoint_id, checkpoint_instance);
|
|
|
- valid_pending_checkpoint_template(cp_template);
|
|
|
- cp_template->pending=0;
|
|
|
- last_valid_checkpoint.checkpoint_id = checkpoint_id;
|
|
|
- last_valid_checkpoint.checkpoint_instance = checkpoint_instance;
|
|
|
}
|
|
|
+ else
|
|
|
+ {
|
|
|
+ alt_cp_template = last_valid_cp->cp_template;
|
|
|
+ last_valid_tracking_inst->valid = 1;
|
|
|
+ last_valid_cp = last_valid_tracking_inst;
|
|
|
+ if (alt_cp_template==NULL)
|
|
|
+ {
|
|
|
+ // TODO:should warn some people, because the msg loggin is not implemented(this precise nodes to contact)
|
|
|
+ _STARPU_MPI_DEBUG(0, "No previous checkpoint to discard\n");
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ // I have to send a discard msg to the old cp's backups, and a valid msg to the backups of the just acknowledged CP.
|
|
|
+ _starpu_mpi_checkpoint_post_cp_discard_send(alt_cp_template, checkpoint_id, checkpoint_instance);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- checkpoint_template_add_future_inst(cp_template, checkpoint_instance);
|
|
|
+ _STARPU_MPI_DEBUG(20, "Future inst (id:%d - inst:%d) found, %d ack msg already received.\n", checkpoint_id, checkpoint_instance, -remaining_ack_messages);
|
|
|
}
|
|
|
+
|
|
|
_STARPU_MPI_DEBUG(20, "Digested\n");
|
|
|
- starpu_pthread_mutex_unlock(&cp_template->mutex);
|
|
|
starpu_pthread_mutex_unlock(&cp_template_mutex);
|
|
|
return 0;
|
|
|
}
|
|
@@ -474,6 +504,6 @@ int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_te
|
|
|
|
|
|
item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
|
|
|
i++;
|
|
|
- };
|
|
|
+ }
|
|
|
return 0;
|
|
|
}
|