瀏覽代碼

Termination mechanisms

Romain LION 5 年之前
父節點
當前提交
418b7b7557
共有 4 個文件被更改,包括 66 次插入25 次删除
  1. 10 1
      mpi/include/starpu_mpi_ft.h
  2. 34 20
      mpi/src/starpu_mpi_checkpoint.c
  3. 17 3
      mpi/src/starpu_mpi_checkpoint.h
  4. 5 1
      mpi/src/starpu_mpi_init.c

+ 10 - 1
mpi/include/starpu_mpi_ft.h

@@ -17,10 +17,14 @@
 #ifndef FT_STARPU_STARPU_MPI_FT_H
 #define FT_STARPU_STARPU_MPI_FT_H
 
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
 struct _starpu_mpi_checkpoint_template;
 typedef struct _starpu_mpi_checkpoint_template* starpu_mpi_checkpoint_template_t;
 
-
 /**
  * Registers a checkpoint template \p cp_template with the given arguments.
  * It is then ready to use with ::starpu_mpi_checkpoint_template_submit during the program execution.
@@ -40,5 +44,10 @@ typedef struct _starpu_mpi_checkpoint_template* starpu_mpi_checkpoint_template_t
 int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, ...);
 int starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template_t cp_template);
 int starpu_mpi_checkpoint_turn_on(void);
+int starpu_mpi_checkpoint_turn_off(void);
+
+#ifdef __cplusplus
+}
+#endif
 
 #endif //FT_STARPU_STARPU_MPI_FT_H

+ 34 - 20
mpi/src/starpu_mpi_checkpoint.c

@@ -34,9 +34,10 @@ extern struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t dat
 extern struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg, int sequential_consistency);
 
 
-int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, va_list varg_list)
+static int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, va_list varg_list)
 {
 	int arg_type;
+	void* useless;
 	void* ptr;
 	int count;
 	int backup_rank;
@@ -49,8 +50,8 @@ int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* c
 	va_list varg_list_copy;
 	va_copy(varg_list_copy, varg_list);
 
-	while ((arg_type = va_arg(varg_list_copy, int)) != 0) {
-
+	while ((arg_type = va_arg(varg_list_copy, int)) != 0)
+	{
 		STARPU_ASSERT_MSG(!(arg_type & STARPU_COMMUTE), "Unable to checkpoint non sequential task flow.\n");
 
 		switch(arg_type)
@@ -98,15 +99,6 @@ int _starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* c
 	return 0;
 }
 
-int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, ...)
-{
-	va_list varg_list;
-	va_start(varg_list, cp_id);
-	int ret = _starpu_mpi_checkpoint_template_register(cp_template, cp_id, varg_list);
-	va_end(varg_list);
-	return ret;
-}
-
 void print_received_value(void* handle)
 {
 	fprintf(stderr, "Node %d - I received backup value:%d\n", my_rank, *(int*)starpu_data_handle_to_pointer(*(starpu_data_handle_t*)handle, STARPU_MAIN_RAM));
@@ -164,7 +156,7 @@ int _starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template_t cp_t
  * @param args
  * @return
  */
-void* _starpu_mpi_checkpoint_ack_send_routine(void* args)
+void* _starpu_mpi_checkpoint_ack_send_cb(void* args)
 {
 	starpu_mpi_checkpoint_template_t cp_template = (starpu_mpi_checkpoint_template_t) args;
 	starpu_pthread_mutex_lock(&cp_template->mutex);
@@ -214,15 +206,37 @@ int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_te
 	return 0;
 }
 
-int starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template_t cp_template)
-{
-	_starpu_mpi_checkpoint_template_submit(cp_template);
-	return 0;
-}
-
 int starpu_mpi_checkpoint_turn_on(void)
 {
 	starpu_pthread_mutex_init(&cp_template_mutex, NULL);
 	starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank); //TODO: check compatibility with several Comms behaviour
 	return 0;
-}
+}
+
+int starpu_mpi_checkpoint_turn_off(void)
+{
+	for (int i=0 ; i<MAX_CP_TEMPLATE_NUMBER ; i++)
+	{
+		if (cp_template_array[i] == NULL)
+		{
+			break;
+		}
+		_starpu_checkpoint_template_free(cp_template_array[i]);
+		cp_template_array[i] = NULL;
+	}
+	starpu_pthread_mutex_destroy(&cp_template_mutex);
+}
+
+int starpu_mpi_checkpoint_template_register(starpu_mpi_checkpoint_template_t* cp_template, int cp_id, ...)
+{
+	va_list varg_list;
+	va_start(varg_list, cp_id);
+	int ret = _starpu_mpi_checkpoint_template_register(cp_template, cp_id, varg_list);
+	va_end(varg_list);
+	return ret;
+}
+
+int starpu_mpi_checkpoint_template_submit(starpu_mpi_checkpoint_template_t cp_template)
+{
+	return _starpu_mpi_checkpoint_template_submit(cp_template);
+}

+ 17 - 3
mpi/src/starpu_mpi_checkpoint.h

@@ -127,9 +127,23 @@ static inline int _starpu_mpi_checkpoint_template_freeze(starpu_mpi_checkpoint_t
 	return cp_template->size;
 }
 
-
-
-int _starpu_mpi_checkpoint_turn_on(void);
+static inline int _starpu_checkpoint_template_free(starpu_mpi_checkpoint_template_t cp_template)
+{
+	struct _starpu_mpi_checkpoint_template_item* item;
+	struct _starpu_mpi_checkpoint_template_item* next_item;
+	starpu_pthread_mutex_lock(&cp_template->mutex);
+	item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
+	while (item != _starpu_mpi_checkpoint_template_end(cp_template))
+	{
+		next_item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
+		starpu_free(item);
+		item = next_item;
+	}
+	starpu_pthread_mutex_unlock(&cp_template->mutex);
+	starpu_pthread_mutex_destroy(&cp_template->mutex);
+	starpu_free(cp_template);
+	return 0;
+}
 
 // For test purpose
 int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_template);

+ 5 - 1
mpi/src/starpu_mpi_init.c

@@ -92,7 +92,7 @@ void _starpu_mpi_do_initialize(struct _starpu_mpi_argc_argv *argc_argv)
 
 #ifdef STARPU_USE_MPI_FT
 	starpu_mpi_checkpoint_turn_on();
-#endif
+#endif // STARPU_USE_MPI_FT
 }
 
 static
@@ -232,6 +232,10 @@ int starpu_mpi_shutdown(void)
 	void *value;
 	int rank, world_size;
 
+#ifdef STARPU_USE_MPI_FT
+	starpu_mpi_checkpoint_turn_off();
+#endif // STARPU_USE_MPI_FT
+
 	/* We need to get the rank before calling MPI_Finalize to pass to _starpu_mpi_comm_amounts_display() */
 	starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
 	starpu_mpi_comm_size(MPI_COMM_WORLD, &world_size);