Browse Source

CP templates are now structured as chained list

Romain LION 5 years ago
parent
commit
886e771cd4
3 changed files with 119 additions and 62 deletions
  1. 57 50
      mpi/src/starpu_mpi_checkpoint.c
  2. 59 11
      mpi/src/starpu_mpi_checkpoint.h
  3. 3 1
      mpi/tests/checkpoints.c

+ 57 - 50
mpi/src/starpu_mpi_checkpoint.c

@@ -22,7 +22,7 @@
 #include <starpu_mpi_checkpoint.h>
 #include <sys/param.h>
 
-#define MAX_CP_TEMPLATE_NUMBER 32
+#define MAX_CP_TEMPLATE_NUMBER 32 // Arbitrary limit
 
 starpu_pthread_mutex_t cp_template_mutex;
 starpu_mpi_checkpoint_template cp_template_array[MAX_CP_TEMPLATE_NUMBER];
@@ -30,60 +30,49 @@ int cp_template_number = 0;
 
 int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template* cp_template, int cp_id, va_list varg_list)
 {
-	int i = 0;
 	int arg_type;
+	void* ptr;
+	int count;
+	int backup_rank;
 
-	starpu_mpi_checkpoint_template _cp_template = _starpu_mpi_checkpoint_create();
+	starpu_mpi_checkpoint_template _cp_template = _starpu_mpi_checkpoint_template_new(cp_id);
+	fprintf(stderr, "cpid: %d\n", cp_id);
 
 	va_list varg_list_copy;
 	va_copy(varg_list_copy, varg_list);
 
 	while ((arg_type = va_arg(varg_list_copy, int)) != 0) {
 
-		if (i == CHECKPOINT_STRUCTURE_MAX_SIZE)
-		{
-			STARPU_ABORT_MSG("Unable to treat more data (CHECKPOINT_STRUCTURE_MAX_SIZE == %d.\n",
-			                 CHECKPOINT_STRUCTURE_MAX_SIZE);
-		}
+		fprintf(stderr, "argtype: %d\n", arg_type);
+		STARPU_ASSERT_MSG(!(arg_type & STARPU_COMMUTE), "Unable to checkpoint non sequential task flow.\n");
 
-		if (arg_type & STARPU_COMMUTE)
-		{
-			STARPU_ABORT_MSG("Unable to checkpoint non sequential task flow.\n");
-		}
-		else if (arg_type==STARPU_R)
-		{
-			_cp_template->items[i].type        = STARPU_R;
-			_cp_template->items[i].ptr         = va_arg(varg_list_copy, void*);
-			_cp_template->items[i].backup_rank = va_arg(varg_list_copy, int);
-		}
-		else if (arg_type==STARPU_VALUE)
-		{
-			_cp_template->items[i].type        = STARPU_VALUE;
-			_cp_template->items[i].ptr         = va_arg(varg_list_copy,void*);
-			_cp_template->items[i].count       = va_arg(varg_list_copy, int);
-			_cp_template->items[i].backup_rank = va_arg(varg_list_copy, int);
-		}
-		else if (arg_type==STARPU_DATA_ARRAY)
-		{
-			_cp_template->items[i].type        = STARPU_DATA_ARRAY;
-			_cp_template->items[i].ptr         = va_arg(varg_list_copy,void*);
-			_cp_template->items[i].count       = va_arg(varg_list_copy, int);
-			_cp_template->items[i].backup_rank = va_arg(varg_list_copy, int);
-		}
-		else
+		switch(arg_type)
 		{
-			STARPU_ABORT_MSG("Unrecognized argument %d, did you perhaps forget to end arguments with 0?\n", arg_type);
+			case STARPU_R:
+				ptr         = va_arg(varg_list_copy, void*);
+				count       = 1;
+				backup_rank = va_arg(varg_list_copy, int);
+				break;
+			case STARPU_VALUE:
+			case STARPU_DATA_ARRAY:
+				ptr         = va_arg(varg_list_copy, void*);
+				count       = va_arg(varg_list_copy, int);
+				backup_rank = va_arg(varg_list_copy, int);
+				break;
+			default:
+				STARPU_ABORT_MSG("Unrecognized argument %d, did you perhaps forget to end arguments with 0?\n", arg_type);
+				break;
 		}
-
-		i ++;
+		_starpu_mpi_checkpoint_template_add_data(_cp_template, arg_type, ptr, count, backup_rank);
 	};
 	va_end(varg_list_copy);
 
-	_cp_template->size = i;
-	starpu_sem_init(&_cp_template->completion_sem, 0, _cp_template->size-1);
-	_cp_template->cp_template_id = cp_id;
-
+	_starpu_mpi_checkpoint_template_freeze(_cp_template);
 
+	starpu_pthread_mutex_lock(&cp_template_mutex);
+	cp_template_array[cp_template_number] = _cp_template;
+	cp_template_number++;
+	starpu_pthread_mutex_unlock(&cp_template_mutex);
 
 	*cp_template = _cp_template;
 
@@ -103,26 +92,30 @@ int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template* cp_t
 int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template cp_template)
 {
 	int val;
-	for (int i=0 ; i< cp_template->size ; i++)
+	int i = 0;
+	struct _starpu_mpi_checkpoint_template_item* item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
+
+	while (item != _starpu_mpi_checkpoint_template_end(cp_template))
 	{
 		fprintf(stderr,"Item %2d: ", i);
-		if (cp_template->items[i].type == STARPU_VALUE)
+		if (item->type == STARPU_VALUE)
 		{
-			printf("STARPU_VALUE - Value=%d\n", (*(int *)(cp_template->items[i].ptr)));
+			fprintf(stderr, "STARPU_VALUE - ");
+			fprintf(stderr, "Value=%d\n", (*(int *)(item->ptr)));
 		}
-		else if (cp_template->items[i].type == STARPU_R)
+		else if (item->type == STARPU_R)
 		{
-			val = *(int*)starpu_data_handle_to_pointer(*(starpu_data_handle_t*)(cp_template->items[i].ptr), 0);
-			printf("STARPU_R - Value=%d\n", val);
+			val = *(int*)starpu_data_handle_to_pointer(*(starpu_data_handle_t*)(item->ptr), 0);
+			fprintf(stderr, "STARPU_R - Value=%d\n", val);
 
 		}
-		else if (cp_template->items[i].type == STARPU_DATA_ARRAY)
+		else if (item->type == STARPU_DATA_ARRAY)
 		{
-			fprintf(stderr, "STARPU_DATA_ARRAY - Multiple values: %d", *(int*)starpu_data_handle_to_pointer(*((starpu_data_handle_t*)cp_template->items[i].ptr), 0));
+			fprintf(stderr, "STARPU_DATA_ARRAY - Multiple values: %d", *(int*)starpu_data_handle_to_pointer(*((starpu_data_handle_t*)item->ptr), 0));
 
-			for (int j=1 ; j<MIN(cp_template->items[i].count, 5) ; j++)
+			for (int j=1 ; j<MIN(item->count, 5) ; j++)
 			{
-				fprintf(stderr, ", %d", *(int*)starpu_data_handle_to_pointer(((starpu_data_handle_t*)cp_template->items[i].ptr)[j], 0)); //j*sizeof(starpu_data_handle_t)
+				fprintf(stderr, ", %d", *(int*)starpu_data_handle_to_pointer(((starpu_data_handle_t*)item->ptr)[j], 0)); //j*sizeof(starpu_data_handle_t)
 			}
 			fprintf(stderr, "...\n");
 		}
@@ -130,6 +123,20 @@ int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template cp_temp
 		{
 			printf("Unrecognized type.\n");
 		}
+
+		item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
+		i++;
+	};
+	return 0;
+}
+
+int _starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template cp_template) {
+	STARPU_ASSERT_MSG(cp_template->pending==0, "A checkpoint submission has been requested while the previous "
+											"one has not ended.\n");
+
+	for (int i = 0; i < cp_template->size; ++i)
+	{
+		break;
 	}
 	return 0;
 }

