Browse Source

Implements CP domain.

Romain LION 5 years ago
parent
commit
937f095fed

+ 1 - 1
mpi/examples/matrix_decomposition/mpi_cholesky_codelets.c

@@ -377,7 +377,7 @@ void dw_cholesky(float ***matA, unsigned ld, int rank, int nodes, double *timing
 
 	_nodes = nodes;
 	starpu_malloc((void**)&checkpoint_p, sizeof(starpu_mpi_checkpoint_template_t));
-	starpu_mpi_checkpoint_template_create(checkpoint_p, 13);
+	starpu_mpi_checkpoint_template_create(checkpoint_p, 13, 0);
 
 	data_handles = malloc(nblocks*sizeof(starpu_data_handle_t *));
 	for(m=0 ; m<nblocks ; m++) data_handles[m] = malloc(nblocks*sizeof(starpu_data_handle_t));

+ 2 - 2
mpi/include/starpu_mpi_ft.h

@@ -42,8 +42,8 @@ typedef struct _starpu_mpi_checkpoint_template* starpu_mpi_checkpoint_template_t
  * <li> The argument list must be ended by the value 0.
  * </ul>
  */
-int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, ...);
-int starpu_mpi_checkpoint_template_create(starpu_mpi_checkpoint_template_t* cp_template, int cp_id);
+int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, int cp_domain, ...);
+int starpu_mpi_checkpoint_template_create(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, int cp_domain);
 int starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t* cp_template, ...);
 int starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t* cp_template);
 int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template);

+ 4 - 2
mpi/src/Makefile.am

@@ -89,7 +89,8 @@ noinst_HEADERS +=       \
 	mpi_failure_tolerance/starpu_mpi_checkpoint.h    \
 	mpi_failure_tolerance/starpu_mpi_checkpoint_template.h \
 	mpi_failure_tolerance/starpu_mpi_ft_service_comms.h  \
-	mpi_failure_tolerance/starpu_mpi_checkpoint_package.h
+	mpi_failure_tolerance/starpu_mpi_checkpoint_package.h \
+	mpi_failure_tolerance/starpu_mpi_checkpoint_tracker.h
 endif STARPU_USE_MPI_FT
 
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
@@ -129,5 +130,6 @@ libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES +=    \
 	mpi_failure_tolerance/starpu_mpi_checkpoint.c    \
 	mpi_failure_tolerance/starpu_mpi_checkpoint_template.c   \
 	mpi_failure_tolerance/starpu_mpi_ft_service_comms.c \
- 	mpi_failure_tolerance/starpu_mpi_checkpoint_package.c
+ 	mpi_failure_tolerance/starpu_mpi_checkpoint_package.c  \
+ 	mpi_failure_tolerance/starpu_mpi_checkpoint_tracker.c
 endif STARPU_USE_MPI_FT

+ 2 - 2
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint.c

@@ -110,15 +110,15 @@ void _starpu_data_release_cb(void* _arg)
 
 int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template)
 {
-	// TODO: For now checkpoint are not taken asynchronously. It will be later, and then we will have to acquire READ permissions to StarPU in order to not have the data potentially corrupted.
 	starpu_data_handle_t* handle;
+	struct _starpu_mpi_checkpoint_tracker* tracker;
 	struct _starpu_mpi_cp_ack_arg_cb* arg;
 	void* cpy_ptr;
 	struct _starpu_mpi_checkpoint_template_item* item;
 	int current_instance;
 
 	current_instance = increment_current_instance();
-	_starpu_mpi_checkpoint_template_create_instance_tracker(cp_template, cp_template->cp_id, current_instance);
+//	_starpu_mpi_checkpoint_template_create_instance_tracker(cp_template, cp_template->cp_id, cp_template->checkpoint_domain, current_instance);
 	_starpu_mpi_checkpoint_post_cp_discard_recv(cp_template);
 
 	item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);

+ 23 - 37
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.c

@@ -35,9 +35,6 @@ int                              my_rank;
 int                              comm_size;
 int                              current_instance;
 
-struct _starpu_mpi_checkpoint_template_tracking_inst*     last_valid_tracking_inst;
-struct _starpu_mpi_checkpoint_template_tracking_inst_list future_tracking_list;
-struct _starpu_mpi_checkpoint_template_tracking_inst_list pending_tracking_list;
 
 
 typedef int (*backup_of_fn)(int);
@@ -66,10 +63,6 @@ void checkpoint_template_lib_init(void) {
 	starpu_pthread_mutex_init(&cp_template_mutex, NULL);
 	starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank);
 	starpu_mpi_comm_size(MPI_COMM_WORLD, &comm_size);
