Browse Source

- complete native Fortran support for StarPU-MPI

Olivier Aumage 8 years ago
parent
commit
2a872e26d7
2 changed files with 439 additions and 0 deletions
  1. 26 0
      mpi/include/fstarpu_mpi_mod.f90
  2. 413 0
      mpi/src/starpu_mpi_task_insert.c

+ 26 - 0
mpi/include/fstarpu_mpi_mod.f90

@@ -178,8 +178,34 @@ module fstarpu_mpi_mod
                 end function fstarpu_mpi_shutdown
 
                 ! struct starpu_task *starpu_mpi_task_build(MPI_Comm comm, struct starpu_codelet *codelet, ...);
+                function fstarpu_mpi_task_build(mpi_comm,arglist) bind(C)
+                        use iso_c_binding, only: c_ptr,c_int
+                        type(c_ptr) :: fstarpu_mpi_task_build
+                        integer(c_int), value, intent(in) :: mpi_comm
+                        type(c_ptr), dimension(:), intent(in) :: arglist
+                end function fstarpu_mpi_task_build
+
                 ! int starpu_mpi_task_post_build(MPI_Comm comm, struct starpu_codelet *codelet, ...);
+                function fstarpu_mpi_task_post_build(mpi_comm,arglist) bind(C)
+                        use iso_c_binding, only: c_ptr,c_int
+                        integer(c_int) :: fstarpu_mpi_task_post_build
+                        integer(c_int), value, intent(in) :: mpi_comm
+                        type(c_ptr), dimension(:), intent(in) :: arglist
+                end function fstarpu_mpi_task_post_build
+
                 ! int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...);
+                function fstarpu_mpi_task_insert(mpi_comm,arglist) bind(C)
+                        use iso_c_binding, only: c_ptr,c_int
+                        integer(c_int) :: fstarpu_mpi_task_insert
+                        integer(c_int), value, intent(in) :: mpi_comm
+                        type(c_ptr), dimension(:), intent(in) :: arglist
+                end function fstarpu_mpi_task_insert
+                function fstarpu_mpi_insert_task(mpi_comm,arglist) bind(C,name="fstarpu_mpi_task_insert")
+                        use iso_c_binding, only: c_ptr,c_int
+                        integer(c_int) :: fstarpu_mpi_insert_task
+                        integer(c_int), value, intent(in) :: mpi_comm
+                        type(c_ptr), dimension(:), intent(in) :: arglist
+                end function fstarpu_mpi_insert_task
 
                 ! void starpu_mpi_get_data_on_node(MPI_Comm comm, starpu_data_handle_t data_handle, int node);
                 subroutine fstarpu_mpi_get_data_on_node(mpi_comm,dh,node) bind(C)

+ 413 - 0
mpi/src/starpu_mpi_task_insert.c

@@ -3,6 +3,7 @@
  * Copyright (C) 2011, 2012, 2013, 2014, 2015, 2016  CNRS
  * Copyright (C) 2011-2016  Université de Bordeaux
  * Copyright (C) 2014 INRIA
+ * Copyright (C) 2016 Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -434,6 +435,283 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
 }
 
 static
