소스 검색

Shutdown mechanisms, to delete all cp data while shuting down

Romain LION 5 년 전
부모
커밋
f41c9c4e66

+ 54 - 14
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_package.c

@@ -19,6 +19,8 @@
 struct _starpu_mpi_checkpoint_data_list* checkpoint_data_list;
 starpu_pthread_mutex_t package_package_mutex;
 
+int _checkpoint_package_data_delete_all();
+
 int checkpoint_package_init()
 {
 	starpu_pthread_mutex_init(&package_package_mutex, NULL);
@@ -27,6 +29,13 @@ int checkpoint_package_init()
 	return 0;
 }
 
+int checkpoint_package_shutdown()
+{
+	starpu_pthread_mutex_destroy(&package_package_mutex);
+	_checkpoint_package_data_delete_all();
+	return 0;
+}
+
 int checkpoint_package_data_add(int cp_id, int cp_inst, int rank, starpu_mpi_tag_t tag, int type, void* ptr, int count)
 {
 	struct _starpu_mpi_checkpoint_data* checkpoint_data = _starpu_mpi_checkpoint_data_new();
@@ -38,7 +47,29 @@ int checkpoint_package_data_add(int cp_id, int cp_inst, int rank, starpu_mpi_tag
 	checkpoint_data->ptr = ptr;
 	checkpoint_data->count = count;
 	_starpu_mpi_checkpoint_data_list_push_back(checkpoint_data_list, checkpoint_data);
-	_STARPU_MPI_DEBUG(8, "CP data added - cpid:%d - cpinst:%d - rank:%d - tag:%ld\n", checkpoint_data->cp_id, checkpoint_data->cp_inst, checkpoint_data->rank, checkpoint_data->tag);
+	_STARPU_MPI_DEBUG(8, "CP data (%p) added - cpid:%d - cpinst:%d - rank:%d - tag:%ld\n", checkpoint_data->ptr, checkpoint_data->cp_id, checkpoint_data->cp_inst, checkpoint_data->rank, checkpoint_data->tag);
+	return 0;
+}
+
+int _checkpoint_package_data_delete(struct _starpu_mpi_checkpoint_data* checkpoint_data)
+{
+	if (checkpoint_data->type==STARPU_R)
+	{
+		starpu_data_handle_t handle = checkpoint_data->ptr;
+		_STARPU_MPI_DEBUG(8, "Clearing handle %p entry\n", handle);
+		starpu_data_unregister_submit(handle);
+	}
+	else if (checkpoint_data->type==STARPU_VALUE)
+	{
+		_STARPU_MPI_DEBUG(8, "Clearing external data entry\n");
+		free(checkpoint_data->ptr);
+	}
+	else
+	{
+		STARPU_ABORT_MSG("Unrecognized data type: %d\n", checkpoint_data->type);
+	}
+	_starpu_mpi_checkpoint_data_list_erase(checkpoint_data_list, checkpoint_data);
+	free(checkpoint_data);
 	return 0;
 }
 
@@ -54,22 +85,31 @@ int checkpoint_package_data_del(int cp_id, int cp_inst, int rank)
 		// the rank that initiated the CP
 		if (checkpoint_data->cp_inst<cp_inst && checkpoint_data->rank==rank)
 		{
-			if (checkpoint_data->type==STARPU_R)
-			{
-				starpu_data_handle_t handle = checkpoint_data->ptr;
-				starpu_data_unregister_submit(handle);
-			}
-			if (checkpoint_data->type==STARPU_VALUE)
-			{
-				free(checkpoint_data->ptr);
-			}
-			_starpu_mpi_checkpoint_data_list_erase(checkpoint_data_list, checkpoint_data);
-			free(checkpoint_data);
+			_checkpoint_package_data_delete(checkpoint_data);
 			done++;
 		}
 		checkpoint_data = next_checkpoint_data;
 	}
 	_STARPU_MPI_DEBUG(0, "cleared %d data from checkpoint database.\n", done);
 
-	return 0;
-}
+	return done;
+}
+
+int _checkpoint_package_data_delete_all()
+{
+	int done = 0;
+	struct _starpu_mpi_checkpoint_data* next_checkpoint_data = NULL;
+	struct _starpu_mpi_checkpoint_data* checkpoint_data = _starpu_mpi_checkpoint_data_list_begin(checkpoint_data_list);
+	while (checkpoint_data != _starpu_mpi_checkpoint_data_list_end(checkpoint_data_list))
+	{
+		next_checkpoint_data = _starpu_mpi_checkpoint_data_list_next(checkpoint_data);
+		// I delete all the old data (i.e. the cp inst is strictly lower than the one of the just validated CP) only for
+		// the rank that initiated the CP
+		_checkpoint_package_data_delete(checkpoint_data);
+		done++;
+		checkpoint_data = next_checkpoint_data;
+	}
+	_STARPU_MPI_DEBUG(0, "cleared %d data from checkpoint database.\n", done);
+
+	return done;
+}

+ 1 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_package.h

@@ -40,6 +40,7 @@ LIST_TYPE(_starpu_mpi_checkpoint_data,
 );
 
 int checkpoint_package_init();
+int checkpoint_package_shutdown();
 int checkpoint_package_data_add(int cp_id, int cp_inst, int rank, starpu_mpi_tag_t tag, int type, void* ptr, int count);
 int checkpoint_package_data_del(int cp_id, int cp_inst, int rank);
 

+ 1 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_ft.c

@@ -36,6 +36,7 @@ int starpu_mpi_ft_turn_on(void)
 int starpu_mpi_ft_turn_off(void)
 {
 	checkpoint_template_lib_quit();
+	checkpoint_package_shutdown();
 	starpu_pthread_mutex_destroy(&ft_mutex);
 
 	return 0;