-	last_valid_tracking_inst = _starpu_mpi_checkpoint_template_tracking_inst_new();
-	_starpu_mpi_checkpoint_template_tracking_inst_init(last_valid_tracking_inst);
-	_starpu_mpi_checkpoint_template_tracking_inst_list_init(&future_tracking_list);
-	_starpu_mpi_checkpoint_template_tracking_inst_list_init(&pending_tracking_list);
 	current_instance = 0;
 	_starpu_mpi_set_debug_level_max(1000);
 }
@@ -99,9 +92,9 @@ int _starpu_mpi_checkpoint_template_add_data(starpu_mpi_checkpoint_template_t cp
 	return 0;
 }
 
-int starpu_mpi_checkpoint_template_create(starpu_mpi_checkpoint_template_t* cp_template, int cp_id)
+int starpu_mpi_checkpoint_template_create(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, int cp_domain)
 {
-	*cp_template = _starpu_mpi_checkpoint_template_new(cp_id);
+	*cp_template = _starpu_mpi_checkpoint_template_new(cp_id, cp_domain);
 	return 0;
 }
 
@@ -380,11 +373,11 @@ int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t cp_t
 	return cp_template->size;
 }
 
-int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, va_list varg_list)
+int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, int cp_domain, va_list varg_list)
 {
 	int arg_type;
 
-	starpu_mpi_checkpoint_template_t _cp_template = _starpu_mpi_checkpoint_template_new(cp_id);
+	starpu_mpi_checkpoint_template_t _cp_template = _starpu_mpi_checkpoint_template_new(cp_id, cp_domain);
 
 	va_list varg_list_copy;
 	va_copy(varg_list_copy, varg_list);
@@ -407,11 +400,11 @@ int starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t* cp_t
 	return _starpu_mpi_checkpoint_template_freeze(*cp_template);
 }
 
-int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, ...)
+int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, int cp_domain, ...)
 {
 	va_list varg_list;
-	va_start(varg_list, cp_id);
-	int ret = _starpu_mpi_checkpoint_template_register(cp_template, cp_id, varg_list);
+	va_start(varg_list, cp_domain);
+	int ret = _starpu_mpi_checkpoint_template_register(cp_template, cp_id, cp_domain, varg_list);
 	va_end(varg_list);
 	return ret;
 }
