|
@@ -35,22 +35,22 @@ struct _starpu_data_entry
|
|
void *data;
|
|
void *data;
|
|
};
|
|
};
|
|
|
|
|
|
-static struct _starpu_data_entry **sent_data = NULL;
|
|
|
|
-static struct _starpu_data_entry **received_data = NULL;
|
|
|
|
-static int cache_enabled=1;
|
|
|
|
|
|
+static struct _starpu_data_entry **_cache_sent_data = NULL;
|
|
|
|
+static struct _starpu_data_entry **_cache_received_data = NULL;
|
|
|
|
+static int _cache_enabled=1;
|
|
|
|
|
|
-void _starpu_mpi_tables_init(MPI_Comm comm)
|
|
|
|
|
|
+void _starpu_mpi_cache_init(MPI_Comm comm)
|
|
{
|
|
{
|
|
int nb_nodes;
|
|
int nb_nodes;
|
|
int i;
|
|
int i;
|
|
|
|
|
|
- cache_enabled = starpu_get_env_number("STARPU_MPI_CACHE");
|
|
|
|
- if (cache_enabled == -1)
|
|
|
|
|
|
+ _cache_enabled = starpu_get_env_number("STARPU_MPI_CACHE");
|
|
|
|
+ if (_cache_enabled == -1)
|
|
{
|
|
{
|
|
- cache_enabled = 1;
|
|
|
|
|
|
+ _cache_enabled = 1;
|
|
}
|
|
}
|
|
|
|
|
|
- if (cache_enabled == 0)
|
|
|
|
|
|
+ if (_cache_enabled == 0)
|
|
{
|
|
{
|
|
if (!getenv("STARPU_SILENT")) fprintf(stderr,"Warning: StarPU MPI Communication cache is disabled\n");
|
|
if (!getenv("STARPU_SILENT")) fprintf(stderr,"Warning: StarPU MPI Communication cache is disabled\n");
|
|
return;
|
|
return;
|
|
@@ -58,36 +58,77 @@ void _starpu_mpi_tables_init(MPI_Comm comm)
|
|
|
|
|
|
MPI_Comm_size(comm, &nb_nodes);
|
|
MPI_Comm_size(comm, &nb_nodes);
|
|
_STARPU_MPI_DEBUG("Initialising htable for cache\n");
|
|
_STARPU_MPI_DEBUG("Initialising htable for cache\n");
|
|
- sent_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
|
|
|
|
- for(i=0 ; i<nb_nodes ; i++) sent_data[i] = NULL;
|
|
|
|
- received_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
|
|
|
|
- for(i=0 ; i<nb_nodes ; i++) received_data[i] = NULL;
|
|
|
|
|
|
+ _cache_sent_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
|
|
|
|
+ for(i=0 ; i<nb_nodes ; i++) _cache_sent_data[i] = NULL;
|
|
|
|
+ _cache_received_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
|
|
|
|
+ for(i=0 ; i<nb_nodes ; i++) _cache_received_data[i] = NULL;
|
|
}
|
|
}
|
|
|
|
|
|
-void _starpu_mpi_tables_free(int world_size)
|
|
|
|
|
|
+void _starpu_mpi_cache_free(int world_size)
|
|
{
|
|
{
|
|
int i;
|
|
int i;
|
|
|
|
|
|
- if (cache_enabled == 0) return;
|
|
|
|
|
|
+ if (_cache_enabled == 0) return;
|
|
|
|
|
|
_STARPU_MPI_DEBUG("Clearing htable for cache\n");
|
|
_STARPU_MPI_DEBUG("Clearing htable for cache\n");
|
|
|
|
|
|
for(i=0 ; i<world_size ; i++)
|
|
for(i=0 ; i<world_size ; i++)
|
|
{
|
|
{
|
|
struct _starpu_data_entry *entry, *tmp;
|
|
struct _starpu_data_entry *entry, *tmp;
|
|
- HASH_ITER(hh, sent_data[i], entry, tmp)
|
|
|
|
|
|
+ HASH_ITER(hh, _cache_sent_data[i], entry, tmp)
|
|
{
|
|
{
|
|
- HASH_DEL(sent_data[i], entry);
|
|
|
|
|
|
+ HASH_DEL(_cache_sent_data[i], entry);
|
|
free(entry);
|
|
free(entry);
|
|
}
|
|
}
|
|
- HASH_ITER(hh, received_data[i], entry, tmp)
|
|
|
|
|
|
+ HASH_ITER(hh, _cache_received_data[i], entry, tmp)
|
|
{
|
|
{
|
|
- HASH_DEL(received_data[i], entry);
|
|
|
|
|
|
+ HASH_DEL(_cache_received_data[i], entry);
|
|
free(entry);
|
|
free(entry);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- free(sent_data);
|
|
|
|
- free(received_data);
|
|
|
|
|
|
+ free(_cache_sent_data);
|
|
|
|
+ free(_cache_received_data);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+static
|
|
|
|
+void *_starpu_mpi_already_received(starpu_data_handle_t data, int mpi_rank)
|
|
|
|
+{
|
|
|
|
+ if (_cache_enabled == 0) return NULL;
|
|
|
|
+
|
|
|
|
+ struct _starpu_data_entry *already_received;
|
|
|
|
+ HASH_FIND_PTR(_cache_received_data[mpi_rank], &data, already_received);
|
|
|
|
+ if (already_received == NULL)
|
|
|
|
+ {
|
|
|
|
+ struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
|
|
|
|
+ entry->data = data;
|
|
|
|
+ HASH_ADD_PTR(_cache_received_data[mpi_rank], data, entry);
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ _STARPU_MPI_DEBUG("Do not receive data %p from node %d as it is already available\n", data, mpi_rank);
|
|
|
|
+ }
|
|
|
|
+ return already_received;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+static
|
|
|
|
+void *_starpu_mpi_already_sent(starpu_data_handle_t data, int dest)
|
|
|
|
+{
|
|
|
|
+ if (_cache_enabled == 0) return NULL;
|
|
|
|
+
|
|
|
|
+ struct _starpu_data_entry *already_sent;
|
|
|
|
+ HASH_FIND_PTR(_cache_sent_data[dest], &data, already_sent);
|
|
|
|
+ if (already_sent == NULL)
|
|
|
|
+ {
|
|
|
|
+ struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
|
|
|
|
+ entry->data = data;
|
|
|
|
+ HASH_ADD_PTR(_cache_sent_data[dest], data, entry);
|
|
|
|
+ _STARPU_MPI_DEBUG("Noting that data %p has already been sent to %d\n", data, dest);
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ _STARPU_MPI_DEBUG("Do not send data %p to node %d as it has already been sent\n", data, dest);
|
|
|
|
+ }
|
|
|
|
+ return already_sent;
|
|
}
|
|
}
|
|
|
|
|
|
static
|
|
static
|
|
@@ -150,47 +191,6 @@ int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_access
|
|
}
|
|
}
|
|
|
|
|
|
static
|
|
static
|
|
-void *_starpu_mpi_already_received(starpu_data_handle_t data, int mpi_rank)
|
|
|
|
-{
|
|
|
|
- if (cache_enabled == 0) return NULL;
|
|
|
|
-
|
|
|
|
- struct _starpu_data_entry *already_received;
|
|
|
|
- HASH_FIND_PTR(received_data[mpi_rank], &data, already_received);
|
|
|
|
- if (already_received == NULL)
|
|
|
|
- {
|
|
|
|
- struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
|
|
|
|
- entry->data = data;
|
|
|
|
- HASH_ADD_PTR(received_data[mpi_rank], data, entry);
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- _STARPU_MPI_DEBUG("Do not receive data %p from node %d as it is already available\n", data, mpi_rank);
|
|
|
|
- }
|
|
|
|
- return already_received;
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-static
|
|
|
|
-void *_starpu_mpi_already_sent(starpu_data_handle_t data, int dest)
|
|
|
|
-{
|
|
|
|
- if (cache_enabled == 0) return NULL;
|
|
|
|
-
|
|
|
|
- struct _starpu_data_entry *already_sent;
|
|
|
|
- HASH_FIND_PTR(sent_data[dest], &data, already_sent);
|
|
|
|
- if (already_sent == NULL)
|
|
|
|
- {
|
|
|
|
- struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
|
|
|
|
- entry->data = data;
|
|
|
|
- HASH_ADD_PTR(sent_data[dest], data, entry);
|
|
|
|
- _STARPU_MPI_DEBUG("Noting that data %p has already been sent to %d\n", data, dest);
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- _STARPU_MPI_DEBUG("Do not send data %p to node %d as it has already been sent\n", data, dest);
|
|
|
|
- }
|
|
|
|
- return already_sent;
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-static
|
|
|
|
void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum starpu_access_mode mode, int me, int dest, int do_execute, MPI_Comm comm)
|
|
void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum starpu_access_mode mode, int me, int dest, int do_execute, MPI_Comm comm)
|
|
{
|
|
{
|
|
if (data && mode & STARPU_R)
|
|
if (data && mode & STARPU_R)
|
|
@@ -266,7 +266,7 @@ void _starpu_mpi_exchange_data_after_execution(starpu_data_handle_t data, enum s
|
|
|
|
|
|
void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum starpu_access_mode mode, int me, int do_execute, MPI_Comm comm)
|
|
void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum starpu_access_mode mode, int me, int do_execute, MPI_Comm comm)
|
|
{
|
|
{
|
|
- if (cache_enabled)
|
|
|
|
|
|
+ if (_cache_enabled)
|
|
{
|
|
{
|
|
if (mode & STARPU_W || mode & STARPU_REDUX)
|
|
if (mode & STARPU_W || mode & STARPU_REDUX)
|
|
{
|
|
{
|
|
@@ -278,11 +278,11 @@ void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum star
|
|
for(n=0 ; n<size ; n++)
|
|
for(n=0 ; n<size ; n++)
|
|
{
|
|
{
|
|
struct _starpu_data_entry *already_sent;
|
|
struct _starpu_data_entry *already_sent;
|
|
- HASH_FIND_PTR(sent_data[n], &data, already_sent);
|
|
|
|
|
|
+ HASH_FIND_PTR(_cache_sent_data[n], &data, already_sent);
|
|
if (already_sent)
|
|
if (already_sent)
|
|
{
|
|
{
|
|
_STARPU_MPI_DEBUG("Clearing send cache for data %p\n", data);
|
|
_STARPU_MPI_DEBUG("Clearing send cache for data %p\n", data);
|
|
- HASH_DEL(sent_data[n], already_sent);
|
|
|
|
|
|
+ HASH_DEL(_cache_sent_data[n], already_sent);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -290,14 +290,14 @@ void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum star
|
|
{
|
|
{
|
|
int mpi_rank = starpu_data_get_rank(data);
|
|
int mpi_rank = starpu_data_get_rank(data);
|
|
struct _starpu_data_entry *already_received;
|
|
struct _starpu_data_entry *already_received;
|
|
- HASH_FIND_PTR(received_data[mpi_rank], &data, already_received);
|
|
|
|
|
|
+ HASH_FIND_PTR(_cache_received_data[mpi_rank], &data, already_received);
|
|
if (already_received)
|
|
if (already_received)
|
|
{
|
|
{
|
|
#ifdef STARPU_DEVEL
|
|
#ifdef STARPU_DEVEL
|
|
# warning TODO: Somebody else will write to the data, so discard our cached copy if any. starpu_mpi could just remember itself.
|
|
# warning TODO: Somebody else will write to the data, so discard our cached copy if any. starpu_mpi could just remember itself.
|
|
#endif
|
|
#endif
|
|
_STARPU_MPI_DEBUG("Clearing receive cache for data %p\n", data);
|
|
_STARPU_MPI_DEBUG("Clearing receive cache for data %p\n", data);
|
|
- HASH_DEL(received_data[mpi_rank], already_received);
|
|
|
|
|
|
+ HASH_DEL(_cache_received_data[mpi_rank], already_received);
|
|
starpu_data_invalidate_submit(data);
|
|
starpu_data_invalidate_submit(data);
|
|
}
|
|
}
|
|
}
|
|
}
|