|
@@ -27,6 +27,7 @@
|
|
|
#include <datawizard/coherency.h>
|
|
|
#include <core/task.h>
|
|
|
|
|
|
+#include <starpu_mpi_cache_stats.h>
|
|
|
#include <starpu_mpi_private.h>
|
|
|
#include <starpu_mpi_task_insert.h>
|
|
|
|
|
@@ -64,6 +65,7 @@ void _starpu_mpi_cache_init(MPI_Comm comm)
|
|
|
for(i=0 ; i<nb_nodes ; i++) _cache_sent_data[i] = NULL;
|
|
|
_cache_received_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
|
|
|
for(i=0 ; i<nb_nodes ; i++) _cache_received_data[i] = NULL;
|
|
|
+ _starpu_mpi_cache_stats_init(comm);
|
|
|
}
|
|
|
|
|
|
static
|
|
@@ -86,6 +88,7 @@ void _starpu_mpi_cache_empty_tables(int world_size)
|
|
|
HASH_ITER(hh, _cache_received_data[i], entry, tmp)
|
|
|
{
|
|
|
HASH_DEL(_cache_received_data[i], entry);
|
|
|
+ _starpu_mpi_cache_stats_dec(-1, i, (starpu_data_handle_t) entry->data);
|
|
|
free(entry);
|
|
|
}
|
|
|
}
|
|
@@ -127,6 +130,7 @@ void starpu_mpi_cache_flush_all_data(MPI_Comm comm)
|
|
|
if (mpi_rank != my_rank && mpi_rank != -1)
|
|
|
starpu_data_invalidate_submit((starpu_data_handle_t) entry->data);
|
|
|
HASH_DEL(_cache_received_data[i], entry);
|
|
|
+ _starpu_mpi_cache_stats_dec(my_rank, i, (starpu_data_handle_t) entry->data);
|
|
|
free(entry);
|
|
|
}
|
|
|
}
|
|
@@ -158,6 +162,7 @@ void starpu_mpi_cache_flush(MPI_Comm comm, starpu_data_handle_t data_handle)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(2, "Clearing send cache for data %p\n", data_handle);
|
|
|
HASH_DEL(_cache_received_data[i], avail);
|
|
|
+ _starpu_mpi_cache_stats_dec(my_rank, i, data_handle);
|
|
|
free(avail);
|
|
|
}
|
|
|
}
|
|
@@ -167,7 +172,7 @@ void starpu_mpi_cache_flush(MPI_Comm comm, starpu_data_handle_t data_handle)
|
|
|
}
|
|
|
|
|
|
static
|
|
|
-void *_starpu_mpi_already_received(starpu_data_handle_t data, int mpi_rank)
|
|
|
+void *_starpu_mpi_already_received(int src, starpu_data_handle_t data, int mpi_rank)
|
|
|
{
|
|
|
if (_cache_enabled == 0) return NULL;
|
|
|
|
|
@@ -178,6 +183,7 @@ void *_starpu_mpi_already_received(starpu_data_handle_t data, int mpi_rank)
|
|
|
struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
|
|
|
entry->data = data;
|
|
|
HASH_ADD_PTR(_cache_received_data[mpi_rank], data, entry);
|
|
|
+ _starpu_mpi_cache_stats_inc(src, mpi_rank, data);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
@@ -287,7 +293,7 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
|
|
|
if (do_execute && mpi_rank != me && mpi_rank != -1)
|
|
|
{
|
|
|
/* I will have to execute but I don't have the data, receive */
|
|
|
- void *already_received = _starpu_mpi_already_received(data, mpi_rank);
|
|
|
+ void *already_received = _starpu_mpi_already_received(me, data, mpi_rank);
|
|
|
if (already_received == NULL)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(1, "Receive data %p from %d\n", data, mpi_rank);
|
|
@@ -376,6 +382,7 @@ void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum star
|
|
|
#endif
|
|
|
_STARPU_MPI_DEBUG(2, "Clearing receive cache for data %p\n", data);
|
|
|
HASH_DEL(_cache_received_data[mpi_rank], already_received);
|
|
|
+ _starpu_mpi_cache_stats_dec(me, mpi_rank, data);
|
|
|
free(already_received);
|
|
|
starpu_data_invalidate_submit(data);
|
|
|
}
|