Browse Source

Stats data that are unsent cause still in init state

Romain LION 4 years ago
parent
commit
d45995cc92
1 changed files with 14 additions and 10 deletions
  1. 14 10
      mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint.c

+ 14 - 10
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint.c

@@ -202,12 +202,6 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 				mpi_data = _starpu_mpi_data_get(handle);
 				if (starpu_mpi_data_get_rank(handle)==_my_rank)
 				{
-					if (!mpi_data->modified)
-					{
-						_starpu_mpi_checkpoint_tracker_update(cp_template, cp_template->cp_id, cp_template->checkpoint_domain, current_instance);
-						//TODO: check if the data are all acknowledged
-						break; // We don't want to CP a data that is still at initial state.
-					}
 					_STARPU_MPI_DEBUG(0, "Submit CP: sending starPU data to %d (tag %d)\n", item->backupped_by, (int)starpu_mpi_data_get_tag(handle));
 					arg = malloc(sizeof(struct _starpu_mpi_cp_ack_arg_cb));
 					arg->rank = item->backupped_by;
@@ -216,6 +210,14 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 					arg->type = STARPU_R;
 					arg->count = item->count;
 					arg->checkpoint_instance_hint = current_instance;
+					if (!mpi_data->modified)
+					{
+						_starpu_mpi_checkpoint_tracker_update(cp_template, cp_template->cp_id, cp_template->checkpoint_domain, current_instance);
+						//TODO: check if the data are all acknowledged
+						_send_internal_data_stats(arg);
+						free(arg);
+						break; // We don't want to CP a data that is still at initial state.
+					}
 					_starpu_mpi_isend_cache_aware(handle, item->backupped_by, starpu_mpi_data_get_tag(handle), MPI_COMM_WORLD, 1, 0, prio,
 					                              &_send_cp_internal_data_cb, (void*)arg, 1, &arg->cache_flag);
 					// the callbacks need to post ack recv. The cache one needs to release the handle.
@@ -223,10 +225,6 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 				}
 				else if (item->backup_of == starpu_mpi_data_get_rank(handle))
 				{
-					if (!mpi_data->modified)
-					{
-						break; // We don't want to CP a data that is still at initial state.
-					}
 					_STARPU_MPI_DEBUG(0, "Submit CP: receiving starPU data from %d (tag %d)\n", starpu_mpi_data_get_rank(handle), (int)starpu_mpi_data_get_tag(handle));
 					arg = malloc(sizeof(struct _starpu_mpi_cp_ack_arg_cb));
 					arg->rank = item->backup_of;
@@ -236,6 +234,12 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 					arg->count = item->count;
 					arg->msg.checkpoint_id = cp_template->cp_id;
 					arg->msg.checkpoint_instance = current_instance;
+					if (!mpi_data->modified)
+					{
+						starpu_data_acquire_cb(arg->copy_handle, STARPU_R, _recv_internal_dup_ro_cb, arg);
+						free(arg);
+						break; // We don't want to CP a data that is still at initial state.
+					}
 					_starpu_mpi_irecv_cache_aware(handle, starpu_mpi_data_get_rank(handle), starpu_mpi_data_get_tag(handle), MPI_COMM_WORLD, 1, 0,
 												  NULL, NULL, 1, 0, 1, &arg->cache_flag);
 					// The callback needs to do nothing. The cached one must release the handle.