Browse Source

almost working example, fixing typo and registration of redux_map ; added an example for testing purpose ; FIXME: segfault with fxt on shutdown

Antoine JEGO 4 years ago
parent
commit
0c63b08843

+ 15 - 5
mpi/examples/Makefile.am

@@ -377,20 +377,30 @@ endif
 
 
 if STARPU_HAVE_MPIFORT
 if STARPU_HAVE_MPIFORT
 if !STARPU_SANITIZE
 if !STARPU_SANITIZE
-examplebin_PROGRAMS +=		\
-	native_fortran/nf_mpi_redux
+examplebin_PROGRAMS +=				\
+	native_fortran/nf_mpi_redux		\
+	native_fortran/nf_mpi_redux_tree
 
 
-native_fortran_nf_mpi_redux_SOURCES	=			\
+native_fortran_nf_mpi_redux_SOURCES =		\
 	native_fortran/fstarpu_mpi_mod.f90	\
 	native_fortran/fstarpu_mpi_mod.f90	\
 	native_fortran/fstarpu_mod.f90		\
 	native_fortran/fstarpu_mod.f90		\
 	native_fortran/nf_mpi_redux.f90	
 	native_fortran/nf_mpi_redux.f90	
 
 
-native_fortran_nf_mpi_redux_LDADD =					\
+native_fortran_nf_mpi_redux_LDADD =		\
+	-lm
+
+native_fortran_nf_mpi_redux_tree_SOURCES =	\
+	native_fortran/fstarpu_mpi_mod.f90	\
+	native_fortran/fstarpu_mod.f90		\
+	native_fortran/nf_mpi_redux_tree.f90	
+
+native_fortran_nf_mpi_redux_tree_LDADD =	\
 	-lm
 	-lm
 
 
 if !STARPU_SIMGRID
 if !STARPU_SIMGRID
 starpu_mpi_EXAMPLES +=				\
 starpu_mpi_EXAMPLES +=				\
-	native_fortran/nf_mpi_redux
+	native_fortran/nf_mpi_redux		\
+	native_fortran/nf_mpi_redux_tree
 endif
 endif
 endif
 endif
 endif
 endif

+ 19 - 0
mpi/include/fstarpu_mpi_mod.f90

@@ -304,6 +304,25 @@ module fstarpu_mpi_mod
                         integer(c_int), value, intent(in) :: prio
                         integer(c_int), value, intent(in) :: prio
                 end subroutine fstarpu_mpi_redux_data_prio
                 end subroutine fstarpu_mpi_redux_data_prio
 
 
+                ! void starpu_mpi_redux_data_tree(MPI_Comm comm, starpu_data_handle_t data_handle, int arity);
+                subroutine fstarpu_mpi_redux_data_tree(mpi_comm,dh, arity) bind(C)
+                        use iso_c_binding
+                        implicit none
+                        integer(c_int), value, intent(in) :: mpi_comm
+                        type(c_ptr), value, intent(in) :: dh
+                        integer(c_int), value, intent(in) :: arity
+                end subroutine fstarpu_mpi_redux_data_tree
+
+                ! void starpu_mpi_redux_data_prio_tree(MPI_Comm comm, starpu_data_handle_t data_handle, int prio, int arity);
+                subroutine fstarpu_mpi_redux_data_prio_tree(mpi_comm,dh, prio, arity) bind(C)
+                        use iso_c_binding
+                        implicit none
+                        integer(c_int), value, intent(in) :: mpi_comm
+                        type(c_ptr), value, intent(in) :: dh
+                        integer(c_int), value, intent(in) :: prio
+                        integer(c_int), value, intent(in) :: arity
+                end subroutine fstarpu_mpi_redux_data_prio_tree
+
                 ! int starpu_mpi_scatter_detached(starpu_data_handle_t *data_handles, int count, int root, MPI_Comm comm, void (*scallback)(void *), void *sarg, void (*rcallback)(void *), void *rarg);
                 ! int starpu_mpi_scatter_detached(starpu_data_handle_t *data_handles, int count, int root, MPI_Comm comm, void (*scallback)(void *), void *sarg, void (*rcallback)(void *), void *rarg);
                 function fstarpu_mpi_scatter_detached (dhs, cnt, root, mpi_comm, scallback, sarg, rcallback, rarg) bind(C)
                 function fstarpu_mpi_scatter_detached (dhs, cnt, root, mpi_comm, scallback, sarg, rcallback, rarg) bind(C)
                         use iso_c_binding
                         use iso_c_binding

