Przeglądaj źródła

StarPU-MPI: incomplete implementation of the cache (missing: cache discarding). Use at your own risks...

Nathalie Furmento 14 lat temu
rodzic
commit
ba94d9dcff
1 zmienionych plików z 65 dodań i 7 usunięć
  1. 65 7
      mpi/starpu_mpi_insert_task.c

+ 65 - 7
mpi/starpu_mpi_insert_task.c

@@ -20,6 +20,7 @@
 #include <starpu.h>
 #include <starpu_data.h>
 #include <common/utils.h>
+#include <common/hash.h>
 #include <util/starpu_insert_task_utils.h>
 
 //#define STARPU_MPI_VERBOSE	1
@@ -30,6 +31,21 @@
  * in an array indexed by node numbers. */
 //#define MPI_CACHE
 
+#ifdef MPI_CACHE
+static struct starpu_htbl32_node_s **sent_data = NULL;
+static struct starpu_htbl32_node_s **received_data = NULL;
+
+static void _starpu_mpi_task_init(int nb_nodes) {
+        int i;
+
+        _STARPU_MPI_DEBUG("Initialising hash table for cache\n");
+        sent_data = malloc(nb_nodes * sizeof(struct starpu_htbl32_node_s *));
+        for(i=0 ; i<nb_nodes ; i++) sent_data[i] = NULL;
+        received_data = malloc(nb_nodes * sizeof(struct starpu_htbl32_node_s *));
+        for(i=0 ; i<nb_nodes ; i++) received_data[i] = NULL;
+}
+#endif
+
 int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...) {
         int arg_type;
         va_list varg_list;
@@ -37,8 +53,18 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...) {
 	size_t arg_buffer_size = 0;
         int dest;
 
+        _STARPU_MPI_LOG_IN();
+
 	MPI_Comm_rank(comm, &me);
 
+#ifdef MPI_CACHE
+        if (sent_data == NULL) {
+                int size;
+                MPI_Comm_size(comm, &size);
+                _starpu_mpi_task_init(size);
+        }
+#endif
+
         /* Get the number of buffers and the size of the arguments */
 	va_start(varg_list, codelet);
         arg_buffer_size = starpu_insert_task_get_arg_size(varg_list);
@@ -58,6 +84,7 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...) {
                                          * insert_task at all itself, this is just a
                                          * safeguard. */
                                         _STARPU_MPI_DEBUG("oh oh\n");
+                                        _STARPU_MPI_LOG_OUT();
                                         return;
                                 }
                                 int mpi_rank = starpu_data_get_rank(data);
@@ -106,20 +133,43 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...) {
                                 int mpi_rank = starpu_data_get_rank(data);
                                 /* The task needs to read this data */
                                 if (do_execute && mpi_rank != me && mpi_rank != -1) {
-                                        _STARPU_MPI_DEBUG("Receive data from %d\n", mpi_rank);
                                         /* I will have to execute but I don't have the data, receive */
 #ifdef MPI_CACHE
-                                        if (!starpu_allocated(data))
+                                        uint32_t key = _starpu_crc32_be(data, 0);
+                                        void *already_received = _starpu_htbl_search_32(received_data[mpi_rank], key);
+                                        if (!already_received) {
+                                                _starpu_htbl_insert_32(&received_data[mpi_rank], key, data);
+                                        }
+                                        else {
+                                                _STARPU_MPI_DEBUG("Do not receive data %p from node %d as it is already available\n", data, mpi_rank);
+                                        }
+                                        if (!already_received)
+                                                //if (!starpu_allocated(data))
+
 #endif
                                                 {
+                                                        _STARPU_MPI_DEBUG("Receive data %p from %d\n", data, mpi_rank);
                                                         starpu_mpi_irecv_detached(data, mpi_rank, 0, comm, NULL, NULL);
                                                 }
                                 }
                                 if (!do_execute && mpi_rank == me) {
                                         /* Somebody else will execute it, and I have the data, send it. */
                                         /* FIXME CACHE: we need to know whether the receiver has it. */
-                                        _STARPU_MPI_DEBUG("Send data to %d\n", dest);
-                                        starpu_mpi_isend_detached(data, dest, 0, comm, NULL, NULL);
+#ifdef MPI_CACHE
+                                        uint32_t key = _starpu_crc32_be(data, 0);
+                                        void *already_sent = _starpu_htbl_search_32(sent_data[dest], key);
+                                        if (!already_sent) {
+                                                _starpu_htbl_insert_32(&sent_data[dest], key, data);
+                                        }
+                                        else {
+                                                _STARPU_MPI_DEBUG("Do not sent data %p to node %d as it has already been sent\n", data, dest);
+                                        }
+                                        if (!already_sent)
+#endif
+                                                {
+                                                        _STARPU_MPI_DEBUG("Send data %p to %d\n", data, dest);
+                                                        starpu_mpi_isend_detached(data, dest, 0, comm, NULL, NULL);
+                                                }
                                 }
                         }
                 }
@@ -162,10 +212,17 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...) {
                                          * now invalid */
                                 }
                                 else {
+                                        int mpi_rank = starpu_data_get_rank(data);
+                                        uint32_t key = _starpu_crc32_be(data, 0);
+                                        void *already_received = _starpu_htbl_search_32(received_data[mpi_rank], key);
+                                        if (already_received) {
                                         /* Somebody else will write to the data, so discard our cached copy if any */
                                         /* TODO: starpu_mpi could just remember itself. */
-                                        if (starpu_allocated(data))
-                                                starpu_deallocate(data);
+                                                _STARPU_MPI_DEBUG("Clear cache for data %p\n", data);
+                                                //                                        if (starpu_data_allocated(data))
+#warning do a submit deallocate
+                                                starpu_data_deallocate(data);
+                                        }
                                 }
                         }
 #else
@@ -173,7 +230,7 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...) {
                         if ((arg_type & STARPU_R) && do_execute) {
                                 int mpi_rank = starpu_data_get_rank(data);
                                 if (mpi_rank != me && mpi_rank != -1) {
-                                        //                                        starpu_deallocate(data);
+                                        starpu_data_deallocate(data);
                                 }
                         }
 #endif
@@ -192,4 +249,5 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...) {
 		}
         }
 	va_end(varg_list);
+        _STARPU_MPI_LOG_OUT();
 }