+ 59 - 11
mpi/src/starpu_mpi_checkpoint.h

@@ -18,40 +18,88 @@
 #define FT_STARPU_STARPU_MPI_CHECKPOINT_H
 
 #include <starpu_mpi.h>
+#include <common/list.h>
+#include "starpu_mpi_private.h"
 
-#define CHECKPOINT_STRUCTURE_MAX_SIZE 32
-
+#ifdef __cplusplus
+extern "C"
+{
+#endif
 
-// TODO: make template as an unlimited chained list
-struct _starpu_mpi_checkpoint_template_item{
+LIST_TYPE(_starpu_mpi_checkpoint_template_item,
     int type;
     void* ptr;
     int count;
     int backup_rank;
-};
+);
 
 struct _starpu_mpi_checkpoint_template{
-    struct _starpu_mpi_checkpoint_template_item items[CHECKPOINT_STRUCTURE_MAX_SIZE];
+    struct _starpu_mpi_checkpoint_template_item_list list;
     int size;
     int cp_template_id;
     int pending;
+    int frozen;
     starpu_sem_t completion_sem;
 };
 
-starpu_mpi_checkpoint_template _starpu_mpi_checkpoint_create(void)
+static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_item_create(int type, void* ptr, int count, int backup_rank)
+{
+	struct _starpu_mpi_checkpoint_template_item* item;
+	_STARPU_MPI_CALLOC(item, 1, sizeof(struct _starpu_mpi_checkpoint_template_item));
+	item->type = type;
+	item->ptr = ptr;
+	item->count = count;
+	item->backup_rank = backup_rank;
+	return item;
+}
+
+static inline starpu_mpi_checkpoint_template _starpu_mpi_checkpoint_template_new(int cp_id)
 {
 	starpu_mpi_checkpoint_template _cp_template;
-	_STARPU_MALLOC(_cp_template, sizeof(struct _starpu_mpi_checkpoint_template));
-	assert(_cp_template!=NULL);
-	_cp_template->pending = 0;
-	//starpu_sem_init(&_cp_template->completion_sem, 0, 0);
+	_STARPU_MPI_CALLOC(_cp_template, 1, sizeof(struct _starpu_mpi_checkpoint_template));
 	return _cp_template;
 }
 
+static inline int _starpu_mpi_checkpoint_template_add_data(starpu_mpi_checkpoint_template cp_template, int type, void* ptr, int count, int backup_rank)
+{
+	STARPU_ASSERT_MSG(!cp_template->frozen, "It is not possible to modify registered checkpoint template.\n");
+	struct _starpu_mpi_checkpoint_template_item* item;
+	item = _starpu_mpi_checkpoint_template_item_create(type, ptr, count, backup_rank);
+	_starpu_mpi_checkpoint_template_item_list_push_back(&cp_template->list, item);
+	return 0;
+}
+
+static inline int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_template _cp_template)
+{
+	_cp_template->frozen = 1;
+	_cp_template->size = _starpu_mpi_checkpoint_template_item_list_size(&_cp_template->list);
+	starpu_sem_init(&_cp_template->completion_sem, 0, _cp_template->size-1);
+	return _cp_template->size;
+}
+
+static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_get_first_data(starpu_mpi_checkpoint_template template)
+{
+	return _starpu_mpi_checkpoint_template_item_list_front(&template->list);
+}
+
+static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_get_next_data(starpu_mpi_checkpoint_template template STARPU_ATTRIBUTE_UNUSED, struct _starpu_mpi_checkpoint_template_item* ref_data)
+{
+	return _starpu_mpi_checkpoint_template_item_list_next(ref_data);
+}
+
+static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_end(starpu_mpi_checkpoint_template template STARPU_ATTRIBUTE_UNUSED)
+{
+	return NULL;
+}
+
 int _starpu_mpi_checkpoint_turn_on(void);
 
 // For test purpose
 int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template cp_template);
 
 
+#ifdef __cplusplus
+}
+#endif
+
 #endif //FT_STARPU_STARPU_MPI_CHECKPOINT_H

+ 3 - 1
mpi/tests/checkpoints.c

@@ -68,11 +68,13 @@ int main(int argc, char* argv[])
     	starpu_variable_data_register(&h_array[i], STARPU_MAIN_RAM, (uintptr_t)&array[i], sizeof(int));
     }
 
-    starpu_mpi_checkpoint_template_register(&cp_template,
+    starpu_mpi_checkpoint_template_register(&cp_template, 123486,
            STARPU_VALUE, &val, sizeof(int), 1,
            STARPU_DATA_ARRAY, h_array, ARRAY_SIZE, 1,
            STARPU_R, &h, 1,
            0);
+
+    FPRINTF(stderr, "registered!\n");
     _starpu_mpi_checkpoint_template_print(cp_template);
     return 0;
 }