+ 11 - 0
mpi/include/starpu_mpi.h

@@ -796,6 +796,17 @@ void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle);
 void starpu_mpi_redux_data_prio(MPI_Comm comm, starpu_data_handle_t data_handle, int prio);
 void starpu_mpi_redux_data_prio(MPI_Comm comm, starpu_data_handle_t data_handle, int prio);
 
 
 /**
 /**
+   Perform a reduction on the given data \p handle. 
+   Nodes perform the reduction through in a tree-based fashion.
+   The tree use is an \p arity - ary tree.
+*/
+void starpu_mpi_redux_data_tree(MPI_Comm comm, starpu_data_handle_t data_handle, int arity);
+
+/**
+   Similar to starpu_mpi_redux_data_tree, but take a priority \p prio.
+*/
+void starpu_mpi_redux_data_prio_tree(MPI_Comm comm, starpu_data_handle_t data_handle, int prio, int arity);
+/**
    Scatter data among processes of the communicator based on the
    Scatter data among processes of the communicator based on the
    ownership of the data. For each data of the array \p data_handles,
    ownership of the data. For each data of the array \p data_handles,
    the process \p root sends the data to the process owning this data.
    the process \p root sends the data to the process owning this data.

+ 2 - 1
mpi/src/starpu_mpi.c

@@ -393,7 +393,8 @@ void _starpu_mpi_data_clear(starpu_data_handle_t data_handle)
 	_mpi_backend._starpu_mpi_backend_data_clear(data_handle);
 	_mpi_backend._starpu_mpi_backend_data_clear(data_handle);
 	_starpu_mpi_cache_data_clear(data_handle);
 	_starpu_mpi_cache_data_clear(data_handle);
 	_starpu_spin_destroy(&data->coop_lock);
 	_starpu_spin_destroy(&data->coop_lock);
-	free(data->redux_map);
+	if (data->redux_map != NULL)
+		free(data->redux_map);
 	free(data);
 	free(data);
 	data_handle->mpi_data = NULL;
 	data_handle->mpi_data = NULL;
 }
 }

+ 8 - 0
mpi/src/starpu_mpi_fortran.c

@@ -123,6 +123,14 @@ void fstarpu_mpi_redux_data_prio(MPI_Fint comm, starpu_data_handle_t data_handle
 {
 {
 	starpu_mpi_redux_data_prio(MPI_Comm_f2c(comm), data_handle, prio);
 	starpu_mpi_redux_data_prio(MPI_Comm_f2c(comm), data_handle, prio);
 }
 }
+void fstarpu_mpi_redux_data_tree(MPI_Fint comm, starpu_data_handle_t data_handle, int arity)
+{
+	starpu_mpi_redux_data_tree(MPI_Comm_f2c(comm), data_handle, arity);
+}
+void fstarpu_mpi_redux_data_prio_tree(MPI_Fint comm, starpu_data_handle_t data_handle, int prio, int arity)
+{
+	starpu_mpi_redux_data_prio_tree(MPI_Comm_f2c(comm), data_handle, prio, arity);
+}
 
 
 /* scatter/gather */
 /* scatter/gather */
 int fstarpu_mpi_scatter_detached(starpu_data_handle_t *data_handles, int cnt, int root, MPI_Fint comm, void (*scallback)(void *), void *sarg, void (*rcallback)(void *), void *rarg)
 int fstarpu_mpi_scatter_detached(starpu_data_handle_t *data_handles, int cnt, int root, MPI_Fint comm, void (*scallback)(void *), void *sarg, void (*rcallback)(void *), void *rarg)

+ 65 - 53
mpi/src/starpu_mpi_task_insert.c

