浏览代码

StarPU-MPI: Deals with cases in which written data are owned by different nodes.

- The task is executed by the node specified by STARPU_EXECUTE
- Data are sent to the node prior to task execution and sent back to their owner after execution
Nathalie Furmento 14 年之前
父节点
当前提交
36ba753a17
共有 1 个文件被更改,包括 92 次插入9 次删除
  1. 92 9
      mpi/starpu_mpi_insert_task.c

+ 92 - 9
mpi/starpu_mpi_insert_task.c

@@ -103,7 +103,8 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...)
         va_list varg_list;
         int me, do_execute;
 	size_t arg_buffer_size = 0;
-        int dest;
+        int dest, execute, inconsistent_execute;
+        int mpi_tag = 100;
 
         _STARPU_MPI_LOG_IN();
 
@@ -121,7 +122,33 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...)
 	va_start(varg_list, codelet);
         arg_buffer_size = starpu_insert_task_get_arg_size(varg_list);
 
+        /* Finds out if the property STARPU_EXECUTE is specified */
+        execute = -1;
+	va_start(varg_list, codelet);
+	while ((arg_type = va_arg(varg_list, int)) != 0) {
+		if (arg_type==STARPU_EXECUTE) {
+                        execute = va_arg(varg_list, int);
+                }
+		else if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type == STARPU_SCRATCH) {
+                        va_arg(varg_list, starpu_data_handle);
+                }
+		else if (arg_type==STARPU_VALUE) {
+			va_arg(varg_list, void *);
+		}
+		else if (arg_type==STARPU_CALLBACK) {
+			va_arg(varg_list, void (*)(void *));
+		}
+		else if (arg_type==STARPU_CALLBACK_ARG) {
+			va_arg(varg_list, void *);
+		}
+		else if (arg_type==STARPU_PRIORITY) {
+			va_arg(varg_list, int);
+		}
+        }
+	va_end(varg_list);
+
 	/* Find out whether we are to execute the data because we own the data to be written to. */
+        inconsistent_execute = 0;
         do_execute = -1;
 	va_start(varg_list, codelet);
 	while ((arg_type = va_arg(varg_list, int)) != 0) {
@@ -142,8 +169,7 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...)
                                 int mpi_rank = starpu_data_get_rank(data);
                                 if (mpi_rank == me) {
                                         if (do_execute == 0) {
-                                                _STARPU_MPI_DEBUG("erh? incoherent!\n");
-                                                return -EINVAL;
+                                                inconsistent_execute = 1;
                                         }
                                         else {
                                                 do_execute = 1;
@@ -151,8 +177,7 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...)
                                 }
                                 else if (mpi_rank != -1) {
                                         if (do_execute == 1) {
-                                                _STARPU_MPI_DEBUG("erh? incoherent!\n");
-                                                return -EINVAL;
+                                                inconsistent_execute = 1;
                                         }
                                         else {
                                                 do_execute = 0;
@@ -177,10 +202,24 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...)
 		else if (arg_type==STARPU_PRIORITY) {
 			va_arg(varg_list, int);
 		}
+		else if (arg_type==STARPU_EXECUTE) {
+			va_arg(varg_list, int);
+		}
 	}
 	va_end(varg_list);
         assert(do_execute != -1);
 
+        if (inconsistent_execute == 1) {
+                if (execute == -1) {
+                        _STARPU_MPI_DEBUG("Different tasks are owning W data. Needs to specify which one is to execute the codelet\n");
+                        return -EINVAL;
+                }
+                else {
+                        do_execute = (me == execute);
+                        dest = execute;
+                }
+        }
+
         /* Send and receive data as requested */
 	va_start(varg_list, codelet);
 	while ((arg_type = va_arg(varg_list, int)) != 0) {
@@ -204,7 +243,7 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...)
 #endif
                                                 {
                                                         _STARPU_MPI_DEBUG("Receive data %p from %d\n", data, mpi_rank);
-                                                        starpu_mpi_irecv_detached(data, mpi_rank, 0, comm, NULL, NULL);
+                                                        starpu_mpi_irecv_detached(data, mpi_rank, mpi_tag, comm, NULL, NULL);
                                                 }
                                 }
                                 if (!do_execute && mpi_rank == me) {
@@ -222,9 +261,10 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...)
 #endif
                                                 {
                                                         _STARPU_MPI_DEBUG("Send data %p to %d\n", data, dest);
-                                                        starpu_mpi_isend_detached(data, dest, 0, comm, NULL, NULL);
+                                                        starpu_mpi_isend_detached(data, dest, mpi_tag, comm, NULL, NULL);
                                                 }
                                 }
+                                mpi_tag++;
                         }
                 }
 		else if (arg_type==STARPU_VALUE) {
@@ -239,6 +279,9 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...)
 		else if (arg_type==STARPU_PRIORITY) {
 			va_arg(varg_list, int);
 		}
+		else if (arg_type==STARPU_EXECUTE) {
+			va_arg(varg_list, int);
+		}
         }
 	va_end(varg_list);
 
@@ -251,8 +294,44 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...)
                 STARPU_ASSERT(ret==0);
         }
 
-	/* No need to handle W, as we assume (and check) that task
-	 * write in data that they own */
+        if (inconsistent_execute) {
+                va_start(varg_list, codelet);
+                while ((arg_type = va_arg(varg_list, int)) != 0) {
+                        if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type == STARPU_SCRATCH) {
+                                starpu_data_handle data = va_arg(varg_list, starpu_data_handle);
+                                if (arg_type & STARPU_W) {
+                                        int mpi_rank = starpu_data_get_rank(data);
+                                        if (mpi_rank == me) {
+                                                if (execute != -1 && me != execute) {
+                                                        _STARPU_MPI_DEBUG("Receive data %p back from the task %d which executed the codelet ...\n", data, dest);
+                                                        starpu_mpi_irecv_detached(data, dest, mpi_tag, comm, NULL, NULL);
+                                                }
+                                        }
+                                        else if (do_execute) {
+                                                _STARPU_MPI_DEBUG("Send data %p back to its owner %d...\n", data, mpi_rank);
+                                                starpu_mpi_isend_detached(data, mpi_rank, mpi_tag, comm, NULL, NULL);
+                                        }
+                                        mpi_tag ++;
+                                }
+                        }
+                        else if (arg_type==STARPU_VALUE) {
+                                va_arg(varg_list, void *);
+                        }
+                        else if (arg_type==STARPU_CALLBACK) {
+                                va_arg(varg_list, void (*)(void *));
+                        }
+                        else if (arg_type==STARPU_CALLBACK_ARG) {
+                                va_arg(varg_list, void *);
+                        }
+                        else if (arg_type==STARPU_PRIORITY) {
+                                va_arg(varg_list, int);
+                        }
+                        else if (arg_type==STARPU_EXECUTE) {
+                                va_arg(varg_list, int);
+                        }
+                }
+                va_end(varg_list);
+        }
 
 	va_start(varg_list, codelet);
 	while ((arg_type = va_arg(varg_list, int)) != 0) {
@@ -307,7 +386,11 @@ int starpu_mpi_insert_task(MPI_Comm comm, starpu_codelet *codelet, ...)
 		else if (arg_type==STARPU_PRIORITY) {
 			va_arg(varg_list, int);
 		}
+		else if (arg_type==STARPU_EXECUTE) {
+			va_arg(varg_list, int);
+		}
         }
 	va_end(varg_list);
         _STARPU_MPI_LOG_OUT();
+        return 0;
 }