Romain LION 5 éve
szülő
commit
132133247c

+ 9 - 0
configure.ac

@@ -580,17 +580,25 @@ AM_CONDITIONAL(STARPU_USE_MPI, test x$build_nmad_lib = xyes -o x$build_mpi_lib =
 AC_ARG_ENABLE(mpi-ft, AC_HELP_STRING([--enable-mpi-ft], [Enable failure tolerance mechanisms provided by StarPU]),
 	      [enable_mpi_ft=$enableval], [enable_mpi_ft=$default_enable_mpi_ft])
 
+default_enable_mpi_ft_stats=no
+use_mpi_ft_stats=no
+AC_ARG_ENABLE(mpi-ft-stats, AC_HELP_STRING([--enable-mpi-ft-stats], [Enable stats for failure tolerance mechanisms]),
+	      [enable_mpi_ft_stats=$enableval], [enable_mpi_ft_stats=$default_enable_mpi_ft_stats])
+
 # TODO: Check MPI version to be ULFM
 if test x$enable_mpi_ft = xyes ; then
     if test x$build_mpi_lib != xyes ; then
         AC_MSG_ERROR([Failure tolerance mechanisms only work with a particular MPI implementation: ULFM (OpenMPI based).])
     else
         AC_DEFINE(STARPU_USE_MPI_FT, [1], [whether the StarPU MPI failure tolerance mechanisms are requested])
+        AC_DEFINE(STARPU_USE_MPI_FT_STATS, [1], [whether the StarPU MPI failure tolerance mechanisms stats are watched])
         use_mpi_ft=yes;
+        use_mpi_ft_stats=$enable_mpi_ft_stats;
     fi
 fi
 
 AM_CONDITIONAL(STARPU_USE_MPI_FT, [test x$use_mpi_ft = xyes])
+AM_CONDITIONAL(STARPU_USE_MPI_FT_STATS, [test x$use_mpi_ft_stats = xyes])
 
 ###### End of failure tolerance material ######
 
@@ -3707,6 +3715,7 @@ AC_MSG_NOTICE([
 	StarPU Extensions:
 	       StarPU MPI enabled:                            $build_mpi_lib
 	       StarPU MPI failure tolerance:                  $enable_mpi_ft
+	       StarPU MPI failure tolerance stats:            $use_mpi_ft_stats
 	       StarPU MPI(nmad) enabled:                      $build_nmad_lib
 	       MPI test suite:                                $running_mpi_check
 	       Master-Slave MPI enabled:                      $use_mpi_master_slave

+ 1 - 0
include/starpu_config.h.in

@@ -119,6 +119,7 @@
 #undef STARPU_USE_MPI_MPI
 #undef STARPU_USE_MPI_NMAD
 #undef STARPU_USE_MPI_FT
+#undef STARPU_USE_MPI_FT_STATS
 
 #undef STARPU_ATLAS
 #undef STARPU_GOTO

+ 4 - 2
mpi/src/Makefile.am

@@ -90,7 +90,8 @@ noinst_HEADERS +=       \
 	mpi_failure_tolerance/starpu_mpi_checkpoint_template.h \
 	mpi_failure_tolerance/starpu_mpi_ft_service_comms.h  \
 	mpi_failure_tolerance/starpu_mpi_checkpoint_package.h \
-	mpi_failure_tolerance/starpu_mpi_checkpoint_tracker.h
+	mpi_failure_tolerance/starpu_mpi_checkpoint_tracker.h \
+	mpi_failure_tolerance/starpu_mpi_ft_stats.h
 endif STARPU_USE_MPI_FT
 
 libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES =	\
@@ -131,5 +132,6 @@ libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES +=    \
 	mpi_failure_tolerance/starpu_mpi_checkpoint_template.c   \
 	mpi_failure_tolerance/starpu_mpi_ft_service_comms.c \
  	mpi_failure_tolerance/starpu_mpi_checkpoint_package.c  \
- 	mpi_failure_tolerance/starpu_mpi_checkpoint_tracker.c
+ 	mpi_failure_tolerance/starpu_mpi_checkpoint_tracker.c  \
+ 	mpi_failure_tolerance/starpu_mpi_ft_stats.c
 endif STARPU_USE_MPI_FT

+ 65 - 39
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint.c

@@ -21,6 +21,7 @@
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
 #include <mpi_failure_tolerance/starpu_mpi_ft_service_comms.h>
+#include <mpi_failure_tolerance/starpu_mpi_ft_stats.h>
 #include <starpu_mpi_private.h>
 #include <mpi/starpu_mpi_mpi_backend.h> // Should be deduced at preprocessing (Nmad vs MPI)
 #include "starpu_mpi_cache.h"
@@ -36,6 +37,13 @@ extern struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_
 
 
 
+void _arg_free(void* _args)
+{
+	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
+	_STARPU_MPI_DEBUG(3, "Ack send succeeded cpid:%d, cpinst:%d, dest:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
+	free(arg);
+}
+
 void _starpu_mpi_treat_ack_receipt_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
@@ -51,60 +59,76 @@ void _starpu_mpi_treat_ack_receipt_cb(void* _args)
 	}
 }
 
-void _arg_free(void* _args)
+void _starpu_mpi_store_data_and_send_ack_cb(struct _starpu_mpi_cp_ack_arg_cb* arg)
+{
+	checkpoint_package_data_add(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank, arg->tag, arg->type, arg->copy_handle, arg->count);
+	_STARPU_MPI_DEBUG(3,"Send ack msg to %d: id=%d inst=%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
+	_ft_service_msg_isend_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _arg_free, arg);
+	_STARPU_MPI_FT_STATS_SEND_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg));
+}
+
+void _starpu_mpi_push_cp_ack_recv_cb(struct _starpu_mpi_cp_ack_arg_cb* arg)
+{
+	_STARPU_MPI_DEBUG(3, "Posting ack recv cb from %d\n", arg->rank);
+	_ft_service_msg_irecv_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _starpu_mpi_treat_ack_receipt_cb, arg);
+	_STARPU_MPI_FT_STATS_RECV_FT_SERVICE_MSG(sizeof(struct _starpu_mpi_cp_ack_msg));
+}
+
+void _recv_internal_dup_ro_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
-	_STARPU_MPI_DEBUG(3, "Ack send succeeded cpid:%d, cpinst:%d, dest:%d\n", arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank);
-	free(arg);
+	starpu_data_release(arg->copy_handle);
+	_starpu_mpi_store_data_and_send_ack_cb(arg);
 }
 
