Pārlūkot izejas kodu

Rearrange file structure

Romain LION 5 gadi atpakaļ
vecāks
revīzija
fa46a49d58

+ 48 - 59
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint.c

@@ -15,41 +15,73 @@
  */
 
 
-#include <stdarg.h>
 #include <stdlib.h>
-#include <common/utils.h>
 
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
 #include <mpi_failure_tolerance/starpu_mpi_ft_service_comms.h>
-#include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
-#include <sys/param.h>
 #include <starpu_mpi_private.h>
 #include <mpi/starpu_mpi_mpi_backend.h> // Should be deduced at preprocessing (Nmad vs MPI)
-#include <mpi/starpu_mpi_mpi.h>
 #include "starpu_mpi_cache.h"
-#include "starpu_mpi_ft_service_comms.h"
 
 #define MAX_CP_TEMPLATE_NUMBER 32 // Arbitrary limit
 
 starpu_pthread_mutex_t           cp_lib_mutex;
 int                              my_rank;
 
-void _starpu_checkpoint_cached_data_recv_copy_and_ack(void* _arg);
-void _starpu_checkpoint_data_recv_copy_and_ack(void* _arg);
-void _starpu_checkpoint_cached_data_send_copy_and_ack(void* _arg);
-void _starpu_checkpoint_data_send_copy_and_ack(void* _args);
 
-void _starpu_mpi_push_cp_ack_recv_cb(void* _args);
-void _starpu_mpi_push_cp_ack_send_cb(void* _args);
-void _starpu_mpi_treat_cache_ack_no_lock_cb(void* _args);
-
-//extern struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count);
-//extern struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg, int sequential_consistency);
 extern struct _starpu_mpi_req* _starpu_mpi_isend_cache_aware(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *_arg, void (*alt_callback)(void *), void *_alt_arg, int sequential_consistency);
 extern struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *_arg, void (*alt_callback)(void *), void *_alt_arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count);
 
 
+
+void _starpu_mpi_treat_ack_receipt_cb(void* _args)
+{
+	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
+	fprintf(stderr, "ack msg recved id:%d inst:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
+	if (_checkpoint_template_digest_ack_reception(arg->msg.checkpoint_id, arg->msg.checkpoint_instance) == 0) {
+		free(arg);
+	}
+}
+
+void _print_ack_sent_cb(void* _args)
+{
+	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
+	fprintf(stderr, "Sent succeeded cpid:%d, cpinst:%d, dest:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
+	free(_args);
+}
+
+void _starpu_mpi_push_cp_ack_send_cb(void* _args)
+{
+	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
+	fprintf(stderr, "Send ack msg to %d: id=%d inst=%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
+	_ft_service_msg_isend_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _print_ack_sent_cb, _args);
+
+}
+
+void _starpu_mpi_push_cp_ack_recv_cb(void* _args)
+{
+	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
+	fprintf(stderr, "Posting ack recv cb from %d\n", arg->rank);
+	_ft_service_msg_irecv_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _starpu_mpi_treat_ack_receipt_cb, _args);
+}
+
+
+void _starpu_checkpoint_cached_data_recv_copy_and_ack(void* _arg)
+{
+	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _arg;
+	starpu_data_register_same(&arg->copy_handle, arg->handle);
+	starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_push_cp_ack_send_cb, _arg);
+	starpu_data_release(arg->handle);
+}
+
+void _starpu_checkpoint_data_recv_copy_and_ack(void* _arg)
+{
+	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _arg;
+	starpu_data_register_same(&arg->copy_handle, arg->handle);
+	starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_push_cp_ack_send_cb, _arg);
+}
+
 int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template)
 {
 	// TODO: For now checkpoint are not taken asynchronously. It will be later, and then we will have to acquire READ permissions to StarPU in order to not have the data potentially corrupted.
@@ -125,27 +157,7 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 //	starpu_pthread_mutex_unlock(&cp_template->mutex);
 //}
 
-void _print_ack_sent_cb(void* _args)
-{
-	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
-	fprintf(stderr, "Sent succeeded cpid:%d, cpinst:%d, dest:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
-	free(_args);
-}
 
-void _starpu_checkpoint_cached_data_recv_copy_and_ack(void* _arg)
-{
-	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _arg;
-	starpu_data_register_same(&arg->copy_handle, arg->handle);
-	starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_push_cp_ack_send_cb, _arg);
-	starpu_data_release(arg->handle);
-}
-
-void _starpu_checkpoint_data_recv_copy_and_ack(void* _arg)
-{
-	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _arg;
-	starpu_data_register_same(&arg->copy_handle, arg->handle);
-	starpu_data_cpy(arg->copy_handle, arg->handle, 1, _starpu_mpi_push_cp_ack_send_cb, _arg);
-}
 //
 //void _starpu_checkpoint_cached_data_send_copy_and_ack(void* _arg)
 //{
@@ -168,26 +180,3 @@ void _starpu_checkpoint_data_recv_copy_and_ack(void* _arg)
 //	cp_template->remaining_ack_awaited--;
 //}
 
-void _starpu_mpi_treat_ack_receipt_cb(void* _args)
-{
-	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
-	fprintf(stderr, "ack msg recved id:%d inst:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
-	if (_checkpoint_template_digest_ack_reception(arg->msg.checkpoint_id, arg->msg.checkpoint_instance) == 0) {
-		free(arg);
-	}
-}
-
-void _starpu_mpi_push_cp_ack_send_cb(void* _args)
-{
-	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
-	fprintf(stderr, "Send ack msg to %d: id=%d inst=%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
-	_ft_service_msg_isend_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _print_ack_sent_cb, _args);
-
-}
-
-void _starpu_mpi_push_cp_ack_recv_cb(void* _args)
-{
-	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
-	fprintf(stderr, "Posting ack recv cb from %d\n", arg->rank);
-	_ft_service_msg_irecv_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _starpu_mpi_treat_ack_receipt_cb, _args);
-}

+ 0 - 1
mpi/src/mpi_failure_tolerance/starpu_mpi_ft_service_comms.c

@@ -124,7 +124,6 @@ static void _starpu_mpi_handle_ft_request_termination(struct _starpu_mpi_req *re
 					// We need to make sure the communication for sending the size
 					// has completed, as MPI can re-order messages, let's call
 					// MPI_Wait to make sure data have been sent
-					int ret;
 					starpu_free_on_node_flags(STARPU_MAIN_RAM, (uintptr_t)req->ptr, req->count, 0);
 					req->ptr = NULL;
 				}