Quellcode durchsuchen

StarPU-MPI: starpu_mpi_insert_task is now asynchronous: post MPI requests and tasks without waiting for completion

Nathalie Furmento vor 14 Jahren
Ursprung
Commit
977ab3e187
2 geänderte Dateien mit 4 neuen und 35 gelöschten Zeilen
  1. 3 35
      mpi/starpu_mpi_insert_task.c
  2. 1 0
      mpi/tests/insert_task.c

+ 3 - 35
mpi/starpu_mpi_insert_task.c

@@ -22,7 +22,7 @@
 #include <common/utils.h>
 #include <util/starpu_insert_task_utils.h>
 
-#define STARPU_MPI_VERBOSE	1
+//#define STARPU_MPI_VERBOSE	1
 #include <starpu_mpi_private.h>
 
 /* Whether we are allowed to keep copies of remote data. Does not work
@@ -86,9 +86,6 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...) {
 	va_end(varg_list);
         assert(do_execute != -1);
 
-        starpu_mpi_req *req = malloc(nb_buffers * sizeof(starpu_mpi_req));
-        int nb_req=0;
-
         /* Send and receive data as requested */
 	va_start(varg_list, codelet);
 	while ((arg_type = va_arg(varg_list, int)) != 0) {
@@ -104,56 +101,27 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...) {
                                         if (!starpu_allocated(data))
 #endif
                                                 {
-                                                        starpu_mpi_irecv(data, &req[nb_req], mpi_rank, 0, comm);
-                                                        nb_req++;
+                                                        starpu_mpi_irecv_detached(data, mpi_rank, 0, comm, NULL, NULL);
                                                 }
                                 }
                                 if (!do_execute && mpi_rank == me) {
                                         /* Somebody else will execute it, and I have the data, send it. */
                                         /* FIXME CACHE: we need to know whether the receiver has it. */
                                         _STARPU_MPI_DEBUG("Send data to %d\n", dest);
-                                        starpu_mpi_isend(data, &req[nb_req], dest, 0, comm);
-                                        nb_req++;
+                                        starpu_mpi_isend_detached(data, dest, 0, comm, NULL, NULL);
                                 }
                         }
                 }
         }
 	va_end(varg_list);
 
-        /* If some MPI communications have been posted, wait until they are finished */
-        _STARPU_MPI_DEBUG("Waiting for %d request(s)\n", nb_req);
-        int nb_waiting_requests=nb_req;
-        while(nb_waiting_requests) {
-                //_STARPU_MPI_DEBUG("Testing %d request(s)\n", nb_waiting_requests);
-                int r=0;
-                for(r=0 ; r<nb_req ; r++) {
-                        if (req[r]) {
-                                int finished = 0;
-                                MPI_Status status;
-                                //_STARPU_MPI_DEBUG("Testing request %d\n", r);
-                                starpu_mpi_test(&req[r], &finished, &status);
-                                STARPU_ASSERT(finished != -1);
-                                if(finished) {
-                                        req[r] = NULL;
-                                        nb_waiting_requests--;
-                                }
-                        }
-                }
-        }
-        _STARPU_MPI_DEBUG("All requests processed\n", nb_req);
-        free(req);
-
 	if (do_execute) {
                 _STARPU_MPI_DEBUG("Execution of the codelet\n");
                 va_start(varg_list, codelet);
                 struct starpu_task *task = starpu_task_create();
-                task->synchronous = 1;
                 int ret = starpu_insert_task_create_and_submit(arg_buffer_size, codelet, &task, varg_list);
                 _STARPU_MPI_DEBUG("ret: %d\n", ret);
                 STARPU_ASSERT(ret==0);
-                //                ret = starpu_task_wait(task);
-                //                _STARPU_MPI_DEBUG("ret: %d\n", ret);
-                //                STARPU_ASSERT(ret==0);
         }
 
 	/* No need to handle W, as we assume (and check) that task

+ 1 - 0
mpi/tests/insert_task.c

@@ -100,6 +100,7 @@ int main(int argc, char **argv)
                                                0);
                 }
         }
+        starpu_task_wait_for_all();
 
 	starpu_mpi_shutdown();
 	starpu_shutdown();