-void _starpu_mpi_store_data_and_send_ack_cb(void* _args)
+void _recv_cp_external_data_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
-	if (STARPU_VALUE == arg->type) {
-		// an handle has specifically been created, Let's get the value back, and unregister the handle
-		arg->copy_handle = starpu_data_handle_to_pointer(arg->handle, STARPU_MAIN_RAM);
-		starpu_data_unregister_submit(arg->handle);
-	}
-	checkpoint_package_data_add(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank, arg->tag, arg->type, arg->copy_handle, arg->count);
-	_STARPU_MPI_DEBUG(3,"Send ack msg to %d: id=%d inst=%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
-	_ft_service_msg_isend_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _arg_free, _args);
+	_STARPU_MPI_FT_STATS_RECV_CP_DATA(arg->type==STARPU_VALUE?arg->count:arg->type==STARPU_R?starpu_data_get_size(arg->handle):-1);
+	// an handle has specifically been created, Let's get the value back, and unregister the handle
+	arg->copy_handle = starpu_data_handle_to_pointer(arg->handle, STARPU_MAIN_RAM);
+	starpu_data_unregister_submit(arg->handle);
+	_starpu_mpi_store_data_and_send_ack_cb(arg);
+}
 
-}void _starpu_mpi_release_and_store_data_and_send_ack_cb(void* _args)
+void _recv_cp_internal_data_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
-	starpu_data_release(arg->copy_handle);
-	checkpoint_package_data_add(arg->msg.checkpoint_id, arg->msg.checkpoint_instance, arg->rank, arg->tag, arg->type, arg->copy_handle, arg->count);
-	_STARPU_MPI_DEBUG(3,"Send ack msg to %d: id=%d inst=%d\n", arg->rank, arg->msg.checkpoint_id, arg->msg.checkpoint_instance);
-	_ft_service_msg_isend_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _arg_free, _args);
+	_STARPU_MPI_FT_STATS_RECV_CP_DATA(
+			arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->handle) : -1);
 }
 
