ソースを参照

mpi: new functions starpu_mpi_task_build() and starpu_mpi_task_post_build()

Nathalie Furmento 11 年 前
コミット
2dbfb1ccc8
共有5 個のファイルを変更した173 個の追加115 個の削除を含む
  1. 1 0
      ChangeLog
  2. 21 0
      doc/doxygen/chapters/08mpi_support.doxy
  3. 16 0
      doc/doxygen/chapters/api/mpi.doxy
  4. 2 0
      mpi/include/starpu_mpi.h
  5. 133 115
      mpi/src/starpu_mpi_task_insert.c

+ 1 - 0
ChangeLog

@@ -40,6 +40,7 @@ New features:
     handle (sequential consistency will be enabled or disabled based
     on the value of the function parameter and the value of the
     sequential consistency defined for the given data)
+  * New functions starpu_mpi_task_build() and starpu_mpi_task_post_build()
 
 Small features:
   * New functions starpu_data_acquire_cb_sequential_consistency() and

+ 21 - 0
doc/doxygen/chapters/08mpi_support.doxy

@@ -318,6 +318,27 @@ application can prune the task for loops according to the data distribution,
 so as to only submit tasks on nodes which have to care about them (either to
 execute them, or to send the required data).
 
+A function starpu_mpi_task_build() is also provided with the aim to
+only construct the task structure. All MPI nodes need to call the
+function, only the node which is to execute the task will return a
+valid task structure. Following the execution of the task, all nodes
+need to call the function starpu_mpi_task_post_build() -- with the same
+list of arguments as starpu_mpi_task_build() -- to post all the
+necessary data communications.
+
+\code{.c}
+struct starpu_task *task;
+task = starpu_mpi_task_build(MPI_COMM_WORLD, &cl,
+                             STARPU_RW, data_handles[0],
+                             STARPU_R, data_handles[1],
+                             0);
+if (task) starpu_task_submit(task);
+starpu_mpi_task_post_build(MPI_COMM_WORLD, &cl,
+                           STARPU_RW, data_handles[0],
+                           STARPU_R, data_handles[1],
+                           0);
+\endcode
+
 \section MPIMigration MPI Data migration
 
 The application can dynamically change its mind about the data distribution, to

+ 16 - 0
doc/doxygen/chapters/api/mpi.doxy

@@ -256,6 +256,22 @@ The algorithm also includes a communication cache mechanism that
 allows not to send data twice to the same MPI node, unless the data
 has been modified. The cache can be disabled (see \ref STARPU_MPI_CACHE).
 
+\fn struct starpu_task *starpu_mpi_task_build(MPI_Comm comm, struct starpu_codelet *codelet, ...)
+\ingroup API_MPI_Support
+Create a task corresponding to codelet with the following arguments.
+The argument list must be zero-terminated. The function performs the
+first two steps of the function starpu_mpi_task_insert(). Only the MPI
+node selected in the first step of the algorithm will return a valid
+task structure which can then be submitted. The function
+starpu_mpi_task_post_build() MUST be called after the submission of
+the task, with the SAME list of arguments.
+
+\fn int starpu_mpi_task_post_build(MPI_Comm comm, struct starpu_codelet *codelet, ...)
+\ingroup API_MPI_Support
+This function MUST be called after a call to starpu_mpi_task_build(),
+with the SAME list of arguments. It performs the fourth -- last -- step of the algorithm described in
+starpu_mpi_task_insert().
+
 \fn void starpu_mpi_get_data_on_node(MPI_Comm comm, starpu_data_handle_t data_handle, int node)
 \ingroup API_MPI_Support
 Transfer data \p data_handle to MPI node \p node, sending it from its

+ 2 - 0
mpi/include/starpu_mpi.h

@@ -47,6 +47,8 @@ int starpu_mpi_initialize(void) STARPU_DEPRECATED;
 int starpu_mpi_initialize_extended(int *rank, int *world_size) STARPU_DEPRECATED;
 int starpu_mpi_shutdown(void);
 
