Explorar el Código

mpi/src/starpu_mpi_insert_task.c: change the algorithm for starpu_mpi_redux_data (see comments inside function implementation), to make sure all the steps are executed after data_handle being reduced finished its last read access.

Nathalie Furmento hace 11 años
padre
commit
ef3823b148
Se han modificado 1 ficheros con 100 adiciones y 25 borrados
  1. 100 25
      mpi/src/starpu_mpi_insert_task.c

+ 100 - 25
mpi/src/starpu_mpi_insert_task.c

@@ -809,6 +809,59 @@ void starpu_mpi_get_data_on_node(MPI_Comm comm, starpu_data_handle_t data_handle
 	}
 }
 
+struct _starpu_mpi_redux_data_args
+{
+	starpu_data_handle_t data_handle;
+	starpu_data_handle_t new_handle;
+	int tag;
+	int node;
+	MPI_Comm comm;
+	struct starpu_task *taskB;
+};
+
+void _starpu_mpi_redux_data_dummy_func(STARPU_ATTRIBUTE_UNUSED void *buffers[], STARPU_ATTRIBUTE_UNUSED void *cl_arg)
+{
+}
+
+struct starpu_codelet _starpu_mpi_redux_data_read_cl =
+{
+	.cpu_funcs = {_starpu_mpi_redux_data_dummy_func, NULL},
+	.cuda_funcs = {_starpu_mpi_redux_data_dummy_func, NULL},
+	.opencl_funcs = {_starpu_mpi_redux_data_dummy_func, NULL},
+	.nbuffers = 1,
+	.modes = {STARPU_R},
+	.name = "_starpu_mpi_redux_data_read_cl"
+};
+
+struct starpu_codelet _starpu_mpi_redux_data_readwrite_cl =
+{
+	.cpu_funcs = {_starpu_mpi_redux_data_dummy_func, NULL},
+	.cuda_funcs = {_starpu_mpi_redux_data_dummy_func, NULL},
+	.opencl_funcs = {_starpu_mpi_redux_data_dummy_func, NULL},
+	.nbuffers = 1,
+	.modes = {STARPU_RW},
+	.name = "_starpu_mpi_redux_data_write_cl"
+};
+
+void _starpu_mpi_redux_data_detached_callback(void *arg)
+{
+	struct _starpu_mpi_redux_data_args *args = (struct _starpu_mpi_redux_data_args *) arg;
+
+	STARPU_TASK_SET_HANDLE(args->taskB, args->new_handle, 1);
+	int ret = starpu_task_submit(args->taskB);
+	STARPU_ASSERT(ret == 0);
+
+	starpu_data_unregister_submit(args->new_handle);
+}
+
+void _starpu_mpi_redux_data_recv_callback(void *callback_arg)
+{
+	struct _starpu_mpi_redux_data_args *args = (struct _starpu_mpi_redux_data_args *) callback_arg;
+	starpu_data_register_same(&args->new_handle, args->data_handle);
+
+	starpu_mpi_irecv_detached_sequential_consistency(args->new_handle, args->node, args->tag, args->comm, _starpu_mpi_redux_data_detached_callback, args, 0);
+}
+
 /* TODO: this should rather be implicitly called by starpu_mpi_insert_task when
  * a data previously accessed in REDUX mode gets accessed in R mode. */
 void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
@@ -836,40 +889,62 @@ void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
 	// need to count how many nodes have the data in redux mode
 	if (me == rank)
 	{
-		int i;
+		int i, j=0;
+		struct starpu_task *taskBs[nb_nodes];
 
 		for(i=0 ; i<nb_nodes ; i++)
 		{
 			if (i != rank)
 			{
-				starpu_data_handle_t new_handle;
-
-				starpu_data_register_same(&new_handle, data_handle);
-
-				_STARPU_MPI_DEBUG(1, "Receiving redux handle from %d in %p ...\n", i, new_handle);
-
-				/* FIXME: we here allocate a lot of data: one
-				 * instance per MPI node and per number of
-				 * times we are called. We should rather do
-				 * that much later, e.g. after data_handle
-				 * finished its last read access, by submitting
-				 * an empty task A reading data_handle whose
-				 * callback submits the mpi comm, whose
-				 * callback submits the redux_cl task B with
-				 * sequential consistency set to 0, and submit
-				 * an empty task C writing data_handle and
-				 * depending on task B, just to replug with
-				 * implicit data dependencies with tasks
-				 * inserted after this reduction.
+				/* We need to make sure all is
+				 * executed after data_handle finished
+				 * its last read access, we hence do
+				 * the following:
+				 * - submit an empty task A reading
+				 * data_handle whose callback submits
+				 * the mpi comm with sequential
+				 * consistency set to 0, whose
+				 * callback submits the redux_cl task
+				 * B with sequential consistency set
+				 * to 0,
+				 * - submit an empty task C reading
+				 * and writing data_handle and
+				 * depending on task B, just to replug
+				 * with implicit data dependencies
+				 * with tasks inserted after this
+				 * reduction.
 				 */
-				starpu_mpi_irecv_detached(new_handle, i, tag, comm, NULL, NULL);
-				starpu_insert_task(data_handle->redux_cl,
-						   STARPU_RW, data_handle,
-						   STARPU_R, new_handle,
+
+				/* FIXME: free args */
+				struct _starpu_mpi_redux_data_args *args = malloc(sizeof(struct _starpu_mpi_redux_data_args));
+				args->data_handle = data_handle;
+				args->tag = tag;
+				args->node = i;
+				args->comm = comm;
+
+				// We need to create taskB early as
+				// taskC declares a dependancy on it
+				args->taskB = starpu_task_create();
+				args->taskB->cl = args->data_handle->redux_cl;
+				args->taskB->sequential_consistency = 0;
+				STARPU_TASK_SET_HANDLE(args->taskB, args->data_handle, 0);
+				taskBs[j] = args->taskB; j++;
+
+				// Submit taskA
+				starpu_insert_task(&_starpu_mpi_redux_data_read_cl,
+						   STARPU_R, data_handle,
+						   STARPU_CALLBACK_WITH_ARG, _starpu_mpi_redux_data_recv_callback, args,
 						   0);
-				starpu_data_unregister_submit(new_handle);
 			}
 		}
+
+		// Submit taskC which depends on all taskBs created
+		struct starpu_task *taskC = starpu_task_create();
+		taskC->cl = &_starpu_mpi_redux_data_readwrite_cl;
+		STARPU_TASK_SET_HANDLE(taskC, data_handle, 0);
+		starpu_task_declare_deps_array(taskC, j, taskBs);
+		int ret = starpu_task_submit(taskC);
+		STARPU_ASSERT(ret == 0);
 	}
 	else
 	{