Sfoglia il codice sorgente

Checkpoint validation and discard comm mechanism implemented

Romain LION 5 anni fa
parent
commit
6442f6cae8

+ 1 - 0
mpi/src/mpi/starpu_mpi_mpi_backend.h

@@ -36,6 +36,7 @@ extern int _starpu_mpi_tag;
 #define _STARPU_MPI_TAG_CP_ACK    _starpu_mpi_tag+3
 #define _STARPU_MPI_TAG_CP_RCVRY  _starpu_mpi_tag+4
 #define _STARPU_MPI_TAG_EXT_DATA  _starpu_mpi_tag+5
+#define _STARPU_MPI_TAG_CP_DISCARD    _starpu_mpi_tag+6
 #endif // STARPU_USE_MPI_FT
 
 enum _starpu_envelope_mode

+ 11 - 7
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint.c

@@ -48,7 +48,7 @@ void _starpu_mpi_treat_ack_receipt_cb(void* _args)
 void _print_ack_sent_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
-	fprintf(stderr, "Sent succeeded cpid:%d, cpinst:%d, dest:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
+	fprintf(stderr, "Ack send succeeded cpid:%d, cpinst:%d, dest:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
 	free(_args);
 }
 
@@ -96,8 +96,10 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 	starpu_data_handle_t handle;
 	struct _starpu_mpi_checkpoint_template_item* item;
 	//MPI_Comm comm;
-
 	starpu_pthread_mutex_lock(&cp_template->mutex);
+	fprintf(stderr, "Mutex taken\n");
+	set_pending_checkpoint_template(cp_template);
+	fprintf(stderr, "Checkpoint now pending\n");
 	STARPU_ASSERT_MSG(cp_template->pending==0, "Can not submit a checkpoint while previous instance has not succeeded.\n");
 
 	cp_template->pending               = 1;
@@ -106,6 +108,8 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 
 	item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
 
+	_starpu_mpi_checkpoint_post_cp_discard_recv(cp_template);
+
 	while (item != _starpu_mpi_checkpoint_template_end(cp_template))
 	{
 		switch (item->type)
@@ -120,23 +124,23 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 				handle = *(starpu_data_handle_t*)item->ptr;
 				if (starpu_mpi_data_get_rank(handle)==my_rank)
 				{
-					fprintf(stderr,"sending to %d (tag %d)\n", item->backup_rank, (int)starpu_mpi_data_get_tag(handle));
+					fprintf(stderr, "sending to %d (tag %d)\n", item->backupped_by, (int)starpu_mpi_data_get_tag(handle));
 					struct _starpu_mpi_cp_ack_arg_cb* arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
-					arg->rank = item->backup_rank;
+					arg->rank = item->backupped_by;
 					arg->handle = handle;
 					arg->tag = starpu_mpi_data_get_tag(handle);
 					arg->type = STARPU_R;
 					arg->count = item->count;
 					arg->msg.checkpoint_id = cp_template->cp_template_id;
 					arg->msg.checkpoint_instance = cp_template->cp_template_current_instance;
-					_starpu_mpi_isend_cache_aware(handle, item->backup_rank, starpu_mpi_data_get_tag(handle), MPI_COMM_WORLD, 1, 0, 0,
+					_starpu_mpi_isend_cache_aware(handle, item->backupped_by, starpu_mpi_data_get_tag(handle), MPI_COMM_WORLD, 1, 0, 0,
 					                              &_starpu_mpi_push_cp_ack_recv_cb, (void*)arg, &_starpu_mpi_push_cp_ack_recv_cb, (void*)arg, 1);
 				}
-				else if (item->backup_rank==my_rank)
+				else if (item->backup_of == starpu_mpi_data_get_rank(handle))
 				{
 					fprintf(stderr,"recving from %d (tag %d)\n", starpu_mpi_data_get_rank(handle), (int)starpu_mpi_data_get_tag(handle));
 					struct _starpu_mpi_cp_ack_arg_cb* arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
-					arg->rank = starpu_mpi_data_get_rank(handle);
+					arg->rank = item->backup_of;
 					arg->handle = handle;
 					arg->tag = starpu_mpi_data_get_tag(handle);
 					arg->type = STARPU_R;

+ 6 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint.h

@@ -45,6 +45,12 @@ struct _starpu_mpi_cp_ack_arg_cb
 	struct _starpu_mpi_cp_ack_msg msg;
 };
 
+struct _starpu_mpi_cp_discard_arg_cb
+{
+	int                           rank;
+	struct _starpu_mpi_cp_ack_msg msg;
+};
+
 
 #ifdef __cplusplus
 }

+ 201 - 30
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.c

@@ -22,6 +22,8 @@
 #include <starpu_mpi_cache.h>
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
+#include <mpi_failure_tolerance/starpu_mpi_ft_service_comms.h>
+#include <mpi/starpu_mpi_mpi_backend.h>
 
 
 #define MAX_CP_TEMPLATE_NUMBER 32 // Arbitrary limit
@@ -32,15 +34,20 @@ int                              my_rank;
 int                              size;
 int cp_template_number = 0;
 struct _starpu_mpi_cp_ack_msg last_valid_checkpoint;
+starpu_mpi_checkpoint_template_t pending_checkpoint;
+starpu_pthread_mutex_t checkpoint_pending_mutex;
 
 typedef int (*backup_of_fn)(int);
 
+
 void checkpoint_template_lib_init(void) {
 	starpu_pthread_mutex_init(&cp_template_mutex, NULL);
+	starpu_pthread_mutex_init(&checkpoint_pending_mutex, NULL);
 	starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank);
 	starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
 	last_valid_checkpoint.checkpoint_id = -1;
 	last_valid_checkpoint.checkpoint_instance = -1;
+	pending_checkpoint = NULL;
 }
 
 void checkpoint_template_lib_quit(void) {
@@ -55,6 +62,29 @@ void checkpoint_template_lib_quit(void) {
 	}
 }
 
+int set_pending_checkpoint_template(starpu_mpi_checkpoint_template_t _pending_checkpoint)
+{
+	int ret=starpu_pthread_mutex_trylock(&checkpoint_pending_mutex);
+	if (EBUSY==ret)
+	{
+		fprintf(stderr, "The process is blocked, a checkpoint has been submitted while the previous one's "
+		                "submission has not ended. The submission has to wait, try to submit checkpoint "
+		                "less frequently.\n");
+		starpu_pthread_mutex_lock(&checkpoint_pending_mutex);
+	}
+	STARPU_ASSERT_MSG(pending_checkpoint==NULL, "There is already a checkpoint submission pending. This should not happen.\n");
+	pending_checkpoint = _pending_checkpoint;
+	return 0;
+}
+
+int valid_pending_checkpoint_template(starpu_mpi_checkpoint_template_t _pending_checkpoint)
+{
+	STARPU_ASSERT_MSG(pending_checkpoint==_pending_checkpoint, "This checkpoint is not the one marked as pending. This should not happen.\n");
+	pending_checkpoint = NULL;
+	starpu_pthread_mutex_unlock(&checkpoint_pending_mutex);
+	return 0;
+}
+
 int starpu_mpi_checkpoint_template_create(starpu_mpi_checkpoint_template_t* cp_template, int cp_id)
 {
 	*cp_template = _starpu_mpi_checkpoint_template_new(cp_id);
@@ -64,26 +94,30 @@ int starpu_mpi_checkpoint_template_create(starpu_mpi_checkpoint_template_t* cp_t
 int _starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t cp_template, int arg_type, va_list varg_list)
 {
 	void*        ptr;
-	int          count;
-	int          my_backup;
-	int          backup_of;
-	int          data_rank;
+	int              count;
+	int              backupped_by;
+	int              backup_of;
+	int              data_rank;
 	starpu_mpi_tag_t tag;
-	backup_of_fn _backup_of;
+	backup_of_fn     _backup_of;
 
 	STARPU_ASSERT_MSG(!(arg_type & STARPU_COMMUTE), "Unable to checkpoint non sequential task flow.\n");
 
 	switch(arg_type)
 	{
 		case STARPU_R:
-			ptr       = va_arg(varg_list, void*);
-			count     = 1;
-			my_backup = va_arg(varg_list, int);
-			backup_of = -1;
-			data_rank = starpu_mpi_data_get_rank(*(starpu_data_handle_t*)ptr);
-			if (my_rank==data_rank || my_rank==my_backup)
+			ptr          = va_arg(varg_list, void*);
+			count        = 1;
+			backupped_by = va_arg(varg_list, int);
+			backup_of    = -1;
+			data_rank    = starpu_mpi_data_get_rank(*(starpu_data_handle_t*)ptr);
+			if (my_rank==data_rank)
+			{
+				return _starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, backupped_by, -1, -1);
+			}
+			else if(my_rank == backupped_by)
 			{
-				return _starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, my_backup, backup_of, -1);
+				return _starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, -1, data_rank, -1);
 			}
 			else
 			{
@@ -118,7 +152,7 @@ int _starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t c
 //			case STARPU_DATA_ARRAY:
 //				ptr         = va_arg(varg_list, void*);
 //				count       = va_arg(varg_list, int);
-//				my_backup = va_arg(varg_list, int);
+//				backupped_by = va_arg(varg_list, int);
 //				backup_of   = -1;
 //				break;
 		default:
@@ -127,6 +161,136 @@ int _starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t c
 	}
 }
 
