Browse Source

Make checkpoint definition by the user great

Romain LION 5 years ago
parent
commit
033c1da35c

+ 5 - 1
mpi/include/starpu_mpi_ft.h

@@ -37,11 +37,15 @@ typedef struct _starpu_mpi_checkpoint_template* starpu_mpi_checkpoint_template_t
  * <li> ::STARPU_DATA_ARRAY followed by an array of data handles,
  * its number of elements and a backup rank;
  * <li> ::STARPU_VALUE followed by a pointer to the unregistered value,
- * its size in bytes, the rank of the back up, and the rank backuped by the calling node.
+ * its size in bytes, a unique tag (as the ones given for data handle registering)
+ * and the function giving the back up rank of the rank argument : int(backup_of)(int) .
  * <li> The argument list must be ended by the value 0.
  * </ul>
  */
 int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, ...);
+int starpu_mpi_checkpoint_template_create(starpu_mpi_checkpoint_template_t* cp_template, int cp_id);
+int starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t* cp_template, ...);
+int starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t* cp_template);
 int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template);
 int starpu_mpi_ft_turn_on(void);
 int starpu_mpi_ft_turn_off(void);

+ 131 - 34
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.c

@@ -28,10 +28,15 @@
 starpu_pthread_mutex_t           cp_template_mutex;
 starpu_mpi_checkpoint_template_t cp_template_array[MAX_CP_TEMPLATE_NUMBER];
 int                              my_rank;
+int                              size;
 int cp_template_number = 0;
 
+typedef int (*backup_of_fn)(int);
+
 void checkpoint_template_lib_init(void) {
 	starpu_pthread_mutex_init(&cp_template_mutex, NULL);
+	starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank);
+	starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
 }
 
 void checkpoint_template_lib_quit(void) {
@@ -46,51 +51,125 @@ void checkpoint_template_lib_quit(void) {
 	}
 }
 
-int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, va_list varg_list)
+int starpu_mpi_checkpoint_template_create(starpu_mpi_checkpoint_template_t* cp_template, int cp_id)
 {
-	int arg_type;
-	//void* useless;
-	void* ptr;
-	int count;
-	int backup_rank;
-	int backup_of;
-//	int (*_backup_of)(int);
-//	int (*_backuped_by)(int);
+	*cp_template = _starpu_mpi_checkpoint_template_new(cp_id);
+	return 0;
+}
 
-	starpu_mpi_checkpoint_template_t _cp_template = _starpu_mpi_checkpoint_template_new(cp_id);
+int _starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t cp_template, int arg_type, va_list varg_list)
+{
+	void*        ptr;
+	int          count;
+	int          my_backup;
+	int          backup_of;
+	int          data_rank;
+	starpu_mpi_tag_t tag;
+	backup_of_fn _backup_of;
 
-	va_list varg_list_copy;
-	va_copy(varg_list_copy, varg_list);
+	STARPU_ASSERT_MSG(!(arg_type & STARPU_COMMUTE), "Unable to checkpoint non sequential task flow.\n");
 
-	while ((arg_type = va_arg(varg_list_copy, int)) != 0)
+	switch(arg_type)
 	{
-		STARPU_ASSERT_MSG(!(arg_type & STARPU_COMMUTE), "Unable to checkpoint non sequential task flow.\n");
+		case STARPU_R:
+			ptr       = va_arg(varg_list, void*);
+			count     = 1;
+			my_backup = va_arg(varg_list, int);
+			backup_of = -1;
+			data_rank = starpu_mpi_data_get_rank(*(starpu_data_handle_t*)ptr);
+			if (my_rank==data_rank || my_rank==my_backup)
+			{
+				return _starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, my_backup, backup_of, -1);
+			}
+			else
+			{
+				/* Since this data does not concern me (i.e. it is nor my data neither a data which I'm the back up)
+				 * it is considered unecessary to register in the CP */
+				return 0;
+			}
+		case STARPU_VALUE:
+			ptr       = va_arg(varg_list, void*);
+			count     = va_arg(varg_list, int);
+			tag       = va_arg(varg_list, starpu_mpi_tag_t);
+			_backup_of = va_arg(varg_list, backup_of_fn);
+			/* I register the backup that will save this data */
+			_starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, _backup_of(my_rank), -1, tag);
+			for (int i=0 ; i<my_rank ; i++)
+			{
+				if (_backup_of(i) == my_rank)
+				{
+					/* I'm the back up of someone else for this data, I have to remember it */
+					_starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, -1, i, tag);
+				}
+			}
+			for (int i=my_rank+1 ; i<size ; i++)
+			{
+				if (_backup_of(i) == my_rank)
+				{
+					/* I'm the back up of someone else for this data, I have to remember it */
+					_starpu_mpi_checkpoint_template_add_data(cp_template, arg_type, ptr, count, -1, i, tag);
+				}
+			}
+			return 0;
+//			case STARPU_DATA_ARRAY:
+//				ptr         = va_arg(varg_list, void*);
+//				count       = va_arg(varg_list, int);
+//				my_backup = va_arg(varg_list, int);
+//				backup_of   = -1;
+//				break;
+		default:
+			STARPU_ABORT_MSG("Unrecognized argument %d, did you perhaps forget to end arguments with 0?\n", arg_type);
+			break;
+	}
+}
+
 
