瀏覽代碼

Change instance numeration.
THe instance number is now absolute, so it is easier to check the sequential order between two checkpoints.

Romain LION 5 年之前
父節點
當前提交
7e188bf7cc

+ 6 - 6
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint.c

@@ -106,9 +106,10 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 	struct _starpu_mpi_cp_ack_arg_cb* arg;
 	void* cpy_ptr;
 	struct _starpu_mpi_checkpoint_template_item* item;
+	int current_instance;
 
-	starpu_pthread_mutex_lock(&cp_template->mutex); // Need to lock to ensure cp_template->cp_template_current_instance stay constant
-	checkpoint_template_increment_instance(cp_template);
+	current_instance = increment_current_instance();
+	_starpu_mpi_checkpoint_template_create_instance_tracker(cp_template, cp_template->cp_template_id, current_instance);
 	_starpu_mpi_checkpoint_post_cp_discard_recv(cp_template);
 
 	item = _starpu_mpi_checkpoint_template_get_first_data(cp_template);
@@ -124,7 +125,7 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 				arg->type = STARPU_VALUE;
 				arg->count = item->count;
 				arg->msg.checkpoint_id = cp_template->cp_template_id;
-				arg->msg.checkpoint_instance = cp_template->cp_template_current_instance;
+				arg->msg.checkpoint_instance = current_instance;
 				if (item->backupped_by != -1)
 				{
 					cpy_ptr = malloc(item->count);
@@ -159,7 +160,7 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 					arg->type = STARPU_R;
 					arg->count = item->count;
 					arg->msg.checkpoint_id = cp_template->cp_template_id;
-					arg->msg.checkpoint_instance = cp_template->cp_template_current_instance;
+					arg->msg.checkpoint_instance = current_instance;
 					_starpu_mpi_isend_cache_aware(*handle, item->backupped_by, starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0, 0,
 					                              &_starpu_mpi_push_cp_ack_recv_cb, (void*)arg, &_starpu_mpi_cached_push_cp_ack_recv_cb, (void*)arg, 1);
 					// the callbacks need to post ack recv. The cache one needs to release the handle.
@@ -175,7 +176,7 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 					arg->type = STARPU_R;
 					arg->count = item->count;
 					arg->msg.checkpoint_id = cp_template->cp_template_id;
-					arg->msg.checkpoint_instance = cp_template->cp_template_current_instance;
+					arg->msg.checkpoint_instance = current_instance;
 					_starpu_mpi_irecv_cache_aware(*handle, starpu_mpi_data_get_rank(*handle), starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0,
 					                              NULL, NULL, &_starpu_data_release_cb, (void*)arg->handle, 1, 0, 1);
 					// The callback needs to do nothing. The cached one must release the handle.
@@ -189,7 +190,6 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 		item = _starpu_mpi_checkpoint_template_get_next_data(cp_template, item);
 	}
 
-	starpu_pthread_mutex_unlock(&cp_template->mutex);
 	return 0;
 }
 

+ 22 - 1
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.c

@@ -28,10 +28,12 @@
 
 
 starpu_pthread_mutex_t           cp_template_mutex;
+starpu_pthread_mutex_t           current_instance_mutex;
 starpu_mpi_checkpoint_template_t cp_template_array[MAX_CP_TEMPLATE_NUMBER];
 int                              cp_template_array_size = 0;
 int                              my_rank;
 int                              comm_size;
+int                              current_instance;
 
 struct _starpu_mpi_checkpoint_template_tracking_inst*     last_valid_cp;
 struct _starpu_mpi_checkpoint_template_tracking_inst_list future_tracking_list;
@@ -40,8 +42,27 @@ struct _starpu_mpi_checkpoint_template_tracking_inst_list pending_tracking_list;
 
 typedef int (*backup_of_fn)(int);
 