+int _fstarpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nodes, int *xrank, int *do_execute, struct starpu_data_descr **descrs_p, int *nb_data_p, void **arglist)
+{
+	int arg_i = 0;
+	int inconsistent_execute = 0;
+	int arg_type, arg_type_nocommute;
+	int node_selected = 0;
+	int nb_allocated_data = 16;
+	struct starpu_data_descr *descrs;
+	int nb_data;
+	int select_node_policy = STARPU_MPI_NODE_SELECTION_CURRENT_POLICY;
+
+	_STARPU_TRACE_TASK_MPI_DECODE_START();
+
+	descrs = (struct starpu_data_descr *)malloc(nb_allocated_data * sizeof(struct starpu_data_descr));
+	nb_data = 0;
+	*do_execute = -1;
+	*xrank = -1;
+
+	while (arglist[arg_i] != NULL)
+	{
+		arg_type = (int)(intptr_t)arglist[arg_i];
+		arg_type_nocommute = arg_type & ~STARPU_COMMUTE;
+
+		if (arg_type==STARPU_EXECUTE_ON_NODE)
+		{
+			arg_i++;
+			*xrank = *(int *)arglist[arg_i];
+			if (node_selected == 0)
+			{
+				_STARPU_MPI_DEBUG(100, "Executing on node %d\n", *xrank);
+				*do_execute = 1;
+				node_selected = 1;
+				inconsistent_execute = 0;
+			}
+		}
+		else if (arg_type==STARPU_EXECUTE_ON_DATA)
+		{
+			arg_i++;
+			starpu_data_handle_t data = arglist[arg_i];
+			if (node_selected == 0)
+			{
+				*xrank = starpu_mpi_data_get_rank(data);
+				STARPU_ASSERT_MSG(*xrank != -1, "Rank of the data must be set using starpu_mpi_data_register() or starpu_data_set_rank()");
+				_STARPU_MPI_DEBUG(100, "Executing on data node %d\n", *xrank);
+				STARPU_ASSERT_MSG(*xrank <= nb_nodes, "Node %d to execute codelet is not a valid node (%d)", *xrank, nb_nodes);
+				*do_execute = 1;
+				node_selected = 1;
+				inconsistent_execute = 0;
+			}
+		}
+		else if (arg_type_nocommute & STARPU_R || arg_type_nocommute & STARPU_W || arg_type_nocommute & STARPU_RW || arg_type & STARPU_SCRATCH || arg_type & STARPU_REDUX)
+		{
+			arg_i++;
+			starpu_data_handle_t data = arglist[arg_i];
+			enum starpu_data_access_mode mode = (enum starpu_data_access_mode) arg_type;
+			if (node_selected == 0)
+			{
+				int ret = _starpu_mpi_find_executee_node(data, mode, me, do_execute, &inconsistent_execute, xrank);
+				if (ret == -EINVAL)
+				{
+					free(descrs);
+					_STARPU_TRACE_TASK_MPI_DECODE_END();
+					return ret;
+				}
+			}
+			if (nb_data >= nb_allocated_data)
+			{
+				nb_allocated_data *= 2;
+				descrs = (struct starpu_data_descr *)realloc(descrs, nb_allocated_data * sizeof(struct starpu_data_descr));
+			}
+			descrs[nb_data].handle = data;
+			descrs[nb_data].mode = mode;
+			nb_data ++;
+		}
+		else if (arg_type == STARPU_DATA_ARRAY)
+		{
+			arg_i++;
+			starpu_data_handle_t *datas = arglist[arg_i];
+			arg_i++;
+			int nb_handles = *(int *)arglist[arg_i];
+			int i;
+
+			for(i=0 ; i<nb_handles ; i++)
+			{
+				STARPU_ASSERT_MSG(codelet->nbuffers == STARPU_VARIABLE_NBUFFERS || nb_data < codelet->nbuffers, "Too many data passed to starpu_mpi_task_insert");
+				enum starpu_data_access_mode mode = STARPU_CODELET_GET_MODE(codelet, nb_data);
+				if (node_selected == 0)
+				{
+					int ret = _starpu_mpi_find_executee_node(datas[i], mode, me, do_execute, &inconsistent_execute, xrank);
+					if (ret == -EINVAL)
+					{
+						free(descrs);
+						_STARPU_TRACE_TASK_MPI_DECODE_END();
+						return ret;
+					}
+				}
+				if (nb_data >= nb_allocated_data)
+				{
+					nb_allocated_data *= 2;
+					descrs = (struct starpu_data_descr *)realloc(descrs, nb_allocated_data * sizeof(struct starpu_data_descr));
+				}
+				descrs[nb_data].handle = datas[i];
+				descrs[nb_data].mode = mode;
+				nb_data ++;
+			}
+		}
+		else if (arg_type == STARPU_DATA_MODE_ARRAY)
+		{
+			arg_i++;
+			struct starpu_data_descr *_descrs = arglist[arg_i];
+			arg_i++;
+			int nb_handles = *(int *)arglist[arg_i];
+			int i;
+
+			for(i=0 ; i<nb_handles ; i++)
+			{
+				enum starpu_data_access_mode mode = _descrs[i].mode;
+				if (node_selected == 0)
+				{
+					int ret = _starpu_mpi_find_executee_node(_descrs[i].handle, mode, me, do_execute, &inconsistent_execute, xrank);
+					if (ret == -EINVAL)
+					{
+						free(descrs);
+						_STARPU_TRACE_TASK_MPI_DECODE_END();
+						return ret;
+					}
+				}
+				if (nb_data >= nb_allocated_data)
+				{
+					nb_allocated_data *= 2;
+					descrs = (struct starpu_data_descr *)realloc(descrs, nb_allocated_data * sizeof(struct starpu_data_descr));
+				}
+				descrs[nb_data].handle = _descrs[i].handle;
+				descrs[nb_data].mode = mode;
+				nb_data ++;
+			}
+		}
+		else if (arg_type==STARPU_VALUE)
+		{
+			arg_i++;
+			/* void* */
+			arg_i++;
+			/* size_t */
+		}
+		else if (arg_type==STARPU_CL_ARGS)
+		{
+			arg_i++;
+			/* void* */
+			arg_i++;
+			/* size_t */
+		}
+		else if (arg_type==STARPU_CALLBACK)
+		{
+			arg_i++;
+			/* _starpu_callback_func_t */
+		}
+		else if (arg_type==STARPU_CALLBACK_WITH_ARG)
+		{
+			arg_i++;
+			/* _starpu_callback_func_t */
+			arg_i++;
+			/* void* */
+		}
+		else if (arg_type==STARPU_CALLBACK_ARG)
+		{
+			arg_i++;
+			/* void* */
+		}
+		else if (arg_type==STARPU_PRIORITY)
+		{
+			arg_i++;
+			/* int* */
+		}
+		/* STARPU_EXECUTE_ON_NODE handled above */
+		/* STARPU_EXECUTE_ON_DATA handled above */
+		/* STARPU_DATA_ARRAY handled above */
+		/* STARPU_DATA_MODE_ARRAY handled above */
+		else if (arg_type==STARPU_TAG)
+		{
+			arg_i++;
+			/* starpu_tag_t* */
+		}
+		else if (arg_type==STARPU_HYPERVISOR_TAG)
+		{
+			arg_i++;
+			/* int* */
+		}
+		else if (arg_type==STARPU_FLOPS)
+		{
+			arg_i++;
+			/* double* */
+		}
+		else if (arg_type==STARPU_SCHED_CTX)
+		{
+			arg_i++;
+			/* unsigned* */
+		}
+		else if (arg_type==STARPU_PROLOGUE_CALLBACK)
+                {
+			arg_i++;
+			/* _starpu_callback_func_t */
+		}
+                else if (arg_type==STARPU_PROLOGUE_CALLBACK_ARG)
+                {
+			arg_i++;
+			/* void* */
+                }
+                else if (arg_type==STARPU_PROLOGUE_CALLBACK_POP)
+                {
+			arg_i++;
+			/* _starpu_callback_func_t */
+                }
+                else if (arg_type==STARPU_PROLOGUE_CALLBACK_POP_ARG)
+                {
+			arg_i++;
+			/* void* */
+		}
+		else if (arg_type==STARPU_EXECUTE_ON_WORKER)
+		{
+			arg_i++;
+			/* int* */
+		}
+		else if (arg_type==STARPU_TAG_ONLY)
+		{
+			arg_i++;
+			/* starpu_tag_t* */
+		}
+		else if (arg_type==STARPU_NAME)
+		{
+			arg_i++;
+			/* char* */
+		}
+		else if (arg_type==STARPU_POSSIBLY_PARALLEL)
+		{
+			arg_i++;
+			/* unsigned* */
+		}
+		else if (arg_type==STARPU_WORKER_ORDER)
+		{
+			arg_i++;
+			/* unsigned* */
+		}
+		else if (arg_type==STARPU_NODE_SELECTION_POLICY)
+		{
+			arg_i++;
+			/* int* */
+		}
+		else
+		{
+			STARPU_ABORT_MSG("Unrecognized argument %d\n", arg_type);
+		}
+
+		arg_i++;
+	}
+
+	if (inconsistent_execute == 1 || *xrank == -1)
+	{
+		// We need to find out which node is going to execute the codelet.
+		_STARPU_MPI_DISP("Different nodes are owning W data. The node to execute the codelet is going to be selected with the current selection node policy. See starpu_mpi_node_selection_set_current_policy() to change the policy, or use STARPU_EXECUTE_ON_NODE or STARPU_EXECUTE_ON_DATA to specify the node\n");
+		*xrank = _starpu_mpi_select_node(me, nb_nodes, descrs, nb_data, select_node_policy);
+		*do_execute = (me == *xrank);
+	}
+	else
+	{
+		_STARPU_MPI_DEBUG(100, "Inconsistent=%d - xrank=%d\n", inconsistent_execute, *xrank);
+		*do_execute = (me == *xrank);
+	}
+	_STARPU_MPI_DEBUG(100, "do_execute=%d\n", *do_execute);
+
+	*descrs_p = descrs;
+	*nb_data_p = nb_data;
+
+	_STARPU_TRACE_TASK_MPI_DECODE_END();
+	return 0;
+}
+
+static
 int _starpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, struct starpu_task **task, int *xrank_p, struct starpu_data_descr **descrs_p, int *nb_data_p, va_list varg_list)
 {
 	va_list varg_list_copy;
@@ -483,6 +761,52 @@ int _starpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, stru
 }
 
 static
