Przeglądaj źródła

Basic checkpoint exchanges and test

Romain LION 5 lat temu
rodzic
commit
14b8d5ca40

+ 5 - 3
mpi/include/starpu_mpi_ft.h

@@ -18,7 +18,7 @@
 #define FT_STARPU_STARPU_MPI_FT_H
 
 struct _starpu_mpi_checkpoint_template;
-typedef struct _starpu_mpi_checkpoint_template* starpu_mpi_checkpoint_template;
+typedef struct _starpu_mpi_checkpoint_template* starpu_mpi_checkpoint_template_t;
 
 
 /**
@@ -33,10 +33,12 @@ typedef struct _starpu_mpi_checkpoint_template* starpu_mpi_checkpoint_template;
  * <li> ::STARPU_DATA_ARRAY followed by an array of data handles,
  * its number of elements and a backup rank;
  * <li> ::STARPU_VALUE followed by a pointer to the unregistered value,
- * its size in bytes and a backup rank.
+ * its size in bytes, the rank of the back up, and the rank backuped by the calling node.
  * <li> The argument list must be ended by the value 0.
  * </ul>
  */
-int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template* cp_template, int cp_id, ...);
+int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, ...);
+int starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template_t cp_template);
+int starpu_mpi_checkpoint_turn_on(void);
 
 #endif //FT_STARPU_STARPU_MPI_FT_H

+ 80 - 16
mpi/src/starpu_mpi_checkpoint.c

@@ -21,21 +21,30 @@
 
 #include <starpu_mpi_checkpoint.h>
 #include <sys/param.h>
+#include <starpu_mpi_private.h>
 
 #define MAX_CP_TEMPLATE_NUMBER 32 // Arbitrary limit
 
-starpu_pthread_mutex_t cp_template_mutex;
-starpu_mpi_checkpoint_template cp_template_array[MAX_CP_TEMPLATE_NUMBER];
+starpu_pthread_mutex_t           cp_template_mutex;
+starpu_mpi_checkpoint_template_t cp_template_array[MAX_CP_TEMPLATE_NUMBER];
+int                              my_rank;
 int cp_template_number = 0;
 
-int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template* cp_template, int cp_id, va_list varg_list)
+extern struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count);
+extern struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg, int sequential_consistency);
+
+
+int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, va_list varg_list)
 {
 	int arg_type;
 	void* ptr;
 	int count;
 	int backup_rank;
+	int backup_of;
+//	int (*_backup_of)(int);
+//	int (*_backuped_by)(int);
 
-	starpu_mpi_checkpoint_template _cp_template = _starpu_mpi_checkpoint_template_new(cp_id);
+	starpu_mpi_checkpoint_template_t _cp_template = _starpu_mpi_checkpoint_template_new(cp_id);
 
 	va_list varg_list_copy;
 	va_copy(varg_list_copy, varg_list);
@@ -50,18 +59,25 @@ int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template* cp_
 				ptr         = va_arg(varg_list_copy, void*);
 				count       = 1;
 				backup_rank = va_arg(varg_list_copy, int);
+				backup_of   = -1;
 				break;
 			case STARPU_VALUE:
+				ptr         = va_arg(varg_list_copy, void*);
+				count       = va_arg(varg_list_copy, int);
+				backup_rank = va_arg(varg_list_copy, int);
+				backup_of   = va_arg(varg_list_copy, int);
+				break;
 			case STARPU_DATA_ARRAY:
 				ptr         = va_arg(varg_list_copy, void*);
 				count       = va_arg(varg_list_copy, int);
 				backup_rank = va_arg(varg_list_copy, int);
+				backup_of   = -1;
 				break;
 			default:
 				STARPU_ABORT_MSG("Unrecognized argument %d, did you perhaps forget to end arguments with 0?\n", arg_type);
 				break;
 		}
-		_starpu_mpi_checkpoint_template_add_data(_cp_template, arg_type, ptr, count, backup_rank);
+		_starpu_mpi_checkpoint_template_add_data(_cp_template, arg_type, ptr, count, backup_rank, backup_of);
 	};
 	va_end(varg_list_copy);
 
@@ -82,7 +98,7 @@ int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template* cp_
 	return 0;
 }
 
-int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template* cp_template, int cp_id, ...)
+int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, ...)
 {
 	va_list varg_list;
 	va_start(varg_list, cp_id);
@@ -91,8 +107,60 @@ int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template* cp_t
 	return ret;
 }
 