-void _starpu_mpi_push_cp_ack_recv_cb(void* _args)
+void _recv_cached_cp_internal_data_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
-	if (STARPU_VALUE == arg->type)
-	{
-		free(starpu_data_handle_to_pointer(arg->handle, STARPU_MAIN_RAM));
-		starpu_data_unregister_submit(arg->handle);
-	}
-	_STARPU_MPI_DEBUG(3, "Posting ack recv cb from %d\n", arg->rank);
-	_ft_service_msg_irecv_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _starpu_mpi_treat_ack_receipt_cb, _args);
+	_STARPU_MPI_FT_STATS_RECV_CACHED_CP_DATA(arg->type==STARPU_VALUE?arg->count:arg->type==STARPU_R?starpu_data_get_size(arg->handle):-1);
+	starpu_data_release(arg->handle);
 }
 
-void _starpu_mpi_cached_push_cp_ack_recv_cb(void* _args)
+void _send_cp_external_data_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
-	if (STARPU_R == arg->type)
-	{
-		starpu_data_release(arg->handle);
-	}
-	_STARPU_MPI_DEBUG(3, "Posting ack recv cb from %d\n", arg->rank);
-	_ft_service_msg_irecv_cb((void*)&arg->msg, sizeof(struct _starpu_mpi_cp_ack_msg), arg->rank, _STARPU_MPI_TAG_CP_ACK, MPI_COMM_WORLD, _starpu_mpi_treat_ack_receipt_cb, _args);
+	_STARPU_MPI_FT_STATS_SEND_CP_DATA(arg->type==STARPU_VALUE?arg->count:arg->type==STARPU_R?starpu_data_get_size(arg->handle):-1);
+	free(starpu_data_handle_to_pointer(arg->handle, STARPU_MAIN_RAM));
+	starpu_data_unregister_submit(arg->handle);
+	_starpu_mpi_push_cp_ack_recv_cb(arg);
 }
 
-void _starpu_data_release_cb(void* _arg)
+void _send_cp_internal_data_cb(void* _args)
 {
-	starpu_data_release(_arg);
+	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
+	_STARPU_MPI_FT_STATS_SEND_CP_DATA(
+			arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->handle) : -1);
+	_starpu_mpi_push_cp_ack_recv_cb(arg);
+}
+
+void _send_cached_cp_internal_data_cb(void* _args)
+{
+	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
+	_STARPU_MPI_FT_STATS_SEND_CACHED_CP_DATA(
+			arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->handle) : -1);
+	starpu_data_release(arg->handle);
+	_starpu_mpi_push_cp_ack_recv_cb(arg);
 }
 
 
@@ -142,7 +166,7 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 					arg->rank = item->backupped_by;
 					_STARPU_MPI_DEBUG(0, "Submit CP: sending external data:%d, tag:%ld, to :%d\n", (int)(*(int*)cpy_ptr), arg->tag, arg->rank);
 					starpu_mpi_isend_detached_prio(arg->handle, arg->rank, arg->tag, 0, MPI_COMM_WORLD,
-												   &_starpu_mpi_push_cp_ack_recv_cb, (void*)arg);
+												   &_send_cp_external_data_cb, (void*)arg);
 					// The callback needs to free the handle specially created for the send, and post ack recv
 				}
 				else if (item->backup_of != -1)
@@ -152,7 +176,7 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 					arg->rank = item->backup_of;
 					_STARPU_MPI_DEBUG(0, "Submit CP: receiving external data tag:%ld, from :%d\n", arg->tag, arg->rank);
 					starpu_mpi_irecv_detached(arg->handle, arg->rank, arg->tag, MPI_COMM_WORLD,
-											  &_starpu_mpi_store_data_and_send_ack_cb, (void*)arg);
+											  &_recv_cp_external_data_cb, (void*)arg);
 					// The callback needs to store the received data and post ack send
 				}
 				break;
@@ -170,7 +194,8 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 					arg->msg.checkpoint_id = cp_template->cp_id;
 					arg->msg.checkpoint_instance = current_instance;
 					_starpu_mpi_isend_cache_aware(*handle, item->backupped_by, starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0, 0,
-					                              &_starpu_mpi_push_cp_ack_recv_cb, (void*)arg, &_starpu_mpi_cached_push_cp_ack_recv_cb, (void*)arg, 1);
+					                              &_send_cp_internal_data_cb, (void*)arg,
+					                              &_send_cached_cp_internal_data_cb, (void*)arg, 1);
 					// the callbacks need to post ack recv. The cache one needs to release the handle.
 
 				}