+static starpu_mpi_checkpoint_template_t _starpu_mpi_get_checkpoint_template_by_id(int checkpoint_id)
+{
+	starpu_pthread_mutex_lock(&cp_template_mutex);
+	for (int i=0 ; i<cp_template_number ; i++)
+	{
+		starpu_pthread_mutex_lock(&cp_template_array[i]->mutex);
+		if (cp_template_array[i]->cp_template_id == checkpoint_id)
+		{
+			starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
+			starpu_pthread_mutex_unlock(&cp_template_mutex);
+			return cp_template_array[i];
+		}
+		starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
+	}
+	starpu_pthread_mutex_unlock(&cp_template_mutex);
+	return NULL;
+}
+
+void checkpoint_discard(void* _args)
+{
+	// TODO: flag data as "CP ready", since the CP has succeeded
+	struct _starpu_mpi_cp_ack_msg* message = (struct _starpu_mpi_cp_ack_msg*) _args;
+	fprintf(stderr, "DISCARDING OLD CHECKPOINT DATA - new one is CPID:%d - CPINST:%d\n", message->checkpoint_id, message->checkpoint_instance);
+}
+
+int _starpu_mpi_checkpoint_post_cp_discard_recv(starpu_mpi_checkpoint_template_t cp_template)
+{
+	/* A new CP is submitted. We must post matching recv for the message warning the future checkpoint integrity (so
+	 * I can tag the data as CP validated, and discard old data from deprecated checkpoint).
+	 * I will receive a msg if I have old CP data, or if I am the back up for a node into the upcoming Checkpoint.
+	 * * Here the union of the different list is processed to post message reception only once.
+	 * TODO: For the message logging discard, I will receive message from the people I exchanged with since the last checkpoint.
+	 * */
+	struct _starpu_mpi_cp_discard_arg_cb* arg;
+	int i, j, flag;
+	starpu_mpi_checkpoint_template_t old_template;
+	for (i=0 ; i<cp_template->backup_of_array_used_size ; i++)
+	{
+		starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
+		arg->rank = cp_template->backup_of_array[i];
+		fprintf(stderr, "Posting DISCARD msg reception from %d\n", arg->rank);
+		_ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_DISCARD, MPI_COMM_WORLD, checkpoint_discard, (void*)arg);
+	}
+	if (last_valid_checkpoint.checkpoint_id == -1)
+	{
+		return -1;
+	}
+	else if (last_valid_checkpoint.checkpoint_id!=cp_template->cp_template_id)
+	{
+		old_template = _starpu_mpi_get_checkpoint_template_by_id(last_valid_checkpoint.checkpoint_id);
+		for (i=0 ; i<old_template->backup_of_array_used_size ; i++)
+		{
+			flag=0;
+			for(j=0 ; j<cp_template->backup_of_array_used_size ; j++)
+			{
+				if (cp_template->backup_of_array[j] == old_template->backup_of_array[i])
+				{
+					flag = 1;
+					break;
+				}
+			}
+			if (flag==0)
+			{
+				starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
+				arg->rank = old_template->backup_of_array[i];
+				fprintf(stderr, "Posting DISCARD msg reception from %d - LAST VALIDATED CP\n", arg->rank);
+				_ft_service_msg_irecv_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_DISCARD, MPI_COMM_WORLD, checkpoint_discard, (void*)arg);
+			}
+		}
+	}
+	return 0;
+}
+
+void free_arg(void* _args)
+{
+	starpu_free(_args);
+}
+
+int _starpu_mpi_checkpoint_post_cp_discard_send(starpu_mpi_checkpoint_template_t cp_template, int cp_id, int cp_instance)
+{
+	/* The CP data replication has succeeded. I must send the message warning the future checkpoint integrity (so
+	 * they can tag the data as CP validated, and discard old data from deprecated checkpoint).
+	 * I will send to one if it has old CP data from me, or if it is my backup for a data into the just succeeded Checkpoint.
+	 * * Here the union of the different list is processed to send message only once.
+	 * TODO: For the message logging discard, I will send message to the people I exchanged with since the last checkpoint.
+	 * */
+	struct _starpu_mpi_cp_discard_arg_cb* arg;
+	int i, j, flag;
+	starpu_mpi_checkpoint_template_t old_template;
+	fprintf(stderr, "backupped_by_array_used_size: %d\n", cp_template->backupped_by_array_used_size);
+	for (i=0 ; i<cp_template->backupped_by_array_used_size ; i++)
+	{
+		starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
+		arg->rank = cp_template->backupped_by_array[i];
+		fprintf(stderr, "Sending DISCARD msg reception to %d\n", arg->rank);
+		arg->msg.checkpoint_id = cp_id;
+		arg->msg.checkpoint_instance = cp_instance;
+		_ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_DISCARD, MPI_COMM_WORLD, free_arg, (void*)arg);
+	}
+	if (last_valid_checkpoint.checkpoint_id == -1)
+	{
+		return -1;
+	}
+	else if (last_valid_checkpoint.checkpoint_id!=cp_template->cp_template_id)
+	{
+		old_template = _starpu_mpi_get_checkpoint_template_by_id(last_valid_checkpoint.checkpoint_id);
+		for (i=0 ; i<old_template->backupped_by_array_used_size ; i++)
+		{
+			flag=0;
+			for(j=0 ; j<cp_template->backupped_by_array_used_size ; j++)
+			{
+				if (cp_template->backupped_by_array[j] == old_template->backupped_by_array[i])
+				{
+					flag = 1;
+					break;
+				}
+			}
+			if (flag==0)
+			{
+				starpu_malloc((void**)&arg, sizeof(struct _starpu_mpi_cp_discard_arg_cb));
+				arg->rank = old_template->backupped_by_array[i];
+				fprintf(stderr, "Sending DISCARD msg to %d - OLD CP\n", arg->rank);
+				arg->msg.checkpoint_id = cp_id;
+				arg->msg.checkpoint_instance = cp_instance;
+				_ft_service_msg_isend_cb(&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_DISCARD, MPI_COMM_WORLD, free_arg, (void*)arg);
+			}
+		}
+	}
+	return 0;
+}
 
 int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t cp_template)
 {
@@ -221,28 +385,35 @@ int starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t* c
 	return ret;
 }
 
