ソースを参照

Update fstarpu_mpi with priorities

Samuel Thibault 7 年 前
コミット
4cb0733623
共有3 個のファイルを変更した21 個の追加11 個の削除を含む
  1. 2 0
      mpi/src/starpu_mpi_task_insert.c
  2. 2 2
      mpi/src/starpu_mpi_task_insert.h
  3. 17 9
      mpi/src/starpu_mpi_task_insert_fortran.c

+ 2 - 0
mpi/src/starpu_mpi_task_insert.c

@@ -30,6 +30,8 @@
 #include <starpu_mpi_cache.h>
 #include <starpu_mpi_select_node.h>
 
+#include "starpu_mpi_task_insert.h"
+
 #define _SEND_DATA(data, mode, dest, data_tag, prio, comm, callback, arg)     \
 	if (mode & STARPU_SSEND)					\
 		starpu_mpi_issend_detached_prio(data, dest, data_tag, prio, comm, callback, arg); \

+ 2 - 2
mpi/src/starpu_mpi_task_insert.h

@@ -23,8 +23,8 @@ extern "C"
 #endif
 
 int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int *do_execute, int *inconsistent_execute, int *xrank);
-void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int xrank, int do_execute, MPI_Comm comm);
-int _starpu_mpi_task_postbuild_v(MPI_Comm comm, int xrank, int do_execute, struct starpu_data_descr *descrs, int nb_data);
+void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int xrank, int do_execute, int prio, MPI_Comm comm);
+int _starpu_mpi_task_postbuild_v(MPI_Comm comm, int xrank, int do_execute, struct starpu_data_descr *descrs, int nb_data, int prio);
 
 #ifdef __cplusplus
 }

+ 17 - 9
mpi/src/starpu_mpi_task_insert_fortran.c

@@ -25,7 +25,7 @@
 
 #ifdef HAVE_MPI_COMM_F2C
 static
-int _fstarpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nodes, int *xrank, int *do_execute, struct starpu_data_descr **descrs_p, int *nb_data_p, void **arglist)
+int _fstarpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nodes, int *xrank, int *do_execute, struct starpu_data_descr **descrs_p, int *nb_data_p, int *prio_p, void **arglist)
 {
 	int arg_i = 0;
 	int inconsistent_execute = 0;
@@ -33,6 +33,7 @@ int _fstarpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_no
 	int nb_allocated_data = 16;
 	struct starpu_data_descr *descrs;
 	int nb_data;
+	int prio = 0;
 	int select_node_policy = STARPU_MPI_NODE_SELECTION_CURRENT_POLICY;
 
 	_STARPU_TRACE_TASK_MPI_DECODE_START();
@@ -194,6 +195,7 @@ int _fstarpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_no
 		}
 		else if (arg_type==STARPU_PRIORITY)
 		{
+			prio = *(int *)arglist[arg_i];
 			arg_i++;
 			/* int* */
 		}
@@ -300,19 +302,21 @@ int _fstarpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_no
 
 	*descrs_p = descrs;
 	*nb_data_p = nb_data;
+	*prio_p = prio;
 
 	_STARPU_TRACE_TASK_MPI_DECODE_END();
 	return 0;
 }
 
 static
-int _fstarpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, struct starpu_task **task, int *xrank_p, struct starpu_data_descr **descrs_p, int *nb_data_p, void **arglist)
+int _fstarpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, struct starpu_task **task, int *xrank_p, struct starpu_data_descr **descrs_p, int *nb_data_p, int *prio_p, void **arglist)
 {
 	int me, do_execute, xrank, nb_nodes;
 	int ret;
 	int i;
 	struct starpu_data_descr *descrs;
 	int nb_data;
+	int prio;
 
 	_STARPU_MPI_LOG_IN();
 
@@ -320,14 +324,14 @@ int _fstarpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, str
 	starpu_mpi_comm_size(comm, &nb_nodes);
 
 	/* Find out whether we are to execute the data because we own the data to be written to. */
-	ret = _fstarpu_mpi_task_decode_v(codelet, me, nb_nodes, &xrank, &do_execute, &descrs, &nb_data, arglist);
+	ret = _fstarpu_mpi_task_decode_v(codelet, me, nb_nodes, &xrank, &do_execute, &descrs, &nb_data, &prio, arglist);
 	if (ret < 0) return ret;
 
 	_STARPU_TRACE_TASK_MPI_PRE_START();
 	/* Send and receive data as requested */
 	for(i=0 ; i<nb_data ; i++)
 	{
-		_starpu_mpi_exchange_data_before_execution(descrs[i].handle, descrs[i].mode, me, xrank, do_execute, comm);
+		_starpu_mpi_exchange_data_before_execution(descrs[i].handle, descrs[i].mode, me, xrank, do_execute, prio, comm);
 	}
 
 	if (xrank_p) *xrank_p = xrank;
@@ -336,6 +340,8 @@ int _fstarpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, str
 		*descrs_p = descrs;
 	else
 		free(descrs);
+	if (prio_p)
+		*prio_p = prio;
 	_STARPU_TRACE_TASK_MPI_PRE_END();
 
 	if (do_execute == 0) return 1;
@@ -360,8 +366,9 @@ int _fstarpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, vo
 	int do_execute = 0;
 	struct starpu_data_descr *descrs;
 	int nb_data;
+	int prio;
 
-	ret = _fstarpu_mpi_task_build_v(comm, codelet, &task, &xrank, &descrs, &nb_data, arglist);
+	ret = _fstarpu_mpi_task_build_v(comm, codelet, &task, &xrank, &descrs, &nb_data, &prio, arglist);
 	if (ret < 0) return ret;
 
 	if (ret == 0)
@@ -381,7 +388,7 @@ int _fstarpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, vo
 			starpu_task_destroy(task);
 		}
 	}
-	return _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data);
+	return _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data, prio);
 }
 
 int fstarpu_mpi_task_insert(MPI_Fint comm, void ***_arglist)
@@ -411,7 +418,7 @@ struct starpu_task *fstarpu_mpi_task_build(MPI_Fint comm, void ***_arglist)
 	struct starpu_task *task;
 	int ret;
 
-	ret = _fstarpu_mpi_task_build_v(MPI_Comm_f2c(comm), codelet, &task, NULL, NULL, NULL, arglist+1);
+	ret = _fstarpu_mpi_task_build_v(MPI_Comm_f2c(comm), codelet, &task, NULL, NULL, NULL, NULL, arglist+1);
 	STARPU_ASSERT(ret >= 0);
 	if (ret > 0) return NULL; else return task;
 }
@@ -429,15 +436,16 @@ int fstarpu_mpi_task_post_build(MPI_Fint _comm, void ***_arglist)
 	int ret, me, nb_nodes;
 	struct starpu_data_descr *descrs;
 	int nb_data;
+	int prio;
 
 	starpu_mpi_comm_rank(comm, &me);
 	starpu_mpi_comm_size(comm, &nb_nodes);
 
 	/* Find out whether we are to execute the data because we own the data to be written to. */
-	ret = _fstarpu_mpi_task_decode_v(codelet, me, nb_nodes, &xrank, &do_execute, &descrs, &nb_data, arglist);
+	ret = _fstarpu_mpi_task_decode_v(codelet, me, nb_nodes, &xrank, &do_execute, &descrs, &nb_data, &prio, arglist);
 	if (ret < 0) return ret;
 
-	return _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data);
+	return _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data, prio);
 }
 
 #endif /* HAVE_MPI_COMM_F2C */