+struct starpu_task *starpu_mpi_task_build(MPI_Comm comm, struct starpu_codelet *codelet, ...);
+int starpu_mpi_task_post_build(MPI_Comm comm, struct starpu_codelet *codelet, ...);
 int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...);
 /* the function starpu_mpi_insert_task has the same semantics as starpu_mpi_task_insert, it is kept to avoid breaking old codes */
 int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...);

+ 133 - 115
mpi/src/starpu_mpi_task_insert.c

@@ -367,50 +367,39 @@ void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum star
 }
 
 static
-int _starpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, va_list varg_list)
+int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nodes, int *xrank, int *dest, int *do_execute, va_list varg_list)
 {
-	int arg_type;
 	va_list varg_list_copy;
-	int me, do_execute, xrank, nb_nodes;
+	int inconsistent_execute = 0;
+	int arg_type;
 	size_t *size_on_nodes;
-	size_t arg_buffer_size = 0;
-	void *arg_buffer = NULL;
-	int dest=0, inconsistent_execute;
 	int current_data = 0;
 
-	_STARPU_MPI_LOG_IN();
-
-	MPI_Comm_rank(comm, &me);
-	MPI_Comm_size(comm, &nb_nodes);
-
 	size_on_nodes = (size_t *)calloc(1, nb_nodes * sizeof(size_t));
-
-	/* Find out whether we are to execute the data because we own the data to be written to. */
-	inconsistent_execute = 0;
-	do_execute = -1;
-	xrank = -1;
+	*do_execute = -1;
+	*xrank = -1;
 	va_copy(varg_list_copy, varg_list);
 	while ((arg_type = va_arg(varg_list_copy, int)) != 0)
 	{
 		if (arg_type==STARPU_EXECUTE_ON_NODE)
 		{
-			xrank = va_arg(varg_list_copy, int);
-			_STARPU_MPI_DEBUG(1, "Executing on node %d\n", xrank);
-			do_execute = 1;
+			*xrank = va_arg(varg_list_copy, int);
+			_STARPU_MPI_DEBUG(1, "Executing on node %d\n", *xrank);
+			*do_execute = 1;
 		}
 		else if (arg_type==STARPU_EXECUTE_ON_DATA)
 		{
 			starpu_data_handle_t data = va_arg(varg_list_copy, starpu_data_handle_t);
-			xrank = starpu_data_get_rank(data);
-			_STARPU_MPI_DEBUG(1, "Executing on data node %d\n", xrank);
-			STARPU_ASSERT_MSG(xrank <= nb_nodes, "Node %d to execute codelet is not a valid node (%d)", xrank, nb_nodes);
-			do_execute = 1;
+			*xrank = starpu_data_get_rank(data);
+			_STARPU_MPI_DEBUG(1, "Executing on data node %d\n", *xrank);
+			STARPU_ASSERT_MSG(*xrank <= nb_nodes, "Node %d to execute codelet is not a valid node (%d)", *xrank, nb_nodes);
+			*do_execute = 1;
 		}
 		else if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type==STARPU_SCRATCH || arg_type==STARPU_REDUX)
 		{
 			starpu_data_handle_t data = va_arg(varg_list_copy, starpu_data_handle_t);
 			enum starpu_data_access_mode mode = (enum starpu_data_access_mode) arg_type;
-			int ret = _starpu_mpi_find_executee_node(data, mode, me, &do_execute, &inconsistent_execute, &dest, size_on_nodes);
+			int ret = _starpu_mpi_find_executee_node(data, mode, me, do_execute, &inconsistent_execute, dest, size_on_nodes);
 			if (ret == -EINVAL)
 			{
 				free(size_on_nodes);
@@ -426,7 +415,7 @@ int _starpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, va_
 			for(i=0 ; i<nb_handles ; i++)
 			{
 				enum starpu_data_access_mode mode = STARPU_CODELET_GET_MODE(codelet, current_data);
-				int ret = _starpu_mpi_find_executee_node(datas[i], mode, me, &do_execute, &inconsistent_execute, &dest, size_on_nodes);
+				int ret = _starpu_mpi_find_executee_node(datas[i], mode, me, do_execute, &inconsistent_execute, dest, size_on_nodes);
 				if (ret == -EINVAL)
 				{
 					free(size_on_nodes);
@@ -477,7 +466,7 @@ int _starpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, va_
 	}
 	va_end(varg_list_copy);
 
-	if (do_execute == -1)
+	if (*do_execute == -1)
 	{
 		int i;
 		size_t max_size = 0;
@@ -486,37 +475,59 @@ int _starpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, va_
 			if (size_on_nodes[i] > max_size)
 			{
 				max_size = size_on_nodes[i];
-				xrank = i;
+				*xrank = i;
 			}
 		}
-		if (xrank != -1)
+		if (*xrank != -1)
 		{
-			_STARPU_MPI_DEBUG(1, "Node %d is having the most R data\n", xrank);
-			do_execute = 1;
+			_STARPU_MPI_DEBUG(1, "Node %d is having the most R data\n", *xrank);
+			*do_execute = 1;
 		}
 	}
 	free(size_on_nodes);
 
-	STARPU_ASSERT_MSG(do_execute != -1, "StarPU needs to see a W or a REDUX data which will tell it where to execute the task");
+	STARPU_ASSERT_MSG(*do_execute != -1, "StarPU needs to see a W or a REDUX data which will tell it where to execute the task");
 
 	if (inconsistent_execute == 1)
 	{
-		if (xrank == -1)
+		if (*xrank == -1)
 		{
 			_STARPU_MPI_DEBUG(1, "Different tasks are owning W data. Needs to specify which one is to execute the codelet, using STARPU_EXECUTE_ON_NODE or STARPU_EXECUTE_ON_DATA\n");
 			return -EINVAL;
 		}
 		else
 		{
-			do_execute = (me == xrank);
-			dest = xrank;
+			*do_execute = (me == *xrank);
+			*dest = *xrank;
 		}
 	}
-	else if (xrank != -1)
+	else if (*xrank != -1)
 	{
-		do_execute = (me == xrank);
-		dest = xrank;
+		*do_execute = (me == *xrank);
+		*dest = *xrank;
 	}
+	return 0;
+}
+
+static
+int _starpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, struct starpu_task **task, int *xrank_p, int *dest_p, va_list varg_list)
+{
+	int arg_type;
+	va_list varg_list_copy;
+	int me, do_execute, xrank, nb_nodes;
+	size_t arg_buffer_size = 0;
+	void *arg_buffer = NULL;
+	int ret, dest=0;
+	int current_data;
+
+	_STARPU_MPI_LOG_IN();
+
+	MPI_Comm_rank(comm, &me);
+	MPI_Comm_size(comm, &nb_nodes);
+
+	/* Find out whether we are to execute the data because we own the data to be written to. */
+	ret = _starpu_mpi_task_decode_v(codelet, me, nb_nodes, &xrank, &dest, &do_execute, varg_list);
+	if (ret < 0) return ret;
 
 	/* Send and receive data as requested */
 	va_copy(varg_list_copy, varg_list);
@@ -589,7 +600,11 @@ int _starpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, va_
 	}
 	va_end(varg_list_copy);
 
-	if (do_execute)
+	if (xrank_p) *xrank_p = xrank;
+	if (dest_p) *dest_p = dest;
+
+	if (do_execute == 0) return 1;
+	else
 	{
 		/* Get the number of buffers and the size of the arguments */
 		va_copy(varg_list_copy, varg_list);
@@ -606,102 +621,41 @@ int _starpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, va_
 
 		_STARPU_MPI_DEBUG(1, "Execution of the codelet %p (%s)\n", codelet, codelet->name);
 
-		struct starpu_task *task = starpu_task_create();
-		task->cl_arg_free = 1;
+		*task = starpu_task_create();
+		(*task)->cl_arg_free = 1;
 
 		if (codelet->nbuffers > STARPU_NMAXBUFS)
 		{
-			task->dyn_handles = malloc(codelet->nbuffers * sizeof(starpu_data_handle_t));
+			(*task)->dyn_handles = malloc(codelet->nbuffers * sizeof(starpu_data_handle_t));
 		}
 
 		va_copy(varg_list_copy, varg_list);
-		int ret = _starpu_task_insert_create_and_submit(arg_buffer, arg_buffer_size, codelet, &task, varg_list_copy);
+		_starpu_task_insert_create(arg_buffer, arg_buffer_size, codelet, task, varg_list_copy);
 		va_end(varg_list_copy);
-
-		STARPU_ASSERT_MSG(ret==0, "_starpu_task_insert_create_and_submit failure %d", ret);
+		return 0;
 	}
+}
 