+
+
 int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_instance) {
+	int old_cp_id;
+	starpu_mpi_checkpoint_template_t old_cp_template;
+	starpu_mpi_checkpoint_template_t cp_template = _starpu_mpi_get_checkpoint_template_by_id(checkpoint_id);
 	starpu_pthread_mutex_lock(&cp_template_mutex);
 	fprintf(stderr, "Digesting ack recv: id=%d, inst=%d\n", checkpoint_id, checkpoint_instance);
-	for (int i=0 ; i<cp_template_number ; i++)
+
+	starpu_pthread_mutex_lock(&cp_template->mutex);
+	if (cp_template->cp_template_current_instance == checkpoint_instance)
 	{
-		starpu_pthread_mutex_lock(&cp_template_array[i]->mutex);
-		if (cp_template_array[i]->cp_template_id == checkpoint_id && cp_template_array[i]->cp_template_current_instance == checkpoint_instance)
+		fprintf(stderr, "Inst found, remaining ack msg awaited:%d\n", cp_template->remaining_ack_awaited);
+		cp_template->remaining_ack_awaited--;
+		if (cp_template->remaining_ack_awaited == 0)
 		{
-			fprintf(stderr, "Inst found, remaining ack msg awaited:%d\n", cp_template_array[i]->remaining_ack_awaited);
-			cp_template_array[i]->remaining_ack_awaited--;
-			if (cp_template_array[i]->remaining_ack_awaited == 0)
-			{
-				// TODO: share info about cp integrity
-				fprintf(stderr, "All cp material for cpid:%d, cpinst:%d - have been sent and acknowledged.\n", checkpoint_id, checkpoint_instance);
-				cp_template_array[i]->pending=0;
-			}
-			starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
-			starpu_pthread_mutex_unlock(&cp_template_mutex);
-			return 0;
+			fprintf(stderr, "All cp material for cpid:%d, cpinst:%d - have been sent and acknowledged.\n", checkpoint_id, checkpoint_instance);
+			_starpu_mpi_checkpoint_post_cp_discard_send(cp_template, checkpoint_id, checkpoint_instance);
+			valid_pending_checkpoint_template(cp_template);
+			cp_template->pending=0;
+			last_valid_checkpoint.checkpoint_id = checkpoint_id;
+			last_valid_checkpoint.checkpoint_instance = checkpoint_instance;
+			fprintf(stderr, "Digested\n");
 		}
-		starpu_pthread_mutex_unlock(&cp_template_array[i]->mutex);
+		starpu_pthread_mutex_unlock(&cp_template->mutex);
+		starpu_pthread_mutex_unlock(&cp_template_mutex);
+		return 0;
 	}
+	starpu_pthread_mutex_unlock(&cp_template->mutex);
 	starpu_pthread_mutex_unlock(&cp_template_mutex);
 	return -1;
 }
