Ver código fonte

Major mods.
Change backup arrays realloc in cp_templates.
Change callback logic for CP submission.
Change ack reception to take into account future instance reception.
But this is not sufficient. The submission must not be stopped if previous CP has not completed.

Romain LION 4 anos atrás
pai
commit
099fb611c2

+ 1 - 0
mpi/include/starpu_mpi_ft.h

@@ -49,6 +49,7 @@ int starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t* cp_t
 int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template);
 int starpu_mpi_ft_turn_on(void);
 int starpu_mpi_ft_turn_off(void);
+int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_template);
 
 #ifdef __cplusplus
 }

+ 36 - 42
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint.c

@@ -39,10 +39,16 @@ extern struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_
 void _starpu_mpi_treat_ack_receipt_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
+	int ret;
 	_STARPU_MPI_DEBUG(3, "ack msg recved id:%d inst:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
-	if (_checkpoint_template_digest_ack_reception(arg->msg.checkpoint_id, arg->msg.checkpoint_instance) == 0) {
+	ret = _checkpoint_template_digest_ack_reception(arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
+	if (ret == 0) {
 		free(arg);
 	}
+	else if (ret == -1)
+	{
+		STARPU_ABORT_MSG("Could not find CP template, cpid:%d - cpinst:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
+	}
 }
 
 void _arg_free(void* _args)
@@ -52,20 +58,18 @@ void _arg_free(void* _args)
 	free(arg);
 }
 
-void _starpu_mpi_push_cp_ack_send_cb(void* _args)
+void _starpu_mpi_store_data_and_send_ack_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
+	if (STARPU_VALUE == arg->type) {
+		// an handle as specificaly been created, no need to copy the data. Call directly the Callback
+		arg->copy_handle = arg->handle;
+	}
+	checkpoint_package_data_add(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank, arg->tag, arg->type, arg->copy_handle, arg->count);
 	_STARPU_MPI_DEBUG(3,"Send ack msg to %d: id=%d inst=%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
 	_ft_service_msg_isend_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _arg_free, _args);
 }
 
-void _starpu_mpi_store_data_and_push_cp_ack_send_cb(void* _args)
-{
-	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
-	checkpoint_package_data_add(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank, arg->tag, arg->type, arg->copy_handle, arg->count);
-	_starpu_mpi_push_cp_ack_send_cb(_args);
-}
-
 void _starpu_mpi_push_cp_ack_recv_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
@@ -78,36 +82,23 @@ void _starpu_mpi_push_cp_ack_recv_cb(void* _args)
 	_ft_service_msg_irecv_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _starpu_mpi_treat_ack_receipt_cb, _args);
 }
 
-
-void _starpu_checkpoint_cached_data_recv_copy_and_ack(void* _arg)
+void _starpu_mpi_cached_push_cp_ack_recv_cb(void* _args)
 {
-	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _arg;
-	starpu_data_dup_ro(&arg->copy_handle, arg->handle, 1, _starpu_mpi_store_data_and_push_cp_ack_send_cb, _arg);
-//	starpu_data_register_same(&arg->copy_handle, arg->handle);
-//	starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_store_data_and_push_cp_ack_send_cb, _arg);
-	starpu_data_release(arg->handle);
+	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
+	if (STARPU_R == arg->type)
+	{
+		starpu_data_release(arg->handle);
+	}
+	_STARPU_MPI_DEBUG(3, "Posting ack recv cb from %d\n", arg->rank);
+	_ft_service_msg_irecv_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _starpu_mpi_treat_ack_receipt_cb, _args);
 }
 
-void _starpu_checkpoint_data_recv_copy_and_ack(void* _arg)
+void _starpu_data_release_cb(void* _arg)
 {
-	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _arg;
-
-	if (STARPU_VALUE == arg->type)
-	{
-		// an handle as specificaly been created, no need to copy the data. Call directly the Callback
-		arg->copy_handle = arg->handle;
-		_starpu_mpi_store_data_and_push_cp_ack_send_cb(_arg);
-		return;
-	}
-	else if (STARPU_R == arg->type)
-	{
-		starpu_data_dup_ro(&arg->copy_handle, arg->handle, 1, _starpu_mpi_store_data_and_push_cp_ack_send_cb, _arg);
-//		starpu_data_register_same(&arg->copy_handle, arg->handle);
-//		starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_store_data_and_push_cp_ack_send_cb, _arg);
-		return;
-	}
+	starpu_data_release(_arg);
 }
 