-	if (inconsistent_execute)
-	{
-		va_copy(varg_list_copy, varg_list);
-		current_data = 0;
-		while ((arg_type = va_arg(varg_list_copy, int)) != 0)
-		{
-			if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type==STARPU_SCRATCH || arg_type==STARPU_REDUX)
-			{
-				starpu_data_handle_t data = va_arg(varg_list_copy, starpu_data_handle_t);
-				enum starpu_data_access_mode mode = (enum starpu_data_access_mode) arg_type;
-
-				_starpu_mpi_exchange_data_after_execution(data, mode, me, xrank, dest, do_execute, comm);
-				current_data++;
-			}
-			else if (arg_type == STARPU_DATA_ARRAY)
-			{
-				starpu_data_handle_t *datas = va_arg(varg_list_copy, starpu_data_handle_t *);
-				int nb_handles = va_arg(varg_list_copy, int);
-				int i;
+static
+int _starpu_mpi_task_postbuild_v(MPI_Comm comm, struct starpu_codelet *codelet, va_list varg_list, int xrank, int dest, int do_execute)
+{
+	int arg_type;
+	va_list varg_list_copy;
+	int current_data;
+	int me;
 
-				for(i=0 ; i<nb_handles ; i++)
-				{
-					_starpu_mpi_exchange_data_after_execution(datas[i], STARPU_CODELET_GET_MODE(codelet, current_data), me, xrank, dest, do_execute, comm);
-					current_data++;
-				}
-			}
-			else if (arg_type==STARPU_VALUE)
-			{
-				va_arg(varg_list_copy, void *);
-				va_arg(varg_list_copy, size_t);
-			}
-			else if (arg_type==STARPU_CALLBACK)
-			{
-				va_arg(varg_list_copy, void (*)(void *));
-			}
-			else if (arg_type==STARPU_CALLBACK_WITH_ARG)
-			{
-				va_arg(varg_list_copy, void (*)(void *));
-				va_arg(varg_list_copy, void *);
-			}
-			else if (arg_type==STARPU_CALLBACK_ARG)
-			{
-				va_arg(varg_list_copy, void *);
-			}
-			else if (arg_type==STARPU_PRIORITY)
-			{
-				va_arg(varg_list_copy, int);
-			}
-			else if (arg_type==STARPU_EXECUTE_ON_NODE)
-			{
-				va_arg(varg_list_copy, int);
-			}
-			else if (arg_type==STARPU_EXECUTE_ON_DATA)
-			{
-				va_arg(varg_list_copy, starpu_data_handle_t);
-			}
-			else if (arg_type==STARPU_HYPERVISOR_TAG)
-			{
-				(void)va_arg(varg_list_copy, int);
-			}
-			else if (arg_type==STARPU_FLOPS)
-			{
-				(void)va_arg(varg_list_copy, double);
-			}
-			else if (arg_type==STARPU_TAG)
-			{
-				STARPU_ASSERT_MSG(0, "STARPU_TAG is not supported in MPI mode\n");
-			}
-			}
-		va_end(varg_list_copy);
-	}
+	MPI_Comm_rank(comm, &me);
 
 	va_copy(varg_list_copy, varg_list);
 	current_data = 0;
 	while ((arg_type = va_arg(varg_list_copy, int)) != 0)
 	{
-		if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type == STARPU_SCRATCH || arg_type == STARPU_REDUX)
+		if (arg_type==STARPU_R || arg_type==STARPU_W || arg_type==STARPU_RW || arg_type==STARPU_SCRATCH || arg_type==STARPU_REDUX)
 		{
 			starpu_data_handle_t data = va_arg(varg_list_copy, starpu_data_handle_t);
 			enum starpu_data_access_mode mode = (enum starpu_data_access_mode) arg_type;
 
+			_starpu_mpi_exchange_data_after_execution(data, mode, me, xrank, dest, do_execute, comm);
 			_starpu_mpi_clear_data_after_execution(data, mode, me, do_execute, comm);
 			current_data++;
 		}