@@ -259,12 +430,12 @@ int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_te
 		fprintf(stderr,"Item %2d: ", i);
 		if (item->type == STARPU_VALUE)
 		{
-			fprintf(stderr, "STARPU_VALUE - Value=%d - backupof:%d - backupedby:%d\n", (*(int *)(item->ptr)), item->backup_of, item->backup_rank);
+			fprintf(stderr, "STARPU_VALUE - Value=%d - backupof:%d - backupedby:%d\n", (*(int *)(item->ptr)), item->backup_of, item->backupped_by);
 		}
 		else if (item->type == STARPU_R)
 		{
 			val = *(int*)starpu_data_handle_to_pointer(*(starpu_data_handle_t*)(item->ptr), 0);
-			fprintf(stderr, "STARPU_R - Value=%d - backupof:%d - backupedby:%d\n", val, item->backup_of, item->backup_rank);
+			fprintf(stderr, "STARPU_R - Value=%d - backupof:%d - backupedby:%d\n", val, item->backup_of, item->backupped_by);
 		}
 		else if (item->type == STARPU_DATA_ARRAY)
 		{

+ 86 - 13
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.h

@@ -27,17 +27,21 @@ extern "C"
 {
 #endif
 
+#define _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE 16
+
 void checkpoint_template_lib_init(void);
 void checkpoint_template_lib_quit(void);
 int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_instance);
-
+int _starpu_mpi_checkpoint_post_cp_discard_recv(starpu_mpi_checkpoint_template_t cp_template);
 int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, va_list varg_list);
