Sfoglia il codice sorgente

Correct cached cp comms stats

Romain LION 4 anni fa
parent
commit
83bede82f8

+ 1 - 0
mpi/include/starpu_mpi.h

@@ -430,6 +430,7 @@ int starpu_mpi_cached_receive(starpu_data_handle_t data_handle);
  * Return 0 if the communication cache is not enabled
  */
 int starpu_mpi_cached_receive_set(starpu_data_handle_t data);
+int starpu_mpi_cached_receive_set_alt(starpu_data_handle_t data_handle);
 
 /**
  * Remove \p data from the reception cache

+ 1 - 1
mpi/src/starpu_mpi.c

@@ -263,7 +263,7 @@ int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, starpu_mpi_tag
 struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *_arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count, int* cache_flag)
 {
 	struct _starpu_mpi_req* req = NULL;
-	int already_received = starpu_mpi_cached_receive_set(data_handle);
+	int already_received = starpu_mpi_cached_receive_set_alt(data_handle);
 	if (already_received == 0)
 	{
 		if (data_tag == -1)

+ 37 - 8
mpi/src/starpu_mpi_cache.c

@@ -229,14 +229,43 @@ int starpu_mpi_cached_receive_set(starpu_data_handle_t data_handle)
 	}
 	else
 	{
-		#ifdef STARPU_USE_MPI_FT_STATS
-			if (mpi_data->ft_induced_cache_received == 1)
-			{
-				_STARPU_MPI_FT_STATS_RECV_CACHED_CP_DATA(starpu_data_get_size(data_handle));
-				_STARPU_MPI_FT_STATS_CANCEL_RECV_CP_DATA(starpu_data_get_size(data_handle));
-				mpi_data->ft_induced_cache_received = 0;
-			}
-		#endif //STARPU_USE_MPI_FT_STATS
+#ifdef STARPU_USE_MPI_FT_STATS
+		if (mpi_data->ft_induced_cache_received == 1)
+		{
+			_STARPU_MPI_FT_STATS_RECV_CACHED_CP_DATA(starpu_data_get_size(data_handle));
+			_STARPU_MPI_FT_STATS_CANCEL_RECV_CP_DATA(starpu_data_get_size(data_handle));
+			mpi_data->ft_induced_cache_received = 0;
+		}
+#endif //STARPU_USE_MPI_FT_STATS
+		_STARPU_MPI_DEBUG(2, "Do not receive data %p from node %d as it is already available\n", data_handle, mpi_rank);
+	}
+	STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);
+	return already_received;
+}
+
+//TODO: rename
+int starpu_mpi_cached_receive_set_alt(starpu_data_handle_t data_handle)
+{
+	int mpi_rank = starpu_mpi_data_get_rank(data_handle);
+	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
+
+	if (_starpu_cache_enabled == 0)
+		return 0;
+
+	STARPU_PTHREAD_MUTEX_LOCK(&_cache_mutex);
+	STARPU_ASSERT(mpi_data->magic == 42);
+	STARPU_MPI_ASSERT_MSG(mpi_rank < _starpu_cache_comm_size, "Node %d invalid. Max node is %d\n", mpi_rank, _starpu_cache_comm_size);
+
+	int already_received = mpi_data->cache_received;
+	if (already_received == 0)
+	{
+		_STARPU_MPI_DEBUG(2, "Noting that data %p has already been received by %d\n", data_handle, mpi_rank);
+		mpi_data->cache_received = 1;
+		_starpu_mpi_cache_data_add_nolock(data_handle);
+		_starpu_mpi_cache_stats_inc(mpi_rank, data_handle);
+	}
+	else
+	{
 		_STARPU_MPI_DEBUG(2, "Do not receive data %p from node %d as it is already available\n", data_handle, mpi_rank);
 	}
 	STARPU_PTHREAD_MUTEX_UNLOCK(&_cache_mutex);

+ 1 - 0
mpi/src/starpu_mpi_private.h

@@ -204,6 +204,7 @@ struct _starpu_mpi_data
 	char *cache_sent;
 	unsigned int cache_received;
 	unsigned int ft_induced_cache_received:1;
+	unsigned int modified:1; // Whether the data has been modified since the registration.
 
 	/** Rendez-vous data for opportunistic cooperative sends */
 	/** Needed to synchronize between submit thread and workers */