Browse Source

Storing checkpoint data.

Romain LION 5 years ago
parent
commit
60d6dae659

+ 4 - 4
mpi/src/Makefile.am

@@ -88,8 +88,8 @@ noinst_HEADERS +=       \
 	mpi_failure_tolerance/starpu_mpi_ft.h   \
 	mpi_failure_tolerance/starpu_mpi_checkpoint.h    \
 	mpi_failure_tolerance/starpu_mpi_checkpoint_template.h \
-	mpi_failure_tolerance/starpu_mpi_ft_service_comms.h
-#	mpi_failure_tolerance/starpu_mpi_checkpoint_package.h
+	mpi_failure_tolerance/starpu_mpi_ft_service_comms.h  \
+	mpi_failure_tolerance/starpu_mpi_checkpoint_package.h
 endif STARPU_USE_MPI_FT
 
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
@@ -128,6 +128,6 @@ libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES +=    \
 	mpi_failure_tolerance/starpu_mpi_ft.c   \
 	mpi_failure_tolerance/starpu_mpi_checkpoint.c    \
 	mpi_failure_tolerance/starpu_mpi_checkpoint_template.c   \
-	mpi_failure_tolerance/starpu_mpi_ft_service_comms.c
- #	mpi_failure_tolerance/starpu_mpi_checkpoint_package.c
+	mpi_failure_tolerance/starpu_mpi_ft_service_comms.c \
+ 	mpi_failure_tolerance/starpu_mpi_checkpoint_package.c
 endif STARPU_USE_MPI_FT

+ 16 - 2
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint.c

@@ -19,6 +19,7 @@
 
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
+#include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
 #include <mpi_failure_tolerance/starpu_mpi_ft_service_comms.h>
 #include <starpu_mpi_private.h>
 #include <mpi/starpu_mpi_mpi_backend.h> // Should be deduced at preprocessing (Nmad vs MPI)
@@ -59,6 +60,13 @@ void _starpu_mpi_push_cp_ack_send_cb(void* _args)
 
 }
 
+void _starpu_mpi_store_data_and_push_cp_ack_send_cb(void* _args)
+{
+	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
+	checkpoint_package_data_add(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank, arg->tag, arg->type, arg->copy_handle, arg->count);
+	_starpu_mpi_push_cp_ack_send_cb(_args);
+}
+
 void _starpu_mpi_push_cp_ack_recv_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
@@ -71,7 +79,7 @@ void _starpu_checkpoint_cached_data_recv_copy_and_ack(void* _arg)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _arg;
 	starpu_data_register_same(&arg->copy_handle, arg->handle);
-	starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_push_cp_ack_send_cb, _arg);
+	starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_store_data_and_push_cp_ack_send_cb, _arg);
 	starpu_data_release(arg->handle);
 }
 
@@ -79,7 +87,7 @@ void _starpu_checkpoint_data_recv_copy_and_ack(void* _arg)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _arg;
 	starpu_data_register_same(&arg->copy_handle, arg->handle);
-	starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_push_cp_ack_send_cb, _arg);
+	starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_store_data_and_push_cp_ack_send_cb, _arg);
 }
 
 int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template)
@@ -116,6 +124,9 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 					struct _starpu_mpi_cp_ack_arg_cb* arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
 					arg->rank = item->backup_rank;
 					arg->handle = handle;
