Przeglądaj źródła

Continue changing printf
Use starpu_data_dup_ro

Romain LION 4 lat temu
rodzic
commit
bbd7f5f9ed

+ 17 - 11
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint.c

@@ -39,7 +39,7 @@ extern struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_
 void _starpu_mpi_treat_ack_receipt_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
-	_STARPU_DEBUG(3, "ack msg recved id:%d inst:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
+	_STARPU_MPI_DEBUG(3, "ack msg recved id:%d inst:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
 	if (_checkpoint_template_digest_ack_reception(arg->msg.checkpoint_id, arg->msg.checkpoint_instance) == 0) {
 		free(arg);
 	}
@@ -48,14 +48,14 @@ void _starpu_mpi_treat_ack_receipt_cb(void* _args)
 void _arg_free(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
-	_STARPU_DEBUG(3,stderr, "Ack send succeeded cpid:%d, cpinst:%d, dest:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
-	free(_args);
+	_STARPU_MPI_DEBUG(3, "Ack send succeeded cpid:%d, cpinst:%d, dest:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
+	free(arg);
 }
 
 void _starpu_mpi_push_cp_ack_send_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
-	_STARPU_DEBUG(3,"Send ack msg to %d: id=%d inst=%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
+	_STARPU_MPI_DEBUG(3,"Send ack msg to %d: id=%d inst=%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
 	_ft_service_msg_isend_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _arg_free, _args);
 }
 
@@ -74,7 +74,7 @@ void _starpu_mpi_push_cp_ack_recv_cb(void* _args)
 		free(starpu_data_handle_to_pointer(arg->handle, STARPU_MAIN_RAM));
 		starpu_data_unregister(arg->handle);
 	}
-	_STARPU_DEBUG(3, "Posting ack recv cb from %d\n", arg->rank);
+	_STARPU_MPI_DEBUG(3, "Posting ack recv cb from %d\n", arg->rank);
 	_ft_service_msg_irecv_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _starpu_mpi_treat_ack_receipt_cb, _args);
 }
 
@@ -82,8 +82,9 @@ void _starpu_mpi_push_cp_ack_recv_cb(void* _args)
 void _starpu_checkpoint_cached_data_recv_copy_and_ack(void* _arg)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _arg;
-	starpu_data_register_same(&arg->copy_handle, arg->handle);
-	starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_store_data_and_push_cp_ack_send_cb, _arg);
+	starpu_data_dup_ro(&arg->copy_handle, arg->handle, 1, _starpu_mpi_store_data_and_push_cp_ack_send_cb, _arg);
+//	starpu_data_register_same(&arg->copy_handle, arg->handle);
+//	starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_store_data_and_push_cp_ack_send_cb, _arg);
 	starpu_data_release(arg->handle);
 }
 
@@ -95,11 +96,16 @@ void _starpu_checkpoint_data_recv_copy_and_ack(void* _arg)
 	{
 		// an handle as specificaly been created, no need to copy the data. Call directly the Callback
 		arg->copy_handle = arg->handle;
-		return _starpu_mpi_store_data_and_push_cp_ack_send_cb(_arg);
+		_starpu_mpi_store_data_and_push_cp_ack_send_cb(_arg);
+		return;
+	}
+	else if (STARPU_R == arg->type)
+	{
+		starpu_data_dup_ro(&arg->copy_handle, arg->handle, 1, _starpu_mpi_store_data_and_push_cp_ack_send_cb, _arg);
+//		starpu_data_register_same(&arg->copy_handle, arg->handle);
+//		starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_store_data_and_push_cp_ack_send_cb, _arg);
+		return;
 	}
-
-	starpu_data_register_same(&arg->copy_handle, arg->handle);
-	starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_store_data_and_push_cp_ack_send_cb, _arg);
 }
 
 int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template)

+ 4 - 3
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_package.c

@@ -38,7 +38,7 @@ int checkpoint_package_data_add(int cp_id, int cp_inst, int rank, starpu_mpi_tag
 	checkpoint_data->ptr = ptr;
 	checkpoint_data->count = count;
 	_starpu_mpi_checkpoint_data_list_push_back(checkpoint_data_list, checkpoint_data);
-	_STARPU_DEBUG(8, "CP data added - cpid:%d - cpinst:%d - rank:%d - tag:%ld\n", checkpoint_data->cp_id, checkpoint_data->cp_inst, checkpoint_data->rank, checkpoint_data->tag);
+	_STARPU_MPI_DEBUG(8, "CP data added - cpid:%d - cpinst:%d - rank:%d - tag:%ld\n", checkpoint_data->cp_id, checkpoint_data->cp_inst, checkpoint_data->rank, checkpoint_data->tag);
 	return 0;
 }
 