+void print_received_value(void* handle)
+{
+	fprintf(stderr, "Node %d - I received backup value:%d\n", my_rank, *(int*)starpu_data_handle_to_pointer(*(starpu_data_handle_t*)handle, STARPU_MAIN_RAM));
+}
+
+int _starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template_t cp_template)
+{
+	starpu_data_handle_t* handle;
+	struct _starpu_mpi_checkpoint_template_item* item;
+	//MPI_Comm comm;
+
+	starpu_pthread_mutex_lock(&cp_template->mutex);
+	STARPU_ASSERT_MSG(cp_template->pending==0, "Can not submit a checkpoint while previous instance has not succeeded.\n");
+
+	cp_template->pending = 1;
+
+	item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
+	fprintf(stderr, "begin iter\n");
+
+	while (item != _starpu_mpi_checkpoint_template_end(cp_template))
+	{
+		switch (item->type)
+		{
+			case STARPU_VALUE:
+//				starpu_data_handle_t send_handle;
+//				starpu_variable_data_register(&send_handle, STARPU_MAIN_RAM, (uintptr_t)item->ptr, item->count);
+//				starpu_mpi_data_register(send_handle, )
+//				starpu_mpi_send
+				break;
+			case STARPU_R:
+				handle = (starpu_data_handle_t*)item->ptr;
+				if (starpu_mpi_data_get_rank(*handle)==my_rank)
+				{
+					fprintf(stderr,"sending to %d (tag %d)\n", item->backup_rank, (int)starpu_mpi_data_get_tag(*handle));
+					_starpu_mpi_isend_common(*handle, item->backup_rank, starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0, 0, NULL, NULL, 1);
+				}
+				else if (item->backup_rank==my_rank)
+				{
+					fprintf(stderr,"recving from %d (tag %d)\n", starpu_mpi_data_get_rank(*handle), (int)starpu_mpi_data_get_tag(*handle));
+					_starpu_mpi_irecv_common(*handle, starpu_mpi_data_get_rank(*handle), starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0, &print_received_value, (void*)handle, 1, 1, 1);
+				}
+				break;
+		}
+
+		item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
+	};
+
+	starpu_pthread_mutex_unlock(&cp_template->mutex);
+
+	return 0;
+}
+
 // For test purpose
-int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template cp_template)
+int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_template)
 {
 	int val;
 	int i = 0;
@@ -133,19 +201,15 @@ int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template cp_temp
 	return 0;
 }
 
-int _starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template cp_template) {
-	STARPU_ASSERT_MSG(cp_template->pending==0, "A checkpoint submission has been requested while the previous "
-											"one has not ended.\n");
-
-	for (int i = 0; i < cp_template->size; ++i)
-	{
-		break;
-	}
+int starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template_t cp_template)
+{
+	_starpu_mpi_checkpoint_template_submit(cp_template);
 	return 0;
 }
 
-int _starpu_mpi_checkpoint_turn_on(void)
+int starpu_mpi_checkpoint_turn_on(void)
 {
 	starpu_pthread_mutex_init(&cp_template_mutex, NULL);
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank); //TODO: check compatibility with several Comms behaviour
 	return 0;
 }

+ 16 - 10
mpi/src/starpu_mpi_checkpoint.h

@@ -31,6 +31,7 @@ LIST_TYPE(_starpu_mpi_checkpoint_template_item,
     void* ptr;
     int count;
     int backup_rank;
+    int backup_of;
 );
 
 struct _starpu_mpi_checkpoint_template{
@@ -39,10 +40,11 @@ struct _starpu_mpi_checkpoint_template{
     int cp_template_id;
     int pending;
     int frozen;
+    starpu_pthread_mutex_t mutex;
     starpu_sem_t completion_sem;
 };
 
-static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_item_create(int type, void* ptr, int count, int backup_rank)
+static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_item_create(int type, void* ptr, int count, int backup_rank, int backup_of)
 {
 	struct _starpu_mpi_checkpoint_template_item* item;
 	_STARPU_MPI_CALLOC(item, 1, sizeof(struct _starpu_mpi_checkpoint_template_item));
@@ -50,26 +52,30 @@ static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoin
 	item->ptr = ptr;
 	item->count = count;
 	item->backup_rank = backup_rank;
+	item->backup_of = backup_of;
+
 	return item;
 }
 
-static inline starpu_mpi_checkpoint_template _starpu_mpi_checkpoint_template_new(int cp_id)
+static inline starpu_mpi_checkpoint_template_t _starpu_mpi_checkpoint_template_new(int cp_id)
 {
-	starpu_mpi_checkpoint_template _cp_template;
+	starpu_mpi_checkpoint_template_t _cp_template;
 	_STARPU_MPI_CALLOC(_cp_template, 1, sizeof(struct _starpu_mpi_checkpoint_template));
+	_cp_template->cp_template_id = cp_id;
+	starpu_pthread_mutex_init(&_cp_template->mutex, NULL);
 	return _cp_template;
 }
 
-static inline int _starpu_mpi_checkpoint_template_add_data(starpu_mpi_checkpoint_template cp_template, int type, void* ptr, int count, int backup_rank)
+static inline int _starpu_mpi_checkpoint_template_add_data(starpu_mpi_checkpoint_template_t cp_template, int type, void* ptr, int count, int backup_rank, int backup_of)
 {
 	STARPU_ASSERT_MSG(!cp_template->frozen, "It is not possible to modify registered checkpoint template.\n");
 	struct _starpu_mpi_checkpoint_template_item* item;
-	item = _starpu_mpi_checkpoint_template_item_create(type, ptr, count, backup_rank);
+	item = _starpu_mpi_checkpoint_template_item_create(type, ptr, count, backup_rank, backup_of);
 	_starpu_mpi_checkpoint_template_item_list_push_back(&cp_template->list, item);
 	return 0;
 }
 
-static inline int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template _cp_template)
+static inline int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t _cp_template)
 {
 	_cp_template->frozen = 1;
 	_cp_template->size = _starpu_mpi_checkpoint_template_item_list_size(&_cp_template->list);
@@ -77,17 +83,17 @@ static inline int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_t
 	return _cp_template->size;
 }
 