@@ -186,10 +211,11 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 					arg->msg.checkpoint_id = cp_template->cp_id;
 					arg->msg.checkpoint_instance = current_instance;
 					_starpu_mpi_irecv_cache_aware(*handle, starpu_mpi_data_get_rank(*handle), starpu_mpi_data_get_tag(*handle), MPI_COMM_WORLD, 1, 0,
-					                              NULL, NULL, &_starpu_data_release_cb, (void*)arg->handle, 1, 0, 1);
+					                              &_recv_cp_internal_data_cb, (void*)arg,
+					                              &_recv_cached_cp_internal_data_cb, (void*)arg, 1, 0, 1);
 					// The callback needs to do nothing. The cached one must release the handle.
 					starpu_data_dup_ro(&arg->copy_handle, arg->handle, 1);
-					starpu_data_acquire_cb(arg->copy_handle, STARPU_R, _starpu_mpi_release_and_store_data_and_send_ack_cb, arg);
+					starpu_data_acquire_cb(arg->copy_handle, STARPU_R, _recv_internal_dup_ro_cb, arg);
 					// The callback need to store the data and post ack send.
 				}
 				break;

+ 16 - 7
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_package.c

@@ -15,6 +15,7 @@
  */
 
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
+#include <mpi_failure_tolerance/starpu_mpi_ft_stats.h>
 
 struct _starpu_mpi_checkpoint_data_list* checkpoint_data_list;
 starpu_pthread_mutex_t package_package_mutex;
@@ -47,20 +48,24 @@ int checkpoint_package_data_add(int cp_id, int cp_inst, int rank, starpu_mpi_tag
 	checkpoint_data->ptr = ptr;
 	checkpoint_data->count = count;
 	_starpu_mpi_checkpoint_data_list_push_back(checkpoint_data_list, checkpoint_data);
+	_STARPU_MPI_FT_STATS_STORE_CP_DATA(type==STARPU_VALUE?count:type==STARPU_R?starpu_data_get_size((starpu_data_handle_t) ptr):-1);
 	_STARPU_MPI_DEBUG(8, "CP data (%p) added - cpid:%d - cpinst:%d - rank:%d - tag:%ld\n", checkpoint_data->ptr, checkpoint_data->cp_id, checkpoint_data->cp_inst, checkpoint_data->rank, checkpoint_data->tag);
 	return 0;
 }
 
 int _checkpoint_package_data_delete(struct _starpu_mpi_checkpoint_data* checkpoint_data)
 {
+	size_t size;
 	if (checkpoint_data->type==STARPU_R)
 	{
 		starpu_data_handle_t handle = checkpoint_data->ptr;
+		size = starpu_data_get_size(handle);
 		_STARPU_MPI_DEBUG(8, "Clearing handle %p entry\n", handle);
 		starpu_data_unregister_submit(handle);
 	}
 	else if (checkpoint_data->type==STARPU_VALUE)
 	{
+		size = checkpoint_data->count;
 		_STARPU_MPI_DEBUG(8, "Clearing external data entry\n");
 		free(checkpoint_data->ptr);
 	}
@@ -70,12 +75,13 @@ int _checkpoint_package_data_delete(struct _starpu_mpi_checkpoint_data* checkpoi
 	}
 	_starpu_mpi_checkpoint_data_list_erase(checkpoint_data_list, checkpoint_data);
 	free(checkpoint_data);
-	return 0;
+	return size;
 }
 
 int checkpoint_package_data_del(int cp_id, int cp_inst, int rank)
 {
 	int done = 0;
+	size_t size = 0;
 	struct _starpu_mpi_checkpoint_data* next_checkpoint_data = NULL;
 	struct _starpu_mpi_checkpoint_data* checkpoint_data = _starpu_mpi_checkpoint_data_list_begin(checkpoint_data_list);
 	while (checkpoint_data != _starpu_mpi_checkpoint_data_list_end(checkpoint_data_list))
@@ -85,12 +91,13 @@ int checkpoint_package_data_del(int cp_id, int cp_inst, int rank)
 		// the rank that initiated the CP
 		if (checkpoint_data->cp_inst<cp_inst && checkpoint_data->rank==rank)
 		{
-			_checkpoint_package_data_delete(checkpoint_data);
+			size += _checkpoint_package_data_delete(checkpoint_data);
 			done++;
 		}
 		checkpoint_data = next_checkpoint_data;
 	}
-	_STARPU_MPI_DEBUG(0, "cleared %d data from checkpoint database.\n", done);
+	_STARPU_MPI_FT_STATS_DISCARD_CP_DATA(size);
+	_STARPU_MPI_DEBUG(0, "cleared %d data from checkpoint database (%ld bytes).\n", done, size);
 
 	return done;
 }