@@ -50,7 +50,8 @@ int checkpoint_package_data_del(int cp_id, int cp_inst, int rank)
 	while (checkpoint_data != _starpu_mpi_checkpoint_data_list_end(checkpoint_data_list))
 	{
 		next_checkpoint_data = _starpu_mpi_checkpoint_data_list_next(checkpoint_data);
-		if (checkpoint_data->cp_id==cp_id && checkpoint_data->cp_inst==cp_inst
+		if (!(checkpoint_data->cp_id==cp_id && checkpoint_data->cp_inst==cp_inst)
+//		if ((checkpoint_data->cp_id==cp_id && checkpoint_data->cp_inst==cp_inst)
 			&& checkpoint_data->rank==rank)
 		{
 			if (checkpoint_data->type==STARPU_R)
@@ -67,7 +68,7 @@ int checkpoint_package_data_del(int cp_id, int cp_inst, int rank)
 		}
 		checkpoint_data = next_checkpoint_data;
 	}
-	_STARPU_DEBUG(2, "cleared %d data from checkpoint database.\n", done);
+	_STARPU_MPI_DEBUG(0, "cleared %d data from checkpoint database.\n", done);
 
 	return 0;
 }

+ 14 - 2
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.c

@@ -86,6 +86,18 @@ int valid_pending_checkpoint_template(starpu_mpi_checkpoint_template_t _pending_
 	return 0;
 }
 