+
 int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template)
 {
 	// TODO: For now checkpoint are not taken asynchronously. It will be later, and then we will have to acquire READ permissions to StarPU in order to not have the data potentially corrupted.
@@ -116,14 +107,9 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 	void* cpy_ptr;
 	struct _starpu_mpi_checkpoint_template_item* item;
 	//MPI_Comm comm;
-	starpu_pthread_mutex_lock(&cp_template->mutex);
 	set_pending_checkpoint_template(cp_template);
-	STARPU_ASSERT_MSG(cp_template->pending==0, "Can not submit a checkpoint while previous instance has not succeeded.\n");
 
-	cp_template->pending               = 1;
-	cp_template->cp_template_current_instance++;
-	cp_template->remaining_ack_awaited = cp_template->sent_message_number;
-	starpu_pthread_mutex_unlock(&cp_template->mutex);
+	checkpoint_template_increment_instance(cp_template);
 
 	item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
 
@@ -151,6 +137,7 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 					_STARPU_MPI_DEBUG(0, "Submit CP: sending external data:%d, tag:%ld, to :%d\n", (int)(*(int*)cpy_ptr), arg->tag, arg->rank);
 					starpu_mpi_isend_detached_prio(*handle, arg->rank, arg->tag, 0, MPI_COMM_WORLD,
 												   &_starpu_mpi_push_cp_ack_recv_cb, (void*)arg);
+					// The callback needs to free the handle specially created for the send, and post ack recv
 				}
 				else if (item->backup_of != -1)
 				{
@@ -159,7 +146,8 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 					arg->rank = item->backup_of;
 					_STARPU_MPI_DEBUG(0, "Submit CP: receiving external data tag:%ld, from :%d\n", arg->tag, arg->rank);
 					starpu_mpi_irecv_detached(*handle, arg->rank, arg->tag, MPI_COMM_WORLD,
-											  &_starpu_checkpoint_data_recv_copy_and_ack, (void*)arg);
+											  &_starpu_mpi_store_data_and_send_ack_cb, (void*)arg);
+					// The callback needs to store the received data and post ack send
 				}
 				break;
 			case STARPU_R:
@@ -176,7 +164,9 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 					arg->msg.checkpoint_id = cp_template->cp_template_id;
 					arg->msg.checkpoint_instance = cp_template->cp_template_current_instance;
 					_starpu_mpi_isend_cache_aware(*handle, item->backupped_by, starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0, 0,
-					                              &_starpu_mpi_push_cp_ack_recv_cb, (void*)arg, &_starpu_mpi_push_cp_ack_recv_cb, (void*)arg, 1);
+					                              &_starpu_mpi_push_cp_ack_recv_cb, (void*)arg, &_starpu_mpi_cached_push_cp_ack_recv_cb, (void*)arg, 1);
+					// the callbacks need to post ack recv. The cache one needs to release the handle.
+
 				}
 				else if (item->backup_of == starpu_mpi_data_get_rank(*handle))
 				{
@@ -190,7 +180,11 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 					arg->msg.checkpoint_id = cp_template->cp_template_id;
 					arg->msg.checkpoint_instance = cp_template->cp_template_current_instance;
 					_starpu_mpi_irecv_cache_aware(*handle, starpu_mpi_data_get_rank(*handle), starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0,
-					                              &_starpu_checkpoint_data_recv_copy_and_ack, (void*)arg, &_starpu_checkpoint_cached_data_recv_copy_and_ack, (void*)arg, 1, 0, 1);
+					                              NULL, NULL, &_starpu_data_release_cb, (void*)arg->handle, 1, 0, 1);
+					// The callback needs to do nothing. The cached one must release the handle.
+					starpu_data_dup_ro(&arg->copy_handle, arg->handle, 1, NULL, NULL);
+					starpu_data_acquire_cb(arg->copy_handle, STARPU_R, _starpu_mpi_store_data_and_send_ack_cb, arg);
+					// The callback need to store the data and post ack send.
 				}
 				break;
 		}

+ 45 - 24
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.c

@@ -26,13 +26,14 @@
 #include <mpi_failure_tolerance/starpu_mpi_ft_service_comms.h>
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
 
+#include <starpu_mpi_private.h>
 
 #define MAX_CP_TEMPLATE_NUMBER 32 // Arbitrary limit
 
 starpu_pthread_mutex_t           cp_template_mutex;
 starpu_mpi_checkpoint_template_t cp_template_array[MAX_CP_TEMPLATE_NUMBER];
 int                              my_rank;
