Преглед изворни кода

upgrade cache mechanisms to detect when CP communications benefits to computation

Romain LION пре 5 година
родитељ
комит
3b78e2cd0e

+ 7 - 2
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint.c

@@ -80,11 +80,13 @@ void _recv_internal_dup_ro_cb(void* _args)
 {
 	struct _starpu_mpi_cp_ack_arg_cb* arg = (struct _starpu_mpi_cp_ack_arg_cb*) _args;
 	if (arg->cache_flag) {
-		_STARPU_MPI_FT_STATS_RECV_CACHED_CP_DATA(arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->handle): -1);
+		_STARPU_MPI_FT_STATS_RECV_CACHED_CP_DATA(arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->copy_handle): -1);
 	}
 	else
 	{
-		_STARPU_MPI_FT_STATS_RECV_CP_DATA(arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->handle): -1);
+		struct _starpu_mpi_data* mpi_data = _starpu_mpi_data_get(arg->copy_handle);
+		mpi_data->cache_received.ft_induced_cache = 1;
+		_STARPU_MPI_FT_STATS_RECV_CP_DATA(arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->copy_handle): -1);
 	}
 	starpu_data_release(arg->copy_handle);
 	_starpu_mpi_store_data_and_send_ack_cb(arg);
@@ -116,6 +118,8 @@ void _send_cp_internal_data_cb(void* _args) {
 	}
 	else
 	{
+		struct _starpu_mpi_data* mpi_data = _starpu_mpi_data_get(arg->handle);
+		mpi_data->cache_sent[arg->rank].ft_induced_cache = 1;
 		_STARPU_MPI_FT_STATS_SEND_CP_DATA(arg->type == STARPU_VALUE ? arg->count : arg->type == STARPU_R ? starpu_data_get_size(arg->handle): -1);
 	}
 	_starpu_mpi_push_cp_ack_recv_cb(arg);
