Prechádzať zdrojové kódy

Change handle reg/unreg to make it consistent

Romain LION 5 rokov pred
rodič
commit
a05d7d821f

+ 17 - 9
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint.c

@@ -62,12 +62,21 @@ void _starpu_mpi_store_data_and_send_ack_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
 	if (STARPU_VALUE == arg->type) {
-		// an handle as specificaly been created, no need to copy the data. Call directly the Callback
-		arg->copy_handle = arg->handle;
+		// an handle has specifically been created, Let's get the value back, and unregister the handle
+		arg->copy_handle = starpu_data_handle_to_pointer(arg->handle, STARPU_MAIN_RAM);
+		starpu_data_unregister_submit(arg->handle);
 	}
 	checkpoint_package_data_add(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank, arg->tag, arg->type, arg->copy_handle, arg->count);
 	_STARPU_MPI_DEBUG(3,"Send ack msg to %d: id=%d inst=%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
 	_ft_service_msg_isend_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _arg_free, _args);
+
+}void _starpu_mpi_release_and_store_data_and_send_ack_cb(void* _args)
+{
+	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
+	starpu_data_release(arg->copy_handle);
+	checkpoint_package_data_add(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank, arg->tag, arg->type, arg->copy_handle, arg->count);
+	_STARPU_MPI_DEBUG(3,"Send ack msg to %d: id=%d inst=%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
+	_ft_service_msg_isend_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _arg_free, _args);
 }
 
 void _starpu_mpi_push_cp_ack_recv_cb(void* _args)
@@ -76,7 +85,7 @@ void _starpu_mpi_push_cp_ack_recv_cb(void* _args)
 	if (STARPU_VALUE == arg->type)
 	{
 		free(starpu_data_handle_to_pointer(arg->handle, STARPU_MAIN_RAM));
-		starpu_data_unregister(arg->handle);
+		starpu_data_unregister_submit(arg->handle);
 	}
 	_STARPU_MPI_DEBUG(3, "Posting ack recv cb from %d\n", arg->rank);
 	_ft_service_msg_irecv_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _starpu_mpi_treat_ack_receipt_cb, _args);
@@ -120,7 +129,6 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 			case STARPU_VALUE:
 				// TODO: Maybe do not pass via starpu handles for external data, and need to reimplement mpi comm layer for
 				arg = calloc(1, sizeof(struct _starpu_mpi_cp_ack_arg_cb));
-				handle = &arg->handle;
 				arg->tag = item->tag;
 				arg->type = STARPU_VALUE;
 				arg->count = item->count;
@@ -130,20 +138,20 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 				{
 					cpy_ptr = malloc(item->count);
 					memcpy(cpy_ptr, item->ptr, item->count);
-					starpu_variable_data_register(handle, STARPU_MAIN_RAM, (uintptr_t)cpy_ptr, item->count);
+					starpu_variable_data_register(&arg->handle, STARPU_MAIN_RAM, (uintptr_t)cpy_ptr, item->count);
 					arg->rank = item->backupped_by;
 					_STARPU_MPI_DEBUG(0, "Submit CP: sending external data:%d, tag:%ld, to :%d\n", (int)(*(int*)cpy_ptr), arg->tag, arg->rank);
-					starpu_mpi_isend_detached_prio(*handle, arg->rank, arg->tag, 0, MPI_COMM_WORLD,
+					starpu_mpi_isend_detached_prio(arg->handle, arg->rank, arg->tag, 0, MPI_COMM_WORLD,
 												   &_starpu_mpi_push_cp_ack_recv_cb, (void*)arg);
 					// The callback needs to free the handle specially created for the send, and post ack recv
 				}
 				else if (item->backup_of != -1)
 				{
 					cpy_ptr = malloc(item->count);
-					starpu_variable_data_register(handle, STARPU_MAIN_RAM, (uintptr_t)cpy_ptr, item->count);
+					starpu_variable_data_register(&arg->handle, STARPU_MAIN_RAM, (uintptr_t)cpy_ptr, item->count);
 					arg->rank = item->backup_of;
 					_STARPU_MPI_DEBUG(0, "Submit CP: receiving external data tag:%ld, from :%d\n", arg->tag, arg->rank);
-					starpu_mpi_irecv_detached(*handle, arg->rank, arg->tag, MPI_COMM_WORLD,
+					starpu_mpi_irecv_detached(arg->handle, arg->rank, arg->tag, MPI_COMM_WORLD,
 											  &_starpu_mpi_store_data_and_send_ack_cb, (void*)arg);
 					// The callback needs to store the received data and post ack send
 				}
@@ -181,7 +189,7 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 					                              NULL, NULL, &_starpu_data_release_cb, (void*)arg->handle, 1, 0, 1);
 					// The callback needs to do nothing. The cached one must release the handle.
 					starpu_data_dup_ro(&arg->copy_handle, arg->handle, 1);
-					starpu_data_acquire_cb(arg->copy_handle, STARPU_R, _starpu_mpi_store_data_and_send_ack_cb, arg);
+					starpu_data_acquire_cb(arg->copy_handle, STARPU_R, _starpu_mpi_release_and_store_data_and_send_ack_cb, arg);
 					// The callback need to store the data and post ack send.
 				}
 				break;