+					arg->tag = starpu_mpi_data_get_tag(handle);
+					arg->type = STARPU_R;
+					arg->count = item->count;
 					arg->msg.checkpoint_id = cp_template->cp_template_id;
 					arg->msg.checkpoint_instance = cp_template->cp_template_current_instance;
 					_starpu_mpi_isend_cache_aware(handle, item->backup_rank, starpu_mpi_data_get_tag(handle), MPI_COMM_WORLD, 1, 0, 0,
@@ -127,6 +138,9 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 					struct _starpu_mpi_cp_ack_arg_cb* arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
 					arg->rank = starpu_mpi_data_get_rank(handle);
 					arg->handle = handle;
+					arg->tag = starpu_mpi_data_get_tag(handle);
+					arg->type = STARPU_R;
+					arg->count = item->count;
 					arg->msg.checkpoint_id = cp_template->cp_template_id;
 					arg->msg.checkpoint_instance = cp_template->cp_template_current_instance;
 					_starpu_mpi_irecv_cache_aware(handle, starpu_mpi_data_get_rank(handle), starpu_mpi_data_get_tag(handle), MPI_COMM_WORLD, 1, 0,

+ 3 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint.h

@@ -39,6 +39,9 @@ struct _starpu_mpi_cp_ack_arg_cb
 	int                           rank;
 	starpu_data_handle_t          handle;
 	starpu_data_handle_t          copy_handle;
+	int type;
+	int count;
+	starpu_mpi_tag_t              tag;
 	struct _starpu_mpi_cp_ack_msg msg;
 };
 

+ 61 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_package.c

@@ -0,0 +1,61 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013-2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
+
+struct _starpu_mpi_checkpoint_data_list* checkpoint_data_list;
+starpu_pthread_mutex_t package_package_mutex;
+
+int checkpoint_package_init()
+{
+	starpu_pthread_mutex_init(&package_package_mutex, NULL);
+	checkpoint_data_list = _starpu_mpi_checkpoint_data_list_new();
+	_starpu_mpi_checkpoint_data_list_init(checkpoint_data_list);
+	return 0;
+}
+
+int checkpoint_package_data_add(int cp_id, int cp_inst, int rank, starpu_mpi_tag_t tag, int type, void* ptr, int count)
+{
+	struct _starpu_mpi_checkpoint_data* checkpoint_data = _starpu_mpi_checkpoint_data_new();
+	checkpoint_data->cp_id = cp_id;
+	checkpoint_data->cp_inst = cp_inst;
+	checkpoint_data->rank = rank;
+	checkpoint_data->tag = tag;
+	checkpoint_data->type = type;
+	checkpoint_data->ptr = ptr;
+	checkpoint_data->count = count;
+	_starpu_mpi_checkpoint_data_list_push_back(checkpoint_data_list, checkpoint_data);
+	fprintf(stderr, "CP data added - cpid:%d - cpinst:%d - rank:%d - tag:%ld\n", checkpoint_data->cp_id, checkpoint_data->cp_inst, checkpoint_data->rank, checkpoint_data->tag);
+	return 0;
+}
+
+int checkpoint_package_data_del(int cp_id, int cp_inst, int rank)
+{
+	int done = 0;
+	struct _starpu_mpi_checkpoint_data* checkpoint_data = _starpu_mpi_checkpoint_data_list_begin(checkpoint_data_list);
+	while (checkpoint_data != _starpu_mpi_checkpoint_data_list_end(checkpoint_data_list))
+	{
+		if (checkpoint_data->cp_id==cp_id && checkpoint_data->cp_inst==cp_inst
+			&& checkpoint_data->rank==rank)
+		{
+			_starpu_mpi_checkpoint_data_list_erase(checkpoint_data_list, checkpoint_data);
+			done++;
+		}
+	}
+	fprintf(stderr, "cleared %d data from checkpoint database.\n", done);
+
+	return 0;
+}

+ 20 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_package.h

@@ -17,11 +17,31 @@
 #ifndef _STARPU_MPI_CHECKPOINT_PACKAGE_H
 #define _STARPU_MPI_CHECKPOINT_PACKAGE_H
 
+#include <starpu_mpi.h>
+#include <common/list.h>
+#include <starpu_mpi_private.h>
+
 #ifdef __cplusplus
 extern "C"
 {
 #endif
 
+	/*TODO: This structure should be a hashtable accessible with these keys:
+	 *  CPid > CPinstance > Rank > tag */
+
+LIST_TYPE(_starpu_mpi_checkpoint_data,
+	int cp_id;
+	int cp_inst;
+	int rank;
+	starpu_mpi_tag_t tag;
+	int type;
+	void* ptr;
+	int count;
+);
+
+int checkpoint_package_init();
+int checkpoint_package_data_add(int cp_id, int cp_inst, int rank, starpu_mpi_tag_t tag, int type, void* ptr, int count);
+int checkpoint_package_data_del(int cp_id, int cp_inst, int rank);
 
 #ifdef __cplusplus
 }

+ 2 - 1
mpi/src/mpi_failure_tolerance/starpu_mpi_ft.c

@@ -16,7 +16,7 @@
 
 #include <starpu_mpi_private.h>
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
-#include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
+#include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
 #include <mpi_failure_tolerance/starpu_mpi_ft_service_comms.h>
 
 
@@ -29,6 +29,7 @@ int starpu_mpi_ft_turn_on(void)
 	starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank); //TODO: check compatibility with several Comms behaviour
 	starpu_mpi_ft_service_lib_init();
 	checkpoint_template_lib_init();
+	checkpoint_package_init();
 	return 0;
 }
 

+ 2 - 2
mpi/tests/checkpoints.c

@@ -162,8 +162,8 @@ int test_checkpoint_submit(int argc, char* argv[])
 
 int main(int argc, char* argv[])
 {
-	pseudotest_checkpoint_template_register(argc, argv);
-	//test_checkpoint_submit(argc, argv);
+	//pseudotest_checkpoint_template_register(argc, argv);
+	test_checkpoint_submit(argc, argv);
 	return 0;
 }