@@ -713,6 +667,7 @@ int _starpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, va_
 
 			for(i=0 ; i<nb_handles ; i++)
 			{
+				_starpu_mpi_exchange_data_after_execution(datas[i], STARPU_CODELET_GET_MODE(codelet, current_data), me, xrank, dest, do_execute, comm);
 				_starpu_mpi_clear_data_after_execution(datas[i], STARPU_CODELET_GET_MODE(codelet, current_data), me, do_execute, comm);
 				current_data++;
 			}
@@ -766,6 +721,38 @@ int _starpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, va_
 	return 0;
 }
 
+static
+int _starpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, va_list varg_list)
+{
+	struct starpu_task *task;
+	int ret;
+	int xrank;
+	int dest;
+	int do_execute = 0;
+
+	ret = _starpu_mpi_task_build_v(comm, codelet, &task, &xrank, &dest, varg_list);
+	if (ret < 0) return ret;
+
+	if (ret == 0)
+	{
+		do_execute = 1;
+		ret = starpu_task_submit(task);
+
+		if (STARPU_UNLIKELY(ret == -ENODEV))
+		{
+			fprintf(stderr, "submission of task %p wih codelet %p failed (symbol `%s') (err: ENODEV)\n",
+				task, task->cl,
+				(codelet == NULL) ? "none" :
+				task->cl->name ? task->cl->name :
+				(task->cl->model && task->cl->model->symbol)?task->cl->model->symbol:"none");
+
+			task->destroy = 0;
+			starpu_task_destroy(task);
+		}
+	}
+	return _starpu_mpi_task_postbuild_v(comm, codelet, varg_list, xrank, dest, do_execute);
+}
+
 int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 {
 	va_list varg_list;
@@ -788,6 +775,37 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 	return ret;
 }
 
