소스 검색

mpi: Update behaviuour for starpu_mpi_insert_task

     node defined by property STARPU_EXECUTE_ON_NODE or
     STARPU_EXECUTE_ON_DATA will be the node executing the codelet,
     even if the data model defines an other node (i.e the node owning
     the W data)
Nathalie Furmento 13 년 전
부모
커밋
8f417fde28
2개의 변경된 파일71개의 추가작업 그리고 99개의 파일을 삭제
  1. 23 42
      mpi/starpu_mpi_insert_task.c
  2. 48 57
      mpi/tests/insert_task_owner.c

+ 23 - 42
mpi/starpu_mpi_insert_task.c

@@ -60,14 +60,15 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 {
         int arg_type;
         va_list varg_list;
-        int me, do_execute;
+        int me, do_execute, xrank, nb_nodes;
 	size_t arg_buffer_size = 0;
 	char *arg_buffer;
-        int dest=0, execute, inconsistent_execute;
+        int dest=0, inconsistent_execute;
 
         _STARPU_MPI_LOG_IN();
 
 	MPI_Comm_rank(comm, &me);
+	MPI_Comm_size(comm, &nb_nodes);
 
 	_starpu_mpi_tables_init();
 
@@ -78,46 +79,22 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 	va_start(varg_list, codelet);
 	_starpu_pack_cl_args(arg_buffer_size, &arg_buffer, varg_list);
 
-        /* Finds out if the property STARPU_EXECUTE_ON_NODE or STARPU_EXECUTE_ON_DATA is specified */
-        execute = -1;
+	/* Find out whether we are to execute the data because we own the data to be written to. */
+        inconsistent_execute = 0;
+        do_execute = -1;
+	xrank = -1;
 	va_start(varg_list, codelet);
 	while ((arg_type = va_arg(varg_list, int)) != 0) {
 		if (arg_type==STARPU_EXECUTE_ON_NODE) {
-                        execute = va_arg(varg_list, int);
+                        xrank = va_arg(varg_list, int);
+			_STARPU_MPI_DEBUG("Executing on node %d\n", xrank);
                 }
 		else if (arg_type==STARPU_EXECUTE_ON_DATA) {
 			starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
-                        execute = starpu_data_get_rank(data);
-			_STARPU_MPI_DEBUG("Executing on node %d\n", execute);
-                }
-		else if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type == STARPU_SCRATCH || arg_type == STARPU_REDUX) {
-                        va_arg(varg_list, starpu_data_handle_t);
+                        xrank = starpu_data_get_rank(data);
+			_STARPU_MPI_DEBUG("Executing on data node %d\n", xrank);
+			STARPU_ASSERT(xrank <= nb_nodes);
                 }
-		else if (arg_type==STARPU_VALUE) {
-			va_arg(varg_list, void *);
-			va_arg(varg_list, size_t);
-		}
-		else if (arg_type==STARPU_CALLBACK) {
-			va_arg(varg_list, void (*)(void *));
-		}
-		else if (arg_type==STARPU_CALLBACK_WITH_ARG) {
-			va_arg(varg_list, void (*)(void *));
-			va_arg(varg_list, void *);
-		}
-		else if (arg_type==STARPU_CALLBACK_ARG) {
-			va_arg(varg_list, void *);
-		}
-		else if (arg_type==STARPU_PRIORITY) {
-			va_arg(varg_list, int);
-		}
-        }
-	va_end(varg_list);
-
-	/* Find out whether we are to execute the data because we own the data to be written to. */
-        inconsistent_execute = 0;
-        do_execute = -1;
-	va_start(varg_list, codelet);
-	while ((arg_type = va_arg(varg_list, int)) != 0) {
 		if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_REDUX || arg_type==STARPU_RW || arg_type == STARPU_SCRATCH) {
                         starpu_data_handle_t data = va_arg(varg_list, starpu_data_handle_t);
                         if (arg_type & STARPU_W || arg_type & STARPU_REDUX) {
@@ -184,18 +161,22 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 	assert(do_execute != -1 && "StarPU needs to see a W or a REDUX data which will tell it where to execute the task");
 
         if (inconsistent_execute == 1) {
-                if (execute == -1) {
+                if (xrank == -1) {
                         _STARPU_MPI_DEBUG("Different tasks are owning W data. Needs to specify which one is to execute the codelet, using STARPU_EXECUTE_ON_NODE or STARPU_EXECUTE_ON_DATA\n");
 			return -EINVAL;
                 }
                 else {
-                        do_execute = (me == execute);
-                        dest = execute;
+                        do_execute = (me == xrank);
+                        dest = xrank;
                 }
         }
-        else if (execute != -1) {
-                _STARPU_MPI_DEBUG("Property STARPU_EXECUTE_ON_NODE or STARPU_EXECUTE_ON_DATA ignored as W data are all owned by the same task\n");
-        }
+	else if (xrank != -1) {
+		_STARPU_MPI_DISP("Property STARPU_EXECUTE_ON_NODE or STARPU_EXECUTE_ON_DATA overwriting node defined by data model\n");
+		do_execute = (me == xrank);
+		dest = xrank;
+	}
+
+	_STARPU_MPI_DEBUG("Executing %d - Sending to node %d\n", do_execute, dest);
 
         /* Send and receive data as requested */
 	va_start(varg_list, codelet);
@@ -290,7 +271,7 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 					int mpi_tag = starpu_data_get_tag(data);
 					STARPU_ASSERT(mpi_tag >= 0 && "StarPU needs to be told the MPI rank of this data, using starpu_data_set_rank");
                                         if (mpi_rank == me) {
-                                                if (execute != -1 && me != execute) {
+                                                if (xrank != -1 && me != xrank) {
                                                         _STARPU_MPI_DEBUG("Receive data %p back from the task %d which executed the codelet ...\n", data, dest);
                                                         starpu_mpi_irecv_detached(data, dest, mpi_tag, comm, NULL, NULL);
                                                 }

+ 48 - 57
mpi/tests/insert_task_owner.c

@@ -20,11 +20,14 @@
 
 void func_cpu(void *descr[], __attribute__ ((unused)) void *_args)
 {
-	int *x = (int *)STARPU_VARIABLE_GET_PTR(descr[0]);
-	int *y = (int *)STARPU_VARIABLE_GET_PTR(descr[1]);
+	int node;
+	int rank;
 
-        *x = *x + 1;
-        *y = *y + 1;
+        starpu_codelet_unpack_args(_args, &node);
+	MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+	FPRINTF(stderr, "Expected node: %d - Actual node: %d\n", node, rank);
+
+	assert(node == rank);
 }
 
 struct starpu_codelet mycodelet_r_w =
@@ -59,23 +62,10 @@ struct starpu_codelet mycodelet_w_r =
 	.modes = {STARPU_W, STARPU_R}
 };
 
-#define ACQUIRE_DATA \
-        if (rank == 0) starpu_data_acquire(data_handlesx0, STARPU_R);    \
-        if (rank == 1) starpu_data_acquire(data_handlesx1, STARPU_R);    \
-        FPRINTF(stderr, "[%d] Values: %d %d\n", rank, x0, x1);
-
-#define RELEASE_DATA \
-        if (rank == 0) starpu_data_release(data_handlesx0); \
-        if (rank == 1) starpu_data_release(data_handlesx1); \
-
-#define CHECK_RESULT \
-        if (rank == 0) assert(x0 == vx0[0] && x1 == vx1[0]); \
-        if (rank == 1) assert(x0 == vx0[1] && x1 == vx1[1]);
-
 int main(int argc, char **argv)
 {
-        int ret, rank, size, err;
-        int x0=0, x1=0, vx0[2] = {x0, x0}, vx1[2]={x1,x1};
+        int ret, rank, size, err, node;
+        int x0=32, x1=23;
         starpu_data_handle_t data_handlesx0;
         starpu_data_handle_t data_handlesx1;
 
@@ -111,60 +101,61 @@ int main(int argc, char **argv)
 		starpu_data_set_tag(data_handlesx0, 0);
         }
 
-        err = starpu_mpi_insert_task(MPI_COMM_WORLD, &mycodelet_r_w, STARPU_R, data_handlesx0, STARPU_W, data_handlesx1, 0);
+	node = starpu_data_get_rank(data_handlesx1);
+        err = starpu_mpi_insert_task(MPI_COMM_WORLD, &mycodelet_r_w,
+				     STARPU_VALUE, &node, sizeof(node),
+				     STARPU_R, data_handlesx0, STARPU_W, data_handlesx1,
+				     0);
         assert(err == 0);
-        ACQUIRE_DATA;
-        vx1[1]++;
-        CHECK_RESULT;
-        RELEASE_DATA;
 
-        err = starpu_mpi_insert_task(MPI_COMM_WORLD, &mycodelet_rw_r, STARPU_RW, data_handlesx0, STARPU_R, data_handlesx1, 0);
+	node = starpu_data_get_rank(data_handlesx0);
+        err = starpu_mpi_insert_task(MPI_COMM_WORLD, &mycodelet_rw_r,
+				     STARPU_VALUE, &node, sizeof(node),
+				     STARPU_RW, data_handlesx0, STARPU_R, data_handlesx1,
+				     0);
         assert(err == 0);
-        ACQUIRE_DATA;
-        vx0[0] ++;
-        CHECK_RESULT;
-        RELEASE_DATA;
 
-        err = starpu_mpi_insert_task(MPI_COMM_WORLD, &mycodelet_rw_rw, STARPU_RW, data_handlesx0, STARPU_RW, data_handlesx1, 0);
+        err = starpu_mpi_insert_task(MPI_COMM_WORLD, &mycodelet_rw_rw,
+				     STARPU_VALUE, &node, sizeof(node),
+				     STARPU_RW, data_handlesx0, STARPU_RW, data_handlesx1,
+				     0);
         assert(err == -EINVAL);
-        ACQUIRE_DATA;
-        CHECK_RESULT;
-        RELEASE_DATA;
 
-        err = starpu_mpi_insert_task(MPI_COMM_WORLD, &mycodelet_rw_rw, STARPU_RW, data_handlesx0, STARPU_RW, data_handlesx1, STARPU_EXECUTE_ON_NODE, 1, 0);
+	node = 1;
+        err = starpu_mpi_insert_task(MPI_COMM_WORLD, &mycodelet_rw_rw,
+				     STARPU_VALUE, &node, sizeof(node),
+				     STARPU_RW, data_handlesx0, STARPU_RW, data_handlesx1, STARPU_EXECUTE_ON_NODE, node,
+				     0);
         assert(err == 0);
-        ACQUIRE_DATA;
-        vx0[0] ++ ; vx1[1] ++;
-        CHECK_RESULT;
-        RELEASE_DATA;
 
-        err = starpu_mpi_insert_task(MPI_COMM_WORLD, &mycodelet_rw_rw, STARPU_RW, data_handlesx0, STARPU_RW, data_handlesx1, STARPU_EXECUTE_ON_NODE, 0, 0);
+	node = 0;
+        err = starpu_mpi_insert_task(MPI_COMM_WORLD, &mycodelet_rw_rw,
+				     STARPU_VALUE, &node, sizeof(node),
+				     STARPU_RW, data_handlesx0, STARPU_RW, data_handlesx1, STARPU_EXECUTE_ON_NODE, node,
+				     0);
         assert(err == 0);
-        ACQUIRE_DATA;
-        vx0[0] ++ ; vx1[1] ++;
-        CHECK_RESULT;
-        RELEASE_DATA;
 
         /* Here the value specified by the property STARPU_EXECUTE_ON_NODE is
-           going to be ignored as the data model clearly specifies
-           which task is going to execute the codelet */
-        err = starpu_mpi_insert_task(MPI_COMM_WORLD, &mycodelet_r_w, STARPU_R, data_handlesx0, STARPU_W, data_handlesx1, STARPU_EXECUTE_ON_NODE, 12, 0);
+           going to overwrite the node even though the data model clearly specifies
+           which node is going to execute the codelet */
+	node = 0;
+        err = starpu_mpi_insert_task(MPI_COMM_WORLD, &mycodelet_r_w,
+				     STARPU_VALUE, &node, sizeof(node),
+				     STARPU_R, data_handlesx0, STARPU_W, data_handlesx1, STARPU_EXECUTE_ON_NODE, node,
+				     0);
         assert(err == 0);
-        ACQUIRE_DATA;
-        vx1[1] ++;
-        CHECK_RESULT;
-        RELEASE_DATA;
 
         /* Here the value specified by the property STARPU_EXECUTE_ON_NODE is
-           going to be ignored as the data model clearly specifies
-           which task is going to execute the codelet */
-        err = starpu_mpi_insert_task(MPI_COMM_WORLD, &mycodelet_w_r, STARPU_W, data_handlesx0, STARPU_R, data_handlesx1, STARPU_EXECUTE_ON_NODE, 11, 0);
+           going to overwrite the node even though the data model clearly specifies
+           which node is going to execute the codelet */
+	node = 0;
+        err = starpu_mpi_insert_task(MPI_COMM_WORLD, &mycodelet_w_r,
+				     STARPU_VALUE, &node, sizeof(node),
+				     STARPU_W, data_handlesx0, STARPU_R, data_handlesx1, STARPU_EXECUTE_ON_NODE, node,
+				     0);
         assert(err == 0);
-        ACQUIRE_DATA;
-        vx0[0] ++;
-        CHECK_RESULT;
-        RELEASE_DATA;
 
+	fprintf(stderr, "Waiting ...\n");
         starpu_task_wait_for_all();
 	starpu_mpi_shutdown();
 	starpu_shutdown();