+int set_pending_checkpoint_template(starpu_mpi_checkpoint_template_t _pending_checkpoint);
+int valid_pending_checkpoint_template(starpu_mpi_checkpoint_template_t _pending_checkpoint);
 
 LIST_TYPE(_starpu_mpi_checkpoint_template_item,
 int type;
 void* ptr;
 int count;
-int backup_rank;
+int backupped_by;
 int backup_of;
 starpu_mpi_tag_t tag;
 );
@@ -52,18 +56,42 @@ struct _starpu_mpi_checkpoint_template{
 	int                                              pending;
 	int                                              frozen;
 	starpu_pthread_mutex_t                           mutex;
+	int* backup_of_array;
+	int backup_of_array_max_size;
+	int backup_of_array_used_size;
+	int* backupped_by_array;
+	int backupped_by_array_max_size;
+	int backupped_by_array_used_size;
 };
 
-static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_item_create(int type, void* ptr, int count, int backup_rank, int backup_of, starpu_mpi_tag_t tag)
+static inline int checkpoint_template_array_realloc(int** array, int* max_size, int growth_factor)
+{
+	*array = (int*)realloc(array, growth_factor*(*max_size));
+	*array[*max_size] = -1;
+	*max_size = growth_factor*(*max_size);
+	return *max_size;
+}
+
+static inline int checkpoint_template_backup_of_array_realloc_double(struct _starpu_mpi_checkpoint_template* checkpoint_template)
+{
+	return checkpoint_template_array_realloc(&checkpoint_template->backup_of_array, &checkpoint_template->backup_of_array_max_size, 2);
+}
+
+static inline int checkpoint_template_backupped_by_array_realloc_double(struct _starpu_mpi_checkpoint_template* checkpoint_template)
+{
+	return checkpoint_template_array_realloc(&checkpoint_template->backupped_by_array, &checkpoint_template->backupped_by_array_max_size, 2);
+}
+
+static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_item_create(int type, void* ptr, int count, int backupped_by, int backup_of, starpu_mpi_tag_t tag)
 {
 	struct _starpu_mpi_checkpoint_template_item* item;
 	_STARPU_MPI_CALLOC(item, 1, sizeof(struct _starpu_mpi_checkpoint_template_item));
-	item->type = type;
-	item->ptr = ptr;
-	item->count = count;
-	item->backup_rank = backup_rank;
-	item->backup_of = backup_of;
-	item->tag = tag;
+	item->type         = type;
+	item->ptr          = ptr;
+	item->count        = count;
+	item->backupped_by = backupped_by;
+	item->backup_of    = backup_of;
+	item->tag          = tag;
 
 	return item;
 }