-static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_get_first_data(starpu_mpi_checkpoint_template template)
+static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_get_first_data(starpu_mpi_checkpoint_template_t template)
 {
 	return _starpu_mpi_checkpoint_template_item_list_front(&template->list);
 }
 
-static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_get_next_data(starpu_mpi_checkpoint_template template STARPU_ATTRIBUTE_UNUSED, struct _starpu_mpi_checkpoint_template_item* ref_data)
+static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_get_next_data(starpu_mpi_checkpoint_template_t template STARPU_ATTRIBUTE_UNUSED, struct _starpu_mpi_checkpoint_template_item* ref_data)
 {
 	return _starpu_mpi_checkpoint_template_item_list_next(ref_data);
 }
 
-static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_end(starpu_mpi_checkpoint_template template STARPU_ATTRIBUTE_UNUSED)
+static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_end(starpu_mpi_checkpoint_template_t template STARPU_ATTRIBUTE_UNUSED)
 {
 	return NULL;
 }
@@ -95,7 +101,7 @@ static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoin
 int _starpu_mpi_checkpoint_turn_on(void);
 
 // For test purpose
-int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template cp_template);
+int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_template);
 
 
 #ifdef __cplusplus

+ 4 - 0
mpi/src/starpu_mpi_init.c

@@ -89,6 +89,10 @@ void _starpu_mpi_do_initialize(struct _starpu_mpi_argc_argv *argc_argv)
 	_mpi_world_size = argc_argv->world_size;
 	_mpi_world_rank = argc_argv->rank;
 #endif
+
+#ifdef STARPU_USE_MPI_FT
+	starpu_mpi_checkpoint_turn_on();
+#endif
 }
 
 static

+ 124 - 49
mpi/tests/checkpoints.c

@@ -21,61 +21,136 @@
 
 #define ARRAY_SIZE 12
 