@@ -143,6 +147,7 @@ int starpu_mpi_submit_checkpoint_template(starpu_mpi_checkpoint_template_t cp_te
 				arg->tag = item->tag;
 				arg->type = STARPU_VALUE;
 				arg->count = item->count;
+				arg->cache_flag = 0;
 				arg->msg.checkpoint_id = cp_template->cp_id;
 				arg->msg.checkpoint_instance = current_instance;
 				if (item->backupped_by != -1)

+ 2 - 2
mpi/src/mpi_failure_tolerance/starpu_mpi_checkpoint_template.c

@@ -90,7 +90,7 @@ int _starpu_mpi_checkpoint_template_add_data(starpu_mpi_checkpoint_template_t cp
 	item = _starpu_mpi_checkpoint_template_item_create(type, ptr, count, backupped_by, backup_of, tag);
 	_starpu_mpi_checkpoint_template_item_list_push_back(&cp_template->list, item);
 	_checkpoint_template_add_to_backup_arrays(cp_template, backupped_by, backup_of);
-	_STARPU_MPI_DEBUG(5, "New checkpoint data entry %p has been added to cp_template with id:%d. (%s)\n", item, cp_template->cp_id, backupped_by == -1 ? "BACKUP_OF" : "BACKUPPED_BY");
+	_STARPU_MPI_DEBUG(5, "New checkpoint data entry %p (data:%p) has been added to cp_template with id:%d. (%s)\n", item, item->ptr, cp_template->cp_id, backupped_by == -1 ? "BACKUP_OF" : "BACKUPPED_BY");
 	starpu_pthread_mutex_unlock(&cp_template->mutex);
 	return 0;
 }
@@ -521,7 +521,7 @@ int _starpu_mpi_checkpoint_template_print(starpu_mpi_checkpoint_template_t cp_te
 		}
 		else if (item->type == STARPU_DATA_ARRAY)
 		{
-			fprintf(stderr, "STARPU_DATA_ARRAY - Multiple values: %d", *(int*)starpu_data_handle_to_pointer(*((starpu_data_handle_t*)item->ptr), 0));
+			fprintf(stderr, "STARPU_DATA_ARRAY - Multiple values: %d", *(int*)starpu_data_handle_to_pointer(((starpu_data_handle_t)item->ptr), 0));
 
 			for (int j=1 ; j<MIN(item->count, 5) ; j++)
 			{

+ 30 - 8
mpi/src/mpi_failure_tolerance/starpu_mpi_ft_stats.h

@@ -63,8 +63,10 @@ static inline void _starpu_ft_stats_free_cp_data_in_memory(size_t size);
 #define _STARPU_MPI_FT_STATS_SHUTDOWN() do{ _starpu_ft_stats_shutdown(); }while(0)
 #define _STARPU_MPI_FT_STATS_WRITE_TO_FD(fd) do{ _starpu_ft_stats_write_to_fd(fd); }while(0)
 #define _STARPU_MPI_FT_STATS_SEND_CP_DATA(size) do{ _starpu_ft_stats_send_data(size); }while(0)
+#define _STARPU_MPI_FT_STATS_CANCEL_SEND_CP_DATA(size) do{ _starpu_ft_stats_cancel_send_data(size); }while(0)
 #define _STARPU_MPI_FT_STATS_SEND_CACHED_CP_DATA(size) do{ _starpu_ft_stats_send_data_cached(size); }while(0)
 #define _STARPU_MPI_FT_STATS_RECV_CP_DATA(size) do{ _starpu_ft_stats_recv_data(size); }while(0)
+#define _STARPU_MPI_FT_STATS_CANCEL_RECV_CP_DATA(size) do{ _starpu_ft_stats_cancel_recv_data(size); }while(0)
 #define _STARPU_MPI_FT_STATS_RECV_CACHED_CP_DATA(size) do{ _starpu_ft_stats_recv_data_cached(size); }while(0)
 #define _STARPU_MPI_FT_STATS_SEND_FT_SERVICE_MSG(size) do{ _starpu_ft_stats_service_msg_send(size); }while(0)
 #define _STARPU_MPI_FT_STATS_RECV_FT_SERVICE_MSG(size) do{ _starpu_ft_stats_service_msg_recv(size); }while(0)
@@ -76,8 +78,10 @@ static inline void _starpu_ft_stats_free_cp_data_in_memory(size_t size);
 #define _STARPU_MPI_FT_STATS_SHUTDOWN() do{}while(0)
 #define _STARPU_MPI_FT_STATS_WRITE_TO_FD(fd) do{}while(0)
 #define _STARPU_MPI_FT_STATS_SEND_CP_DATA(size) do{}while(0)
+#define _STARPU_MPI_FT_STATS_CANCEL_SEND_CP_DATA(size) do{}while(0)
 #define _STARPU_MPI_FT_STATS_SEND_CACHED_CP_DATA(size) do{}while(0)
 #define _STARPU_MPI_FT_STATS_RECV_CP_DATA(size) do{}while(0)
+#define _STARPU_MPI_FT_STATS_CANCEL_RECV_CP_DATA(size) do{}while(0)
 #define _STARPU_MPI_FT_STATS_RECV_CACHED_CP_DATA(size) do{}while(0)
 #define _STARPU_MPI_FT_STATS_SEND_FT_SERVICE_MSG(size) do{}while(0)
 #define _STARPU_MPI_FT_STATS_RECV_FT_SERVICE_MSG(size) do{}while(0)
@@ -122,6 +126,15 @@ static inline void _starpu_ft_stats_send_data(size_t size)
 	starpu_pthread_mutex_unlock(&_ft_stats_mutex);
 }
 
+static inline void _starpu_ft_stats_cancel_send_data(size_t size)
+{
+	STARPU_ASSERT_MSG(size != -1, "Cannot count a data of size -1. An error has occured.\n");
+	starpu_pthread_mutex_lock(&_ft_stats_mutex);
+	cp_data_msgs_sent_count--;
+	cp_data_msgs_sent_total_size-=size;
+	starpu_pthread_mutex_unlock(&_ft_stats_mutex);
+}
+
 static inline void _starpu_ft_stats_send_data_cached(size_t size)
 {
 	STARPU_ASSERT_MSG(size != -1, "Cannot count a data of size -1. An error has occured.\n");
@@ -140,6 +153,15 @@ static inline void _starpu_ft_stats_recv_data(size_t size)
 	starpu_pthread_mutex_unlock(&_ft_stats_mutex);
 }
 
+static inline void _starpu_ft_stats_cancel_recv_data(size_t size)
+{
+	STARPU_ASSERT_MSG(size != -1, "Cannot count a data of size -1. An error has occured.\n");
+	starpu_pthread_mutex_lock(&_ft_stats_mutex);
+	cp_data_msgs_received_count--;
+	cp_data_msgs_received_total_size-=size;
+	starpu_pthread_mutex_unlock(&_ft_stats_mutex);
+}
+
 static inline void _starpu_ft_stats_recv_data_cached(size_t size)
 {
 	STARPU_ASSERT_MSG(size != -1, "Cannot count a data of size -1. An error has occured.\n");
@@ -219,14 +241,14 @@ static inline void _starpu_ft_stats_write_to_fd(FILE* fd)
 	fprintf(fd, "\n");
 	fprintf(fd, "IN_MEM_CP_DATA_TOTAL:%ld\n", cp_data_in_memory_size_total);
 	fprintf(fd, "\n");
-	fprintf(fd, "IN_MEM_CP_DATA_TRACKING\n");
-	struct size_sample *sample = size_sample_list_begin(&cp_data_in_memory_list);
-	while (sample != size_sample_list_end(&cp_data_in_memory_list))
-	{
-		fprintf(fd, "%ld\n", sample->size);
-		sample = size_sample_list_next(sample);
-	}
-	fprintf(fd, "\n");
+//	fprintf(fd, "IN_MEM_CP_DATA_TRACKING\n");
+//	struct size_sample *sample = size_sample_list_begin(&cp_data_in_memory_list);
+//	while (sample != size_sample_list_end(&cp_data_in_memory_list))
+//	{
+//		fprintf(fd, "%ld\n", sample->size);
+//		sample = size_sample_list_next(sample);
+//	}
+//	fprintf(fd, "\n");
 }
 
 static inline void _starpu_ft_stats_shutdown()

+ 39 - 17
mpi/src/starpu_mpi_cache.c

@@ -21,6 +21,10 @@
 #include <starpu_mpi_cache.h>
 #include <starpu_mpi_cache_stats.h>
 #include <starpu_mpi_private.h>
+#include <mpi_failure_tolerance/starpu_mpi_ft_stats.h>
+
+#define STARPU_CACHE_IN_CACHE                (1U<<0U)
+#define STARPU_CACHE_FT_INDUCED_IN_CACHE     (1<<1)
 
 /* Whether we are allowed to keep copies of remote data. */
 struct _starpu_data_entry
@@ -129,11 +133,13 @@ void _starpu_mpi_cache_data_init(starpu_data_handle_t data_handle)
 		return;
 
 	STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
-	mpi_data->cache_received = 0;
+	mpi_data->cache_received.in_cache         = 0;
+	mpi_data->cache_received.ft_induced_cache = 0;
 	_STARPU_MALLOC(mpi_data->cache_sent, _starpu_cache_comm_size*sizeof(mpi_data->cache_sent[0]));
 	for(i=0 ; i<_starpu_cache_comm_size ; i++)
 	{
-		mpi_data->cache_sent[i] = 0;
+		mpi_data->cache_sent[i].in_cache         = 0;
+		mpi_data->cache_sent[i].ft_induced_cache = 0;
 	}
 	STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
 }
@@ -184,13 +190,14 @@ void starpu_mpi_cached_receive_clear(starpu_data_handle_t data_handle)
 	STARPU_ASSERT(mpi_data->magic == 42);
 	STARPU_MPI_ASSERT_MSG(mpi_rank < _starpu_cache_comm_size, "Node %d invalid. Max node is %d\n", mpi_rank, _starpu_cache_comm_size);
 
-	if (mpi_data->cache_received == 1)
+	if (mpi_data->cache_received.in_cache == 1)
 	{
 #ifdef STARPU_DEVEL
 #  warning TODO: Somebody else will write to the data, so discard our cached copy if any. starpu_mpi could just remember itself.
 #endif
 		_STARPU_MPI_DEBUG(2, "Clearing receive cache for data %p\n", data_handle);
-		mpi_data->cache_received = 0;
+		mpi_data->cache_received.in_cache         = 0;
+		mpi_data->cache_received.ft_induced_cache = 0;
 		starpu_data_invalidate_submit(data_handle);
 		_starpu_mpi_cache_data_remove_nolock(data_handle);
 		_starpu_mpi_cache_stats_dec(mpi_rank, data_handle);
@@ -210,16 +217,22 @@ int starpu_mpi_cached_receive_set(starpu_data_handle_t data_handle)
 	STARPU_ASSERT(mpi_data->magic == 42);
 	STARPU_MPI_ASSERT_MSG(mpi_rank < _starpu_cache_comm_size, "Node %d invalid. Max node is %d\n", mpi_rank, _starpu_cache_comm_size);
 
-	int already_received = mpi_data->cache_received;
+	int already_received = mpi_data->cache_received.in_cache;
 	if (already_received == 0)
 	{
 		_STARPU_MPI_DEBUG(2, "Noting that data %p has already been received by %d\n", data_handle, mpi_rank);
-		mpi_data->cache_received = 1;
+		mpi_data->cache_received.in_cache = 1;
 		_starpu_mpi_cache_data_add_nolock(data_handle);
 		_starpu_mpi_cache_stats_inc(mpi_rank, data_handle);
 	}
 	else
 	{
+		if (mpi_data->cache_received.ft_induced_cache == 1)
+		{
+			_STARPU_MPI_FT_STATS_RECV_CACHED_CP_DATA(starpu_data_get_size(data_handle));
+			_STARPU_MPI_FT_STATS_CANCEL_RECV_CP_DATA(starpu_data_get_size(data_handle));
+			mpi_data->cache_received.ft_induced_cache = 0;
+		}
 		_STARPU_MPI_DEBUG(2, "Do not receive data %p from node %d as it is already available\n", data_handle, mpi_rank);
 	}
 	STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
@@ -236,7 +249,7 @@ int starpu_mpi_cached_receive(starpu_data_handle_t data_handle)
 
 	STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
 	STARPU_ASSERT(mpi_data->magic == 42);
-	already_received = mpi_data->cache_received;
+	already_received = mpi_data->cache_received.in_cache;
 	STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
 	return already_received;
 }
@@ -256,10 +269,11 @@ void starpu_mpi_cached_send_clear(starpu_data_handle_t data_handle)
 	starpu_mpi_comm_size(mpi_data->node_tag.node.comm, &size);
 	for(n=0 ; n<size ; n++)
 	{
-		if (mpi_data->cache_sent[n] == 1)
+		if (mpi_data->cache_sent[n].in_cache == 1)
 		{
 			_STARPU_MPI_DEBUG(2, "Clearing send cache for data %p\n", data_handle);
-			mpi_data->cache_sent[n] = 0;
+			mpi_data->cache_sent[n].in_cache = 0;
+			mpi_data->cache_sent[n].ft_induced_cache = 0;
 			_starpu_mpi_cache_data_remove_nolock(data_handle);
 		}
 	}
@@ -276,15 +290,21 @@ int starpu_mpi_cached_send_set(starpu_data_handle_t data_handle, int dest)
 	STARPU_MPI_ASSERT_MSG(dest < _starpu_cache_comm_size, "Node %d invalid. Max node is %d\n", dest, _starpu_cache_comm_size);
 
 	STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
-	int already_sent = mpi_data->cache_sent[dest];
-	if (mpi_data->cache_sent[dest] == 0)
+	int already_sent = mpi_data->cache_sent[dest].in_cache;
+	if (mpi_data->cache_sent[dest].in_cache == 0)
 	{
-		mpi_data->cache_sent[dest] = 1;
+		mpi_data->cache_sent[dest].in_cache = 1;
 		_starpu_mpi_cache_data_add_nolock(data_handle);
 		_STARPU_MPI_DEBUG(2, "Noting that data %p has already been sent to %d\n", data_handle, dest);
 	}
 	else
 	{
+		if (mpi_data->cache_sent[dest].ft_induced_cache == 1)
+		{
+			_STARPU_MPI_FT_STATS_SEND_CACHED_CP_DATA(starpu_data_get_size(data_handle));
+			_STARPU_MPI_FT_STATS_CANCEL_SEND_CP_DATA(starpu_data_get_size(data_handle));
+			mpi_data->cache_sent[dest].ft_induced_cache = 0;
+		}
 		_STARPU_MPI_DEBUG(2, "Do not send data %p to node %d as it has already been sent\n", data_handle, dest);
 	}
 	STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
@@ -301,7 +321,7 @@ int starpu_mpi_cached_send(starpu_data_handle_t data_handle, int dest)
 
 	STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
 	STARPU_MPI_ASSERT_MSG(dest < _starpu_cache_comm_size, "Node %d invalid. Max node is %d\n", dest, _starpu_cache_comm_size);
-	already_sent = mpi_data->cache_sent[dest];
+	already_sent = mpi_data->cache_sent[dest].in_cache;
 	STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
 	return already_sent;
 }