@@ -431,16 +424,14 @@ int starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t* c
 
 int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_instance) {
 	int remaining_ack_messages;
-	struct _starpu_mpi_checkpoint_template_tracking_inst* _last_valid_tracking_inst;
+	struct _starpu_mpi_checkpoint_tracker* tracker, *tracker1;
 	starpu_mpi_checkpoint_template_t cp_template = _starpu_mpi_get_checkpoint_template_by_id(checkpoint_id);
-	starpu_mpi_checkpoint_template_t alt_cp_template;
 	starpu_pthread_mutex_lock(&cp_template_mutex);
 	_STARPU_MPI_DEBUG(20, "Digesting ack recv: id=%d, inst=%d\n", checkpoint_id, checkpoint_instance);
 
-	remaining_ack_messages = _starpu_mpi_checkpoint_template_track_inst_treat_ack(cp_template, checkpoint_id,
-	                                                                              checkpoint_instance);
+	tracker = _starpu_mpi_checkpoint_tracker_update(cp_template, checkpoint_id, cp_template->checkpoint_domain, checkpoint_instance);
+	remaining_ack_messages = _starpu_mpi_checkpoint_check_tracker(tracker);
 
-//	starpu_pthread_mutex_lock(&cp_template->mutex);
 	if (remaining_ack_messages>0)
 	{
 		_STARPU_MPI_DEBUG(20, "The CP (id:%d - inst:%d) found, remaining ack msg awaited:%d.\n", checkpoint_id,
@@ -449,37 +440,32 @@ int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_
 	else if (remaining_ack_messages==0)
 	{
 		_STARPU_MPI_DEBUG(0, "The CP (id:%d - inst:%d) has been successfully saved and acknowledged.\n", checkpoint_id, checkpoint_instance);
-		_last_valid_tracking_inst = _starpu_mpi_checkpoint_template_check_validation_coherency(checkpoint_id, checkpoint_instance);
-		STARPU_MPI_ASSERT_MSG(_last_valid_tracking_inst != NULL, "I couldn't check validation coherency for CP (id:%d - inst:%d), certainly nothing refers to it in pending inst tracking list.\n", checkpoint_id, checkpoint_instance);
-		if (_last_valid_tracking_inst == last_valid_tracking_inst)
+		tracker = _starpu_mpi_checkpoint_tracker_validate_instance(tracker);
+		if (tracker==NULL)
 		{
-			_STARPU_MPI_DEBUG(0, "The CP (id:%d - inst:%d) has been fully acknowledged, while a more recent one (id:%d - inst:%d) is already validated.\n", checkpoint_id, checkpoint_instance, _last_valid_tracking_inst->cp_id, _last_valid_tracking_inst->cp_inst);
-			checkpoint_id = _last_valid_tracking_inst->cp_id;
-			checkpoint_instance = _last_valid_tracking_inst->cp_inst;
-			// I have to warn the backups of the just acknowledged CP that the CP is already out of date. I must send a them a discard directly msg
-			_starpu_mpi_checkpoint_post_cp_discard_send(cp_template, checkpoint_id, checkpoint_instance);
+			// TODO:should warn some people, because the msg loggin is not implemented(this precise nodes to contact)
+			_STARPU_MPI_DEBUG(0, "No previous checkpoint to discard\n");
 		}
 		else
 		{
-			alt_cp_template = last_valid_tracking_inst->cp_template;
-			_last_valid_tracking_inst->valid = 1;
-			last_valid_tracking_inst = _last_valid_tracking_inst;
-			if (alt_cp_template==NULL)
+			if (tracker->old)
 			{
-				// TODO:should warn some people, because the msg loggin is not implemented(this precise nodes to contact)
-				_STARPU_MPI_DEBUG(0, "No previous checkpoint to discard\n");
+				tracker1 = _starpu_mpi_checkpoint_tracker_get_last_valid_tracker(tracker->cp_domain);
+				_starpu_mpi_checkpoint_post_cp_discard_send(tracker->cp_template, tracker1->cp_id, tracker1->cp_inst);
 			}
 			else
 			{
-				// I have to send a discard msg to the old cp's backups, and a valid msg to the backups of the just acknowledged CP.
-				_starpu_mpi_checkpoint_post_cp_discard_send(alt_cp_template, checkpoint_id, checkpoint_instance);
+				_starpu_mpi_checkpoint_post_cp_discard_send(tracker->cp_template, checkpoint_id, checkpoint_instance);
 			}
 		}
-
+	}
+	else if (remaining_ack_messages==-1)
+	{
+		STARPU_ABORT_MSG("Inst (id:%d - inst:%d) is already valid. should not have received an ack msg.\n", checkpoint_id, checkpoint_instance);
 	}
 	else
 	{
-		_STARPU_MPI_DEBUG(20, "Future inst (id:%d - inst:%d) found, %d ack msg already received.\n", checkpoint_id, checkpoint_instance, -remaining_ack_messages);
+		STARPU_ABORT_MSG("Critical error, can not identify %d as remaining messages\n", remaining_ack_messages);
 	}
 
 	_STARPU_MPI_DEBUG(20, "Digested\n");

+ 7 - 144
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.h

@@ -21,6 +21,7 @@
 #include <common/list.h>
 #include <starpu_mpi_private.h>
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
+#include <mpi_failure_tolerance/starpu_mpi_checkpoint_tracker.h>
 
 #ifdef __cplusplus
 extern "C"
@@ -34,7 +35,7 @@ extern "C"
 extern starpu_pthread_mutex_t           cp_template_mutex;
 extern int                              cp_template_array_size;
 extern starpu_mpi_checkpoint_template_t cp_template_array[MAX_CP_TEMPLATE_NUMBER];
-struct _starpu_mpi_checkpoint_template_tracking_inst*     last_valid_tracking_inst;
+
 extern struct _starpu_mpi_checkpoint_template_tracking_inst_list future_tracking_list;
 extern struct _starpu_mpi_checkpoint_template_tracking_inst_list pending_tracking_list;
 
@@ -49,7 +50,7 @@ int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_
 
 int _starpu_mpi_checkpoint_post_cp_discard_recv(starpu_mpi_checkpoint_template_t cp_template);
 
-int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t *cp_template, int cp_id, va_list varg_list);
+int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t *cp_template, int cp_id, int cp_domain, va_list varg_list);
 
 
 LIST_TYPE(_starpu_mpi_checkpoint_template_tracking_inst,
@@ -88,128 +89,6 @@ struct _starpu_mpi_checkpoint_template
 
 };
 
-static inline void _starpu_mpi_checkpoint_template_tracking_inst_init(struct _starpu_mpi_checkpoint_template_tracking_inst *tracking_inst)
-{
-	tracking_inst->cp_template   = NULL;
-	tracking_inst->cp_id         = -1;
-	tracking_inst->cp_inst       = -1;
-	tracking_inst->ack_msg_count = 0;
-	tracking_inst->valid         = 0;
-}
-
-static struct _starpu_mpi_checkpoint_template_tracking_inst* _starpu_mpi_checkpoint_template_get_tracking_inst_by_id_inst(struct _starpu_mpi_checkpoint_template_tracking_inst_list tracking_list, int cp_id, int cp_inst)
-{
-	struct _starpu_mpi_checkpoint_template_tracking_inst* item;
-
-	for (item =_starpu_mpi_checkpoint_template_tracking_inst_list_begin(&tracking_list) ;
-	     item!=_starpu_mpi_checkpoint_template_tracking_inst_list_end(&tracking_list) ;
-	     item =_starpu_mpi_checkpoint_template_tracking_inst_list_next(item))
-	{
-		if (item->cp_id==cp_id && item->cp_inst==cp_inst)
-		{
-			return item;
-		}
-	}
-	return NULL;
-}
-
-
-static struct _starpu_mpi_checkpoint_template_tracking_inst* _starpu_mpi_checkpoint_template_check_validation_coherency(int cp_id, int cp_inst)
-{
-	struct _starpu_mpi_checkpoint_template_tracking_inst* item;
-
-	for (item =_starpu_mpi_checkpoint_template_tracking_inst_list_begin(&pending_tracking_list) ;
-	     item!=_starpu_mpi_checkpoint_template_tracking_inst_list_end(&pending_tracking_list) ;
-	     item =_starpu_mpi_checkpoint_template_tracking_inst_list_next(item))
-	{
-		if (last_valid_tracking_inst->cp_inst > cp_inst)
-		{
-			return last_valid_tracking_inst;
-		}
-		else if (item->cp_id==cp_id && item->cp_inst==cp_inst)
-		{
-			if (item->valid)
-			{
-				STARPU_ABORT_MSG("The checkpoint (id:%d - inst:%d) is already validated. This should not happen.\n",
-				                 cp_id, cp_inst);
-			}
-			return item;
-		}
-	}
-	return NULL;
-}
-
-static int _starpu_mpi_checkpoint_template_create_instance_tracker(starpu_mpi_checkpoint_template_t cp_template, int cp_id, int cp_inst)
-{
-	struct _starpu_mpi_checkpoint_template_tracking_inst* item;
-	int ret=0;
-	item = _starpu_mpi_checkpoint_template_get_tracking_inst_by_id_inst(future_tracking_list, cp_id, cp_inst);
-	if (NULL != item)
-	{
-		_starpu_mpi_checkpoint_template_tracking_inst_list_erase(&future_tracking_list, item);
-		_starpu_mpi_checkpoint_template_tracking_inst_list_push_front(&pending_tracking_list, item);
-		ret = item->ack_msg_count;
-		item->ack_msg_count = item->cp_template->message_to_send_number-item->ack_msg_count;
-		if (item->ack_msg_count==0)
-		{
-			//TODO:Process discard send
-			STARPU_ABORT_MSG("Not yet implemented.\n");
-		}
-		else if (item->ack_msg_count<0)
-		{
-			STARPU_ABORT_MSG("Already received to many ack msgs(n:%d) for cp(id:%d - inst:%d). This should never happen.\n", ret, cp_id, cp_inst);
-		}
-	}
-	else
-	{
-		item = _starpu_mpi_checkpoint_template_tracking_inst_new();
-		_starpu_mpi_checkpoint_template_tracking_inst_init(item);
-		item->cp_id = cp_id;
-		item->cp_inst = cp_inst;
-		item->cp_template = cp_template;
-		item->ack_msg_count = cp_template->message_to_send_number;
-		_starpu_mpi_checkpoint_template_tracking_inst_list_push_front(&pending_tracking_list, item);
-	}
-	return ret;
-}
-
-static int _starpu_mpi_checkpoint_template_add_future_inst(starpu_mpi_checkpoint_template_t cp_template, int cp_id, int cp_inst)
-{
-	struct _starpu_mpi_checkpoint_template_tracking_inst* item;
-	int current_instance = get_current_instance();
-	item = _starpu_mpi_checkpoint_template_get_tracking_inst_by_id_inst(future_tracking_list, cp_id, cp_inst);
-	_STARPU_MPI_DEBUG(10, "I received an ack msg for a checkpoint(id:%d) instance I did not initiated yet(received:%d - last:%d). Let's remember it's already acknowledged.\n", cp_id, cp_inst, current_instance);
-	if (item != NULL)
-	{
-		item->ack_msg_count++;
-		return item->ack_msg_count;
-	}
-	_STARPU_MPI_DEBUG(10, "This instance is not yet registered, let's create it.\n");
-	item = _starpu_mpi_checkpoint_template_tracking_inst_new();
-	_starpu_mpi_checkpoint_template_tracking_inst_init(item);
-	item->cp_id = cp_id;
-	item->cp_inst = cp_inst;
-	item->cp_template = cp_template;
-	item->ack_msg_count = 1;
-	_starpu_mpi_checkpoint_template_tracking_inst_list_push_front(&future_tracking_list, item);
-	return item->ack_msg_count;
-}
-
-static int _starpu_mpi_checkpoint_template_track_inst_treat_ack(starpu_mpi_checkpoint_template_t cp_template, int cp_id, int cp_inst)
-{
-	int ret;
-	struct _starpu_mpi_checkpoint_template_tracking_inst* item;
-	item = _starpu_mpi_checkpoint_template_get_tracking_inst_by_id_inst(pending_tracking_list, cp_id, cp_inst);
-	if (item != NULL)
-	{
-		item->ack_msg_count--;
-		return item->ack_msg_count;
-	}
-	_STARPU_MPI_DEBUG(10, "The instance (id:%d - inst:%d) is not pending, let's ask the future instance instead.\n", cp_id, cp_inst);
-	ret = _starpu_mpi_checkpoint_template_add_future_inst(cp_template, cp_id, cp_inst);
-	return -ret;
-}
-
 static starpu_mpi_checkpoint_template_t _starpu_mpi_get_checkpoint_template_by_id(int checkpoint_id)
 {
 	starpu_pthread_mutex_lock(&cp_template_mutex);
@@ -228,25 +107,8 @@ static starpu_mpi_checkpoint_template_t _starpu_mpi_get_checkpoint_template_by_i
 	return NULL;
 }
 
-//static int checkpoint_template_count_ack_inst(int cp_id, int cp_inst)
-//{
-//	int ret;
-//	struct _starpu_mpi_checkpoint_template_instance* item;
-//	starpu_mpi_checkpoint_template_t cp_template = _starpu_mpi_get_checkpoint_template_by_id(cp_id);
-//	for (item=_starpu_mpi_checkpoint_template_instance_list_begin(&cp_template->pending_inst_list) ;
-//	     item!=_starpu_mpi_checkpoint_template_instance_list_end(&cp_template->pending_inst_list) ;
-//	     item=_starpu_mpi_checkpoint_template_instance_list_next(item))
-//	{
-//		if (item->instance == cp_inst)
-//		{
-//			item->count--;
-//			return item->count;
-//		}
-//	}
-//	_STARPU_MPI_DEBUG(10, "This instance is not pending, let's see with the future instance instead.\n");
-//	ret = checkpoint_template_add_future_inst(cp_template, cp_inst);
-//	return -ret;
-//}
+
+
 
 
 static inline int checkpoint_template_array_realloc(int** array, int* max_size, int growth_factor)
@@ -284,11 +146,12 @@ static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoin
 	return item;
 }
 