+int increment_current_instance()
+{
+	int _inst;
+	starpu_pthread_mutex_lock(&current_instance_mutex);
+	_inst = ++current_instance;
+	starpu_pthread_mutex_unlock(&current_instance_mutex);
+	return _inst;
+}
+
+int get_current_instance()
+{
+	int _inst;
+	starpu_pthread_mutex_lock(&current_instance_mutex);
+	_inst = current_instance;
+	starpu_pthread_mutex_unlock(&current_instance_mutex);
+	return _inst;
+}
+
 
 void checkpoint_template_lib_init(void) {
+	starpu_pthread_mutex_init(&current_instance_mutex, NULL);
 	starpu_pthread_mutex_init(&cp_template_mutex, NULL);
 	starpu_mpi_comm_rank(MPI_COMM_WORLD, &my_rank);
 	starpu_mpi_comm_size(MPI_COMM_WORLD, &comm_size);
@@ -49,7 +70,7 @@ void checkpoint_template_lib_init(void) {
 	_starpu_mpi_checkpoint_template_tracking_inst_init(last_valid_cp);
 	_starpu_mpi_checkpoint_template_tracking_inst_list_init(&future_tracking_list);
 	_starpu_mpi_checkpoint_template_tracking_inst_list_init(&pending_tracking_list);
-
+	current_instance = 0;
 	_starpu_mpi_set_debug_level_max(1000);
 }
 

+ 5 - 7
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.h

@@ -37,6 +37,9 @@ extern starpu_mpi_checkpoint_template_t cp_template_array[MAX_CP_TEMPLATE_NUMBER
 extern struct _starpu_mpi_checkpoint_template_tracking_inst_list future_tracking_list;
 extern struct _starpu_mpi_checkpoint_template_tracking_inst_list pending_tracking_list;
 
+int increment_current_instance();
+int get_current_instance();
+
 void checkpoint_template_lib_init(void);
 
 void checkpoint_template_lib_quit(void);
@@ -194,8 +197,9 @@ static int _starpu_mpi_checkpoint_template_create_instance_tracker(starpu_mpi_ch
 static int _starpu_mpi_checkpoint_template_add_future_inst(starpu_mpi_checkpoint_template_t cp_template, int cp_id, int cp_inst)
 {
 	struct _starpu_mpi_checkpoint_template_tracking_inst* item;
+	int current_instance = get_current_instance();
 	item = _starpu_mpi_checkpoint_template_get_tracking_inst_by_id_inst(future_tracking_list, cp_id, cp_inst);
-	_STARPU_MPI_DEBUG(10, "I received an ack msg for a checkpoint(id:%d) instance I did not initiated yet(received:%d - last:%d). Let's remember it's already acknowledged.\n", cp_id, cp_inst, cp_template->cp_template_current_instance);
+	_STARPU_MPI_DEBUG(10, "I received an ack msg for a checkpoint(id:%d) instance I did not initiated yet(received:%d - last:%d). Let's remember it's already acknowledged.\n", cp_id, cp_inst, current_instance);
 	if (item != NULL)
 	{
 		item->ack_msg_count++;
@@ -265,11 +269,6 @@ static starpu_mpi_checkpoint_template_t _starpu_mpi_get_checkpoint_template_by_i
 //	return -ret;
 //}
 
-static inline int checkpoint_template_increment_instance(starpu_mpi_checkpoint_template_t cp_template)
-{
-	cp_template->cp_template_current_instance++;
-	return _starpu_mpi_checkpoint_template_create_instance_tracker(cp_template, cp_template->cp_template_id, cp_template->cp_template_current_instance);
-}
 
 static inline int checkpoint_template_array_realloc(int** array, int* max_size, int growth_factor)
 {
@@ -311,7 +310,6 @@ static inline starpu_mpi_checkpoint_template_t _starpu_mpi_checkpoint_template_n
 	starpu_mpi_checkpoint_template_t _cp_template;
 	_STARPU_MPI_CALLOC(_cp_template, 1, sizeof(struct _starpu_mpi_checkpoint_template));
 	_cp_template->cp_template_id               = cp_id;
-	_cp_template->cp_template_current_instance = 0;
 	_cp_template->backup_of_array_max_size     = _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE;
 	starpu_malloc((void**)&_cp_template->backup_of_array, _CHECKPOINT_TEMPLATE_BACKUPED_RANK_ARRAY_DEFAULT_SIZE);
 	_cp_template->backup_of_array[0] = -1;