@@ -317,19 +337,21 @@ static void _starpu_mpi_cache_flush_nolock(starpu_data_handle_t data_handle)
 	starpu_mpi_comm_size(mpi_data->node_tag.node.comm, &nb_nodes);
 	for(i=0 ; i<nb_nodes ; i++)
 	{
-		if (mpi_data->cache_sent[i] == 1)
+		if (mpi_data->cache_sent[i].in_cache == 1)
 		{
 			_STARPU_MPI_DEBUG(2, "Clearing send cache for data %p\n", data_handle);
-			mpi_data->cache_sent[i] = 0;
+			mpi_data->cache_sent[i].in_cache         = 0;
+			mpi_data->cache_sent[i].ft_induced_cache = 0;
 			_starpu_mpi_cache_stats_dec(i, data_handle);
 		}
 	}
 
-	if (mpi_data->cache_received == 1)
+	if (mpi_data->cache_received.in_cache == 1)
 	{
 		int mpi_rank = starpu_mpi_data_get_rank(data_handle);
 		_STARPU_MPI_DEBUG(2, "Clearing received cache for data %p\n", data_handle);
-		mpi_data->cache_received = 0;
+		mpi_data->cache_received.in_cache         = 0;
+		mpi_data->cache_received.ft_induced_cache = 0;
 		_starpu_mpi_cache_stats_dec(mpi_rank, data_handle);
 	}
 }