@@ -98,18 +105,20 @@ int checkpoint_package_data_del(int cp_id, int cp_inst, int rank)
 int _checkpoint_package_data_delete_all()
 {
 	int done = 0;
+	size_t size = 0;
 	struct _starpu_mpi_checkpoint_data* next_checkpoint_data = NULL;
 	struct _starpu_mpi_checkpoint_data* checkpoint_data = _starpu_mpi_checkpoint_data_list_begin(checkpoint_data_list);
 	while (checkpoint_data != _starpu_mpi_checkpoint_data_list_end(checkpoint_data_list))
 	{
 		next_checkpoint_data = _starpu_mpi_checkpoint_data_list_next(checkpoint_data);
-		// I delete all the old data (i.e. the cp inst is strictly lower than the one of the just validated CP) only for
-		// the rank that initiated the CP
-		_checkpoint_package_data_delete(checkpoint_data);
+		// I delete all the data
+		size += _checkpoint_package_data_delete(checkpoint_data);
 		done++;
 		checkpoint_data = next_checkpoint_data;
 	}
-	_STARPU_MPI_DEBUG(0, "cleared %d data from checkpoint database.\n", done);
+
+	_STARPU_MPI_FT_STATS_DISCARD_CP_DATA(size);
+	_STARPU_MPI_DEBUG(0, "cleared %d data from checkpoint database (%ld bytes).\n", done, size);
 
 	return done;
 }

+ 1 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.c

@@ -25,6 +25,7 @@
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint.h>
 #include <mpi_failure_tolerance/starpu_mpi_ft_service_comms.h>
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
+#include <mpi_failure_tolerance/starpu_mpi_ft_stats.h>
 
 
 starpu_pthread_mutex_t           cp_template_mutex;

+ 4 - 1
mpi/src/mpi_failure_tolerance/starpu_mpi_ft.c

@@ -18,6 +18,7 @@
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint_template.h>
 #include <mpi_failure_tolerance/starpu_mpi_checkpoint_package.h>
 #include <mpi_failure_tolerance/starpu_mpi_ft_service_comms.h>
+#include <mpi_failure_tolerance/starpu_mpi_ft_stats.h>
 
 
 starpu_pthread_mutex_t           ft_mutex;
@@ -31,6 +32,7 @@ int starpu_mpi_ft_turn_on(void)
 	checkpoint_template_lib_init();
 	_starpu_mpi_checkpoint_tracker_init();
 	checkpoint_package_init();
+	_STARPU_MPI_FT_STATS_INIT();
 	return 0;
 }
 
@@ -40,7 +42,8 @@ int starpu_mpi_ft_turn_off(void)
 	checkpoint_package_shutdown();
 	_starpu_mpi_checkpoint_tracker_shutdown();
 	starpu_pthread_mutex_destroy(&ft_mutex);
-
+	_STARPU_MPI_FT_STATS_WRITE_TO_FD(stderr);
+	_STARPU_MPI_FT_STATS_SHUTDOWN();
 	return 0;
 }
 

+ 38 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_ft_stats.c

@@ -0,0 +1,38 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013-2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <mpi_failure_tolerance/starpu_mpi_ft_stats.h>
+
+starpu_pthread_mutex_t _ft_stats_mutex;
+
+int cp_data_msgs_sent_count;
+size_t cp_data_msgs_sent_total_size;
+int cp_data_msgs_received_count;
+size_t cp_data_msgs_received_total_size;
+
+int cp_data_msgs_sent_cached_count;
+size_t cp_data_msgs_sent_cached_total_size;
+int cp_data_msgs_received_cached_count;
+size_t cp_data_msgs_received_cached_total_size;
+
+int ft_service_msgs_sent_count;
+size_t ft_service_msgs_sent_total_size;
+int ft_service_msgs_received_count;
+size_t ft_service_msgs_received_total_size;
+
+struct size_sample_list cp_data_in_memory_list; //over time
+size_t cp_data_in_memory_size_total;
+

+ 241 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_ft_stats.h