@@ -72,19 +100,65 @@ static inline starpu_mpi_checkpoint_template_t _starpu_mpi_checkpoint_template_n
 {
 	starpu_mpi_checkpoint_template_t _cp_template;
 	_STARPU_MPI_CALLOC(_cp_template, 1, sizeof(struct _starpu_mpi_checkpoint_template));
-	_cp_template->cp_template_id = cp_id;
+	_cp_template->cp_template_id               = cp_id;
 	_cp_template->cp_template_current_instance = 0;
+	_cp_template->backup_of_array_max_size     = _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE;
+	starpu_malloc((void**)&_cp_template->backup_of_array, _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE);
+	_cp_template->backup_of_array[0] = -1;
+	_cp_template->backup_of_array_used_size = 0;
+	_cp_template->backupped_by_array_max_size     = _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE;
+	starpu_malloc((void**)&_cp_template->backupped_by_array, _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE);
+	_cp_template->backupped_by_array[0] = -1;
+	_cp_template->backupped_by_array_used_size = 0;
 	starpu_pthread_mutex_init(&_cp_template->mutex, NULL);
 	return _cp_template;
 }
 
-static inline int _starpu_mpi_checkpoint_template_add_data(starpu_mpi_checkpoint_template_t cp_template, int type, void* ptr, int count, int backup_rank, int backup_of, starpu_mpi_tag_t tag)
+static inline int _checkpoint_template_add_to_backup_arrays(starpu_mpi_checkpoint_template_t cp_template, int backupped_by, int backup_of)
+{
+	if (backup_of == -1) {
+		for (int i = 0; i < cp_template->backupped_by_array_used_size; i++)
+		{
+			if (backupped_by == cp_template->backupped_by_array[i]) {
+				return 0;
+			}
+		}
+		if (cp_template->backupped_by_array_used_size + 1 == cp_template->backupped_by_array_max_size)
+		{
+			checkpoint_template_backupped_by_array_realloc_double(cp_template);
+		}
+		cp_template->backupped_by_array[cp_template->backupped_by_array_used_size] = backupped_by;
+		cp_template->backupped_by_array_used_size++;
+		cp_template->backupped_by_array[cp_template->backupped_by_array_used_size] = -1;
+		return backupped_by;
+	}
+	else if (backupped_by == -1)
+	{
+		for (int i = 0; i < cp_template->backup_of_array_used_size; i++)
+		{
+			if (backup_of == cp_template->backup_of_array[i]) {
+				return 0;
+			}
+		}
+		if (cp_template->backup_of_array_used_size + 1 == cp_template->backup_of_array_max_size)
+		{
+			checkpoint_template_backup_of_array_realloc_double(cp_template);
+		}
+		cp_template->backup_of_array[cp_template->backup_of_array_used_size] = backup_of;
+		cp_template->backup_of_array_used_size++;
+		cp_template->backup_of_array[cp_template->backup_of_array_used_size] = -1;
+		return backup_of;
+	}
+}
+
+static int _starpu_mpi_checkpoint_template_add_data(starpu_mpi_checkpoint_template_t cp_template, int type, void* ptr, int count, int backupped_by, int backup_of, starpu_mpi_tag_t tag)
 {
 	starpu_pthread_mutex_lock(&cp_template->mutex);
 	STARPU_ASSERT_MSG(!cp_template->frozen, "It is not possible to modify registered checkpoint template.\n");
 	struct _starpu_mpi_checkpoint_template_item* item;
-	item = _starpu_mpi_checkpoint_template_item_create(type, ptr, count, backup_rank, backup_of, tag);
+	item = _starpu_mpi_checkpoint_template_item_create(type, ptr, count, backupped_by, backup_of, tag);
 	_starpu_mpi_checkpoint_template_item_list_push_back(&cp_template->list, item);
+	_checkpoint_template_add_to_backup_arrays(cp_template, backupped_by, backup_of);
 	starpu_pthread_mutex_unlock(&cp_template->mutex);
 	return 0;
 }
