|
@@ -40,23 +40,49 @@ struct _starpu_data_entry **sent_data = NULL;
|
|
|
struct _starpu_data_entry **received_data = NULL;
|
|
|
#endif /* STARPU_MPI_CACHE */
|
|
|
|
|
|
-static void _starpu_mpi_tables_init()
|
|
|
+void _starpu_mpi_tables_init(MPI_Comm comm)
|
|
|
{
|
|
|
#ifdef STARPU_MPI_CACHE
|
|
|
- if (sent_data == NULL) {
|
|
|
- int nb_nodes;
|
|
|
- int i;
|
|
|
+ int nb_nodes;
|
|
|
+ int i;
|
|
|
|
|
|
- MPI_Comm_size(MPI_COMM_WORLD, &nb_nodes);
|
|
|
- _STARPU_MPI_DEBUG("Initialising htable for cache\n");
|
|
|
- sent_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
|
|
|
- for(i=0 ; i<nb_nodes ; i++) sent_data[i] = NULL;
|
|
|
- received_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
|
|
|
- for(i=0 ; i<nb_nodes ; i++) received_data[i] = NULL;
|
|
|
- }
|
|
|
+ MPI_Comm_size(comm, &nb_nodes);
|
|
|
+ _STARPU_MPI_DEBUG("Initialising htable for cache\n");
|
|
|
+ sent_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
|
|
|
+ for(i=0 ; i<nb_nodes ; i++) sent_data[i] = NULL;
|
|
|
+ received_data = malloc(nb_nodes * sizeof(struct _starpu_data_entry *));
|
|
|
+ for(i=0 ; i<nb_nodes ; i++) received_data[i] = NULL;
|
|
|
+#else
|
|
|
+ if (!getenv("STARPU_SILENT")) fprintf(stderr,"Warning: StarPU was configured with --disable-mpi-cache\n");
|
|
|
#endif /* STARPU_MPI_CACHE */
|
|
|
}
|
|
|
|
|
|
+void _starpu_mpi_tables_free(int world_size)
|
|
|
+{
|
|
|
+#ifdef STARPU_MPI_CACHE
|
|
|
+ int i;
|
|
|
+
|
|
|
+ _STARPU_MPI_DEBUG("Clearing htable for cache\n");
|
|
|
+
|
|
|
+ for(i=0 ; i<world_size ; i++)
|
|
|
+ {
|
|
|
+ struct _starpu_data_entry *entry, *tmp;
|
|
|
+ HASH_ITER(hh, sent_data[i], entry, tmp)
|
|
|
+ {
|
|
|
+ HASH_DEL(sent_data[i], entry);
|
|
|
+ free(entry);
|
|
|
+ }
|
|
|
+ HASH_ITER(hh, received_data[i], entry, tmp)
|
|
|
+ {
|
|
|
+ HASH_DEL(received_data[i], entry);
|
|
|
+ free(entry);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ free(sent_data);
|
|
|
+ free(received_data);
|
|
|
+#endif
|
|
|
+}
|
|
|
+
|
|
|
static
|
|
|
int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_access_mode mode, int me, int *do_execute, int *inconsistent_execute, int *dest, size_t *size_on_nodes)
|
|
|
{
|
|
@@ -107,6 +133,47 @@ int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_access
|
|
|
}
|
|
|
|
|
|
static
|
|
|
+void *_starpu_mpi_already_received(starpu_data_handle_t data, int mpi_rank)
|
|
|
+{
|
|
|
+#ifdef STARPU_MPI_CACHE
|
|
|
+ struct _starpu_data_entry *already_received;
|
|
|
+ HASH_FIND_PTR(received_data[mpi_rank], &data, already_received);
|
|
|
+ if (already_received == NULL) {
|
|
|
+ struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
|
|
|
+ entry->data = data;
|
|
|
+ HASH_ADD_PTR(received_data[mpi_rank], data, entry);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ _STARPU_MPI_DEBUG("Do not receive data %p from node %d as it is already available\n", data, mpi_rank);
|
|
|
+ }
|
|
|
+ return already_received;
|
|
|
+#else
|
|
|
+ return NULL;
|
|
|
+#endif
|
|
|
+}
|
|
|
+
|
|
|
+static
|
|
|
+void *_starpu_mpi_already_sent(starpu_data_handle_t data, int dest)
|
|
|
+{
|
|
|
+#ifdef STARPU_MPI_CACHE
|
|
|
+ struct _starpu_data_entry *already_sent;
|
|
|
+ HASH_FIND_PTR(sent_data[dest], &data, already_sent);
|
|
|
+ if (already_sent == NULL) {
|
|
|
+ struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
|
|
|
+ entry->data = data;
|
|
|
+ HASH_ADD_PTR(sent_data[dest], data, entry);
|
|
|
+ _STARPU_MPI_DEBUG("Noting that data %p has already been sent to %d\n", data, dest);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ _STARPU_MPI_DEBUG("Do not send data %p to node %d as it has already been sent\n", data, dest);
|
|
|
+ }
|
|
|
+ return already_sent;
|
|
|
+#else
|
|
|
+ return NULL;
|
|
|
+#endif
|
|
|
+}
|
|
|
+
|
|
|
+static
|
|
|
void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum starpu_access_mode mode, int me, int dest, int do_execute, MPI_Comm comm)
|
|
|
{
|
|
|
if (data && mode & STARPU_R) {
|
|
@@ -123,19 +190,8 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
|
|
|
/* The task needs to read this data */
|
|
|
if (do_execute && mpi_rank != me && mpi_rank != -1) {
|
|
|
/* I will have to execute but I don't have the data, receive */
|
|
|
-#ifdef STARPU_MPI_CACHE
|
|
|
- struct _starpu_data_entry *already_received;
|
|
|
- HASH_FIND_PTR(received_data[mpi_rank], &data, already_received);
|
|
|
- if (already_received == NULL) {
|
|
|
- struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
|
|
|
- entry->data = data;
|
|
|
- HASH_ADD_PTR(received_data[mpi_rank], data, entry);
|
|
|
- }
|
|
|
- else {
|
|
|
- _STARPU_MPI_DEBUG("Do not receive data %p from node %d as it is already available\n", data, mpi_rank);
|
|
|
- }
|
|
|
- if (!already_received)
|
|
|
-#endif
|
|
|
+ void *already_received = _starpu_mpi_already_received(data, mpi_rank);
|
|
|
+ if (already_received == NULL)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG("Receive data %p from %d\n", data, mpi_rank);
|
|
|
starpu_mpi_irecv_detached(data, mpi_rank, mpi_tag, comm, NULL, NULL);
|
|
@@ -143,20 +199,8 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
|
|
|
}
|
|
|
if (!do_execute && mpi_rank == me) {
|
|
|
/* Somebody else will execute it, and I have the data, send it. */
|
|
|
-#ifdef STARPU_MPI_CACHE
|
|
|
- struct _starpu_data_entry *already_sent;
|
|
|
- HASH_FIND_PTR(sent_data[dest], &data, already_sent);
|
|
|
- if (already_sent == NULL) {
|
|
|
- struct _starpu_data_entry *entry = (struct _starpu_data_entry *)malloc(sizeof(*entry));
|
|
|
- entry->data = data;
|
|
|
- HASH_ADD_PTR(sent_data[dest], data, entry);
|
|
|
- _STARPU_MPI_DEBUG("Noting that data %p has already been sent to %d\n", data, dest);
|
|
|
- }
|
|
|
- else {
|
|
|
- _STARPU_MPI_DEBUG("Do not send data %p to node %d as it has already been sent\n", data, dest);
|
|
|
- }
|
|
|
- if (!already_sent)
|
|
|
-#endif
|
|
|
+ void *already_sent = _starpu_mpi_already_sent(data, dest);
|
|
|
+ if (already_sent == NULL)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG("Send data %p to %d\n", data, dest);
|
|
|
starpu_mpi_isend_detached(data, dest, mpi_tag, comm, NULL, NULL);
|
|
@@ -251,8 +295,6 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
|
|
|
|
|
|
size_on_nodes = (size_t *)calloc(1, nb_nodes * sizeof(size_t));
|
|
|
|
|
|
- _starpu_mpi_tables_init();
|
|
|
-
|
|
|
/* Get the number of buffers and the size of the arguments */
|
|
|
va_start(varg_list, codelet);
|
|
|
arg_buffer_size = _starpu_insert_task_get_arg_size(varg_list);
|