瀏覽代碼

Correct stats

Romain LION 4 年之前
父節點
當前提交
b44ca50e2c

+ 1 - 1
mpi/include/starpu_mpi.h

@@ -430,7 +430,7 @@ int starpu_mpi_cached_receive(starpu_data_handle_t data_handle);
  * Return 0 if the communication cache is not enabled
  */
 int starpu_mpi_cached_receive_set(starpu_data_handle_t data);
-int starpu_mpi_cached_receive_set_alt(starpu_data_handle_t data_handle);
+int starpu_mpi_cached_cp_receive_set(starpu_data_handle_t data_handle);
 
 /**
  * Remove \p data from the reception cache

+ 1 - 19
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint.c

@@ -129,24 +129,6 @@ void _send_internal_data_stats(struct _starpu_mpi_cp_ack_arg_cb* arg)
 	}
 }
 
-#ifdef STARPU_USE_MPI_FT_STATS
-void _recv_internal_data_stats(struct _starpu_mpi_cp_ack_arg_cb* arg)
-{
-	if (arg->cache_flag) {
-		_STARPU_MPI_FT_STATS_RECV_CACHED_CP_DATA( starpu_data_get_size(arg->handle));
-	}
-	else
-	{
-		_STARPU_MPI_FT_STATS_RECV_CP_DATA(starpu_data_get_size(arg->handle));
-		starpu_mpi_cache_set_ft_induced_cache_receive(arg->handle);
-	}
-}
-#else
-void _recv_internal_data_stats(STARPU_ATTRIBUTE_UNUSED struct _starpu_mpi_cp_ack_arg_cb* arg)
-{
-	return;
-}
-#endif
 
 int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_template, int prio)
 {
@@ -243,7 +225,7 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 					_starpu_mpi_irecv_cache_aware(handle, starpu_mpi_data_get_rank(handle), starpu_mpi_data_get_tag(handle), MPI_COMM_WORLD, 1, 0,
 												  NULL, NULL, 1, 0, 1, &arg->cache_flag);
 					// The callback needs to do nothing. The cached one must release the handle.
-					_recv_internal_data_stats(arg);
+					//  _recv_internal_data_stats(arg);  // Now done in data_cache_set
 					starpu_data_dup_ro(&arg->copy_handle, arg->handle, 1);
 					starpu_data_acquire_cb(arg->copy_handle, STARPU_R, _recv_internal_dup_ro_cb, arg);
 					// The callback need to store the data and post ack send.

+ 2 - 0
mpi/src/mpi_failure_tolerance/starpu_mpi_ft_stats.c

@@ -27,6 +27,8 @@ int cp_data_msgs_sent_cached_count;
 size_t cp_data_msgs_sent_cached_total_size;
 int cp_data_msgs_received_cached_count;
 size_t cp_data_msgs_received_cached_total_size;
+int cp_data_msgs_received_cp_cached_count;
+size_t cp_data_msgs_received_cp_cached_total_size;
 
 int ft_service_msgs_sent_count;
 size_t ft_service_msgs_sent_total_size;

+ 17 - 1
mpi/src/mpi_failure_tolerance/starpu_mpi_ft_stats.h

@@ -36,6 +36,8 @@ extern int cp_data_msgs_sent_cached_count;
 extern size_t cp_data_msgs_sent_cached_total_size;
 extern int cp_data_msgs_received_cached_count;
 extern size_t cp_data_msgs_received_cached_total_size;
+extern int cp_data_msgs_received_cp_cached_count;
+extern size_t cp_data_msgs_received_cp_cached_total_size;
 
 extern int ft_service_msgs_sent_count;
 extern size_t ft_service_msgs_sent_total_size;
@@ -54,6 +56,7 @@ static inline void _starpu_ft_stats_send_data(size_t size);
 static inline void _starpu_ft_stats_send_data_cached(size_t size);;
 static inline void _starpu_ft_stats_recv_data(size_t size);
 static inline void _starpu_ft_stats_recv_data_cached(size_t size);
+static inline void _starpu_ft_stats_recv_data_cp_cached(size_t size);
 static inline void _starpu_ft_stats_service_msg_send(size_t size);
 static inline void _starpu_ft_stats_service_msg_recv(size_t size);
 static inline void _starpu_ft_stats_add_cp_data_in_memory(size_t size);
@@ -69,6 +72,7 @@ static inline void _starpu_ft_stats_free_cp_data_in_memory(size_t size);
 #define _STARPU_MPI_FT_STATS_RECV_CP_DATA(size) do{ _starpu_ft_stats_recv_data(size); }while(0)
 #define _STARPU_MPI_FT_STATS_CANCEL_RECV_CP_DATA(size) do{ _starpu_ft_stats_cancel_recv_data(size); }while(0)
 #define _STARPU_MPI_FT_STATS_RECV_CACHED_CP_DATA(size) do{ _starpu_ft_stats_recv_data_cached(size); }while(0)
+#define _STARPU_MPI_FT_STATS_RECV_CP_CACHED_CP_DATA(size) do{ _starpu_ft_stats_recv_data_cp_cached(size); }while(0)
 #define _STARPU_MPI_FT_STATS_SEND_FT_SERVICE_MSG(size) do{ _starpu_ft_stats_service_msg_send(size); }while(0)
 #define _STARPU_MPI_FT_STATS_RECV_FT_SERVICE_MSG(size) do{ _starpu_ft_stats_service_msg_recv(size); }while(0)
 #define _STARPU_MPI_FT_STATS_STORE_CP_DATA(size) do{ _starpu_ft_stats_add_cp_data_in_memory(size); }while(0)
@@ -84,6 +88,7 @@ static inline void _starpu_ft_stats_free_cp_data_in_memory(size_t size);
 #define _STARPU_MPI_FT_STATS_RECV_CP_DATA(size) do{}while(0)
 #define _STARPU_MPI_FT_STATS_CANCEL_RECV_CP_DATA(size) do{}while(0)
 #define _STARPU_MPI_FT_STATS_RECV_CACHED_CP_DATA(size) do{}while(0)
+#define _STARPU_MPI_FT_STATS_RECV_CP_CACHED_CP_DATA(size) do{}while(0)
 #define _STARPU_MPI_FT_STATS_SEND_FT_SERVICE_MSG(size) do{}while(0)
 #define _STARPU_MPI_FT_STATS_RECV_FT_SERVICE_MSG(size) do{}while(0)
 #define _STARPU_MPI_FT_STATS_STORE_CP_DATA(size) do{}while(0)
@@ -109,6 +114,8 @@ static inline void stat_init()
 	cp_data_msgs_sent_cached_total_size = 0;
 	cp_data_msgs_received_cached_count = 0;
 	cp_data_msgs_received_cached_total_size = 0;
+	cp_data_msgs_received_cp_cached_count = 0;
+	cp_data_msgs_received_cp_cached_total_size = 0;
 
 	ft_service_msgs_sent_count = 0;
 	ft_service_msgs_sent_total_size = 0;
@@ -173,6 +180,15 @@ static inline void _starpu_ft_stats_recv_data_cached(size_t size)
 	STARPU_PTHREAD_MUTEX_UNLOCK(&_ft_stats_mutex);
 }
 
+static inline void _starpu_ft_stats_recv_data_cp_cached(size_t size)
+{
+	STARPU_ASSERT_MSG(size != -1, "Cannot count a data of size -1. An error has occured.\n");
+	STARPU_PTHREAD_MUTEX_LOCK(&_ft_stats_mutex);
+	cp_data_msgs_received_cp_cached_count++;
+	cp_data_msgs_received_cp_cached_total_size+=size;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&_ft_stats_mutex);
+}
+
 static inline void _starpu_ft_stats_service_msg_send(size_t size)
 {
 	STARPU_ASSERT_MSG(size != -1, "Cannot count a data of size -1. An error has occured.\n");
@@ -243,7 +259,7 @@ static inline void _starpu_ft_stats_write_to_fd(FILE* fd)
 	fprintf(fd, "SEND\t%d\t"                 "%ld\t"                    "%d\t"               "%ld\t"               "%d\t"                 "%ld\n",
 	        cp_data_msgs_sent_count, cp_data_msgs_sent_total_size, cp_data_msgs_sent_cached_count, cp_data_msgs_sent_cached_total_size, ft_service_msgs_sent_count, ft_service_msgs_sent_total_size);
 	fprintf(fd, "RECV\t%d\t"                 "%ld\t"                    "%d\t"               "%ld\t"               "%d\t"                 "%ld\n",
-	        cp_data_msgs_received_count, cp_data_msgs_received_total_size, cp_data_msgs_received_cached_count, cp_data_msgs_received_cached_total_size, ft_service_msgs_received_count, ft_service_msgs_received_total_size);
+	        cp_data_msgs_received_count, cp_data_msgs_received_total_size, cp_data_msgs_received_cached_count, cp_data_msgs_received_cached_total_size+cp_data_msgs_received_cp_cached_total_size, ft_service_msgs_received_count, ft_service_msgs_received_total_size);
 	fprintf(fd, "\n");
 	fprintf(fd, "IN_MEM_CP_DATA_TOTAL:%lu\n", cp_data_in_memory_size_total);
 	fprintf(fd, "\n");

+ 1 - 1
mpi/src/starpu_mpi.c

@@ -263,7 +263,7 @@ int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, starpu_mpi_tag
 struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *_arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count, int* cache_flag)
 {
 	struct _starpu_mpi_req* req = NULL;
-	int already_received = starpu_mpi_cached_receive_set_alt(data_handle);
+	int already_received = starpu_mpi_cached_cp_receive_set(data_handle);
 	if (already_received == 0)
 	{
 		if (data_tag == -1)

+ 12 - 10
mpi/src/starpu_mpi_cache.c

@@ -132,6 +132,7 @@ void _starpu_mpi_cache_data_init(starpu_data_handle_t data_handle)
 	STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
 	mpi_data->cache_received = 0;
 	mpi_data->ft_induced_cache_received = 0;
+	mpi_data->ft_induced_cache_received_count = 0;
 	_STARPU_MALLOC(mpi_data->cache_sent, _starpu_cache_comm_size*sizeof(mpi_data->cache_sent[0]));
 	for(i=0 ; i<_starpu_cache_comm_size ; i++)
 	{
@@ -194,6 +195,7 @@ void starpu_mpi_cached_receive_clear(starpu_data_handle_t data_handle)
 		_STARPU_MPI_DEBUG(2, "Clearing receive cache for data %p\n", data_handle);
 		mpi_data->cache_received = 0;
 		mpi_data->ft_induced_cache_received = 0;
+		mpi_data->ft_induced_cache_received_count = 0;
 		starpu_data_invalidate_submit(data_handle);
 		_starpu_mpi_cache_data_remove_nolock(data_handle);
 		_starpu_mpi_cache_stats_dec(mpi_rank, data_handle);
@@ -201,12 +203,6 @@ void starpu_mpi_cached_receive_clear(starpu_data_handle_t data_handle)
 	STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
 }
 
-int starpu_mpi_cache_set_ft_induced_cache_receive(starpu_data_handle_t data_handle)
-{
-	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
-	mpi_data->ft_induced_cache_received = 1;
-	return 1;
-}
 
 int starpu_mpi_cached_receive_set(starpu_data_handle_t data_handle)
 {
@@ -231,11 +227,11 @@ int starpu_mpi_cached_receive_set(starpu_data_handle_t data_handle)
 	else
 	{
 #ifdef STARPU_USE_MPI_FT_STATS
-		if (mpi_data->ft_induced_cache_received == 1)
+		if (mpi_data->ft_induced_cache_received == 1 && mpi_data->ft_induced_cache_received_count == 0)
 		{
 			_STARPU_MPI_FT_STATS_RECV_CACHED_CP_DATA(starpu_data_get_size(data_handle));
 			_STARPU_MPI_FT_STATS_CANCEL_RECV_CP_DATA(starpu_data_get_size(data_handle));
-			mpi_data->ft_induced_cache_received = 0;
+			mpi_data->ft_induced_cache_received_count = 1;
 		}
 #endif //STARPU_USE_MPI_FT_STATS
 		_STARPU_MPI_DEBUG(2, "Do not receive data %p from node %d as it is already available\n", data_handle, mpi_rank);
@@ -244,8 +240,7 @@ int starpu_mpi_cached_receive_set(starpu_data_handle_t data_handle)
 	return already_received;
 }
 
-//TODO: rename
-int starpu_mpi_cached_receive_set_alt(starpu_data_handle_t data_handle)
+int starpu_mpi_cached_cp_receive_set(starpu_data_handle_t data_handle)
 {
 	int mpi_rank = starpu_mpi_data_get_rank(data_handle);
 	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
@@ -262,11 +257,17 @@ int starpu_mpi_cached_receive_set_alt(starpu_data_handle_t data_handle)
 	{
 		_STARPU_MPI_DEBUG(2, "Noting that data %p has already been received by %d\n", data_handle, mpi_rank);
 		mpi_data->cache_received = 1;
+		mpi_data->ft_induced_cache_received = 1;
+		_STARPU_MPI_FT_STATS_RECV_CP_DATA(starpu_data_get_size(data_handle));
 		_starpu_mpi_cache_data_add_nolock(data_handle);
 		_starpu_mpi_cache_stats_inc(mpi_rank, data_handle);
 	}
 	else
 	{
+		if (mpi_data->ft_induced_cache_received == 1)
+			_STARPU_MPI_FT_STATS_RECV_CP_CACHED_CP_DATA(starpu_data_get_size(data_handle));
+		else
+			_STARPU_MPI_FT_STATS_RECV_CACHED_CP_DATA(starpu_data_get_size(data_handle));
 		_STARPU_MPI_DEBUG(2, "Do not receive data %p from node %d as it is already available\n", data_handle, mpi_rank);
 	}
 	STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
@@ -378,6 +379,7 @@ static void _starpu_mpi_cache_flush_nolock(starpu_data_handle_t data_handle)
 		_STARPU_MPI_DEBUG(2, "Clearing received cache for data %p\n", data_handle);
 		mpi_data->cache_received = 0;
 		mpi_data->ft_induced_cache_received = 0;
+		mpi_data->ft_induced_cache_received_count = 0;
 		_starpu_mpi_cache_stats_dec(mpi_rank, data_handle);
 	}
 }

+ 1 - 0
mpi/src/starpu_mpi_private.h

@@ -204,6 +204,7 @@ struct _starpu_mpi_data
 	char *cache_sent;
 	unsigned int cache_received;
 	unsigned int ft_induced_cache_received:1;
+	unsigned int ft_induced_cache_received_count:1;
 	unsigned int modified:1; // Whether the data has been modified since the registration.
 
 	/** Rendez-vous data for opportunistic cooperative sends */