@@ -618,7 +618,7 @@ int _starpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, stru
 
 
 int _starpu_mpi_task_postbuild_v(MPI_Comm comm, int xrank, int do_execute, struct starpu_data_descr *descrs, int nb_data, int prio)
 int _starpu_mpi_task_postbuild_v(MPI_Comm comm, int xrank, int do_execute, struct starpu_data_descr *descrs, int nb_data, int prio)
 {
 {
-	int me, i;
+	int me, i, rrank;
 
 
 	_STARPU_TRACE_TASK_MPI_POST_START();
 	_STARPU_TRACE_TASK_MPI_POST_START();
 	starpu_mpi_comm_rank(comm, &me);
 	starpu_mpi_comm_rank(comm, &me);
@@ -628,14 +628,13 @@ int _starpu_mpi_task_postbuild_v(MPI_Comm comm, int xrank, int do_execute, struc
 		if ((descrs[i].mode & STARPU_REDUX || descrs[i].mode & STARPU_MPI_REDUX) && descrs[i].handle)
 		if ((descrs[i].mode & STARPU_REDUX || descrs[i].mode & STARPU_MPI_REDUX) && descrs[i].handle)
 		{
 		{
 			struct _starpu_mpi_data *mpi_data = (struct _starpu_mpi_data *) descrs[i].handle->mpi_data;
 			struct _starpu_mpi_data *mpi_data = (struct _starpu_mpi_data *) descrs[i].handle->mpi_data;
-			if (me == starpu_mpi_data_get_rank(descrs[i].handle) || me == xrank)
-			{
-				int size;
-				starpu_mpi_comm_size(comm, &size);
-				if (mpi_data->redux_map == NULL)
-					_STARPU_CALLOC(mpi_data->redux_map, size, sizeof(mpi_data->redux_map[0]));
-				mpi_data->redux_map [xrank] = 1;
-			}
+			int rrank = starpu_mpi_data_get_rank(descrs[i].handle);
+			int size;
+			starpu_mpi_comm_size(comm, &size);
+			if (mpi_data->redux_map == NULL)
+				_STARPU_CALLOC(mpi_data->redux_map, size, sizeof(mpi_data->redux_map[0]));
+			mpi_data->redux_map [xrank] = 1;
+			mpi_data->redux_map [rrank] = 1;
 		}
 		}
 		_starpu_mpi_exchange_data_after_execution(descrs[i].handle, descrs[i].mode, me, xrank, do_execute, prio, comm);
 		_starpu_mpi_exchange_data_after_execution(descrs[i].handle, descrs[i].mode, me, xrank, do_execute, prio, comm);
 		_starpu_mpi_clear_data_after_execution(descrs[i].handle, descrs[i].mode, me, do_execute);
 		_starpu_mpi_clear_data_after_execution(descrs[i].handle, descrs[i].mode, me, do_execute);
@@ -749,6 +748,13 @@ int starpu_mpi_task_post_build(MPI_Comm comm, struct starpu_codelet *codelet, ..
 	return _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data, prio);
 	return _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data, prio);
 }
 }
 
 
+struct starpu_codelet _starpu_mpi_redux_data_synchro_cl = 
+{
+	.where = STARPU_NOWHERE,
+	.modes = {STARPU_R, STARPU_W},
+	.nbuffers = 2
+};
+
 struct _starpu_mpi_redux_data_args
 struct _starpu_mpi_redux_data_args
 {
 {
 	starpu_data_handle_t data_handle;
 	starpu_data_handle_t data_handle;
@@ -760,47 +766,12 @@ struct _starpu_mpi_redux_data_args
 	long taskC_jobid;
 	long taskC_jobid;
 };
 };
 
 
-void _starpu_mpi_redux_data_dummy_func(void *buffers[], void *cl_arg)
+void _starpu_mpi_redux_fill_post_sync_jobid(const void * const redux_data_args, long * const post_sync_jobid)
 {
 {
-	(void)buffers;
-	(void)cl_arg;
+	*post_sync_jobid = ((const struct _starpu_mpi_redux_data_args *) redux_data_args)->taskC_jobid;
 }
 }
 
 