@@ -0,0 +1,241 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013-2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#ifndef FT_STARPU_STARPU_MPI_FT_STATS_H
+#define FT_STARPU_STARPU_MPI_FT_STATS_H
+
+#include <common/list.h>
+#include <common/utils.h>
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+extern starpu_pthread_mutex_t _ft_stats_mutex;
+
+extern int cp_data_msgs_sent_count;
+extern size_t cp_data_msgs_sent_total_size;
+extern int cp_data_msgs_received_count;
+extern size_t cp_data_msgs_received_total_size;
+
+extern int cp_data_msgs_sent_cached_count;
+extern size_t cp_data_msgs_sent_cached_total_size;
+extern int cp_data_msgs_received_cached_count;
+extern size_t cp_data_msgs_received_cached_total_size;
+
+extern int ft_service_msgs_sent_count;
+extern size_t ft_service_msgs_sent_total_size;
+extern int ft_service_msgs_received_count;
+extern size_t ft_service_msgs_received_total_size;
+
+extern struct size_sample_list cp_data_in_memory_list; //over time
+extern size_t cp_data_in_memory_size_total;
+
+
+static inline void stat_init();
+static inline void _starpu_ft_stats_shutdown();
+static inline void _starpu_ft_stats_write_to_fd();
+static inline void _starpu_ft_stats_send_data(size_t size);
+static inline void _starpu_ft_stats_send_data_cached(size_t size);;
+static inline void _starpu_ft_stats_recv_data(size_t size);
+static inline void _starpu_ft_stats_recv_data_cached(size_t size);
+static inline void _starpu_ft_stats_service_msg_send(size_t size);
+static inline void _starpu_ft_stats_service_msg_recv(size_t size);
+static inline void _starpu_ft_stats_add_cp_data_in_memory(size_t size);
+static inline void _starpu_ft_stats_free_cp_data_in_memory(size_t size);
+
+#ifdef STARPU_USE_MPI_FT_STATS
+#define _STARPU_MPI_FT_STATS_INIT() do{ stat_init(); }while(0)
+#define _STARPU_MPI_FT_STATS_SHUTDOWN() do{ _starpu_ft_stats_shutdown(); }while(0)
+#define _STARPU_MPI_FT_STATS_WRITE_TO_FD(fd) do{ _starpu_ft_stats_write_to_fd(fd); }while(0)
+#define _STARPU_MPI_FT_STATS_SEND_CP_DATA(size) do{ _starpu_ft_stats_send_data(size); }while(0)
+#define _STARPU_MPI_FT_STATS_SEND_CACHED_CP_DATA(size) do{ _starpu_ft_stats_send_data_cached(size); }while(0)
+#define _STARPU_MPI_FT_STATS_RECV_CP_DATA(size) do{ _starpu_ft_stats_recv_data(size); }while(0)
+#define _STARPU_MPI_FT_STATS_RECV_CACHED_CP_DATA(size) do{ _starpu_ft_stats_recv_data_cached(size); }while(0)
+#define _STARPU_MPI_FT_STATS_SEND_FT_SERVICE_MSG(size) do{ _starpu_ft_stats_service_msg_send(size); }while(0)
+#define _STARPU_MPI_FT_STATS_RECV_FT_SERVICE_MSG(size) do{ _starpu_ft_stats_service_msg_recv(size); }while(0)
+#define _STARPU_MPI_FT_STATS_STORE_CP_DATA(size) do{ _starpu_ft_stats_add_cp_data_in_memory(size); }while(0)
+#define _STARPU_MPI_FT_STATS_DISCARD_CP_DATA(size) do{ _starpu_ft_stats_free_cp_data_in_memory(size); }while(0)
+
+#else //_STARPU_MPI_FT_STATS
+#define _STARPU_MPI_FT_STATS_INIT() do{}while(0)
+#define _STARPU_MPI_FT_STATS_SHUTDOWN() do{}while(0)
+#define _STARPU_MPI_FT_STATS_WRITE_TO_FD(fd) do{}while(0)
+#define _STARPU_MPI_FT_STATS_SEND_CP_DATA(size) do{}while(0)
+#define _STARPU_MPI_FT_STATS_SEND_CACHED_CP_DATA(size) do{}while(0)
+#define _STARPU_MPI_FT_STATS_RECV_CP_DATA(size) do{}while(0)
+#define _STARPU_MPI_FT_STATS_RECV_CACHED_CP_DATA(size) do{}while(0)
+#define _STARPU_MPI_FT_STATS_SEND_FT_SERVICE_MSG(size) do{}while(0)
+#define _STARPU_MPI_FT_STATS_RECV_FT_SERVICE_MSG(size) do{}while(0)
+#define _STARPU_MPI_FT_STATS_STORE_CP_DATA(size) do{}while(0)
+#define _STARPU_MPI_FT_STATS_DISCARD_CP_DATA(size) do{}while(0)
+
+#endif //_STARPU_MPI_FT_STATS
+
+LIST_TYPE(size_sample, \
+size_t size;
+)
+
+
+static inline void stat_init()
+{
+	starpu_pthread_mutex_init(&_ft_stats_mutex, NULL);
+	size_sample_list_init(&cp_data_in_memory_list);
+	cp_data_msgs_sent_count = 0;
+	cp_data_msgs_sent_total_size = 0;
+	cp_data_msgs_received_count = 0;
+	cp_data_msgs_received_total_size = 0;
+
+	cp_data_msgs_sent_cached_count = 0;
+	cp_data_msgs_sent_cached_total_size = 0;
+	cp_data_msgs_received_cached_count = 0;
+	cp_data_msgs_received_cached_total_size = 0;
+
+	ft_service_msgs_sent_count = 0;
+	ft_service_msgs_sent_total_size = 0;
+	ft_service_msgs_received_count = 0;
+	ft_service_msgs_received_total_size = 0;
+
+	cp_data_in_memory_size_total = 0;
+}
+
+static inline void _starpu_ft_stats_send_data(size_t size)
+{
+	STARPU_ASSERT_MSG(size != -1, "Cannot count a data of size -1. An error has occured.\n");
+	starpu_pthread_mutex_lock(&_ft_stats_mutex);
+	cp_data_msgs_sent_count++;
+	cp_data_msgs_sent_total_size+=size;
+	starpu_pthread_mutex_unlock(&_ft_stats_mutex);
+}
+
+static inline void _starpu_ft_stats_send_data_cached(size_t size)
+{
+	STARPU_ASSERT_MSG(size != -1, "Cannot count a data of size -1. An error has occured.\n");
+	starpu_pthread_mutex_lock(&_ft_stats_mutex);
+	cp_data_msgs_sent_cached_count++;
+	cp_data_msgs_sent_cached_total_size+=size;
+	starpu_pthread_mutex_unlock(&_ft_stats_mutex);
+}
+
+static inline void _starpu_ft_stats_recv_data(size_t size)
+{
+	STARPU_ASSERT_MSG(size != -1, "Cannot count a data of size -1. An error has occured.\n");
+	starpu_pthread_mutex_lock(&_ft_stats_mutex);
+	cp_data_msgs_received_count++;
+	cp_data_msgs_received_total_size+=size;
+	starpu_pthread_mutex_unlock(&_ft_stats_mutex);
+}
+
+static inline void _starpu_ft_stats_recv_data_cached(size_t size)
+{
+	STARPU_ASSERT_MSG(size != -1, "Cannot count a data of size -1. An error has occured.\n");
+	starpu_pthread_mutex_lock(&_ft_stats_mutex);
+	cp_data_msgs_received_cached_count++;
+	cp_data_msgs_received_cached_total_size+=size;
+	starpu_pthread_mutex_unlock(&_ft_stats_mutex);
+}
+
+static inline void _starpu_ft_stats_service_msg_send(size_t size)
+{
+	STARPU_ASSERT_MSG(size != -1, "Cannot count a data of size -1. An error has occured.\n");
+	starpu_pthread_mutex_lock(&_ft_stats_mutex);
+	ft_service_msgs_sent_count++;
+	ft_service_msgs_sent_total_size+=size;
+	starpu_pthread_mutex_unlock(&_ft_stats_mutex);
+}
+
+static inline void _starpu_ft_stats_service_msg_recv(size_t size)
+{
+	STARPU_ASSERT_MSG(size != -1, "Cannot count a data of size -1. An error has occured.\n");
+	starpu_pthread_mutex_lock(&_ft_stats_mutex);
+	ft_service_msgs_received_count++;
+	ft_service_msgs_received_total_size+=size;
+	starpu_pthread_mutex_unlock(&_ft_stats_mutex);
+}
+
+static inline void _starpu_ft_stats_add_cp_data_in_memory(size_t size)
+{
+	size_t tmp;
+	struct size_sample *tmp_sample, *sample = malloc(sizeof(struct size_sample));
+	STARPU_ASSERT_MSG(size != -1, "Cannot count a data of size -1. An error has occured.\n");
+	starpu_pthread_mutex_lock(&_ft_stats_mutex);
+	cp_data_in_memory_size_total+=size;
+	tmp_sample = size_sample_list_back(&cp_data_in_memory_list);
+	tmp = (NULL==tmp_sample?0:tmp_sample->size);
+	tmp+=size;
+	sample->size = tmp;
+	size_sample_list_push_back(&cp_data_in_memory_list, sample);
+	starpu_pthread_mutex_unlock(&_ft_stats_mutex);
+}
+
+static inline void _starpu_ft_stats_free_cp_data_in_memory(size_t size)
+{
+	size_t tmp;
+	struct size_sample* sample = malloc(sizeof(struct size_sample));
+	STARPU_ASSERT_MSG(size != -1, "Cannot count a data of size -1. An error has occured.\n");
+	starpu_pthread_mutex_lock(&_ft_stats_mutex);
+	tmp = size_sample_list_back(&cp_data_in_memory_list)->size;
+	tmp-=size;
+	sample->size = tmp;
+	size_sample_list_push_back(&cp_data_in_memory_list, sample);
+	starpu_pthread_mutex_unlock(&_ft_stats_mutex);
+}
+
+static inline void _ft_stats_free_cp_data_in_memory_list()
+{
+	struct size_sample *next, *sample = size_sample_list_begin(&cp_data_in_memory_list);
+	while (sample != size_sample_list_end(&cp_data_in_memory_list))
+	{
+		next = size_sample_list_next(sample);
+		size_sample_list_erase(&cp_data_in_memory_list, sample);
+		free(sample);
+		sample = next;
+	}
+}
+
+static inline void _starpu_ft_stats_write_to_fd(FILE* fd)
+{
+	// HEADER
+	fprintf(fd, "TYPE\tCP_DATA_NORMAL_COUNT\tCP_DATA_NORMAL_TOTAL_SIZE\tCP_DATA_CACHED_COUNT\tCP_DATA_CACHED_SIZE\tFT_SERVICE_MSGS_COUNT\tFT_SERVICE_MSGS_TOTAL_SIZE\n");
+	// DATA
+	fprintf(fd, "SEND\t%d\t"                 "%ld\t"                    "%d\t"               "%ld\t"               "%d\t"                 "%ld\n",
+	        cp_data_msgs_sent_count, cp_data_msgs_sent_total_size, cp_data_msgs_sent_cached_count, cp_data_msgs_sent_cached_total_size, ft_service_msgs_sent_count, ft_service_msgs_sent_total_size);
+	fprintf(fd, "RECV\t%d\t"                 "%ld\t"                    "%d\t"               "%ld\t"               "%d\t"                 "%ld\n",
+	        cp_data_msgs_received_count, cp_data_msgs_received_total_size, cp_data_msgs_received_cached_count, cp_data_msgs_received_cached_total_size, ft_service_msgs_received_count, ft_service_msgs_received_total_size);
+	fprintf(fd, "\n");
+	fprintf(fd, "IN_MEM_CP_DATA_TOTAL:%ld\n", cp_data_in_memory_size_total);
+	fprintf(fd, "\n");
+	fprintf(fd, "IN_MEM_CP_DATA_TRACKING\n");
+	struct size_sample *sample = size_sample_list_begin(&cp_data_in_memory_list);
+	while (sample != size_sample_list_end(&cp_data_in_memory_list))
+	{
+		fprintf(fd, "%ld\n", sample->size);
+		sample = size_sample_list_next(sample);
+	}
+	fprintf(fd, "\n");
+}
+
+static inline void _starpu_ft_stats_shutdown()
+{
+	_ft_stats_free_cp_data_in_memory_list();
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //FT_STARPU_STARPU_MPI_FT_STATS_H

+ 2 - 2
mpi/src/starpu_mpi.c

@@ -178,7 +178,7 @@ int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, starp
 struct _starpu_mpi_req* _starpu_mpi_isend_cache_aware(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *_arg, void (*alt_callback)(void *), void *_alt_arg, int sequential_consistency)
 {
 	struct _starpu_mpi_req* req = NULL;
-	int already_sent = _starpu_mpi_cache_sent_data_set(data_handle, dest);
+	int already_sent = starpu_mpi_cached_send_set(data_handle, dest);
 	if (already_sent == 0)
 	{
 		if (data_tag == -1)
@@ -260,7 +260,7 @@ int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, starpu_mpi_tag
 struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *_arg, void (*alt_callback)(void *), void *_alt_arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count)
 {
 	struct _starpu_mpi_req* req = NULL;
-	int already_received = _starpu_mpi_cache_received_data_set(data_handle);
+	int already_received = starpu_mpi_cached_receive_set(data_handle);
 	if (already_received == 0)
 	{
 		if (data_tag == -1)