-int                              size;
+int                              comm_size;
 int cp_template_number = 0;
 struct _starpu_mpi_cp_ack_msg last_valid_checkpoint;
 starpu_mpi_checkpoint_template_t pending_checkpoint;
@@ -45,10 +46,12 @@ void checkpoint_template_lib_init(void) {
 	starpu_pthread_mutex_init(&cp_template_mutex, NULL);
 	starpu_pthread_mutex_init(&checkpoint_pending_mutex, NULL);
 	starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank);
-	starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &comm_size);
 	last_valid_checkpoint.checkpoint_id = -1;
 	last_valid_checkpoint.checkpoint_instance = -1;
 	pending_checkpoint = NULL;
+
+	_starpu_mpi_set_debug_level_max(1000);
 }
 
 void checkpoint_template_lib_quit(void) {
@@ -94,6 +97,7 @@ int _starpu_mpi_checkpoint_template_add_data(starpu_mpi_checkpoint_template_t cp
 	item = _starpu_mpi_checkpoint_template_item_create(type, ptr, count, backupped_by, backup_of, tag);
 	_starpu_mpi_checkpoint_template_item_list_push_back(&cp_template->list, item);
 	_checkpoint_template_add_to_backup_arrays(cp_template, backupped_by, backup_of);
+	_STARPU_MPI_DEBUG(5, "New checkpoint data entry %p has been added to cp_template with id:%d. (%s)\n", item, cp_template->cp_template_id, backupped_by==-1?"BACKUP_OF":"BACKUPPED_BY");
 	starpu_pthread_mutex_unlock(&cp_template->mutex);
 	return 0;
 }
@@ -151,7 +155,7 @@ int _starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t c
 					_starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, -1, i, tag);
 				}
 			}
-			for (int i=my_rank+1 ; i<size ; i++)
+			for (int i=my_rank+1 ; i<comm_size ; i++)
 			{
 				if (_backup_of(i) == my_rank)
 				{
@@ -305,8 +309,9 @@ int _starpu_mpi_checkpoint_post_cp_discard_send(starpu_mpi_checkpoint_template_t
 
 int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t cp_template)
 {
+	char str[256];
 	starpu_pthread_mutex_lock(&cp_template->mutex);
-
+	_STARPU_MPI_DEBUG(2, "Start freezing checkpoint id:%d\n", cp_template->cp_template_id);
 	cp_template->frozen              = 1;
 	cp_template->sent_message_number = 0;
 	cp_template->size                = _starpu_mpi_checkpoint_template_item_list_size(&cp_template->list);
@@ -321,9 +326,32 @@ int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t cp_t
 		}
 		item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
 	}
+	sprintf(str, "backupped by Array maxsize:%d - currentsize:%d - ", cp_template->backupped_by_array_max_size, cp_template->backupped_by_array_used_size);
+	for (int i=0 ; i<cp_template->backupped_by_array_used_size ; i++)
+	{
+		sprintf(str,"%s%d ", str, cp_template->backupped_by_array[i]);
+	}
+	fprintf(stderr, "%s\n", str);
+
+	sprintf(str,"backup of Array maxsize:%d - currentsize:%d - ", cp_template->backup_of_array_max_size, cp_template->backup_of_array_used_size);
+	for (int i=0 ; i<cp_template->backup_of_array_used_size ; i++)
+	{
+		sprintf(str,"%s%d ", str, cp_template->backup_of_array[i]);
+	}
+	fprintf(stderr, "%s\n", str);
 
 	starpu_pthread_mutex_unlock(&cp_template->mutex);
 
+	starpu_pthread_mutex_lock(&cp_template_mutex);
+	for (int i=0 ; i<cp_template_number ; i++)
+	{
+		STARPU_ASSERT_MSG(cp_template_array[i]->cp_template_id != cp_template->cp_template_id, "A checkpoint with id %d has already been registered.\n", cp_template->cp_template_id);
+	}
+	cp_template_array[cp_template_number] = cp_template;
+	cp_template_number++;
+	starpu_pthread_mutex_unlock(&cp_template_mutex);
+
+	_STARPU_MPI_DEBUG(2, "Checkpoint id:%d is frozen and registered.\n", cp_template->cp_template_id);
 	return cp_template->size;
 }
 
@@ -344,16 +372,6 @@ int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* c
 
 	_starpu_mpi_checkpoint_template_freeze(_cp_template);
 