+int _fstarpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, struct starpu_task **task, int *xrank_p, struct starpu_data_descr **descrs_p, int *nb_data_p, void **arglist)
+{
+	int me, do_execute, xrank, nb_nodes;
+	int ret;
+	int i;
+	struct starpu_data_descr *descrs;
+	int nb_data;
+
+	_STARPU_MPI_LOG_IN();
+
+	starpu_mpi_comm_rank(comm, &me);
+	starpu_mpi_comm_size(comm, &nb_nodes);
+
+	/* Find out whether we are to execute the data because we own the data to be written to. */
+	ret = _fstarpu_mpi_task_decode_v(codelet, me, nb_nodes, &xrank, &do_execute, &descrs, &nb_data, arglist);
+	if (ret < 0) return ret;
+
+	_STARPU_TRACE_TASK_MPI_PRE_START();
+	/* Send and receive data as requested */
+	for(i=0 ; i<nb_data ; i++)
+	{
+		_starpu_mpi_exchange_data_before_execution(descrs[i].handle, descrs[i].mode, me, xrank, do_execute, comm);
+	}
+
+	if (xrank_p) *xrank_p = xrank;
+	if (nb_data_p) *nb_data_p = nb_data;
+	if (descrs_p)
+		*descrs_p = descrs;
+	else
+		free(descrs);
+	_STARPU_TRACE_TASK_MPI_PRE_END();
+
+	if (do_execute == 0) return 1;
+	else
+	{
+		_STARPU_MPI_DEBUG(100, "Execution of the codelet %p (%s)\n", codelet, codelet?codelet->name:NULL);
+
+		*task = starpu_task_create();
+		(*task)->cl_arg_free = 1;
+
+		_fstarpu_task_insert_create(codelet, task, arglist);
+		return 0;
+	}
+}
+
+static
 int _starpu_mpi_task_postbuild_v(MPI_Comm comm, int xrank, int do_execute, struct starpu_data_descr *descrs, int nb_data)
 {
 	int me, i;
@@ -536,6 +860,39 @@ int _starpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, va_
 	return _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data);
 }
 