@@ -104,7 +178,6 @@ static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoin
 	return NULL;
 }
 
-
 static inline int _starpu_checkpoint_template_free(starpu_mpi_checkpoint_template_t cp_template)
 {
 	struct _starpu_mpi_checkpoint_template_item* item;

+ 5 - 4
mpi/src/mpi_failure_tolerance/starpu_mpi_ft_service_comms.c

@@ -38,7 +38,7 @@ int _ft_service_msg_recv_send_common(void* ptr, int count, int rank, int tag, in
 	struct _starpu_mpi_req* req;
 
 	/* Check if the tag is a service message */
-	STARPU_ASSERT_MSG(tag==_STARPU_MPI_TAG_CP_ACK, "Only _STARPU_MPI_TAG_CP_ACK are service msgs.");
+	STARPU_ASSERT_MSG(tag==_STARPU_MPI_TAG_CP_ACK || tag==_STARPU_MPI_TAG_CP_DISCARD, "Only _STARPU_MPI_TAG_CP_ACK or _STARPU_MPI_TAG_CP_DISCARD are service msgs.");
 
 	/* Initialize the request structure */
 	_starpu_mpi_request_init(&req);
@@ -65,7 +65,7 @@ int _ft_service_msg_recv_send_common(void* ptr, int count, int rank, int tag, in
 
 	STARPU_PTHREAD_MUTEX_LOCK(&detached_ft_service_requests_mutex);
 	if (req_type==SEND_REQ) {
-		fprintf(stderr, "data:%d/%d", *(int*)(req->ptr), *(int*)(req->ptr+4));
+		fprintf(stderr, "data:%d/%d\n", *(int*)(req->ptr), *(int*)(req->ptr+4));
 		MPI_Isend(req->ptr, req->count, req->datatype, req->node_tag.node.rank, req->node_tag.data_tag,
 		          req->node_tag.node.comm, &req->backend->data_request);
 	}
@@ -77,7 +77,7 @@ int _ft_service_msg_recv_send_common(void* ptr, int count, int rank, int tag, in
 		STARPU_ASSERT_MSG(1, "Unrecognized req type: Only RECV_REQ and SEND_REQ are accepeted\n");
 	}
 	_starpu_mpi_req_list_push_back(&detached_ft_service_requests, req);
-	fprintf(stderr, "pushed service req: %p in list %p - prev: %p - next: %p - dest:%d - tag:%d\n", req, &detached_ft_service_requests, _starpu_mpi_req_list_prev(req), _starpu_mpi_req_list_next(req), req->node_tag.node.rank, (int)req->node_tag.data_tag);
+	fprintf(stderr, "pushed service req: %p in list %p - prev: %p - next: %p - dest:%d - tag:%d - type:%s\n", req, &detached_ft_service_requests, _starpu_mpi_req_list_prev(req), _starpu_mpi_req_list_next(req), req->node_tag.node.rank, (int)req->node_tag.data_tag, req_type ? "recv" : "send");
 	if (req_type==SEND_REQ) {
 		detached_send_n_ft_service_requests++;
 	}
@@ -103,7 +103,7 @@ int _ft_service_msg_recv_send_common(void* ptr, int count, int rank, int tag, in
 static void _starpu_mpi_handle_ft_request_termination(struct _starpu_mpi_req *req)
 {
 	_STARPU_MPI_LOG_IN();
-
+	fprintf(stderr, "Handle termination begin \n");
 	_STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %"PRIi64" src %d data %p ptr %p datatype '%s' count %d registered_datatype %d internal_req %p\n",
 			req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.node.rank, req->data_handle, req->ptr,
 			req->datatype_name, (int)req->count, req->registered_datatype, req->backend->internal_req);
@@ -161,6 +161,7 @@ static void _starpu_mpi_handle_ft_request_termination(struct _starpu_mpi_req *re
 	STARPU_PTHREAD_COND_BROADCAST(&req->backend->req_cond);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
 	_STARPU_MPI_LOG_OUT();
+	fprintf(stderr, "Handle termination end\n");
 }
 
 void starpu_mpi_test_ft_detached_service_requests(void)

+ 32 - 2
mpi/tests/checkpoints.c

@@ -131,11 +131,14 @@ int test_checkpoint_submit(int argc, char* argv[])
 	starpu_variable_data_register(&handle1, STARPU_MAIN_RAM, (uintptr_t)&val1, sizeof(int));
 	starpu_mpi_data_register(handle1, 200, 1);
 
-
+	FPRINTF_MPI(stderr, "Registering\n");
 	starpu_mpi_checkpoint_template_register(&cp_template, 321,
 			STARPU_R, &handle0, 1,
 			STARPU_R, &handle1, 0,
 			0);
+	FPRINTF_MPI(stderr, "Registered\n");
+
+	_starpu_mpi_checkpoint_template_print(cp_template);
 
 	switch (me)
 	{
@@ -153,7 +156,34 @@ int test_checkpoint_submit(int argc, char* argv[])
 	starpu_mpi_submit_checkpoint_template(cp_template);
 
 	FPRINTF_MPI(stderr, "Submitted\n");
-	sleep(10);
+
+	sleep(1);
+	fprintf(stderr, "\n\n");
+	sleep(1);
+
+	if (me==0)
+	{
+		starpu_data_acquire(handle0, STARPU_RW);
+		val0*=2;
+		starpu_data_release(handle0);
+	}
+
+	if (me==1)
+	{
+		starpu_data_acquire(handle1, STARPU_RW);
+		val1*=2;
+		starpu_data_release(handle1);
+	}
+
+	FPRINTF_MPI(stderr, "Submitting\n");
+	starpu_mpi_submit_checkpoint_template(cp_template);
+
+	FPRINTF_MPI(stderr, "Submitted\n");
+
+	sleep(1);
+	fprintf(stderr, "\n\n");
+	sleep(1);
+
 	FPRINTF_MPI(stderr, "Bye!\n");
 	starpu_shutdown();