Bläddra i källkod

last valid tracking inst better check since instance number is absolute and no cptemplate relative

Romain LION 5 år sedan
förälder
incheckning
31b76d193b

+ 13 - 13
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.c

@@ -35,7 +35,7 @@ int                              my_rank;
 int                              comm_size;
 int                              current_instance;
 
-struct _starpu_mpi_checkpoint_template_tracking_inst*     last_valid_cp;
+struct _starpu_mpi_checkpoint_template_tracking_inst*     last_valid_tracking_inst;
 struct _starpu_mpi_checkpoint_template_tracking_inst_list future_tracking_list;
 struct _starpu_mpi_checkpoint_template_tracking_inst_list pending_tracking_list;
 
@@ -66,8 +66,8 @@ void checkpoint_template_lib_init(void) {
 	starpu_pthread_mutex_init(&cp_template_mutex, NULL);
 	starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank);
 	starpu_mpi_comm_size(MPI_COMM_WORLD, &comm_size);
-	last_valid_cp = _starpu_mpi_checkpoint_template_tracking_inst_new();
-	_starpu_mpi_checkpoint_template_tracking_inst_init(last_valid_cp);
+	last_valid_tracking_inst = _starpu_mpi_checkpoint_template_tracking_inst_new();
+	_starpu_mpi_checkpoint_template_tracking_inst_init(last_valid_tracking_inst);
 	_starpu_mpi_checkpoint_template_tracking_inst_list_init(&future_tracking_list);
 	_starpu_mpi_checkpoint_template_tracking_inst_list_init(&pending_tracking_list);
 	current_instance = 0;