+static
+int _fstarpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, void **arglist)
+{
+	struct starpu_task *task;
+	int ret;
+	int xrank;
+	int do_execute = 0;
+	struct starpu_data_descr *descrs;
+	int nb_data;
+
+	ret = _fstarpu_mpi_task_build_v(comm, codelet, &task, &xrank, &descrs, &nb_data, arglist);
+	if (ret < 0) return ret;
+
+	if (ret == 0)
+	{
+		do_execute = 1;
+		ret = starpu_task_submit(task);
+
+		if (STARPU_UNLIKELY(ret == -ENODEV))
+		{
+			fprintf(stderr, "submission of task %p wih codelet %p failed (symbol `%s') (err: ENODEV)\n",
+				task, task->cl,
+				(codelet == NULL) ? "none" :
+				task->cl->name ? task->cl->name :
+				(task->cl->model && task->cl->model->symbol)?task->cl->model->symbol:"none");
+
+			task->destroy = 0;
+			starpu_task_destroy(task);
+		}
+	}
+	return _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data);
+}
+
 int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 {
 	va_list varg_list;
@@ -547,6 +904,20 @@ int starpu_mpi_task_insert(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 	return ret;
 }
 
+int fstarpu_mpi_task_insert(MPI_Fint comm, void ***_arglist)
+{
+	void **arglist = *_arglist;
+	struct starpu_codelet *codelet = arglist[0];
+	if (codelet == NULL)
+	{
+		STARPU_ABORT_MSG("task without codelet");
+	}
+	int ret;
+
+	ret = _fstarpu_mpi_task_insert_v(MPI_Comm_f2c(comm), codelet, arglist+1);
+	return ret;
+}
+
 int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 {
 	va_list varg_list;
@@ -558,6 +929,8 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 	return ret;
 }
 
+/* fstarpu_mpi_insert_task: aliased to fstarpu_mpi_task_insert in fstarpu_mpi_mod.f90 */
+
 struct starpu_task *starpu_mpi_task_build(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 {
 	va_list varg_list;
@@ -571,6 +944,22 @@ struct starpu_task *starpu_mpi_task_build(MPI_Comm comm, struct starpu_codelet *
 	if (ret > 0) return NULL; else return task;
 }
 
+struct starpu_task *fstarpu_mpi_task_build(MPI_Fint comm, void ***_arglist)
+{
+	void **arglist = *_arglist;
+	struct starpu_codelet *codelet = arglist[0];
+	if (codelet == NULL)
+	{
+		STARPU_ABORT_MSG("task without codelet");
+	}
+	struct starpu_task *task;
+	int ret;
+
+	ret = _fstarpu_mpi_task_build_v(MPI_Comm_f2c(comm), codelet, &task, NULL, NULL, NULL, arglist+1);
+	STARPU_ASSERT(ret >= 0);
+	if (ret > 0) return NULL; else return task;
+}
+
 int starpu_mpi_task_post_build(MPI_Comm comm, struct starpu_codelet *codelet, ...)
 {
 	int xrank, do_execute;
@@ -591,6 +980,30 @@ int starpu_mpi_task_post_build(MPI_Comm comm, struct starpu_codelet *codelet, ..
 	return _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data);
 }
 
+int fstarpu_mpi_task_post_build(MPI_Fint _comm, void ***_arglist)
+{
+	void **arglist = *_arglist;
+	struct starpu_codelet *codelet = arglist[0];
+	if (codelet == NULL)
+	{
+		STARPU_ABORT_MSG("task without codelet");
+	}
+	MPI_Comm comm = MPI_Comm_f2c(_comm);
+	int xrank, do_execute;
+	int ret, me, nb_nodes;
+	struct starpu_data_descr *descrs;
+	int nb_data;
+
+	starpu_mpi_comm_rank(comm, &me);
+	starpu_mpi_comm_size(comm, &nb_nodes);
+
+	/* Find out whether we are to execute the data because we own the data to be written to. */
+	ret = _fstarpu_mpi_task_decode_v(codelet, me, nb_nodes, &xrank, &do_execute, &descrs, &nb_data, arglist);
+	if (ret < 0) return ret;
+
+	return _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data);
+}
+
 void starpu_mpi_get_data_on_node_detached(MPI_Comm comm, starpu_data_handle_t data_handle, int node, void (*callback)(void*), void *arg)
 {
 	int me, rank, tag;