-int main(int argc, char* argv[])
+#define STARPU_MPI_INIT(void) do{struct starpu_conf conf; int ret; \
+starpu_conf_init(&conf); \
+conf.nmic = 0; \
+conf.nmpi_ms = 0; \
+ret = starpu_init(NULL); \
+if (STARPU_UNLIKELY(ret == -ENODEV)) \
+{ \
+return 77; \
+} \
+STARPU_CHECK_RETURN_VALUE(ret, "starpu_init"); \
+if (starpu_cpu_worker_get_count() < 1) \
+{ \
+FPRINTF(stderr, "This application requires at least 1 cpu worker\n"); \
+starpu_shutdown(); \
+return 77; \
+} \
+starpu_mpi_init(&argc, &argv, 1); \
+starpu_mpi_comm_size(MPI_COMM_WORLD, &nb_nodes); \
+starpu_mpi_comm_rank(MPI_COMM_WORLD, &me); \
+}while(0)
+
+
+int nb_nodes;
+int me;
+
+int backup_of(int _me)
 {
-	starpu_data_handle_t h;
-	starpu_data_handle_t h_array[ARRAY_SIZE];
-    starpu_mpi_checkpoint_template cp_template;
-    int val = 42;
-    int val2 = 1234;
-    int array[ARRAY_SIZE];
-    int ret;
-    struct starpu_conf conf;
-
-    //init array
-    for (int i=0 ; i<ARRAY_SIZE ; i++)
-    {
-    	array[i] = i*1111+42;
-    }
+	return (_me+1)%nb_nodes;
+}
+
+int backuped_by(int _me)
+{
+	return (_me-1)%nb_nodes;
+}
+
+int pseudotest_checkpoint_template_register(int argc, char* argv[])
+{
+	starpu_data_handle_t             h;
+	starpu_data_handle_t             h_array[ARRAY_SIZE];
+	starpu_mpi_checkpoint_template_t cp_template;
+	int                              val = 42;
+	int                              val2 = 1234;
+
+	int array[ARRAY_SIZE];
+	int ret;
+
+	//init array
+	for (int i=0 ; i<ARRAY_SIZE ; i++)
+	{
+		array[i] = i*1111+42;
+	}
 
 	for (int i=0 ; i<ARRAY_SIZE ; i++)
 	{
 		h_array[i] = NULL;
 	}
 
-    starpu_conf_init(&conf);
-    conf.nmic = 0;
-    conf.nmpi_ms = 0;
-
-    ret = starpu_init(&conf);
-    if (STARPU_UNLIKELY(ret == -ENODEV))
-    {
-        return 77;
-    }
-    STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
-
-    if (starpu_cpu_worker_get_count() < 1)
-    {
-        FPRINTF(stderr, "This application requires at least 1 cpu worker\n");
-        starpu_shutdown();
-        return 77;
-    }
-
-    starpu_variable_data_register(&h, STARPU_MAIN_RAM, (uintptr_t)&val2, sizeof(int));
-
-    starpu_vector_data_register(h_array, STARPU_MAIN_RAM, (uintptr_t)array, ARRAY_SIZE, sizeof(int));
-    for (int i=0 ; i<ARRAY_SIZE ; i++) {
-    	starpu_variable_data_register(&h_array[i], STARPU_MAIN_RAM, (uintptr_t)&array[i], sizeof(int));
-    }
-
-    starpu_mpi_checkpoint_template_register(&cp_template, 123486,
-           STARPU_VALUE, &val, sizeof(int), 1,
-           STARPU_DATA_ARRAY, h_array, ARRAY_SIZE, 1,
-           STARPU_R, &h, 1,
-           0);
-
-    FPRINTF(stderr, "registered!\n");
-    _starpu_mpi_checkpoint_template_print(cp_template);
-    return 0;
+	STARPU_MPI_INIT();
+
+	starpu_variable_data_register(&h, STARPU_MAIN_RAM, (uintptr_t)&val2, sizeof(int));
+
+	starpu_vector_data_register(h_array, STARPU_MAIN_RAM, (uintptr_t)array, ARRAY_SIZE, sizeof(int));
+	for (int i=0 ; i<ARRAY_SIZE ; i++)
+	{
+		starpu_variable_data_register(&h_array[i], STARPU_MAIN_RAM, (uintptr_t)&array[i], sizeof(int));
+	}
+
+	starpu_mpi_checkpoint_template_register(&cp_template, 123486,
+	                                        STARPU_VALUE, &val, sizeof(int), backup_of(me), backuped_by(me),
+	                                        STARPU_DATA_ARRAY, h_array, ARRAY_SIZE, 1,
+	                                        STARPU_R, &h, 1,
+	                                        0);
+
+	FPRINTF(stderr, "registered!\n");
+	_starpu_mpi_checkpoint_template_print(cp_template);
+	starpu_shutdown();
+	return 0;
+}
+
+int test_checkpoint_submit(int argc, char* argv[])
+{
+	starpu_data_handle_t handle0, handle1;
+	starpu_mpi_checkpoint_template_t cp_template;
+	int val0 = 0;
+	int val1 = 0;
+
+	FPRINTF(stderr, "Go\n");
+
+	STARPU_MPI_INIT();
+
+	FPRINTF_MPI(stderr, "Init ok - my rnk %d - size %d\n", me, nb_nodes);
+
+	starpu_variable_data_register(&handle0, STARPU_MAIN_RAM, (uintptr_t)&val0, sizeof(int));
+	starpu_variable_data_register(&handle1, STARPU_MAIN_RAM, (uintptr_t)&val1, sizeof(int));
+
+	starpu_mpi_data_register(handle0, 100, 0);
+	starpu_mpi_data_register(handle1, 200, 1);
+
+	starpu_mpi_checkpoint_template_register(&cp_template, 321,
+			STARPU_R, &handle0, 1,
+			STARPU_R, &handle1, 0,
+			0);
+
+	switch (me)
+	{
+		case 0:
+			val0 = 42;
+			break;
+		case 1:
+			val1 = 1000;
+			break;
+		default:
+			STARPU_ABORT_MSG("Test code with only two node communicator.\n");
+			break;
+	}
+	FPRINTF_MPI(stderr, "Submitting\n");
+	starpu_mpi_checkpoint_template_submit(cp_template);
+
+	FPRINTF_MPI(stderr, "Submitted\n");
+	sleep(10);
+
+	starpu_shutdown();
+
+	return 0;
+}
+
+int main(int argc, char* argv[])
+{
+	//pseudotest_checkpoint_template_register(argc, argv);
+	test_checkpoint_submit(argc, argv);
+	return 0;
 }