Ver código fonte

mpi/starpu_mpi_insert_task.c: fix cache discarding

Nathalie Furmento 14 anos atrás
pai
commit
e08d3dcfe7
1 arquivos alterados com 64 adições e 16 exclusões
  1. 64 16
      mpi/starpu_mpi_insert_task.c

+ 64 - 16
mpi/starpu_mpi_insert_task.c

@@ -21,6 +21,7 @@
 #include <starpu_data.h>
 #include <common/utils.h>
 #include <common/hash.h>
+#include <common/htable32.h>
 #include <util/starpu_insert_task_utils.h>
 
 //#define STARPU_MPI_VERBOSE	1
@@ -29,13 +30,14 @@
 /* Whether we are allowed to keep copies of remote data. Does not work
  * yet: the sender has to know whether the receiver has it, keeping it
  * in an array indexed by node numbers. */
-//#define MPI_CACHE
+#define MPI_CACHE
 
 #ifdef MPI_CACHE
 static struct starpu_htbl32_node_s **sent_data = NULL;
 static struct starpu_htbl32_node_s **received_data = NULL;
 
-static void _starpu_mpi_task_init(int nb_nodes) {
+static void _starpu_mpi_task_init(int nb_nodes)
+{
         int i;
 
         _STARPU_MPI_DEBUG("Initialising hash table for cache\n");
@@ -44,9 +46,53 @@ static void _starpu_mpi_task_init(int nb_nodes) {
         received_data = malloc(nb_nodes * sizeof(struct starpu_htbl32_node_s *));
         for(i=0 ; i<nb_nodes ; i++) received_data[i] = NULL;
 }
+
+typedef struct _starpu_mpi_clear_data_s {
+        starpu_data_handle data;
+        int rank;
+        int mode;
+} _starpu_mpi_clear_data_t;
+
+#define _STARPU_MPI_CLEAR_SENT_DATA     0
+#define _STARPU_MPI_CLEAR_RECEIVED_DATA 1
+
+void _starpu_mpi_clear_data_callback(void *callback_arg)
+{
+        _starpu_mpi_clear_data_t *data_rank = (_starpu_mpi_clear_data_t *)callback_arg;
+        uint32_t key = _starpu_crc32_be(data_rank->data, 0);
+
+        if (data_rank->mode == _STARPU_MPI_CLEAR_SENT_DATA) {
+                _STARPU_MPI_DEBUG("Clearing sent cache for data %p and rank %d\n", data_rank->data, data_rank->rank);
+                _starpu_htbl_insert_32(&sent_data[data_rank->rank], key, NULL);
+        }
+        else if (data_rank->mode == _STARPU_MPI_CLEAR_RECEIVED_DATA) {
+                _STARPU_MPI_DEBUG("Clearing received cache for data %p and rank %d\n", data_rank->data, data_rank->rank);
+                _starpu_htbl_insert_32(&received_data[data_rank->rank], key, NULL);
+        }
+        free(data_rank);
+}
+
+void _starpu_mpi_clear_data(starpu_data_handle data_handle, int rank, int mode)
+{
+        struct starpu_task *task = starpu_task_create();
+        task->cl = NULL;
+
+        task->buffers[0].handle = data_handle;
+        task->buffers[0].mode = STARPU_RW;
+
+        _starpu_mpi_clear_data_t *data_rank = malloc(sizeof(_starpu_mpi_clear_data_t));
+        data_rank->data = data_handle;
+        data_rank->rank = rank;
+        data_rank->mode = mode;
+
+        task->callback_func = _starpu_mpi_clear_data_callback;
+        task->callback_arg = data_rank;
+        starpu_task_submit(task);
+}
 #endif
 
-int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...) {
+int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...)
+{
         int arg_type;
         va_list varg_list;
         int me, do_execute;
@@ -144,8 +190,6 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...) {
                                                 _STARPU_MPI_DEBUG("Do not receive data %p from node %d as it is already available\n", data, mpi_rank);
                                         }
                                         if (!already_received)
-                                                //if (!starpu_allocated(data))
-
 #endif
                                                 {
                                                         _STARPU_MPI_DEBUG("Receive data %p from %d\n", data, mpi_rank);
@@ -154,7 +198,6 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...) {
                                 }
                                 if (!do_execute && mpi_rank == me) {
                                         /* Somebody else will execute it, and I have the data, send it. */
-                                        /* FIXME CACHE: we need to know whether the receiver has it. */
 #ifdef MPI_CACHE
                                         uint32_t key = _starpu_crc32_be(data, 0);
                                         void *already_sent = _starpu_htbl_search_32(sent_data[dest], key);
@@ -206,22 +249,27 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...) {
                         starpu_data_handle data = va_arg(varg_list, starpu_data_handle);
 #ifdef MPI_CACHE
                         if (arg_type & STARPU_W) {
+                                uint32_t key = _starpu_crc32_be(data, 0);
                                 if (do_execute) {
-                                        /* FIXME: I need to note that all
-                                         * copies I've sent to neighbours are
-                                         * now invalid */
+                                        /* Note that all copies I've sent to neighbours are now invalid */
+                                        int n, size;
+                                        MPI_Comm_size(comm, &size);
+                                        for(n=0 ; n<size ; n++) {
+                                                void *already_sent = _starpu_htbl_search_32(sent_data[n], key);
+                                                if (already_sent) {
+                                                        _STARPU_MPI_DEBUG("Posting request to clear send cache for data %p\n", data);
+                                                        _starpu_mpi_clear_data(data, n, _STARPU_MPI_CLEAR_SENT_DATA);
+                                                }
+                                        }
                                 }
                                 else {
                                         int mpi_rank = starpu_data_get_rank(data);
-                                        uint32_t key = _starpu_crc32_be(data, 0);
                                         void *already_received = _starpu_htbl_search_32(received_data[mpi_rank], key);
                                         if (already_received) {
-                                        /* Somebody else will write to the data, so discard our cached copy if any */
-                                        /* TODO: starpu_mpi could just remember itself. */
-                                                _STARPU_MPI_DEBUG("Clear cache for data %p\n", data);
-                                                //                                        if (starpu_data_allocated(data))
-#warning do a submit deallocate
-                                                //starpu_data_deallocate(data);
+                                                /* Somebody else will write to the data, so discard our cached copy if any */
+                                                /* TODO: starpu_mpi could just remember itself. */
+                                                _STARPU_MPI_DEBUG("Posting request to clear receive cache for data %p\n", data);
+                                                _starpu_mpi_clear_data(data, mpi_rank, _STARPU_MPI_CLEAR_RECEIVED_DATA);
                                         }
                                 }
                         }