Selaa lähdekoodia

initial solution with additional method to select arity of the reduction tree - untested not yet compiled draft code

Antoine JEGO 4 vuotta sitten
vanhempi
commit
f186286579
3 muutettua tiedostoa jossa 90 lisäystä ja 153 poistoa
  1. 1 2
      mpi/src/starpu_mpi.c
  2. 0 6
      mpi/src/starpu_mpi_private.h
  3. 89 145
      mpi/src/starpu_mpi_task_insert.c

+ 1 - 2
mpi/src/starpu_mpi.c

@@ -393,8 +393,7 @@ void _starpu_mpi_data_clear(starpu_data_handle_t data_handle)
 	_mpi_backend._starpu_mpi_backend_data_clear(data_handle);
 	_starpu_mpi_cache_data_clear(data_handle);
 	_starpu_spin_destroy(&data->coop_lock);
-	if (data->redux_map != REDUX_CONTRIB)
-		free(data->redux_map);
+	free(data->redux_map);
 	free(data);
 	data_handle->mpi_data = NULL;
 }

+ 0 - 6
mpi/src/starpu_mpi_private.h

@@ -203,12 +203,6 @@ struct _starpu_mpi_coop_sends
 	long pre_sync_jobid;
 };
 
-/** cf. redux_map field : this is the value
- * put in this field whenever a node contributes
- * to the reduction of the data.
- * Only the owning node keeps track of all the contributing nodes. */
-#define REDUX_CONTRIB ((char*) -1)
-
 /** Initialized in starpu_mpi_data_register_comm */
 struct _starpu_mpi_data
 {

+ 89 - 145
mpi/src/starpu_mpi_task_insert.c

@@ -628,7 +628,7 @@ int _starpu_mpi_task_postbuild_v(MPI_Comm comm, int xrank, int do_execute, struc
 		if ((descrs[i].mode & STARPU_REDUX || descrs[i].mode & STARPU_MPI_REDUX) && descrs[i].handle)
 		{
 			struct _starpu_mpi_data *mpi_data = (struct _starpu_mpi_data *) descrs[i].handle->mpi_data;
-			if (me == starpu_mpi_data_get_rank(descrs[i].handle))
+			if (me == starpu_mpi_data_get_rank(descrs[i].handle) || me == xrank)
 			{
 				int size;
 				starpu_mpi_comm_size(comm, &size);
@@ -636,8 +636,6 @@ int _starpu_mpi_task_postbuild_v(MPI_Comm comm, int xrank, int do_execute, struc
 					_STARPU_CALLOC(mpi_data->redux_map, size, sizeof(mpi_data->redux_map[0]));
 				mpi_data->redux_map [xrank] = 1;
 			}
-			else if (me == xrank)
-				mpi_data->redux_map = REDUX_CONTRIB;
 		}
 		_starpu_mpi_exchange_data_after_execution(descrs[i].handle, descrs[i].mode, me, xrank, do_execute, prio, comm);
 		_starpu_mpi_clear_data_after_execution(descrs[i].handle, descrs[i].mode, me, do_execute);
@@ -781,67 +779,30 @@ static struct starpu_perfmodel dumb_model =
 	.cost_function	= cost_function
 };
 
-/* FIXME: we can probably use STARPU_NOWHERE for these? */
-static
-struct starpu_codelet _starpu_mpi_redux_data_read_cl =
-{
-	.cpu_funcs = {_starpu_mpi_redux_data_dummy_func},
-	.cuda_funcs = {_starpu_mpi_redux_data_dummy_func},
-	.opencl_funcs = {_starpu_mpi_redux_data_dummy_func},
-	.nbuffers = 1,
-	.modes = {STARPU_R},
-	.model = &dumb_model,
-	.name = "_starpu_mpi_redux_data_read_cl"
-};
-
-struct starpu_codelet _starpu_mpi_redux_data_readwrite_cl =
+struct starpu_codelet _starpu_mpi_redux_data_synchro_cl = 
 {
-	.cpu_funcs = {_starpu_mpi_redux_data_dummy_func},
-	.cuda_funcs = {_starpu_mpi_redux_data_dummy_func},
-	.opencl_funcs = {_starpu_mpi_redux_data_dummy_func},
-	.nbuffers = 1,
-	.modes = {STARPU_RW},
-	.model = &dumb_model,
-	.name = "_starpu_mpi_redux_data_write_cl"
+	.where = STARPU_NOWHERE,
+	.modes = {STARPU_R, STARPU_W}
 };
 
-static
-void _starpu_mpi_redux_data_detached_callback(void *arg)
-{
-	struct _starpu_mpi_redux_data_args *args = (struct _starpu_mpi_redux_data_args *) arg;
-
-	STARPU_TASK_SET_HANDLE(args->taskB, args->new_handle, 1);
-	int ret = starpu_task_submit(args->taskB);
-	STARPU_ASSERT(ret == 0);
-
-	starpu_data_unregister_submit(args->new_handle);
-	free(args);
-}
-
-static
-void _starpu_mpi_redux_data_recv_callback(void *callback_arg)
+void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
 {
-	struct _starpu_mpi_redux_data_args *args = (struct _starpu_mpi_redux_data_args *) callback_arg;
-	starpu_data_register_same(&args->new_handle, args->data_handle);
-
-	starpu_mpi_irecv_detached_sequential_consistency(args->new_handle, args->node, args->data_tag, args->comm, _starpu_mpi_redux_data_detached_callback, args, 0);
+	return starpu_mpi_redux_data_prio(comm, data_handle, 0);
 }
 
-
-void _starpu_mpi_redux_fill_post_sync_jobid(const void * const redux_data_args, long * const post_sync_jobid)
+void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle, int prio)
 {
-	*post_sync_jobid = ((const struct _starpu_mpi_redux_data_args *) redux_data_args)->taskC_jobid;
+	int nb_nodes, nb_contrib;
+	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
+	starpu_mpi_comm_size(comm, &nb_nodes);
+	for (i=0;i<nb_nodes;i++) 
+	{
+		if (mpi_data->redux_map[i]) nb_contrib++;
+	}
+	return starpu_mpi_redux_data_prio_tree(comm, data_handle, prio, nb_contrib);
 }
 
-
-/* TODO: this should rather be implicitly called by starpu_mpi_task_insert when
- * a data previously accessed in REDUX mode gets accessed in R mode. */
-/* FIXME: In order to prevent simultaneous receive submissions
- * on the same handle, we need to wait that all the starpu_mpi
- * tasks are done before submitting next tasks. The current
- * version of the implementation does not support multiple
- * simultaneous receive requests on the same handle.*/
-void starpu_mpi_redux_data_prio(MPI_Comm comm, starpu_data_handle_t data_handle, int prio)
+void starpu_mpi_redux_data_prio_tree(MPI_Comm comm, starpu_data_handle_t data_handle, int prio, int arity)
 {
 	int me, rank, nb_nodes;
 	starpu_mpi_tag_t data_tag;
@@ -861,102 +822,85 @@ void starpu_mpi_redux_data_prio(MPI_Comm comm, starpu_data_handle_t data_handle,
 	starpu_mpi_comm_rank(comm, &me);
 	starpu_mpi_comm_size(comm, &nb_nodes);
 
-	_STARPU_MPI_DEBUG(50, "Doing reduction for data %p on node %d with %d nodes ...\n", data_handle, rank, nb_nodes);
-	// need to count how many nodes have the data in redux mode
-	if (me == rank)
+	int current_step, nb_contrib, next_nb_contrib;
+	int i, j, substep, node;
+	char root_in_substep, me_in_substep;
+	for (i=0;i<nb_nodes;i++) 
 	{
-		int i,j;
-		_STARPU_MPI_DEBUG(50, "Who is in the map ?\n");
-		for (j = 0; j<nb_nodes; j++)
-		{
-			_STARPU_MPI_DEBUG(50, "%d is in the map ? %d\n", j, mpi_data->redux_map[j]);
-		}
-
-		// taskC depends on all taskBs created
-		// Creating synchronization task and use its jobid for tracing
-		struct starpu_task *taskC = starpu_task_create();
-		const long taskC_jobid = starpu_task_get_job_id(taskC);
-		taskC->cl = &_starpu_mpi_redux_data_readwrite_cl;
-		STARPU_TASK_SET_HANDLE(taskC, data_handle, 0);
-
-		for(i=0 ; i<nb_nodes ; i++)
+		if (mpi_data->redux_map[i]) nb_contrib++;
+	}
+	int contributors[nb_contrib];
+	int reducing_node;
+	j=0;
+	for (i=0;i<nb_nodes;i++)
+	{
+		if (mpi_data->redux_map[i]) contributors[j++] = i;
+	}
+	if (mpi_data->redux_map != NULL) free(mpi_data->redux_map);
+	// Creating synchronization task and use its jobid for tracing
+	struct starpu_task *synchro = starpu_task_create();
+	const long synchro_jobid = starpu_task_get_job_id(synchro);
+	synchro->cl = &_starpu_mpi_redux_data_synchro_cl;
+	STARPU_TASK_SET_HANDLE(synchro, data_handle, 0);
+	current_step = 0;
+	current_step = 0;
+	while (nb_contrib != 1)
+	{
+		if (nb_contrib%arity == 0) next_nb_contrib = nb_contrib/arity;
+		else next_nb_contrib = nb_contrib/arity + 1;
+		for (substep = 0; substep < next_nb_contrib; substep++) 
 		{
-			if (i != rank && mpi_data->redux_map[i])
+			for (node = substep*arity ; node < nb_nodes && node < (substep+1)*arity ; node++)
 			{
-				_STARPU_MPI_DEBUG(5, "%d takes part in the reduction of %p \n", i, data_handle);
-				/* We need to make sure all is
-				 * executed after data_handle finished
-				 * its last read access, we hence do
-				 * the following:
-				 * - submit an empty task A reading
-				 * data_handle whose callback submits
-				 * the mpi comm with sequential
-				 * consistency set to 0, whose
-				 * callback submits the redux_cl task
-				 * B with sequential consistency set
-				 * to 0,
-				 * - submit an empty task C reading
-				 * and writing data_handle and
-				 * depending on task B, just to replug
-				 * with implicit data dependencies
-				 * with tasks inserted after this
-				 * reduction.
-				 */
-
-				struct _starpu_mpi_redux_data_args *args;
-				_STARPU_MPI_MALLOC(args, sizeof(struct _starpu_mpi_redux_data_args));
-				args->data_handle = data_handle;
-				args->data_tag = data_tag;
-				args->node = i;
-				args->comm = comm;
-
-				args->taskC_jobid = taskC_jobid;
-
-				// We need to create taskB early as
-				// taskC declares a dependancy on it
-				args->taskB = starpu_task_create();
-				args->taskB->cl = args->data_handle->redux_cl;
-				args->taskB->sequential_consistency = 0;
-				STARPU_TASK_SET_HANDLE(args->taskB, args->data_handle, 0);
-
-				starpu_task_declare_deps_array(taskC, 1, &args->taskB);
-
-				// Submit taskA
-				starpu_task_insert(&_starpu_mpi_redux_data_read_cl,
-						   STARPU_R, data_handle,
-						   STARPU_CALLBACK_WITH_ARG_NFREE, _starpu_mpi_redux_data_recv_callback, args,
-						   0);
+				if (contributors[node] == rank) root_in_substep = 1;
+				if (contributors[node] == me) me_in_substep = 1;
 			}
-			else
+			/* FIXME: if the root node is note in the substep, then we agree the node
+			 * with the lowest id reduces the substep : we could agree on another
+			 * node to better load balance in the case of multiple reductions involving
+			 * the same sets of nodes 
+			 */
+			if (root_in_substep) reducing_node = rank;
+			else reducing_node = contributors[substep*arity]; 
+
+			if (me == reducing_node)
 			{
-				_STARPU_MPI_DEBUG(5, "%d is not in the map or is me\n", i);
+				for (node = substep*arity ; node < nb_contrib && node < (substep+1)*arity ; node++)
+				{
+					if (me != contributors[node]) {
+						_STARPU_MPI_DEBUG(5, "%d takes part in the reduction of %p towards %d (step %dth substep %dth) \n", 
+								contributors[node], data_handle, reducing_node, current_step, substep);
+						/* We need to make sure all is
+						 * executed after data_handle finished
+						 * its last read access, we hence do
+						 * the following:
+						 * - submit an empty task A reading
+							 * data_handle
+						 * - submit the reducing task B
+						 * reading and writing data_handle and
+						 * depending on task A through sequencial
+						 * consistency
+						 */
+						starpu_data_handle_t new_handle;
+						starpu_data_register_same($new_handle, handle);
+						/* Task A */
+				       	        starpu_task_insert(synchro, STARPU_R, handle, STARPU_W, new_handle, 0);
+				       	        starpu_mpi_irecv_detached_prio(new_handle, contributors[node], data_tag, prio, comm, NULL, NULL);
+					        /* Task B */
+				       		starpu_task_insert(data_handle->redux_cl, STARPU_RW|STARPU_COMMUTE, data_handle, STARPU_R, new_handle, 0);
+				
+					}
+				}
 			}
+			else if (me_in_substep)
+			{
+				_STARPU_MPI_DEBUG(5, "Sending redux handle to %d ...\n", reducing_node);
+				starpu_mpi_isend_detached_prio(data_handle, reducing_node, data_tag, prio, comm, NULL, NULL);
+				starpu_data_invalidate_submit(data_handle);
+			}
+			contributors[step] = reducing_node;
 		}
-
-		int ret = starpu_task_submit(taskC);
-		STARPU_ASSERT(ret == 0);
+		nb_contrib = next_nb_contrib;
+		current_level++;
 	}
-	else if (mpi_data->redux_map)
-	{
-		STARPU_ASSERT(mpi_data->redux_map == REDUX_CONTRIB);
-		_STARPU_MPI_DEBUG(5, "Sending redux handle to %d ...\n", rank);
-		starpu_mpi_isend_detached_prio(data_handle, rank, data_tag, prio, comm, NULL, NULL);
-		starpu_data_invalidate_submit(data_handle);
-	}
-	else
-	{
-		_STARPU_MPI_DEBUG(5, "I am not in the map of %d, I am %d ...\n", rank, me);
-	}
-	if (mpi_data->redux_map != NULL)
-	{
-		_STARPU_MPI_DEBUG(100, "waiting for redux tasks with %d\n", rank);
-		starpu_task_wait_for_all();
-	}
-	if (me == rank)
-		free(mpi_data->redux_map);
-	mpi_data->redux_map = NULL;
-}
-void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
-{
-	return starpu_mpi_redux_data_prio(comm, data_handle, 0);
 }