+int _starpu_mpi_checkpoint_template_add_data(starpu_mpi_checkpoint_template_t cp_template, int type, void* ptr, int count, int backupped_by, int backup_of, starpu_mpi_tag_t tag)
+{
+	starpu_pthread_mutex_lock(&cp_template->mutex);
+	STARPU_ASSERT_MSG(!cp_template->frozen, "It is not possible to modify registered checkpoint template.\n");
+	struct _starpu_mpi_checkpoint_template_item* item;
+	item = _starpu_mpi_checkpoint_template_item_create(type, ptr, count, backupped_by, backup_of, tag);
+	_starpu_mpi_checkpoint_template_item_list_push_back(&cp_template->list, item);
+	_checkpoint_template_add_to_backup_arrays(cp_template, backupped_by, backup_of);
+	starpu_pthread_mutex_unlock(&cp_template->mutex);
+	return 0;
+}
+
 int starpu_mpi_checkpoint_template_create(starpu_mpi_checkpoint_template_t* cp_template, int cp_id)
 {
 	*cp_template = _starpu_mpi_checkpoint_template_new(cp_id);
@@ -182,7 +194,7 @@ void checkpoint_discard(void* _args)
 {
 	// TODO: flag data as "CP ready", since the CP has succeeded
 	struct _starpu_mpi_cp_discard_arg_cb* arg = (struct _starpu_mpi_cp_discard_arg_cb*) _args;
-	_STARPU_MPI_DEBUG(0, "DISCARDING OLD CHECKPOINT DATA - new one is CPID:%d - CPINST:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
+	_STARPU_MPI_DEBUG(0, "DISCARDING OLD CHECKPOINT DATA of rank %d - new one is CPID:%d - CPINST:%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
 	checkpoint_package_data_del(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
 }
 
@@ -388,7 +400,7 @@ int _checkpoint_template_digest_ack_reception(int checkpoint_id, int checkpoint_
 		_STARPU_MPI_DEBUG(20, "Inst found, remaining ack msg awaited:%d\n", cp_template->remaining_ack_awaited);
 		if (cp_template->remaining_ack_awaited == 0)
 		{
-			_STARPU_MPI_DEBUG(20, "All cp material for cpid:%d, cpinst:%d - have been sent and acknowledged.\n", checkpoint_id, checkpoint_instance);
+			_STARPU_MPI_DEBUG(0, "My CP has been successfully saved and acknowledged - cpid:%d, cpinst:%d\n", checkpoint_id, checkpoint_instance);
 			_starpu_mpi_checkpoint_post_cp_discard_send(cp_template, checkpoint_id, checkpoint_instance);
 			valid_pending_checkpoint_template(cp_template);
 			cp_template->pending=0;

+ 1 - 12
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.h

@@ -153,18 +153,7 @@ static inline int _checkpoint_template_add_to_backup_arrays(starpu_mpi_checkpoin
 	{
 		_STARPU_DISP("[warning] Checkpoint template item does not refer any backup information. This should not happen.\n");
 	}
-}
-
-static int _starpu_mpi_checkpoint_template_add_data(starpu_mpi_checkpoint_template_t cp_template, int type, void* ptr, int count, int backupped_by, int backup_of, starpu_mpi_tag_t tag)
-{
-	starpu_pthread_mutex_lock(&cp_template->mutex);
-	STARPU_ASSERT_MSG(!cp_template->frozen, "It is not possible to modify registered checkpoint template.\n");
-	struct _starpu_mpi_checkpoint_template_item* item;
-	item = _starpu_mpi_checkpoint_template_item_create(type, ptr, count, backupped_by, backup_of, tag);
-	_starpu_mpi_checkpoint_template_item_list_push_back(&cp_template->list, item);
-	_checkpoint_template_add_to_backup_arrays(cp_template, backupped_by, backup_of);
-	starpu_pthread_mutex_unlock(&cp_template->mutex);
-	return 0;
+	return -1;
 }
 
 static inline struct _starpu_mpi_checkpoint_template_item* _starpu_mpi_checkpoint_template_get_first_data(starpu_mpi_checkpoint_template_t template)

+ 26 - 9
mpi/src/mpi_failure_tolerance/starpu_mpi_ft_service_comms.c

@@ -33,6 +33,22 @@ static struct _starpu_mpi_req_list detached_ft_service_requests;
 static unsigned detached_send_n_ft_service_requests;
 static starpu_pthread_mutex_t detached_ft_service_requests_mutex;
 
+#ifdef STARPU_MPI_VERBOSE
+static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type)
+{
+	switch (request_type)
+	{
+		case SEND_REQ: return "SEND_REQ";
+		case RECV_REQ: return "RECV_REQ";
+		case WAIT_REQ: return "WAIT_REQ";
+		case TEST_REQ: return "TEST_REQ";
+		case BARRIER_REQ: return "BARRIER_REQ";
+		case UNKNOWN_REQ: return "UNSET_REQ";
+		default: return "unknown request type";
+	}
+}
+#endif
+
 int _ft_service_msg_recv_send_common(void* ptr, int count, int rank, int tag, int req_type, MPI_Comm comm, void (*callback)(void *), void* arg)
 {
 	struct _starpu_mpi_req* req;
@@ -89,15 +105,16 @@ int _ft_service_msg_recv_send_common(void* ptr, int count, int rank, int tag, in
 	return 0;
 }
 
-//inline int _ft_service_msg_isend_cb(void* msg, int count, int rank, int tag, MPI_Comm comm, void (*callback)(void *), void* arg)
-//{
-//	return _ft_service_msg_recv_send_common(msg, count, rank, tag, SEND_REQ, comm, callback, arg);
-//}
-//
-//inline int _ft_service_msg_irecv_cb(void* msg, int count, int rank, int tag, MPI_Comm comm, void (*callback)(void *), void* arg)
-//{
-//	return _ft_service_msg_recv_send_common(msg, count, rank, tag, SEND_REQ, comm, callback, arg);
-//}
+int _ft_service_msg_isend_cb(void* msg, int count, int rank, int tag, MPI_Comm comm, void (*callback)(void *), void* arg)
+{
+	return _ft_service_msg_recv_send_common(msg, count, rank, tag, SEND_REQ, comm, callback, arg);
+}
+
+int _ft_service_msg_irecv_cb(void* msg, int count, int rank, int tag, MPI_Comm comm, void (*callback)(void *), void* arg)
+{
+	return _ft_service_msg_recv_send_common(msg, count, rank, tag, RECV_REQ, comm, callback, arg);
+}
+
 
 static void _starpu_mpi_handle_ft_request_termination(struct _starpu_mpi_req *req)
 {

+ 2 - 10
mpi/src/mpi_failure_tolerance/starpu_mpi_ft_service_comms.h

@@ -23,16 +23,8 @@ extern "C"
 #endif
 
 int _ft_service_msg_recv_send_common(void* ptr, int count, int rank, int tag, int req_type, MPI_Comm comm, void (*callback)(void *), void* arg);
-
-inline int _ft_service_msg_isend_cb(void* msg, int count, int rank, int tag, MPI_Comm comm, void (*callback)(void *), void* arg)
-{
-	return _ft_service_msg_recv_send_common(msg, count, rank, tag, SEND_REQ, comm, callback, arg);
-}
-
-inline int _ft_service_msg_irecv_cb(void* msg, int count, int rank, int tag, MPI_Comm comm, void (*callback)(void *), void* arg)
-{
-	return _ft_service_msg_recv_send_common(msg, count, rank, tag, RECV_REQ, comm, callback, arg);
-}
+int _ft_service_msg_isend_cb(void* msg, int count, int rank, int tag, MPI_Comm comm, void (*callback)(void *), void* arg);
+int _ft_service_msg_irecv_cb(void* msg, int count, int rank, int tag, MPI_Comm comm, void (*callback)(void *), void* arg);
 
 void starpu_mpi_test_ft_detached_service_requests(void);
 int starpu_mpi_ft_service_lib_init();

+ 1 - 1
mpi/src/starpu_mpi.c

@@ -183,7 +183,7 @@ struct _starpu_mpi_req* _starpu_mpi_isend_cache_aware(starpu_data_handle_t data_
 	{
 		if (data_tag == -1)
 			_STARPU_ERROR("StarPU needs to be told the MPI tag of this data, using starpu_mpi_data_register\n");
-		_STARPU_MPI_DEBUG(1, "Receiving data %p from %d\n", data_handle, mpi_rank);
+		_STARPU_MPI_DEBUG(1, "Send data %p to %d\n", data_handle, dest);
 		req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, detached, sync, prio, callback, _arg, sequential_consistency);
 	}
 	else