-		switch(arg_type)
+int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t cp_template)
+{
+	starpu_pthread_mutex_lock(&cp_template->mutex);
+
+	cp_template->frozen         = 1;
+	cp_template->message_number = 0;
+	cp_template->size           = _starpu_mpi_checkpoint_template_item_list_size(&cp_template->list);
+
+	struct _starpu_mpi_checkpoint_template_item* item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
+
+	while (item != _starpu_mpi_checkpoint_template_end(cp_template))
+	{
+		switch (item->type)
 		{
-			case STARPU_R:
-				ptr         = va_arg(varg_list_copy, void*);
-				count       = 1;
-				backup_rank = va_arg(varg_list_copy, int);
-				backup_of   = -1;
-				break;
 			case STARPU_VALUE:
-				ptr         = va_arg(varg_list_copy, void*);
-				count       = va_arg(varg_list_copy, int);
-				backup_rank = va_arg(varg_list_copy, int);
-				backup_of   = va_arg(varg_list_copy, int);
+				cp_template->message_number++;
 				break;
-//			case STARPU_DATA_ARRAY:
-//				ptr         = va_arg(varg_list_copy, void*);
-//				count       = va_arg(varg_list_copy, int);
-//				backup_rank = va_arg(varg_list_copy, int);
-//				backup_of   = -1;
-//				break;
-			default:
-				STARPU_ABORT_MSG("Unrecognized argument %d, did you perhaps forget to end arguments with 0?\n", arg_type);
+			case STARPU_R:
+				if (starpu_mpi_data_get_rank((starpu_data_handle_t) item->ptr))
+				{
+					cp_template->message_number++;
+				}
+				break;
+			case STARPU_DATA_ARRAY:
 				break;
 		}
-		_starpu_mpi_checkpoint_template_add_data(_cp_template, arg_type, ptr, count, backup_rank, backup_of);
+		item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
+	}
+
+	starpu_pthread_mutex_unlock(&cp_template->mutex);
+
+	return cp_template->size;
+}
+
+int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, va_list varg_list)
+{
+	int arg_type;
+
+	starpu_mpi_checkpoint_template_t _cp_template = _starpu_mpi_checkpoint_template_new(cp_id);
+
+	va_list varg_list_copy;
+	va_copy(varg_list_copy, varg_list);
+
+	while ((arg_type = va_arg(varg_list_copy, int)) != 0)
+	{
+		_starpu_mpi_checkpoint_template_add_entry(_cp_template, arg_type, varg_list_copy);
 	};
 	va_end(varg_list_copy);
 
@@ -111,6 +190,11 @@ int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* c
 	return 0;
 }
 