-static inline starpu_mpi_checkpoint_template_t _starpu_mpi_checkpoint_template_new(int cp_id)
+static inline starpu_mpi_checkpoint_template_t _starpu_mpi_checkpoint_template_new(int cp_id, int cp_domain)
 {
 	starpu_mpi_checkpoint_template_t _cp_template;
 	_STARPU_MPI_CALLOC(_cp_template, 1, sizeof(struct _starpu_mpi_checkpoint_template));
 	_cp_template->cp_id                    = cp_id;
+	_cp_template->checkpoint_domain        = cp_domain;
 	_cp_template->backup_of_array_max_size = _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE;
 	starpu_malloc((void**)&_cp_template->backup_of_array, _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE);
 	_cp_template->backup_of_array[0] = -1;

+ 235 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_tracker.c

@@ -0,0 +1,235 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013-2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <common/uthash.h>
+#include <common/list.h>
+#include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
+#include <mpi_failure_tolerance/starpu_mpi_checkpoint_tracker.h>
+#include "starpu_mpi_checkpoint_template.h"
+
+
+//int                              cp_id;
+//	int                              cp_inst;
+//	int                              cp_domain;
+//	starpu_mpi_checkpoint_template_t cp_template;
+//	int                              ack_msg_count;
+
+struct _starpu_mpi_checkpoint_domain_tracker_index_list* domain_tracker_list;
+
+struct _starpu_mpi_checkpoint_domain_tracker_entry
+{
+	UT_hash_handle                        hh;
+	int                                   instance;
+	struct _starpu_mpi_checkpoint_tracker tracker;
+};
+
+LIST_TYPE(_starpu_mpi_checkpoint_domain_tracker_index,
+int domain;
+struct _starpu_mpi_checkpoint_tracker* last_valid_instance;
+struct _starpu_mpi_checkpoint_domain_tracker_entry* tracked_inst_hash_table;
+)
+
+static inline void _starpu_mpi_checkpoint_domain_tracker_index_init(struct _starpu_mpi_checkpoint_domain_tracker_index* index)
+{
+	index->domain = -1;
+	index->tracked_inst_hash_table = NULL;
+	index->last_valid_instance = NULL;
+}
+
+static inline void _starpu_mpi_checkpoint_domain_tracker_entry_init(struct _starpu_mpi_checkpoint_domain_tracker_entry* entry)
+{
+	entry->instance = -1;
+	entry->tracker.cp_id = -1;
+	entry->tracker.cp_inst = -1;
+	entry->tracker.cp_domain = -1;
+	entry->tracker.cp_template = NULL;
+	entry->tracker.ack_msg_count = 0;
+	entry->tracker.valid = 0;
+	entry->tracker.old = 0;
+}
+
+static inline struct _starpu_mpi_checkpoint_domain_tracker_index* get_domain_tracker_index(int domain)
+{
+	struct _starpu_mpi_checkpoint_domain_tracker_index* index;
+	for (index = _starpu_mpi_checkpoint_domain_tracker_index_list_begin(domain_tracker_list) ;
+		index != _starpu_mpi_checkpoint_domain_tracker_index_list_end(domain_tracker_list) ;
+		index = _starpu_mpi_checkpoint_domain_tracker_index_list_next(index))
+	{
+		if (index->domain == domain)
+		{
+			return index;
+		}
+	}
+	return NULL;
+}
+
+static inline struct _starpu_mpi_checkpoint_domain_tracker_index* add_domain_tracker_index(int domain)
+{
+	struct _starpu_mpi_checkpoint_domain_tracker_index* index;
+	starpu_malloc((void**)&index, sizeof(struct _starpu_mpi_checkpoint_domain_tracker_index));
+	_starpu_mpi_checkpoint_domain_tracker_index_init(index);
+	index->domain = domain;
+	_starpu_mpi_checkpoint_domain_tracker_index_list_push_back(domain_tracker_list, index);
+	return index;
+}
+
+static inline struct _starpu_mpi_checkpoint_domain_tracker_entry* get_tracker_entry(struct _starpu_mpi_checkpoint_domain_tracker_index* index, int instance)
+{
+	struct _starpu_mpi_checkpoint_domain_tracker_entry* entry = NULL;
+	if (index->tracked_inst_hash_table)
+	{
+		HASH_FIND_INT(index->tracked_inst_hash_table, &instance, entry);
+	}
+	return entry;
+}
+
+static inline struct _starpu_mpi_checkpoint_domain_tracker_entry* add_tracker_entry(struct _starpu_mpi_checkpoint_domain_tracker_index* index, int cp_id, int cp_inst, int cp_domain, starpu_mpi_checkpoint_template_t cp_template)
+{
+	struct _starpu_mpi_checkpoint_domain_tracker_entry* entry;
+	starpu_malloc((void**)&entry, sizeof(struct _starpu_mpi_checkpoint_domain_tracker_entry));
+	_starpu_mpi_checkpoint_domain_tracker_entry_init(entry);
+	entry->instance = cp_inst;
+	entry->tracker.cp_id = cp_id;
+	entry->tracker.cp_inst = cp_inst;
+	entry->tracker.cp_domain = cp_domain;
+	entry->tracker.cp_template = cp_template;
+	entry->tracker.ack_msg_count = cp_template->message_to_send_number;
+	HASH_ADD_INT(index->tracked_inst_hash_table, instance, entry);
+	return entry;
+}
+
+static inline int _clear_domain_tracker_index(struct _starpu_mpi_checkpoint_domain_tracker_index* index)
+{
+	struct _starpu_mpi_checkpoint_domain_tracker_entry* entry, *tmp;
+	HASH_ITER(hh, index->tracked_inst_hash_table, entry, tmp)
+	{
+		HASH_DEL(index->tracked_inst_hash_table, entry);
+		free(entry);
+	}
+}
+
+static inline int _domain_tracker_delete_all()
+{
+	struct _starpu_mpi_checkpoint_domain_tracker_index* temp_index;
+	struct _starpu_mpi_checkpoint_domain_tracker_index* index = index =_starpu_mpi_checkpoint_domain_tracker_index_list_begin(domain_tracker_list) ;
+	while (index != _starpu_mpi_checkpoint_domain_tracker_index_list_end(domain_tracker_list))
+	{
+		temp_index = _starpu_mpi_checkpoint_domain_tracker_index_list_next(index);
+		_clear_domain_tracker_index(index);
+		_starpu_mpi_checkpoint_domain_tracker_index_list_erase(domain_tracker_list, index);
+		free(index);
+		index = temp_index;
+	}
+	return 0;
+}
+
+
+
+int _starpu_mpi_checkpoint_tracker_init()
+{
+	domain_tracker_list = _starpu_mpi_checkpoint_domain_tracker_index_list_new();
+}
+
+int _starpu_mpi_checkpoint_tracker_shutdown()
+{
+	_domain_tracker_delete_all();
+	free(domain_tracker_list);
+}
+
+struct _starpu_mpi_checkpoint_tracker* _starpu_mpi_checkpoint_template_get_tracking_inst_by_id_inst(int cp_domain, int cp_inst)
+{
+	struct _starpu_mpi_checkpoint_domain_tracker_index* index = get_domain_tracker_index(cp_domain);
+	if (NULL == index)
+		return NULL;
+	struct _starpu_mpi_checkpoint_domain_tracker_entry* entry = get_tracker_entry(index, cp_inst);
+	if (NULL == entry)
+		return NULL;
+	return &entry->tracker;
+}
+
+struct _starpu_mpi_checkpoint_tracker* _starpu_mpi_checkpoint_template_create_instance_tracker(starpu_mpi_checkpoint_template_t cp_template, int cp_id, int cp_domain, int cp_inst)
+{
+	struct _starpu_mpi_checkpoint_domain_tracker_entry *entry;
+	struct _starpu_mpi_checkpoint_domain_tracker_index *index = get_domain_tracker_index(cp_domain);
+	if (NULL == index)
+		index = add_domain_tracker_index(cp_domain);
+	entry     = get_tracker_entry(index, cp_inst);
+	if (NULL == entry)
+		entry = add_tracker_entry(index, cp_id, cp_inst, cp_domain, cp_template);
+	return &entry->tracker;
+}
+
+struct _starpu_mpi_checkpoint_tracker* _starpu_mpi_checkpoint_tracker_update(starpu_mpi_checkpoint_template_t cp_template, int cp_id, int cp_domain, int cp_instance)
+{
+	struct _starpu_mpi_checkpoint_domain_tracker_entry* entry;
+	struct _starpu_mpi_checkpoint_domain_tracker_index* index = get_domain_tracker_index(cp_domain);
+	if (NULL == index)
+		index = add_domain_tracker_index(cp_domain);
+	entry = get_tracker_entry(index, cp_instance);
+	if (NULL == entry)
+	{
+		STARPU_ASSERT_MSG(cp_template!=NULL, "Couldn't find a CP template with the cpid:%d\n", cp_id);
+		entry = add_tracker_entry(index, cp_id, cp_instance, cp_domain, cp_template);
+	}
+	STARPU_ASSERT_MSG(entry->tracker.ack_msg_count>0, "Error. Trying to count ack message while all have already been received. id:%d, inst:%d, remaining_ack_messages:%d\n", entry->tracker.cp_id, entry->instance, entry->tracker.ack_msg_count);
+	entry->tracker.ack_msg_count--;
+	return &entry->tracker;
+}
+
+int _starpu_mpi_checkpoint_check_tracker(struct _starpu_mpi_checkpoint_tracker* tracker)
+{
+	if (tracker->valid==1)
+	{
+		return -1;
+	}
+	return tracker->ack_msg_count;
+}
+
+struct _starpu_mpi_checkpoint_tracker* _starpu_mpi_checkpoint_tracker_validate_instance(struct _starpu_mpi_checkpoint_tracker* tracker)
+{
+	// Here we validate a checkpoint and return the old cp info that must be discarded
+	struct _starpu_mpi_checkpoint_tracker* temp_tracker;
+	struct _starpu_mpi_checkpoint_domain_tracker_index* index = get_domain_tracker_index(tracker->cp_domain);
+	if (NULL == index->last_valid_instance || tracker->cp_inst > index->last_valid_instance->cp_inst)
+	{
+		_STARPU_MPI_DEBUG(0, "The CP (id:%d - dom:%d - inst:%d) has been fully acknowledged, and is now the latest valid CP for the domain.\n", tracker->cp_id, tracker->cp_domain, tracker->cp_inst);
+		// The checkpoint to validate is the newest of the domain. Update the latest CP and return the old "latest"
+		temp_tracker = index->last_valid_instance;
+		index->last_valid_instance = tracker;
+		tracker->valid = 1;
+		if (STARPU_LIKELY(temp_tracker!=NULL))
+		{
+			temp_tracker->old = 1;
+		}
+		return temp_tracker;
+	}
+	else
+	{
+		_STARPU_MPI_DEBUG(0, "The CP (id:%d - dom:%d - inst:%d) has been fully acknowledged, while a more recent one (id:%d - dom:%d - inst:%d) is already validated.\n",
+				tracker->cp_id, tracker->cp_domain, tracker->cp_inst,
+				index->last_valid_instance->cp_id, index->last_valid_instance->cp_domain, index->last_valid_instance->cp_inst);
+		// The checkpoint to validate is older than the latest validated, just return it to discard it
+		tracker->valid = 1;
+		tracker->old =1;
+		return tracker;
+	}
+}
+
+struct _starpu_mpi_checkpoint_tracker* _starpu_mpi_checkpoint_tracker_get_last_valid_tracker(int domain)
+{
+	struct _starpu_mpi_checkpoint_domain_tracker_index* index = get_domain_tracker_index(domain);
+	return index->last_valid_instance;
+}

+ 53 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_tracker.h

@@ -0,0 +1,53 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013-2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef FT_STARPU_STARPU_MPI_CHECKPOINT_TRACKER_H
+#define FT_STARPU_STARPU_MPI_CHECKPOINT_TRACKER_H
+
+
+//#include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+struct _starpu_mpi_checkpoint_tracker
+{
+	int                              cp_id;
+	int                              cp_inst;
+	int                              cp_domain;
+	starpu_mpi_checkpoint_template_t cp_template;
+	int                              ack_msg_count;
+	int                              old:1;
+	int                              valid: 1;
+};
+
+int _starpu_mpi_checkpoint_tracker_init();
+int _starpu_mpi_checkpoint_tracker_shutdown();
+struct _starpu_mpi_checkpoint_tracker* _starpu_mpi_checkpoint_template_get_tracking_inst_by_id_inst(int cp_domain, int cp_inst);
+struct _starpu_mpi_checkpoint_tracker* _starpu_mpi_checkpoint_template_create_instance_tracker(starpu_mpi_checkpoint_template_t cp_template, int cp_id, int cp_domain, int cp_inst);
+struct _starpu_mpi_checkpoint_tracker* _starpu_mpi_checkpoint_tracker_update(starpu_mpi_checkpoint_template_t cp_template, int cp_id, int cp_domain, int cp_instance);
+int _starpu_mpi_checkpoint_check_tracker(struct _starpu_mpi_checkpoint_tracker* tracker);
+struct _starpu_mpi_checkpoint_tracker* _starpu_mpi_checkpoint_tracker_validate_instance(struct _starpu_mpi_checkpoint_tracker* tracker);
+struct _starpu_mpi_checkpoint_tracker* _starpu_mpi_checkpoint_tracker_get_last_valid_tracker(int domain);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //FT_STARPU_STARPU_MPI_CHECKPOINT_TRACKER_H

+ 2 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_ft.c

@@ -29,6 +29,7 @@ int starpu_mpi_ft_turn_on(void)
 	starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank); //TODO: check compatibility with several Comms behaviour
 	starpu_mpi_ft_service_lib_init();
 	checkpoint_template_lib_init();