@@ -431,7 +431,7 @@ int starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t* c
 
 int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_instance) {
 	int remaining_ack_messages;
-	struct _starpu_mpi_checkpoint_template_tracking_inst* last_valid_tracking_inst;
+	struct _starpu_mpi_checkpoint_template_tracking_inst* _last_valid_tracking_inst;
 	starpu_mpi_checkpoint_template_t cp_template = _starpu_mpi_get_checkpoint_template_by_id(checkpoint_id);
 	starpu_mpi_checkpoint_template_t alt_cp_template;
 	starpu_pthread_mutex_lock(&cp_template_mutex);
@@ -449,21 +449,21 @@ int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_
 	else if (remaining_ack_messages==0)
 	{
 		_STARPU_MPI_DEBUG(0, "The CP (id:%d - inst:%d) has been successfully saved and acknowledged.\n", checkpoint_id, checkpoint_instance);
-		last_valid_tracking_inst = _starpu_mpi_checkpoint_template_check_validation_coherency(checkpoint_id, checkpoint_instance);
-		STARPU_MPI_ASSERT_MSG(last_valid_tracking_inst != NULL, "I couldn't check validation coherency for CP (id:%d - inst:%d), certainly nothing refers to it in pending inst tracking list.\n", checkpoint_id, checkpoint_instance);
-		if (!(last_valid_tracking_inst->cp_id==checkpoint_id && last_valid_tracking_inst->cp_inst==checkpoint_instance))
+		_last_valid_tracking_inst = _starpu_mpi_checkpoint_template_check_validation_coherency(checkpoint_id, checkpoint_instance);
+		STARPU_MPI_ASSERT_MSG(_last_valid_tracking_inst != NULL, "I couldn't check validation coherency for CP (id:%d - inst:%d), certainly nothing refers to it in pending inst tracking list.\n", checkpoint_id, checkpoint_instance);
+		if (_last_valid_tracking_inst == last_valid_tracking_inst)
 		{
-			_STARPU_MPI_DEBUG(0, "The CP (id:%d - inst:%d) has been fully acknowledged, while a more recent one (id:%d - inst:%d) is already validated.\n", checkpoint_id, checkpoint_instance, last_valid_tracking_inst->cp_id, last_valid_tracking_inst->cp_inst);
-			checkpoint_id = last_valid_tracking_inst->cp_id;
-			checkpoint_instance = last_valid_tracking_inst->cp_inst;
+			_STARPU_MPI_DEBUG(0, "The CP (id:%d - inst:%d) has been fully acknowledged, while a more recent one (id:%d - inst:%d) is already validated.\n", checkpoint_id, checkpoint_instance, _last_valid_tracking_inst->cp_id, _last_valid_tracking_inst->cp_inst);
+			checkpoint_id = _last_valid_tracking_inst->cp_id;
+			checkpoint_instance = _last_valid_tracking_inst->cp_inst;
 			// I have to warn the backups of the just acknowledged CP that the CP is already out of date. I must send a them a discard directly msg
 			_starpu_mpi_checkpoint_post_cp_discard_send(cp_template, checkpoint_id, checkpoint_instance);
 		}
 		else
 		{
-			alt_cp_template = last_valid_cp->cp_template;
-			last_valid_tracking_inst->valid = 1;
-			last_valid_cp = last_valid_tracking_inst;
+			alt_cp_template = last_valid_tracking_inst->cp_template;
+			_last_valid_tracking_inst->valid = 1;
+			last_valid_tracking_inst = _last_valid_tracking_inst;
 			if (alt_cp_template==NULL)
 			{
 				// TODO:should warn some people, because the msg loggin is not implemented(this precise nodes to contact)

+ 35 - 9
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.h

@@ -34,6 +34,7 @@ extern "C"
 extern starpu_pthread_mutex_t           cp_template_mutex;
 extern int                              cp_template_array_size;
 extern starpu_mpi_checkpoint_template_t cp_template_array[MAX_CP_TEMPLATE_NUMBER];
+struct _starpu_mpi_checkpoint_template_tracking_inst*     last_valid_tracking_inst;
 extern struct _starpu_mpi_checkpoint_template_tracking_inst_list future_tracking_list;
 extern struct _starpu_mpi_checkpoint_template_tracking_inst_list pending_tracking_list;
 
@@ -133,6 +134,33 @@ static struct _starpu_mpi_checkpoint_template_tracking_inst* _starpu_mpi_checkpo
 //	return item->count;
 //}
 
+//static struct _starpu_mpi_checkpoint_template_tracking_inst* _starpu_mpi_checkpoint_template_check_validation_coherency(int cp_id, int cp_inst)
+//{
+//	struct _starpu_mpi_checkpoint_template_tracking_inst* item;
+//
+//	for (item =_starpu_mpi_checkpoint_template_tracking_inst_list_begin(&pending_tracking_list) ;
+//	     item!=_starpu_mpi_checkpoint_template_tracking_inst_list_end(&pending_tracking_list) ;
+//	     item =_starpu_mpi_checkpoint_template_tracking_inst_list_next(item))
+//	{
+//		if (item->valid)
+//		{
+//			if (!(item->cp_id == cp_id && item->cp_inst == cp_inst))
+//			{
+//				return item;
+//			}
+//			else
+//			{
+//				STARPU_ABORT_MSG("The checkpoint (id:%d - inst:%d) is already validated. This should not happen.\n", cp_id, cp_inst);
+//			}
+//		}
+//		else if (item->cp_id==cp_id && item->cp_inst==cp_inst)
+//		{
+//			return item;
+//		}
+//	}
+//	return NULL;
+//}
+
 static struct _starpu_mpi_checkpoint_template_tracking_inst* _starpu_mpi_checkpoint_template_check_validation_coherency(int cp_id, int cp_inst)
 {
 	struct _starpu_mpi_checkpoint_template_tracking_inst* item;
@@ -141,19 +169,17 @@ static struct _starpu_mpi_checkpoint_template_tracking_inst* _starpu_mpi_checkpo
 	     item!=_starpu_mpi_checkpoint_template_tracking_inst_list_end(&pending_tracking_list) ;
 	     item =_starpu_mpi_checkpoint_template_tracking_inst_list_next(item))
 	{
-		if (item->valid)
+		if (last_valid_tracking_inst->cp_inst > cp_inst)
 		{
-			if (!(item->cp_id == cp_id && item->cp_inst == cp_inst))
-			{
-				return item;
-			}
-			else
-			{
-				STARPU_ABORT_MSG("The checkpoint (id:%d - inst:%d) is already validated. This should not happen.\n", cp_id, cp_inst);
-			}
+			return last_valid_tracking_inst;
 		}
 		else if (item->cp_id==cp_id && item->cp_inst==cp_inst)
 		{
+			if (item->valid)
+			{
+				STARPU_ABORT_MSG("The checkpoint (id:%d - inst:%d) is already validated. This should not happen.\n",
+				                 cp_id, cp_inst);
+			}
 			return item;
 		}
 	}