+int starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t* cp_template)
+{
+	return _starpu_mpi_checkpoint_template_freeze(*cp_template);
+}
+
 int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, ...)
 {
 	va_list varg_list;
@@ -120,6 +204,19 @@ int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp
 	return ret;
 }
 
+int starpu_mpi_checkpoint_template_add_entry(starpu_mpi_checkpoint_template_t* cp_template, ...)
+{
+	va_list varg_list;
+	int arg_type;
+	int ret;
+	va_start(varg_list, cp_template);
+	arg_type = va_arg(varg_list, int);
+	STARPU_ASSERT_MSG(arg_type!=STARPU_NONE, "Unhandled arg_type: STARPU_NONE(0).\n");
+	ret = _starpu_mpi_checkpoint_template_add_entry(*cp_template, arg_type, varg_list);
+	va_end(varg_list);
+	return ret;
+}
+
 int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_instance) {
 	starpu_pthread_mutex_lock(&cp_template_mutex);
 	fprintf(stderr, "Digesting ack recv: id=%d, inst=%d\n", checkpoint_id, checkpoint_instance);

+ 5 - 36
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.h

@@ -39,6 +39,7 @@ void* ptr;
 int count;
 int backup_rank;
 int backup_of;
+starpu_mpi_tag_t tag;
 );
 
 struct _starpu_mpi_checkpoint_template{
@@ -53,7 +54,7 @@ struct _starpu_mpi_checkpoint_template{
 	starpu_pthread_mutex_t                           mutex;
 };
 
-static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_item_create(int type, void* ptr, int count, int backup_rank, int backup_of)
+static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_item_create(int type, void* ptr, int count, int backup_rank, int backup_of, starpu_mpi_tag_t tag)
 {
 	struct _starpu_mpi_checkpoint_template_item* item;
 	_STARPU_MPI_CALLOC(item, 1, sizeof(struct _starpu_mpi_checkpoint_template_item));
@@ -62,6 +63,7 @@ static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoin
 	item->count = count;
 	item->backup_rank = backup_rank;
 	item->backup_of = backup_of;
+	item->tag = tag;
 
 	return item;
 }
@@ -76,12 +78,12 @@ static inline starpu_mpi_checkpoint_template_t _starpu_mpi_checkpoint_template_n
 	return _cp_template;
 }
 
-static inline int _starpu_mpi_checkpoint_template_add_data(starpu_mpi_checkpoint_template_t cp_template, int type, void* ptr, int count, int backup_rank, int backup_of)
+static inline int _starpu_mpi_checkpoint_template_add_data(starpu_mpi_checkpoint_template_t cp_template, int type, void* ptr, int count, int backup_rank, int backup_of, starpu_mpi_tag_t tag)
 {
 	starpu_pthread_mutex_lock(&cp_template->mutex);
 	STARPU_ASSERT_MSG(!cp_template->frozen, "It is not possible to modify registered checkpoint template.\n");
 	struct _starpu_mpi_checkpoint_template_item* item;
-	item = _starpu_mpi_checkpoint_template_item_create(type, ptr, count, backup_rank, backup_of);
+	item = _starpu_mpi_checkpoint_template_item_create(type, ptr, count, backup_rank, backup_of, tag);
 	_starpu_mpi_checkpoint_template_item_list_push_back(&cp_template->list, item);
 	starpu_pthread_mutex_unlock(&cp_template->mutex);
 	return 0;
@@ -102,39 +104,6 @@ static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoin
 	return NULL;
 }
 