-/* Dummy cost function for simgrid */
-static double cost_function(struct starpu_task *task, unsigned nimpl)
-{
-	(void)task;
-	(void)nimpl;
-	return 0.000001;
-}
-static struct starpu_perfmodel dumb_model =
-{
-	.type		= STARPU_COMMON,
-	.cost_function	= cost_function
-};
 
 
-struct starpu_codelet _starpu_mpi_redux_data_synchro_cl = 
-{
-	.where = STARPU_NOWHERE,
-	.modes = {STARPU_R, STARPU_W}
-};
-
-void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
-{
-	return starpu_mpi_redux_data_prio(comm, data_handle, 0);
-}
-
-void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle, int prio)
-{
-	int nb_nodes, nb_contrib;
-	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
-	starpu_mpi_comm_size(comm, &nb_nodes);
-	for (i=0;i<nb_nodes;i++) 
-	{
-		if (mpi_data->redux_map[i]) nb_contrib++;
-	}
-	return starpu_mpi_redux_data_prio_tree(comm, data_handle, prio, nb_contrib);
-}
 
 
 void starpu_mpi_redux_data_prio_tree(MPI_Comm comm, starpu_data_handle_t data_handle, int prio, int arity)
 void starpu_mpi_redux_data_prio_tree(MPI_Comm comm, starpu_data_handle_t data_handle, int prio, int arity)
 {
 {
@@ -825,33 +796,47 @@ void starpu_mpi_redux_data_prio_tree(MPI_Comm comm, starpu_data_handle_t data_ha
 	int current_level, nb_contrib, next_nb_contrib;
 	int current_level, nb_contrib, next_nb_contrib;
 	int i, j, step, node;
 	int i, j, step, node;
 	char root_in_step, me_in_step;
 	char root_in_step, me_in_step;
+	// https://stackoverflow.com/questions/109023/how-to-count-the-number-of-set-bits-in-a-32-bit-integer
+	// https://stackoverflow.com/a/109025
+	// see hamming weight
+	//nb_contrib = std::popcount(mpi_data->redux_map); // most preferable
+	nb_contrib=0;
 	for (i=0;i<nb_nodes;i++) 
 	for (i=0;i<nb_nodes;i++) 
 	{
 	{
+		_STARPU_MPI_DEBUG(5, "mpi_data->redux_map[%d] = %d\n", i, mpi_data->redux_map[i]);
 		if (mpi_data->redux_map[i]) nb_contrib++;
 		if (mpi_data->redux_map[i]) nb_contrib++;
 	}
 	}
+	_STARPU_MPI_DEBUG(5, "There is %d contributors\n", nb_contrib);
 	int contributors[nb_contrib];
 	int contributors[nb_contrib];
 	int reducing_node;
 	int reducing_node;
 	j=0;
 	j=0;
 	for (i=0;i<nb_nodes;i++)
 	for (i=0;i<nb_nodes;i++)
 	{
 	{
+		_STARPU_MPI_DEBUG(5, "%d in reduction ? %d\n", i, mpi_data->redux_map[i]); 
 		if (mpi_data->redux_map[i]) contributors[j++] = i;
 		if (mpi_data->redux_map[i]) contributors[j++] = i;
 	}
 	}
-	if (mpi_data->redux_map != NULL) free(mpi_data->redux_map);
+	for (i=0;i<nb_contrib;i++)
+	{
+		_STARPU_MPI_DEBUG(5, "%dth contributor = %d\n", i, contributors[i]); 
+	}
 	// Creating synchronization task and use its jobid for tracing
 	// Creating synchronization task and use its jobid for tracing
 	struct starpu_task *synchro = starpu_task_create();
 	struct starpu_task *synchro = starpu_task_create();
-	const long synchro_jobid = starpu_task_get_job_id(synchro);
+//	const long synchro_jobid = starpu_task_get_job_id(synchro);
 	synchro->cl = &_starpu_mpi_redux_data_synchro_cl;
 	synchro->cl = &_starpu_mpi_redux_data_synchro_cl;
-	STARPU_TASK_SET_HANDLE(synchro, data_handle, 0);
+//	STARPU_TASK_SET_HANDLE(synchro, data_handle, 0);
+
+	_STARPU_MPI_DEBUG(15, "mpi_redux _ STARTING with %d-ary tree \n", arity);
 	current_level = 0;
 	current_level = 0;
 	while (nb_contrib != 1)
 	while (nb_contrib != 1)
 	{
 	{
+		_STARPU_MPI_DEBUG(5, "%dth level in the reduction \n", current_level); 
 		if (nb_contrib%arity == 0) next_nb_contrib = nb_contrib/arity;
 		if (nb_contrib%arity == 0) next_nb_contrib = nb_contrib/arity;
 		else next_nb_contrib = nb_contrib/arity + 1;
 		else next_nb_contrib = nb_contrib/arity + 1;
 		for (step = 0; step < next_nb_contrib; step++) 
 		for (step = 0; step < next_nb_contrib; step++) 
 		{
 		{
 			root_in_step = 0;
 			root_in_step = 0;
 			me_in_step = 0;
 			me_in_step = 0;
-			for (node = step*arity ; node < nb_nodes && node < (step+1)*arity ; node++)
+			for (node = step*arity ; node < nb_contrib && node < (step+1)*arity ; node++)
 			{
 			{
 				if (contributors[node] == rank) root_in_step = 1;
 				if (contributors[node] == rank) root_in_step = 1;
 				if (contributors[node] == me) me_in_step = 1;
 				if (contributors[node] == me) me_in_step = 1;
@@ -866,6 +851,7 @@ void starpu_mpi_redux_data_prio_tree(MPI_Comm comm, starpu_data_handle_t data_ha
 
 
 			if (me == reducing_node)
 			if (me == reducing_node)
 			{
 			{
+				_STARPU_MPI_DEBUG(5, "mpi_redux _ %dth level, %dth step ; chose %d node\n", current_level, step, reducing_node); 
 				for (node = step*arity ; node < nb_contrib && node < (step+1)*arity ; node++)
 				for (node = step*arity ; node < nb_contrib && node < (step+1)*arity ; node++)
 				{
 				{
 					if (me != contributors[node]) {
 					if (me != contributors[node]) {
@@ -876,16 +862,18 @@ void starpu_mpi_redux_data_prio_tree(MPI_Comm comm, starpu_data_handle_t data_ha
 						 * its last read access, we hence do
 						 * its last read access, we hence do
 						 * the following:
 						 * the following:
 						 * - submit an empty task A reading
 						 * - submit an empty task A reading
-							 * data_handle
+						 * data_handle
 						 * - submit the reducing task B
 						 * - submit the reducing task B
 						 * reading and writing data_handle and
 						 * reading and writing data_handle and
 						 * depending on task A through sequencial
 						 * depending on task A through sequencial
 						 * consistency
 						 * consistency
 						 */
 						 */
 						starpu_data_handle_t new_handle;
 						starpu_data_handle_t new_handle;
-						starpu_data_register_same($new_handle, handle);
+						starpu_data_register_same(&new_handle, data_handle);
 						/* Task A */
 						/* Task A */
-				       	        starpu_task_insert(synchro, STARPU_R, handle, STARPU_W, new_handle, 0);
+				       	        starpu_task_insert(&_starpu_mpi_redux_data_synchro_cl, 
+									STARPU_R, data_handle, 
+									STARPU_W, new_handle, 0);
 				       	        starpu_mpi_irecv_detached_prio(new_handle, contributors[node], data_tag, prio, comm, NULL, NULL);
 				       	        starpu_mpi_irecv_detached_prio(new_handle, contributors[node], data_tag, prio, comm, NULL, NULL);
 					        /* Task B */
 					        /* Task B */
 				       		starpu_task_insert(data_handle->redux_cl, STARPU_RW|STARPU_COMMUTE, data_handle, STARPU_R, new_handle, 0);
 				       		starpu_task_insert(data_handle->redux_cl, STARPU_RW|STARPU_COMMUTE, data_handle, STARPU_R, new_handle, 0);
@@ -904,4 +892,28 @@ void starpu_mpi_redux_data_prio_tree(MPI_Comm comm, starpu_data_handle_t data_ha
 		nb_contrib = next_nb_contrib;
 		nb_contrib = next_nb_contrib;
 		current_level++;
 		current_level++;
 	}
 	}
+	if (mpi_data->redux_map != NULL) free(mpi_data->redux_map);
 }
 }
+
+void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
+{
+	return starpu_mpi_redux_data_prio(comm, data_handle, 0);
+}
+
+void starpu_mpi_redux_data_tree(MPI_Comm comm, starpu_data_handle_t data_handle, int arity)
+{
+	return starpu_mpi_redux_data_prio_tree(comm, data_handle, 0, arity);
+}
+
+void starpu_mpi_redux_data_prio(MPI_Comm comm, starpu_data_handle_t data_handle, int prio)
+{
+	int nb_nodes, nb_contrib, i;
+	struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
+	starpu_mpi_comm_size(comm, &nb_nodes);
+	for (i=0;i<nb_nodes;i++) 
+	{
+		if (mpi_data->redux_map[i]) nb_contrib++;
+	}
+	return starpu_mpi_redux_data_prio_tree(comm, data_handle, prio, nb_contrib);
+}
+