+struct starpu_task *starpu_mpi_task_build(MPI_Comm comm, struct starpu_codelet *codelet, ...)
+{
+	va_list varg_list;
+	struct starpu_task *task;
+	int ret;
+
+	va_start(varg_list, codelet);
+	ret = _starpu_mpi_task_build_v(comm, codelet, &task, NULL, NULL, varg_list);
+	va_end(varg_list);
+	STARPU_ASSERT(ret >= 0);
+	if (ret > 0) return NULL; else return task;
+}
+
+int starpu_mpi_task_post_build(MPI_Comm comm, struct starpu_codelet *codelet, ...)
+{
+	int xrank, dest, do_execute;
+	int ret, me, nb_nodes;
+	va_list varg_list;
+
+	MPI_Comm_rank(comm, &me);
+	MPI_Comm_size(comm, &nb_nodes);
+
+	va_start(varg_list, codelet);
+	/* Find out whether we are to execute the data because we own the data to be written to. */
+	ret = _starpu_mpi_task_decode_v(codelet, me, nb_nodes, &xrank, &dest, &do_execute, varg_list);
+	if (ret < 0) return ret;
+	va_end(varg_list);
+
+	return _starpu_mpi_task_postbuild_v(comm, codelet, varg_list, xrank, dest, do_execute);
+}
+
 void starpu_mpi_get_data_on_node_detached(MPI_Comm comm, starpu_data_handle_t data_handle, int node, void (*callback)(void*), void *arg)
 {
 	int me, rank, tag;