-static inline int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template_t cp_template)
-{
-	starpu_pthread_mutex_lock(&cp_template->mutex);
-
-	cp_template->frozen         = 1;
-	cp_template->message_number = 0;
-	cp_template->size           = _starpu_mpi_checkpoint_template_item_list_size(&cp_template->list);
-
-	struct _starpu_mpi_checkpoint_template_item* item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
-
-	while (item != _starpu_mpi_checkpoint_template_end(cp_template))
-	{
-		switch (item->type)
-		{
-			case STARPU_VALUE:
-				cp_template->message_number++;
-				break;
-			case STARPU_R:
-				if (starpu_mpi_data_get_rank((starpu_data_handle_t) item->ptr))
-				{
-					cp_template->message_number++;
-				}
-				break;
-			case STARPU_DATA_ARRAY:
-				break;
-		}
-		item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
-	}
-
-	starpu_pthread_mutex_unlock(&cp_template->mutex);
-
-	return cp_template->size;
-}
 
 static inline int _starpu_checkpoint_template_free(starpu_mpi_checkpoint_template_t cp_template)
 {

+ 18 - 7
mpi/tests/checkpoints.c

@@ -60,7 +60,7 @@ int pseudotest_checkpoint_template_register(int argc, char* argv[])
 {
 	starpu_data_handle_t             h;
 	starpu_data_handle_t             h_array[ARRAY_SIZE];
-	starpu_mpi_checkpoint_template_t cp_template;
+	starpu_mpi_checkpoint_template_t cp_template1, cp_template2;
 	int                              val = 42;
 	int                              val2 = 1234;
 
@@ -77,9 +77,12 @@ int pseudotest_checkpoint_template_register(int argc, char* argv[])
 	{
 		h_array[i] = NULL;
 	}
+	FPRINTF(stderr, "Go\n");
 
 	STARPU_MPI_INIT();
 
+	FPRINTF_MPI(stderr, "Init ok - my rnk %d - size %d\n", me, nb_nodes);
+
 	starpu_variable_data_register(&h, STARPU_MAIN_RAM, (uintptr_t)&val2, sizeof(int));
 
 	starpu_vector_data_register(h_array, STARPU_MAIN_RAM, (uintptr_t)array, ARRAY_SIZE, sizeof(int));
@@ -88,14 +91,22 @@ int pseudotest_checkpoint_template_register(int argc, char* argv[])
 		starpu_variable_data_register(&h_array[i], STARPU_MAIN_RAM, (uintptr_t)&array[i], sizeof(int));
 	}
 
-	starpu_mpi_checkpoint_template_register(&cp_template, 123486,
-	                                        STARPU_VALUE, &val, sizeof(int), backup_of(me), backuped_by(me),
-	                                        STARPU_DATA_ARRAY, h_array, ARRAY_SIZE, 1,
+	starpu_mpi_checkpoint_template_register(&cp_template1, 123486,
+	                                        STARPU_VALUE, &val, sizeof(int), 84, backup_of,
 	                                        STARPU_R, &h, 1,
 	                                        0);
 
 	FPRINTF(stderr, "registered!\n");
-	_starpu_mpi_checkpoint_template_print(cp_template);
+	_starpu_mpi_checkpoint_template_print(cp_template1);
+
+	starpu_mpi_checkpoint_template_create(&cp_template2, 98765);
+	starpu_mpi_checkpoint_template_add_entry(&cp_template2, STARPU_R, &h, 1);
+	starpu_mpi_checkpoint_template_add_entry(&cp_template2, STARPU_VALUE, &val, sizeof(int), 84, backup_of);
+	starpu_mpi_checkpoint_template_freeze(&cp_template2);
+
+	FPRINTF(stderr, "registered 2!\n");
+	_starpu_mpi_checkpoint_template_print(cp_template1);
+
 	starpu_shutdown();
 	return 0;
 }
@@ -149,8 +160,8 @@ int test_checkpoint_submit(int argc, char* argv[])
 
 int main(int argc, char* argv[])
 {
-	//pseudotest_checkpoint_template_register(argc, argv);
-	test_checkpoint_submit(argc, argv);
+	pseudotest_checkpoint_template_register(argc, argv);
+	//test_checkpoint_submit(argc, argv);
 	return 0;
 }