-	starpu_pthread_mutex_lock(&cp_template_mutex);
-	for (int i=0 ; i<cp_template_number ; i++)
-	{
-		STARPU_ASSERT_MSG(cp_template_array[i]->cp_template_id != _cp_template->cp_template_id, "A checkpoint with id %d has already been registered.\n", _cp_template->cp_template_id);
-	}
-
-	cp_template_array[cp_template_number] = _cp_template;
-	cp_template_number++;
-	starpu_pthread_mutex_unlock(&cp_template_mutex);
-
 	*cp_template = _cp_template;
 
 	return 0;
@@ -387,13 +405,13 @@ int starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t* c
 }
 
 
-
 int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_instance) {
 	starpu_mpi_checkpoint_template_t cp_template = _starpu_mpi_get_checkpoint_template_by_id(checkpoint_id);
 	starpu_pthread_mutex_lock(&cp_template_mutex);
 	_STARPU_MPI_DEBUG(20, "Digesting ack recv: id=%d, inst=%d\n", checkpoint_id, checkpoint_instance);
 
 	starpu_pthread_mutex_lock(&cp_template->mutex);
+	_STARPU_MPI_DEBUG(20, "Mutex taken\n");
 	if (cp_template->cp_template_current_instance == checkpoint_instance)
 	{
 		cp_template->remaining_ack_awaited--;
@@ -406,21 +424,22 @@ int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_
 			cp_template->pending=0;
 			last_valid_checkpoint.checkpoint_id = checkpoint_id;
 			last_valid_checkpoint.checkpoint_instance = checkpoint_instance;
-			_STARPU_MPI_DEBUG(20, "Digested\n");
 		}
-		starpu_pthread_mutex_unlock(&cp_template->mutex);
-		starpu_pthread_mutex_unlock(&cp_template_mutex);
-		return 0;
 	}
+	else
+	{
+		checkpoint_template_add_future_inst(cp_template, checkpoint_instance);
+	}
+	_STARPU_MPI_DEBUG(20, "Digested\n");
 	starpu_pthread_mutex_unlock(&cp_template->mutex);
 	starpu_pthread_mutex_unlock(&cp_template_mutex);