+ 9 - 3
mpi/src/starpu_mpi_private.h

@@ -195,13 +195,19 @@ struct _starpu_mpi_coop_sends
 	unsigned redirects_sent;
 };
 
+struct _starpu_cache_info
+{
+	int in_cache:1;
+	int ft_induced_cache:1;
+};
+
 /** Initialized in starpu_mpi_data_register_comm */
 struct _starpu_mpi_data
 {
-	int magic;
+	int                         magic;
 	struct _starpu_mpi_node_tag node_tag;
-	int *cache_sent;
-	int cache_received;
+	struct _starpu_cache_info   *cache_sent;
+	struct _starpu_cache_info   cache_received;
 
 	/** Rendez-vous data for opportunistic cooperative sends */
 	/** Needed to synchronize between submit thread and workers */

+ 1 - 0
src/util/starpu_data_cpy.c

@@ -198,6 +198,7 @@ int starpu_data_dup_ro(starpu_data_handle_t *dst_handle, starpu_data_handle_t sr
 	_starpu_spin_unlock(&src_handle->header_lock);
 
 	starpu_data_register_same(dst_handle, src_handle);
+//	(*dst_handle)->mpi_data = src_handle->mpi_data;
 	_starpu_data_cpy(*dst_handle, src_handle, asynchronous, NULL, NULL, 0, NULL);
 	(*dst_handle)->readonly = 1;