+	_starpu_mpi_checkpoint_tracker_init();
 	checkpoint_package_init();
 	return 0;
 }
@@ -37,6 +38,7 @@ int starpu_mpi_ft_turn_off(void)
 {
 	checkpoint_template_lib_quit();
 	checkpoint_package_shutdown();
+	_starpu_mpi_checkpoint_tracker_shutdown();
 	starpu_pthread_mutex_destroy(&ft_mutex);
 
 	return 0;

+ 3 - 3
mpi/tests/checkpoints.c

@@ -92,7 +92,7 @@ int pseudotest_checkpoint_template_register(int argc, char* argv[])
 		starpu_mpi_data_register(h_array[i], 42+i, 1); //42 to 54
 	}
 
-	starpu_mpi_checkpoint_template_register(&cp_template1, 123486,
+	starpu_mpi_checkpoint_template_register(&cp_template1, 123486, 0,
 	                                        STARPU_VALUE, &val, sizeof(int), 84, backup_of,
 	                                        STARPU_R, &h, 1,
 	                                        0);
@@ -100,7 +100,7 @@ int pseudotest_checkpoint_template_register(int argc, char* argv[])
 	FPRINTF(stderr, "registered!\n");
 	_starpu_mpi_checkpoint_template_print(cp_template1);
 
-	starpu_mpi_checkpoint_template_create(&cp_template2, 98765);
+	starpu_mpi_checkpoint_template_create(&cp_template2, 98765, 0);
 	starpu_mpi_checkpoint_template_add_entry(&cp_template2, STARPU_R, &h, 1);
 	starpu_mpi_checkpoint_template_add_entry(&cp_template2, STARPU_VALUE, &val, sizeof(int), 84, backup_of);
 	starpu_mpi_checkpoint_template_freeze(&cp_template2);
@@ -140,7 +140,7 @@ int test_checkpoint_submit(int argc, char* argv[])
 	starpu_mpi_data_register(handle1, 200, 1);
 
 	FPRINTF_MPI(stderr, "Registering\n");
-	starpu_mpi_checkpoint_template_register(&cp_template, 321,
+	starpu_mpi_checkpoint_template_register(&cp_template, 321, 0,
 			STARPU_R, &handle0, 1,
 			STARPU_R, &handle1, 0,
             STARPU_VALUE, &stage, sizeof(int), 300, &backup_of,