-	return -1;
+	return 0;
 }
 
 // For test purpose
 int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_template)
 {
-	int val;
+//	int val;
 	int i = 0;
 	struct _starpu_mpi_checkpoint_template_item* item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
 
@@ -429,12 +448,14 @@ int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_te
 		fprintf(stderr,"Item %2d: ", i);
 		if (item->type == STARPU_VALUE)
 		{
-			fprintf(stderr, "STARPU_VALUE - Value=%d - backupof:%d - backupedby:%d\n", (*(int *)(item->ptr)), item->backup_of, item->backupped_by);
+//			fprintf(stderr, "STARPU_VALUE - Value=%d - backupof:%d - backupedby:%d\n", (*(int *)(item->ptr)), item->backup_of, item->backupped_by);
+			fprintf(stderr, "STARPU_VALUE - pointer:%p - backupof:%d - backupedby:%d\n", item->ptr, item->backup_of, item->backupped_by);
 		}
 		else if (item->type == STARPU_R)
 		{
-			val = *(int*)starpu_data_handle_to_pointer(*(starpu_data_handle_t*)(item->ptr), 0);
-			fprintf(stderr, "STARPU_R - Value=%d - backupof:%d - backupedby:%d\n", val, item->backup_of, item->backupped_by);
+//			val = *(int*)starpu_data_handle_to_pointer(*(starpu_data_handle_t*)(item->ptr), 0);
+//			fprintf(stderr, "STARPU_R - Value=%d - backupof:%d - backupedby:%d\n", val, item->backup_of, item->backupped_by);
+			fprintf(stderr, "STARPU_R - pointer:%p - backupof:%d - backupedby:%d\n", item->ptr, item->backup_of, item->backupped_by);
 		}
 		else if (item->type == STARPU_DATA_ARRAY)
 		{

+ 69 - 3
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.h

@@ -27,7 +27,7 @@ extern "C"
 {
 #endif
 
-#define _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE 16
+#define _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE 2
 
 void checkpoint_template_lib_init(void);
 void checkpoint_template_lib_quit(void);
@@ -46,6 +46,11 @@ int backup_of;
 starpu_mpi_tag_t tag;
 );
 
+LIST_TYPE(_starpu_mpi_checkpoint_future_instance,
+int instance;
+int count;
+);
+
 struct _starpu_mpi_checkpoint_template{
 	struct _starpu_mpi_checkpoint_template_item_list list;
 	int                                              size;
@@ -62,16 +67,77 @@ struct _starpu_mpi_checkpoint_template{
 	int* backupped_by_array;
 	int backupped_by_array_max_size;
 	int backupped_by_array_used_size;
+	struct _starpu_mpi_checkpoint_future_instance_list future_inst_list;
 };
 
 static inline int checkpoint_template_array_realloc(int** array, int* max_size, int growth_factor)
 {
-	*array = (int*)realloc(array, growth_factor*(*max_size));
-	*array[*max_size] = -1;
+	fprintf(stderr, "old array %p - first elem %d\n", *array, *array[0]);
+	fprintf(stderr, "Newsize=%d\n", growth_factor*(*max_size));
+	*array = (int*)realloc(*array, growth_factor*(*max_size)*sizeof(int));
+	fprintf(stderr, "Newarray=%p\n", *array);
+	STARPU_ASSERT_MSG(array!=NULL, "Realloc could not allocate %ld bytes\n", growth_factor*(*max_size)*sizeof(int));
 	*max_size = growth_factor*(*max_size);
 	return *max_size;
 }
 
+static int checkpoint_template_add_future_inst(starpu_mpi_checkpoint_template_t cp_template, int instance)
+{
+	struct _starpu_mpi_checkpoint_future_instance* item;
+	_STARPU_MPI_DEBUG(10, "I received an ack msg for a checkpoint instance I did not initiated yet(received:%d - current:%d). Let's remember it's already acknowledged.\n", instance, cp_template->cp_template_current_instance);
+	for (item=_starpu_mpi_checkpoint_future_instance_list_begin(&cp_template->future_inst_list) ;
+			item!=_starpu_mpi_checkpoint_future_instance_list_end(&cp_template->future_inst_list) ;
+			item=_starpu_mpi_checkpoint_future_instance_list_next(item))
+	{
+		if (item->instance == instance)
+		{
+			item->count++;
+			return 0;
+		}
+	}
+	_STARPU_MPI_DEBUG(10, "This instance is not yet registered, let's create it.\n");
+	item = _starpu_mpi_checkpoint_future_instance_new();
+	item->count = 1;
+	item->instance = instance;
+	_starpu_mpi_checkpoint_future_instance_list_push_back(&cp_template->future_inst_list, item);
+	return 0;
+}
+
+static int checkpoint_template_pop_future_inst(starpu_mpi_checkpoint_template_t cp_template, int instance)
+{
+	struct _starpu_mpi_checkpoint_future_instance *item;
+	int count;
+	for (item = _starpu_mpi_checkpoint_future_instance_list_begin(&cp_template->future_inst_list);
+			item != _starpu_mpi_checkpoint_future_instance_list_end(&cp_template->future_inst_list);
+			item = _starpu_mpi_checkpoint_future_instance_list_next(item))
+	{
+		if (item->instance == instance)
+		{
+			_STARPU_MPI_DEBUG(10, "The new cp (id:%d) instance is %d. %d ack msg have been received, let's take it into account.\n",
+			                  cp_template->cp_template_id, instance, item->count);
+			count = item->count;
+			_starpu_mpi_checkpoint_future_instance_list_erase(&cp_template->future_inst_list, item);
+			free(item);
+			return count;
+		}
+	}
+	return 0;
+}
+
+static inline int checkpoint_template_increment_instance(starpu_mpi_checkpoint_template_t cp_template)
+{
+	int already_received_ack;
+	starpu_pthread_mutex_lock(&cp_template->mutex);
+	STARPU_ASSERT_MSG(cp_template->pending==0, "Can not submit a checkpoint while previous instance has not succeeded.\n");
+	cp_template->pending               = 1;
+	cp_template->cp_template_current_instance++;
+	cp_template->remaining_ack_awaited = cp_template->sent_message_number;
+	already_received_ack = checkpoint_template_pop_future_inst(cp_template, cp_template->cp_template_current_instance);
+	cp_template->remaining_ack_awaited -= already_received_ack;
+	starpu_pthread_mutex_unlock(&cp_template->mutex);
+	return already_received_ack;
+}
+
 static inline int checkpoint_template_backup_of_array_realloc_double(struct _starpu_mpi_checkpoint_template* checkpoint_template)
 {
 	return checkpoint_template_array_realloc(&checkpoint_template->backup_of_array, &checkpoint_template->backup_of_array_max_size, 2);