Browse Source

Add support for priorities in StarPU-MPI

Samuel Thibault 7 years ago
parent
commit
587a42efa7

+ 9 - 0
doc/doxygen/chapters/410_mpi_support.doxy

@@ -569,6 +569,15 @@ starpu_mpi_task_post_build(MPI_COMM_WORLD, &cl,
                            0);
 \endcode
 
+\section MPIPriorities Priorities
+
+All send functions have a <c>_prio</c> variant which takes an additional
+priority parameter, which allows to make StarPU-MPI change the order of MPI
+requests before submitting them to MPI. The default priority is 0.
+
+When using the starpu_mpi_task_insert helper, STARPU_PRIORITY defines both the
+task priority and the MPI requests priority.
+
 \section MPICache MPI cache support
 
 StarPU-MPI automatically optimizes duplicate data transmissions: if an MPI

+ 28 - 0
doc/doxygen/chapters/api/mpi.doxy

@@ -82,6 +82,10 @@ Perform a standard-mode, blocking send of \p data_handle to the node
 \p dest using the message tag \p mpi_tag within the communicator \p
 comm.
 
+\fn int starpu_mpi_send_prio(starpu_data_handle_t data_handle, int dest, int mpi_tag, int prio, MPI_Comm comm)
+\ingroup API_MPI_Support
+Similar to starpu_mpi_send, but takes a priority \p prio.
+
 \fn int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, MPI_Status *status)
 \ingroup API_MPI_Support
 Perform a standard-mode, blocking receive in \p data_handle from the
@@ -95,6 +99,10 @@ Post a standard-mode, non blocking send of \p data_handle to the node
 comm. After the call, the pointer to the request \p req can be used to
 test or to wait for the completion of the communication.
 
+\fn int starpu_mpi_isend_prio(starpu_data_handle_t data_handle, starpu_mpi_req *req, int dest, int mpi_tag, int prio, MPI_Comm comm)
+\ingroup API_MPI_Support
+Similar to starpu_mpi_isend, but takes a priority \p prio.
+
 \fn int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *req, int source, int mpi_tag, MPI_Comm comm)
 \ingroup API_MPI_Support
 Post a nonblocking receive in \p data_handle from the node \p source
@@ -113,6 +121,10 @@ communication completes, its resources are automatically released back
 to the system, there is no need to test or to wait for the completion
 of the request.
 
+\fn int starpu_mpi_isend_detached_prio(starpu_data_handle_t data_handle, int dest, int mpi_tag, int prio, MPI_Comm comm, void (*callback)(void *), void *arg)
+\ingroup API_MPI_Support
+Similar to starpu_mpi_isend_detached, but takes a priority \p prio.
+
 \fn int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 \ingroup API_MPI_Support
 Post a nonblocking receive in \p data_handle from the node \p source
@@ -146,6 +158,10 @@ Perform a synchronous-mode, non-blocking send of \p data_handle to the node
 \p dest using the message tag \p mpi_tag within the communicator \p
 comm.
 
+\fn int starpu_mpi_issend_prio(starpu_data_handle_t data_handle, starpu_mpi_req *req, int dest, int mpi_tag, int prio, MPI_Comm comm)
+\ingroup API_MPI_Support
+Similar to starpu_mpi_issend, but takes a priority \p prio.
+
 \fn int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
 \ingroup API_MPI_Support
 Perform a synchronous-mode, non-blocking send of \p data_handle to the node
@@ -182,6 +198,10 @@ Post a standard-mode, non blocking send of \p data_handle to the node
 \p dest using the message tag \p mpi_tag within the communicator \p
 comm. On completion, \p tag is unlocked.
 
+\fn int starpu_mpi_isend_detached_unlock_tag_prio(starpu_data_handle_t data_handle, int dest, int mpi_tag, int prio, MPI_Comm comm, starpu_tag_t tag)
+\ingroup API_MPI_Support
+Similar to starpu_mpi_isend_detached_unlock_tag, but takes a priority \p prio.
+
 \fn int starpu_mpi_irecv_detached_unlock_tag(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, starpu_tag_t tag)
 \ingroup API_MPI_Support
 Post a nonblocking receive in \p data_handle from the node \p source
@@ -196,6 +216,10 @@ array \p dest using the n-th message tag of the array \p mpi_tag
 within the n-th communicator of the array \p comm. On completion of
 the all the requests, \p tag is unlocked.
 
+\fn int starpu_mpi_isend_array_detached_unlock_tag_prio(unsigned array_size, starpu_data_handle_t *data_handle, int *dest, int *mpi_tag, int *prio, MPI_Comm *comm, starpu_tag_t tag)
+\ingroup API_MPI_Support
+Similar to starpu_mpi_isend_array_detached_unlock_tag, but takes a priority \p prio.
+
 \fn int starpu_mpi_irecv_array_detached_unlock_tag(unsigned array_size, starpu_data_handle_t *data_handle, int *source, int *mpi_tag, MPI_Comm *comm, starpu_tag_t tag)
 \ingroup API_MPI_Support
 Post \p array_size nonblocking receive. Each post receives in the n-th
@@ -481,6 +505,10 @@ Unregister a previously registered policy.
 Perform a reduction on the given data \p handle. All nodes send the data to its
 owner node which will perform a reduction.
 
+\fn void starpu_mpi_redux_data_prio(MPI_Comm comm, starpu_data_handle_t data_handle, int prio)
+\ingroup API_MPI_Support
+Similar to starpu_mpi_redux_data, but takes a priority \p prio.
+
 \fn int starpu_mpi_scatter_detached(starpu_data_handle_t *data_handles, int count, int root, MPI_Comm comm, void (*scallback)(void *), void *sarg, void (*rcallback)(void *), void *rarg)
 \ingroup API_MPI_Support
 Scatter data among processes of the communicator based on the

+ 105 - 0
mpi/include/fstarpu_mpi_mod.f90

@@ -1,6 +1,7 @@
 ! StarPU --- Runtime system for heterogeneous multicore architectures.
 !
 ! Copyright (C) 2016  Inria
+! Copyright (C) 2017  Université de Bordeaux
 !
 ! StarPU is free software; you can redistribute it and/or modify
 ! it under the terms of the GNU Lesser General Public License as published by
@@ -32,6 +33,20 @@ module fstarpu_mpi_mod
                         integer(c_int), value, intent(in) :: mpi_comm
                 end function fstarpu_mpi_isend
 
+                ! == mpi/include/starpu_mpi.h ==
+                ! int starpu_mpi_isend_prio(starpu_data_handle_t data_handle, starpu_mpi_req *req, int dest, int mpi_tag, int prio, MPI_Comm comm);
+                function fstarpu_mpi_isend_prio (dh, mpi_req, dst, mpi_tag, prio, mpi_comm) bind(C)
+                        use iso_c_binding
+                        implicit none
+                        integer(c_int) :: fstarpu_mpi_isend_prio
+                        type(c_ptr), value, intent(in) :: dh
+                        type(c_ptr), value, intent(in) :: mpi_req
+                        integer(c_int), value, intent(in) :: dst
+                        integer(c_int), value, intent(in) :: mpi_tag
+                        integer(c_int), value, intent(in) :: prio
+                        integer(c_int), value, intent(in) :: mpi_comm
+                end function fstarpu_mpi_isend_prio
+
                 ! int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *req, int source, int mpi_tag, MPI_Comm comm);
                 function fstarpu_mpi_irecv (dh, mpi_req, src, mpi_tag, mpi_comm) bind(C)
                         use iso_c_binding
@@ -55,6 +70,18 @@ module fstarpu_mpi_mod
                         integer(c_int), value, intent(in) :: mpi_comm
                 end function fstarpu_mpi_send
 
+                ! int starpu_mpi_send_prio(starpu_data_handle_t data_handle, int dest, int mpi_tag, int prio, MPI_Comm comm);
+                function fstarpu_mpi_send_prio (dh, dst, mpi_tag, prio, mpi_comm) bind(C)
+                        use iso_c_binding
+                        implicit none
+                        integer(c_int) :: fstarpu_mpi_send_prio
+                        type(c_ptr), value, intent(in) :: dh
+                        integer(c_int), value, intent(in) :: dst
+                        integer(c_int), value, intent(in) :: mpi_tag
+                        integer(c_int), value, intent(in) :: prio
+                        integer(c_int), value, intent(in) :: mpi_comm
+                end function fstarpu_mpi_send_prio
+
                 ! int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, MPI_Status *status);
                 function fstarpu_mpi_recv (dh, src, mpi_tag, mpi_comm, mpi_status) bind(C)
                         use iso_c_binding
@@ -80,6 +107,20 @@ module fstarpu_mpi_mod
                         type(c_ptr), value, intent(in) :: arg
                 end function fstarpu_mpi_isend_detached
 
+                ! int starpu_mpi_isend_detached_prio(starpu_data_handle_t data_handle, int dest, int mpi_tag, int prio, MPI_Comm comm, void (*callback)(void *), void *arg);
+                function fstarpu_mpi_isend_detached_prio (dh, dst, mpi_tag, prio, mpi_comm, callback, arg) bind(C)
+                        use iso_c_binding
+                        implicit none
+                        integer(c_int) :: fstarpu_mpi_isend_detached_prio
+                        type(c_ptr), value, intent(in) :: dh
+                        integer(c_int), value, intent(in) :: dst
+                        integer(c_int), value, intent(in) :: mpi_tag
+                        integer(c_int), value, intent(in) :: prio
+                        integer(c_int), value, intent(in) :: mpi_comm
+                        type(c_funptr), value, intent(in) :: callback
+                        type(c_ptr), value, intent(in) :: arg
+                end function fstarpu_mpi_isend_detached_prio
+
                 ! int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
                 function fstarpu_mpi_recv_detached (dh, src, mpi_tag, mpi_comm, callback, arg) bind(C)
                         use iso_c_binding
@@ -105,6 +146,19 @@ module fstarpu_mpi_mod
                         integer(c_int), value, intent(in) :: mpi_comm
                 end function fstarpu_mpi_issend
 
+                ! int starpu_mpi_issend_prio(starpu_data_handle_t data_handle, starpu_mpi_req *req, int dest, int mpi_tag, int prio, MPI_Comm comm);
+                function fstarpu_mpi_issend_prio (dh, mpi_req, dst, mpi_tag, prio, mpi_comm) bind(C)
+                        use iso_c_binding
+                        implicit none
+                        integer(c_int) :: fstarpu_mpi_issend_prio
+                        type(c_ptr), value, intent(in) :: dh
+                        type(c_ptr), value, intent(in) :: mpi_req
+                        integer(c_int), value, intent(in) :: dst
+                        integer(c_int), value, intent(in) :: mpi_tag
+                        integer(c_int), value, intent(in) :: prio
+                        integer(c_int), value, intent(in) :: mpi_comm
+                end function fstarpu_mpi_issend_prio
+
                 ! int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
                 function fstarpu_mpi_issend_detached (dh, dst, mpi_tag, mpi_comm, callback, arg) bind(C)
                         use iso_c_binding
@@ -118,6 +172,20 @@ module fstarpu_mpi_mod
                         type(c_ptr), value, intent(in) :: arg
                 end function fstarpu_mpi_issend_detached
 
+                ! int starpu_mpi_issend_detached_prio(starpu_data_handle_t data_handle, int dest, int mpi_tag, int prio, MPI_Comm comm, void (*callback)(void *), void *arg);
+                function fstarpu_mpi_issend_detached_prio (dh, dst, mpi_tag, prio, mpi_comm, callback, arg) bind(C)
+                        use iso_c_binding
+                        implicit none
+                        integer(c_int) :: fstarpu_mpi_issend_detached_prio
+                        type(c_ptr), value, intent(in) :: dh
+                        integer(c_int), value, intent(in) :: dst
+                        integer(c_int), value, intent(in) :: mpi_tag
+                        integer(c_int), value, intent(in) :: prio
+                        integer(c_int), value, intent(in) :: mpi_comm
+                        type(c_funptr), value, intent(in) :: callback
+                        type(c_ptr), value, intent(in) :: arg
+                end function fstarpu_mpi_issend_detached_prio
+
                 ! int starpu_mpi_wait(starpu_mpi_req *req, MPI_Status *status);
                 function fstarpu_mpi_wait(req,st) bind(C,name="starpu_mpi_wait")
                         use iso_c_binding
@@ -235,6 +303,15 @@ module fstarpu_mpi_mod
                         type(c_ptr), value, intent(in) :: dh
                 end subroutine fstarpu_mpi_redux_data
 
+                ! void starpu_mpi_redux_data_prio(MPI_Comm comm, starpu_data_handle_t data_handle, int prio);
+                subroutine fstarpu_mpi_redux_data_prio(mpi_comm,dh, prio) bind(C)
+                        use iso_c_binding
+                        implicit none
+                        integer(c_int), value, intent(in) :: mpi_comm
+                        type(c_ptr), value, intent(in) :: dh
+                        integer(c_int), value, intent(in) :: prio
+                end subroutine fstarpu_mpi_redux_data_prio
+
                 ! int starpu_mpi_scatter_detached(starpu_data_handle_t *data_handles, int count, int root, MPI_Comm comm, void (*scallback)(void *), void *sarg, void (*rcallback)(void *), void *rarg);
                 function fstarpu_mpi_scatter_detached (dhs, cnt, root, mpi_comm, scallback, sarg, rcallback, rarg) bind(C)
                         use iso_c_binding
@@ -278,6 +355,19 @@ module fstarpu_mpi_mod
                         type(c_ptr), value, intent(in) :: starpu_tag
                 end function fstarpu_mpi_isend_detached_unlock_tag
 
+                ! int starpu_mpi_isend_detached_unlock_tag_prio(starpu_data_handle_t data_handle, int dest, int mpi_tag, int prio, MPI_Comm comm, starpu_tag_t tag);
+                function fstarpu_mpi_isend_detached_unlock_tag_prio (dh, dst, mpi_tag, prio, mpi_comm, starpu_tag) bind(C)
+                        use iso_c_binding
+                        implicit none
+                        integer(c_int) :: fstarpu_mpi_isend_detached_unlock_tag_prio
+                        type(c_ptr), value, intent(in) :: dh
+                        integer(c_int), value, intent(in) :: dst
+                        integer(c_int), value, intent(in) :: mpi_tag
+                        integer(c_int), value, intent(in) :: prio
+                        integer(c_int), value, intent(in) :: mpi_comm
+                        type(c_ptr), value, intent(in) :: starpu_tag
+                end function fstarpu_mpi_isend_detached_unlock_tag_prio
+
                 ! int starpu_mpi_irecv_detached_unlock_tag(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, starpu_tag_t tag);
                 function fstarpu_mpi_recv_detached_unlock_tag (dh, src, mpi_tag, mpi_comm, starpu_tag) bind(C)
                         use iso_c_binding
@@ -304,6 +394,21 @@ module fstarpu_mpi_mod
                         type(c_ptr), value, intent(in) :: starpu_tag
                 end function fstarpu_mpi_isend_array_detached_unlock_tag
 
+                ! int starpu_mpi_isend_array_detached_unlock_tag_prio(unsigned array_size, starpu_data_handle_t *data_handle, int *dest, int *mpi_tag, int *prio, MPI_Comm *comm, starpu_tag_t tag);
+                function fstarpu_mpi_isend_array_detached_unlock_tag_prio (array_size, dhs, dsts, mpi_tags, prio, mpi_comms, &
+                                starpu_tag) bind(C)
+                        use iso_c_binding
+                        implicit none
+                        integer(c_int) :: fstarpu_mpi_isend_array_detached_unlock_tag_prio
+                        integer(c_int), value, intent(in) :: array_size
+                        type(c_ptr), intent(in) :: dhs(*)
+                        integer(c_int), intent(in) :: dsts(*)
+                        integer(c_int), intent(in) :: mpi_tags(*)
+                        integer(c_int), intent(in) :: prio(*)
+                        integer(c_int), intent(in) :: mpi_comms(*)
+                        type(c_ptr), value, intent(in) :: starpu_tag
+                end function fstarpu_mpi_isend_array_detached_unlock_tag_prio
+
                 ! int starpu_mpi_irecv_array_detached_unlock_tag(unsigned array_size, starpu_data_handle_t *data_handle, int *source, int *mpi_tag, MPI_Comm *comm, starpu_tag_t tag);
                 function fstarpu_mpi_recv_array_detached_unlock_tag (array_size, dhs, srcs, mpi_tags, mpi_comms, starpu_tag) &
                                 bind(C)

+ 8 - 0
mpi/include/starpu_mpi.h

@@ -33,13 +33,18 @@ extern "C"
 typedef void *starpu_mpi_req;
 
 int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *req, int dest, int mpi_tag, MPI_Comm comm);
+int starpu_mpi_isend_prio(starpu_data_handle_t data_handle, starpu_mpi_req *req, int dest, int mpi_tag, int prio, MPI_Comm comm);
 int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *req, int source, int mpi_tag, MPI_Comm comm);
 int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm);
+int starpu_mpi_send_prio(starpu_data_handle_t data_handle, int dest, int mpi_tag, int prio, MPI_Comm comm);
 int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, MPI_Status *status);
 int starpu_mpi_isend_detached(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
+int starpu_mpi_isend_detached_prio(starpu_data_handle_t data_handle, int dest, int mpi_tag, int prio, MPI_Comm comm, void (*callback)(void *), void *arg);
 int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
 int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *req, int dest, int mpi_tag, MPI_Comm comm);
+int starpu_mpi_issend_prio(starpu_data_handle_t data_handle, starpu_mpi_req *req, int dest, int mpi_tag, int prio, MPI_Comm comm);
 int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
+int starpu_mpi_issend_detached_prio(starpu_data_handle_t data_handle, int dest, int mpi_tag, int prio, MPI_Comm comm, void (*callback)(void *), void *arg);
 int starpu_mpi_wait(starpu_mpi_req *req, MPI_Status *status);
 int starpu_mpi_test(starpu_mpi_req *req, int *flag, MPI_Status *status);
 int starpu_mpi_barrier(MPI_Comm comm);
@@ -61,14 +66,17 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...);
 void starpu_mpi_get_data_on_node(MPI_Comm comm, starpu_data_handle_t data_handle, int node);
 void starpu_mpi_get_data_on_node_detached(MPI_Comm comm, starpu_data_handle_t data_handle, int node, void (*callback)(void*), void *arg);
 void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle);
+void starpu_mpi_redux_data_prio(MPI_Comm comm, starpu_data_handle_t data_handle, int prio);
 
 int starpu_mpi_scatter_detached(starpu_data_handle_t *data_handles, int count, int root, MPI_Comm comm, void (*scallback)(void *), void *sarg, void (*rcallback)(void *), void *rarg);
 int starpu_mpi_gather_detached(starpu_data_handle_t *data_handles, int count, int root, MPI_Comm comm, void (*scallback)(void *), void *sarg, void (*rcallback)(void *), void *rarg);
 
 int starpu_mpi_isend_detached_unlock_tag(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm, starpu_tag_t tag);
+int starpu_mpi_isend_detached_unlock_tag_prio(starpu_data_handle_t data_handle, int dest, int mpi_tag, int prio, MPI_Comm comm, starpu_tag_t tag);
 int starpu_mpi_irecv_detached_unlock_tag(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, starpu_tag_t tag);
 
 int starpu_mpi_isend_array_detached_unlock_tag(unsigned array_size, starpu_data_handle_t *data_handle, int *dest, int *mpi_tag, MPI_Comm *comm, starpu_tag_t tag);
+int starpu_mpi_isend_array_detached_unlock_tag_prio(unsigned array_size, starpu_data_handle_t *data_handle, int *dest, int *mpi_tag, int *prio, MPI_Comm *comm, starpu_tag_t tag);
 int starpu_mpi_irecv_array_detached_unlock_tag(unsigned array_size, starpu_data_handle_t *data_handle, int *source, int *mpi_tag, MPI_Comm *comm, starpu_tag_t tag);
 
 void starpu_mpi_comm_amounts_retrieve(size_t *comm_amounts);

+ 54 - 27
mpi/src/starpu_mpi.c

@@ -17,6 +17,7 @@
  */
 
 #include <stdlib.h>
+#include <limits.h>
 #include <starpu_mpi.h>
 #include <starpu_mpi_datatype.h>
 #include <starpu_mpi_private.h>
@@ -50,7 +51,7 @@ static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type
 #endif
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
 							int dest, int data_tag, MPI_Comm comm,
-							unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
+							unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg,
 							int sequential_consistency);
 static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle,
 							int source, int data_tag, MPI_Comm comm,
@@ -61,7 +62,7 @@ static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
 static void _starpu_mpi_early_data_cb(void* arg);
 
 /* The list of ready requests */
-static struct _starpu_mpi_req_list ready_requests;
+static struct _starpu_mpi_req_prio_list ready_requests;
 
 /* The list of detached requests that have already been submitted to MPI */
 static struct _starpu_mpi_req_list detached_requests;
@@ -105,6 +106,7 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
 
 	/* Initialize the request structure */
 	(*req)->data_handle = NULL;
+	(*req)->prio = 0;
 
 	(*req)->datatype = 0;
 	(*req)->datatype_name = NULL;
@@ -222,7 +224,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 			_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
 					  req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr,
 					  req->datatype_name, (int)req->count, req->registered_datatype);
-			_starpu_mpi_req_list_push_front(&ready_requests, req);
+			_starpu_mpi_req_prio_list_push_front(&ready_requests, req);
 
 			/* inform the starpu mpi thread that the request has been pushed in the ready_requests list */
 			STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
@@ -288,7 +290,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 						STARPU_ASSERT(req->count);
 						_STARPU_MPI_MALLOC(req->ptr, req->count);
 					}
-					_starpu_mpi_req_list_push_front(&ready_requests, req);
+					_starpu_mpi_req_prio_list_push_front(&ready_requests, req);
 					_starpu_mpi_request_destroy(sync_req);
 				}
 				else
@@ -301,7 +303,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
 	}
 	else
 	{
-		_starpu_mpi_req_list_push_front(&ready_requests, req);
+		_starpu_mpi_req_prio_list_push_front(&ready_requests, req);
 		_STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
 				  req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr,
 				  req->datatype_name, (int)req->count, req->registered_datatype);
@@ -323,7 +325,7 @@ static void nop_acquire_cb(void *arg)
 
 static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
 							      int srcdst, int data_tag, MPI_Comm comm,
-							      unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
+							      unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg,
 							      enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
 							      enum starpu_data_access_mode mode,
 							      int sequential_consistency,
@@ -347,6 +349,7 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 	/* Initialize the request structure */
 	_starpu_mpi_request_init(&req);
 	req->request_type = request_type;
+	req->prio = prio;
 	req->data_handle = data_handle;
 	req->node_tag.rank = srcdst;
 	req->node_tag.data_tag = data_tag;
@@ -534,10 +537,10 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
 							int dest, int data_tag, MPI_Comm comm,
-							unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
+							unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg,
 							int sequential_consistency)
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, dest, data_tag, comm, detached, sync, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func,
+	return _starpu_mpi_isend_irecv_common(data_handle, dest, data_tag, comm, detached, sync, prio, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func,
 #ifdef STARPU_MPI_PEDANTIC_ISEND
 					      STARPU_RW,
 #else
@@ -546,14 +549,14 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t dat
 					      sequential_consistency, 0, 0);
 }
 
-int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, MPI_Comm comm)
+int starpu_mpi_isend_prio(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, int prio, MPI_Comm comm)
 {
 	_STARPU_MPI_LOG_IN();
 	STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_isend needs a valid starpu_mpi_req");
 
 	struct _starpu_mpi_req *req;
 	_STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(dest, data_tag, 0);
-	req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 0, 0, NULL, NULL, 1);
+	req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 0, 0, prio, NULL, NULL, 1);
 	_STARPU_MPI_TRACE_ISEND_COMPLETE_END(dest, data_tag, 0);
 
 	STARPU_MPI_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
@@ -562,17 +565,26 @@ int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 }
+int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, MPI_Comm comm)
+{
+	return starpu_mpi_isend_prio(data_handle, public_req, dest, data_tag, 0, comm);
+}
 
-int starpu_mpi_isend_detached(starpu_data_handle_t data_handle,
-			      int dest, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
+int starpu_mpi_isend_detached_prio(starpu_data_handle_t data_handle,
+			      int dest, int data_tag, int prio, MPI_Comm comm, void (*callback)(void *), void *arg)
 {
 	_STARPU_MPI_LOG_IN();
-	_starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 1, 0, callback, arg, 1);
+	_starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 1, 0, prio, callback, arg, 1);
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 }
+int starpu_mpi_isend_detached(starpu_data_handle_t data_handle,
+			      int dest, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
+{
+	return starpu_mpi_isend_detached_prio(data_handle, dest, data_tag, 0, comm, callback, arg);
+}
 
-int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm)
+int starpu_mpi_send_prio(starpu_data_handle_t data_handle, int dest, int data_tag, int prio, MPI_Comm comm)
 {
 	starpu_mpi_req req;
 	MPI_Status status;
@@ -580,20 +592,24 @@ int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int data_tag, MP
 	_STARPU_MPI_LOG_IN();
 	memset(&status, 0, sizeof(MPI_Status));
 
-	starpu_mpi_isend(data_handle, &req, dest, data_tag, comm);
+	starpu_mpi_isend_prio(data_handle, &req, dest, data_tag, prio, comm);
 	starpu_mpi_wait(&req, &status);
 
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 }
+int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm)
+{
+	return starpu_mpi_send_prio(data_handle, dest, data_tag, 0, comm);
+}
 
-int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, MPI_Comm comm)
+int starpu_mpi_issend_prio(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, int prio, MPI_Comm comm)
 {
 	_STARPU_MPI_LOG_IN();
 	STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_issend needs a valid starpu_mpi_req");
 
 	struct _starpu_mpi_req *req;
-	req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 0, 1, NULL, NULL, 1);
+	req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 0, 1, prio, NULL, NULL, 1);
 
 	STARPU_MPI_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
 	*public_req = req;
@@ -601,16 +617,24 @@ int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *public_r
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 }
+int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, MPI_Comm comm)
+{
+	return starpu_mpi_issend_prio(data_handle, public_req, dest, data_tag, 0, comm);
+}
 
-int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
+int starpu_mpi_issend_detached_prio(starpu_data_handle_t data_handle, int dest, int data_tag, int prio, MPI_Comm comm, void (*callback)(void *), void *arg)
 {
 	_STARPU_MPI_LOG_IN();
 
-	_starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 1, 1, callback, arg, 1);
+	_starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 1, 1, prio, callback, arg, 1);
 
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 }
+int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
+{
+	return starpu_mpi_issend_detached_prio(data_handle, dest, data_tag, 0, comm, callback, arg);
+}
 
 /********************************************************/
 /*                                                      */
@@ -670,7 +694,7 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
 
 static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count)
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, source, data_tag, comm, detached, sync, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency, is_internal_req, count);
+	return _starpu_mpi_isend_irecv_common(data_handle, source, data_tag, comm, detached, sync, 0, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency, is_internal_req, count);
 }
 
 int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int data_tag, MPI_Comm comm)
@@ -795,6 +819,7 @@ int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
 
 	/* Initialize the request structure */
 	 _starpu_mpi_request_init(&waiting_req);
+	waiting_req->prio = INT_MAX;
 	waiting_req->status = status;
 	waiting_req->other_request = req;
 	waiting_req->func = _starpu_mpi_wait_func;
@@ -886,6 +911,7 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
 
 		/* Initialize the request structure */
 		_starpu_mpi_request_init(&testing_req);
+		testing_req->prio = INT_MAX;
 		testing_req->flag = flag;
 		testing_req->status = status;
 		testing_req->other_request = req;
@@ -978,6 +1004,7 @@ int _starpu_mpi_barrier(MPI_Comm comm)
 
 	/* Initialize the request structure */
 	_starpu_mpi_request_init(&barrier_req);
+	barrier_req->prio = INT_MAX;
 	barrier_req->func = _starpu_mpi_barrier_func;
 	barrier_req->request_type = BARRIER_REQ;
 	barrier_req->node_tag.comm = comm;
@@ -1316,7 +1343,7 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
 
 	// Handle the request immediatly to make sure the mpi_irecv is
 	// posted before receiving an other envelope
-	_starpu_mpi_req_list_erase(&ready_requests, early_data_handle->req);
+	_starpu_mpi_req_prio_list_erase(&ready_requests, early_data_handle->req);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
 	_starpu_mpi_handle_ready_request(early_data_handle->req);
 	STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
@@ -1392,13 +1419,13 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
  	int envelope_request_submitted = 0;
 
-	while (running || posted_requests || !(_starpu_mpi_req_list_empty(&ready_requests)) || !(_starpu_mpi_req_list_empty(&detached_requests)))// || !(_starpu_mpi_early_request_count()) || !(_starpu_mpi_sync_data_count()))
+	while (running || posted_requests || !(_starpu_mpi_req_prio_list_empty(&ready_requests)) || !(_starpu_mpi_req_list_empty(&detached_requests)))// || !(_starpu_mpi_early_request_count()) || !(_starpu_mpi_sync_data_count()))
 	{
 #ifdef STARPU_SIMGRID
 		starpu_pthread_wait_reset(&wait);
 #endif
 		/* shall we block ? */
-		unsigned block = _starpu_mpi_req_list_empty(&ready_requests) && _starpu_mpi_early_request_count() == 0 && _starpu_mpi_sync_data_count() == 0 && _starpu_mpi_req_list_empty(&detached_requests);
+		unsigned block = _starpu_mpi_req_prio_list_empty(&ready_requests) && _starpu_mpi_early_request_count() == 0 && _starpu_mpi_sync_data_count() == 0 && _starpu_mpi_req_list_empty(&detached_requests);
 
 		if (block)
 		{
@@ -1415,7 +1442,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 
 		/* get one request */
 		int n = 0;
-		while (!_starpu_mpi_req_list_empty(&ready_requests))
+		while (!_starpu_mpi_req_prio_list_empty(&ready_requests))
 		{
 			struct _starpu_mpi_req *req;
 
@@ -1423,7 +1450,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 				/* Already spent some time on submitting ready requests, poll before processing more ready requests */
 				break;
 
-			req = _starpu_mpi_req_list_pop_back(&ready_requests);
+			req = _starpu_mpi_req_prio_list_pop_back(&ready_requests);
 
 			/* handling a request is likely to block for a while
 			 * (on a sync_data_with_mem call), we want to let the
@@ -1585,7 +1612,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
 #endif
 
 	STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(&detached_requests), "List of detached requests not empty");
-	STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(&ready_requests), "List of ready requests not empty");
+	STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_prio_list_empty(&ready_requests), "List of ready requests not empty");
 	STARPU_MPI_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
 	_starpu_mpi_early_request_check_termination();
 	_starpu_mpi_early_data_check_termination();
@@ -1647,7 +1674,7 @@ int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv)
         STARPU_PTHREAD_MUTEX_INIT(&progress_mutex, NULL);
         STARPU_PTHREAD_COND_INIT(&progress_cond, NULL);
         STARPU_PTHREAD_COND_INIT(&barrier_cond, NULL);
-	_starpu_mpi_req_list_init(&ready_requests);
+	_starpu_mpi_req_prio_list_init(&ready_requests);
 
         STARPU_PTHREAD_MUTEX_INIT(&detached_requests_mutex, NULL);
 	_starpu_mpi_req_list_init(&detached_requests);

+ 35 - 2
mpi/src/starpu_mpi_fortran.c

@@ -2,6 +2,7 @@
  *
  * Copyright (C) 2016, 2017  CNRS
  * Copyright (C) 2016  Inria
+ * Copyright (C) 2017  Université de Bordeaux
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -120,6 +121,10 @@ void fstarpu_mpi_redux_data(MPI_Fint comm, starpu_data_handle_t data_handle)
 {
 	starpu_mpi_redux_data(MPI_Comm_f2c(comm), data_handle);
 }
+void fstarpu_mpi_redux_data_prio(MPI_Fint comm, starpu_data_handle_t data_handle, int prio)
+{
+	starpu_mpi_redux_data_prio(MPI_Comm_f2c(comm), data_handle, prio);
+}
 
 /* scatter/gather */
 int fstarpu_mpi_scatter_detached(starpu_data_handle_t *data_handles, int cnt, int root, MPI_Fint comm, void (*scallback)(void *), void *sarg, void (*rcallback)(void *), void *rarg)
@@ -137,6 +142,10 @@ int fstarpu_mpi_isend_detached_unlock_tag(starpu_data_handle_t data_handle, int
 {
 	return starpu_mpi_isend_detached_unlock_tag(data_handle, dst, mpi_tag, MPI_Comm_f2c(comm), *starpu_tag);
 }
+int fstarpu_mpi_isend_detached_unlock_tag_prio(starpu_data_handle_t data_handle, int dst, int mpi_tag, int prio, MPI_Fint comm, starpu_tag_t *starpu_tag)
+{
+	return starpu_mpi_isend_detached_unlock_tag_prio(data_handle, dst, mpi_tag, prio, MPI_Comm_f2c(comm), *starpu_tag);
+}
 
 int fstarpu_mpi_irecv_detached_unlock_tag(starpu_data_handle_t data_handle, int src, int mpi_tag, MPI_Fint comm, starpu_tag_t *starpu_tag)
 {
@@ -144,7 +153,7 @@ int fstarpu_mpi_irecv_detached_unlock_tag(starpu_data_handle_t data_handle, int
 }
 
 /* isend/irecv array detached unlock tag */
-int fstarpu_mpi_isend_array_detached_unlock_tag(int array_size, starpu_data_handle_t *data_handles, int *dsts, int *mpi_tags, MPI_Fint *_comms, starpu_tag_t *starpu_tag)
+int fstarpu_mpi_isend_array_detached_unlock_tag_prio(int array_size, starpu_data_handle_t *data_handles, int *dsts, int *mpi_tags, int *prio, MPI_Fint *_comms, starpu_tag_t *starpu_tag)
 {
 	MPI_Comm comms[array_size];
 	int i;
@@ -152,9 +161,13 @@ int fstarpu_mpi_isend_array_detached_unlock_tag(int array_size, starpu_data_hand
 	{
 		comms[i] = MPI_Comm_f2c(_comms[i]);
 	}
-	int ret = starpu_mpi_isend_array_detached_unlock_tag((unsigned)array_size, data_handles, dsts, mpi_tags, comms, *starpu_tag);
+	int ret = starpu_mpi_isend_array_detached_unlock_tag_prio((unsigned)array_size, data_handles, dsts, mpi_tags, prio, comms, *starpu_tag);
 	return ret;
 }
+int fstarpu_mpi_isend_array_detached_unlock_tag(int array_size, starpu_data_handle_t *data_handles, int *dsts, int *mpi_tags, MPI_Fint *_comms, starpu_tag_t *starpu_tag)
+{
+	return fstarpu_mpi_isend_array_detached_unlock_tag_prio(array_size, data_handles, dsts, mpi_tags, NULL, _comms, starpu_tag);
+}
 
 int fstarpu_mpi_irecv_array_detached_unlock_tag(int array_size, starpu_data_handle_t *data_handles, int *srcs, int *mpi_tags, MPI_Fint *_comms, starpu_tag_t *starpu_tag)
 {
@@ -173,6 +186,10 @@ int fstarpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *req, int
 {
 	return starpu_mpi_isend(data_handle, req, dst, mpi_tag, MPI_Comm_f2c(comm));
 }
+int fstarpu_mpi_isend_prio(starpu_data_handle_t data_handle, starpu_mpi_req *req, int dst, int mpi_tag, int prio, MPI_Fint comm)
+{
+	return starpu_mpi_isend_prio(data_handle, req, dst, mpi_tag, prio, MPI_Comm_f2c(comm));
+}
 
 int fstarpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *req, int src, int mpi_tag, MPI_Fint comm)
 {
@@ -184,6 +201,10 @@ int fstarpu_mpi_send(starpu_data_handle_t data_handle, int dst, int mpi_tag, MPI
 {
 	return starpu_mpi_send(data_handle, dst, mpi_tag, MPI_Comm_f2c(comm));
 }
+int fstarpu_mpi_send_prio(starpu_data_handle_t data_handle, int dst, int mpi_tag, int prio, MPI_Fint comm)
+{
+	return starpu_mpi_send_prio(data_handle, dst, mpi_tag, prio, MPI_Comm_f2c(comm));
+}
 
 int fstarpu_mpi_recv(starpu_data_handle_t data_handle, int src, int mpi_tag, MPI_Fint comm, MPI_Status *status)
 {
@@ -195,6 +216,10 @@ int fstarpu_mpi_isend_detached(starpu_data_handle_t data_handle, int dst, int mp
 {
 	return starpu_mpi_isend_detached(data_handle, dst, mpi_tag, MPI_Comm_f2c(comm), callback, arg);
 }
+int fstarpu_mpi_isend_detached_prio(starpu_data_handle_t data_handle, int dst, int mpi_tag, int prio, MPI_Fint comm, void (*callback)(void *), void *arg)
+{
+	return starpu_mpi_isend_detached_prio(data_handle, dst, mpi_tag, prio, MPI_Comm_f2c(comm), callback, arg);
+}
 
 int fstarpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int src, int mpi_tag, MPI_Fint comm, void (*callback)(void *), void *arg)
 {
@@ -206,11 +231,19 @@ int fstarpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *req, in
 {
 	return starpu_mpi_issend(data_handle, req, dst, mpi_tag, MPI_Comm_f2c(comm));
 }
+int fstarpu_mpi_issend_prio(starpu_data_handle_t data_handle, starpu_mpi_req *req, int dst, int mpi_tag, int prio, MPI_Fint comm)
+{
+	return starpu_mpi_issend_prio(data_handle, req, dst, mpi_tag, prio, MPI_Comm_f2c(comm));
+}
 
 int fstarpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dst, int mpi_tag, MPI_Fint comm, void (*callback)(void *), void *arg)
 {
 	return starpu_mpi_issend_detached(data_handle, dst, mpi_tag, MPI_Comm_f2c(comm), callback, arg);
 }
+int fstarpu_mpi_issend_detached_prio(starpu_data_handle_t data_handle, int dst, int mpi_tag, int prio, MPI_Fint comm, void (*callback)(void *), void *arg)
+{
+	return starpu_mpi_issend_detached_prio(data_handle, dst, mpi_tag, prio, MPI_Comm_f2c(comm), callback, arg);
+}
 
 /* cache */
 void fstarpu_mpi_cache_flush(MPI_Fint comm, starpu_data_handle_t data_handle)

+ 19 - 6
mpi/src/starpu_mpi_helper.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010, 2015  Université de Bordeaux
+ * Copyright (C) 2010, 2015, 2017  Université de Bordeaux
  * Copyright (C) 2010, 2012, 2014, 2016  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -27,13 +27,17 @@ static void starpu_mpi_unlock_tag_callback(void *arg)
 	free(tagptr);
 }
 
-int starpu_mpi_isend_detached_unlock_tag(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm, starpu_tag_t tag)
+int starpu_mpi_isend_detached_unlock_tag_prio(starpu_data_handle_t data_handle, int dest, int data_tag, int prio, MPI_Comm comm, starpu_tag_t tag)
 {
 	starpu_tag_t *tagptr;
 	_STARPU_MPI_MALLOC(tagptr, sizeof(starpu_tag_t));
 	*tagptr = tag;
 
-	return starpu_mpi_isend_detached(data_handle, dest, data_tag, comm, starpu_mpi_unlock_tag_callback, tagptr);
+	return starpu_mpi_isend_detached_prio(data_handle, dest, data_tag, prio, comm, starpu_mpi_unlock_tag_callback, tagptr);
+}
+int starpu_mpi_isend_detached_unlock_tag(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm, starpu_tag_t tag)
+{
+	return starpu_mpi_isend_detached_unlock_tag_prio(data_handle, dest, data_tag, 0, comm, tag);
 }
 
 
@@ -65,8 +69,8 @@ static void starpu_mpi_array_unlock_callback(void *_arg)
 	}
 }
 
-int starpu_mpi_isend_array_detached_unlock_tag(unsigned array_size,
-		starpu_data_handle_t *data_handle, int *dest, int *data_tag,
+int starpu_mpi_isend_array_detached_unlock_tag_prio(unsigned array_size,
+		starpu_data_handle_t *data_handle, int *dest, int *data_tag, int *prio,
 		MPI_Comm *comm, starpu_tag_t tag)
 {
 	if (!array_size)
@@ -80,11 +84,20 @@ int starpu_mpi_isend_array_detached_unlock_tag(unsigned array_size,
 	unsigned elem;
 	for (elem = 0; elem < array_size; elem++)
 	{
-		starpu_mpi_isend_detached(data_handle[elem], dest[elem], data_tag[elem], comm[elem], starpu_mpi_array_unlock_callback, arg);
+		int p = 0;
+		if (prio)
+			p = prio[elem];
+		starpu_mpi_isend_detached_prio(data_handle[elem], dest[elem], data_tag[elem], p, comm[elem], starpu_mpi_array_unlock_callback, arg);
 	}
 
 	return 0;
 }
+int starpu_mpi_isend_array_detached_unlock_tag(unsigned array_size,
+		starpu_data_handle_t *data_handle, int *dest, int *data_tag,
+		MPI_Comm *comm, starpu_tag_t tag)
+{
+	return starpu_mpi_isend_array_detached_unlock_tag_prio(array_size, data_handle, dest, data_tag, NULL, comm, tag);
+}
 
 
 int starpu_mpi_irecv_array_detached_unlock_tag(unsigned array_size, starpu_data_handle_t *data_handle, int *source, int *data_tag, MPI_Comm *comm, starpu_tag_t tag)

+ 4 - 0
mpi/src/starpu_mpi_private.h

@@ -24,6 +24,7 @@
 #include <starpu_mpi.h>
 #include <starpu_mpi_fxt.h>
 #include <common/list.h>
+#include <common/prio_list.h>
 #include <core/simgrid.h>
 
 #ifdef __cplusplus
@@ -199,6 +200,8 @@ LIST_TYPE(_starpu_mpi_req,
 	/* description of the data at StarPU level */
 	starpu_data_handle_t data_handle;
 
+	int prio;
+
 	/* description of the data to be sent/received */
 	MPI_Datatype datatype;
 	char *datatype_name;
@@ -261,6 +264,7 @@ LIST_TYPE(_starpu_mpi_req,
 #endif
 
 );
+PRIO_LIST_TYPE(_starpu_mpi_req, prio)
 
 struct _starpu_mpi_argc_argv
 {

+ 33 - 21
mpi/src/starpu_mpi_task_insert.c

@@ -30,11 +30,11 @@
 #include <starpu_mpi_cache.h>
 #include <starpu_mpi_select_node.h>
 
-#define _SEND_DATA(data, mode, dest, data_tag, comm, callback, arg)     \
+#define _SEND_DATA(data, mode, dest, data_tag, prio, comm, callback, arg)     \
 	if (mode & STARPU_SSEND)					\
-		starpu_mpi_issend_detached(data, dest, data_tag, comm, callback, arg); \
+		starpu_mpi_issend_detached_prio(data, dest, data_tag, prio, comm, callback, arg); \
 	else								\
-		starpu_mpi_isend_detached(data, dest, data_tag, comm, callback, arg);
+		starpu_mpi_isend_detached_prio(data, dest, data_tag, prio, comm, callback, arg);
 
 static void (*pre_submit_hook)(struct starpu_task *task) = NULL;
 
@@ -92,7 +92,7 @@ int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_data_a
 	return 0;
 }
 
-void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int xrank, int do_execute, MPI_Comm comm)
+void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int xrank, int do_execute, int prio, MPI_Comm comm)
 {
 	if (data && mode & STARPU_R)
 	{
@@ -126,7 +126,7 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
 				if (data_tag == -1)
 					_STARPU_ERROR("StarPU needs to be told the MPI tag of this data, using starpu_mpi_data_register\n");
 				_STARPU_MPI_DEBUG(1, "Sending data %p to %d\n", data, xrank);
-				_SEND_DATA(data, mode, xrank, data_tag, comm, NULL, NULL);
+				_SEND_DATA(data, mode, xrank, data_tag, prio, comm, NULL, NULL);
 			}
 			// Else the data has already been sent
 		}
@@ -134,7 +134,7 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
 }
 
 static
-void _starpu_mpi_exchange_data_after_execution(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int xrank, int do_execute, MPI_Comm comm)
+void _starpu_mpi_exchange_data_after_execution(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int xrank, int do_execute, int prio, MPI_Comm comm)
 {
 	if (mode & STARPU_W)
 	{
@@ -159,7 +159,7 @@ void _starpu_mpi_exchange_data_after_execution(starpu_data_handle_t data, enum s
 			if(data_tag == -1)
 				_STARPU_ERROR("StarPU needs to be told the MPI tag of this data, using starpu_mpi_data_register\n");
 			_STARPU_MPI_DEBUG(1, "Send data %p back to its owner %d...\n", data, mpi_rank);
-			_SEND_DATA(data, mode, mpi_rank, data_tag, comm, NULL, NULL);
+			_SEND_DATA(data, mode, mpi_rank, data_tag, prio, comm, NULL, NULL);
 		}
 	}
 }
@@ -191,7 +191,7 @@ void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum star
 }
 
 static
-int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nodes, int *xrank, int *do_execute, struct starpu_data_descr **descrs_p, int *nb_data_p, va_list varg_list)
+int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nodes, int *xrank, int *do_execute, struct starpu_data_descr **descrs_p, int *nb_data_p, int *prio_p, va_list varg_list)
 {
 	va_list varg_list_copy;
 	int inconsistent_execute = 0;
@@ -200,6 +200,7 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
 	int nb_allocated_data = 16;
 	struct starpu_data_descr *descrs;
 	int nb_data;
+	int prio = 0;
 	int select_node_policy = STARPU_MPI_NODE_SELECTION_CURRENT_POLICY;
 
 	_STARPU_TRACE_TASK_MPI_DECODE_START();
@@ -348,7 +349,7 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
 		}
 		else if (arg_type==STARPU_PRIORITY)
 		{
-			(void)va_arg(varg_list_copy, int);
+			prio = va_arg(varg_list_copy, int);
 		}
 		/* STARPU_EXECUTE_ON_NODE handled above */
 		/* STARPU_EXECUTE_ON_DATA handled above */
@@ -444,19 +445,21 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
 
 	*descrs_p = descrs;
 	*nb_data_p = nb_data;
+	*prio_p = prio;
 
 	_STARPU_TRACE_TASK_MPI_DECODE_END();
 	return 0;
 }
 
 static
-int _starpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, struct starpu_task **task, int *xrank_p, struct starpu_data_descr **descrs_p, int *nb_data_p, va_list varg_list)
+int _starpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, struct starpu_task **task, int *xrank_p, struct starpu_data_descr **descrs_p, int *nb_data_p, int *prio_p, va_list varg_list)
 {
 	int me, do_execute, xrank, nb_nodes;
 	int ret;
 	int i;
 	struct starpu_data_descr *descrs;
 	int nb_data;
+	int prio;
 
 	_STARPU_MPI_LOG_IN();
 
@@ -464,14 +467,14 @@ int _starpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, stru
 	starpu_mpi_comm_size(comm, &nb_nodes);
 
 	/* Find out whether we are to execute the data because we own the data to be written to. */
-	ret = _starpu_mpi_task_decode_v(codelet, me, nb_nodes, &xrank, &do_execute, &descrs, &nb_data, varg_list);
+	ret = _starpu_mpi_task_decode_v(codelet, me, nb_nodes, &xrank, &do_execute, &descrs, &nb_data, &prio, varg_list);
 	if (ret < 0) return ret;
 
 	_STARPU_TRACE_TASK_MPI_PRE_START();
 	/* Send and receive data as requested */
 	for(i=0 ; i<nb_data ; i++)
 	{
-		_starpu_mpi_exchange_data_before_execution(descrs[i].handle, descrs[i].mode, me, xrank, do_execute, comm);
+		_starpu_mpi_exchange_data_before_execution(descrs[i].handle, descrs[i].mode, me, xrank, do_execute, prio, comm);
 	}
 
 	if (xrank_p) *xrank_p = xrank;
@@ -480,6 +483,9 @@ int _starpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, stru
 		*descrs_p = descrs;
 	else
 		free(descrs);
+	if (prio_p)
+		*prio_p = prio;
+
 	_STARPU_TRACE_TASK_MPI_PRE_END();
 
 	if (do_execute == 0) return 1;
@@ -499,7 +505,7 @@ int _starpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, stru
 	}
 }
 
-int _starpu_mpi_task_postbuild_v(MPI_Comm comm, int xrank, int do_execute, struct starpu_data_descr *descrs, int nb_data)
+int _starpu_mpi_task_postbuild_v(MPI_Comm comm, int xrank, int do_execute, struct starpu_data_descr *descrs, int nb_data, int prio)
 {
 	int me, i;
 
@@ -508,7 +514,7 @@ int _starpu_mpi_task_postbuild_v(MPI_Comm comm, int xrank, int do_execute, struc
 
 	for(i=0 ; i<nb_data ; i++)
 	{
-		_starpu_mpi_exchange_data_after_execution(descrs[i].handle, descrs[i].mode, me, xrank, do_execute, comm);
+		_starpu_mpi_exchange_data_after_execution(descrs[i].handle, descrs[i].mode, me, xrank, do_execute, prio, comm);
 		_starpu_mpi_clear_data_after_execution(descrs[i].handle, descrs[i].mode, me, do_execute);
 	}
 
@@ -528,8 +534,9 @@ int _starpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, va_
 	int do_execute = 0;
 	struct starpu_data_descr *descrs;
 	int nb_data;
+	int prio;
 
-	ret = _starpu_mpi_task_build_v(comm, codelet, &task, &xrank, &descrs, &nb_data, varg_list);
+	ret = _starpu_mpi_task_build_v(comm, codelet, &task, &xrank, &descrs, &nb_data, &prio, varg_list);
 	if (ret < 0) return ret;
 
 	if (ret == 0)
@@ -550,7 +557,7 @@ int _starpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, va_
 		}
 	}
 
-	int val = _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data);
+	int val = _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data, prio);
 
 	if (ret == 0 && pre_submit_hook)
 		pre_submit_hook(task);
@@ -587,7 +594,7 @@ struct starpu_task *starpu_mpi_task_build(MPI_Comm comm, struct starpu_codelet *
 	int ret;
 
 	va_start(varg_list, codelet);
-	ret = _starpu_mpi_task_build_v(comm, codelet, &task, NULL, NULL, NULL, varg_list);
+	ret = _starpu_mpi_task_build_v(comm, codelet, &task, NULL, NULL, NULL, NULL, varg_list);
 	va_end(varg_list);
 	STARPU_ASSERT(ret >= 0);
 	if (ret > 0) return NULL; else return task;
@@ -600,17 +607,18 @@ int starpu_mpi_task_post_build(MPI_Comm comm, struct starpu_codelet *codelet, ..
 	va_list varg_list;
 	struct starpu_data_descr *descrs;
 	int nb_data;
+	int prio;
 
 	starpu_mpi_comm_rank(comm, &me);
 	starpu_mpi_comm_size(comm, &nb_nodes);
 
 	va_start(varg_list, codelet);
 	/* Find out whether we are to execute the data because we own the data to be written to. */
-	ret = _starpu_mpi_task_decode_v(codelet, me, nb_nodes, &xrank, &do_execute, &descrs, &nb_data, varg_list);
+	ret = _starpu_mpi_task_decode_v(codelet, me, nb_nodes, &xrank, &do_execute, &descrs, &nb_data, &prio, varg_list);
 	va_end(varg_list);
 	if (ret < 0) return ret;
 
-	return _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data);
+	return _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data, prio);
 }
 
 struct _starpu_mpi_redux_data_args
@@ -685,7 +693,7 @@ void _starpu_mpi_redux_data_recv_callback(void *callback_arg)
 
 /* TODO: this should rather be implicitly called by starpu_mpi_task_insert when
  * a data previously accessed in REDUX mode gets accessed in R mode. */
-void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
+void starpu_mpi_redux_data_prio(MPI_Comm comm, starpu_data_handle_t data_handle, int prio)
 {
 	int me, rank, tag, nb_nodes;
 
@@ -768,7 +776,7 @@ void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
 	else
 	{
 		_STARPU_MPI_DEBUG(1, "Sending redux handle to %d ...\n", rank);
-		starpu_mpi_isend_detached(data_handle, rank, tag, comm, NULL, NULL);
+		starpu_mpi_isend_detached_prio(data_handle, rank, tag, prio, comm, NULL, NULL);
 		starpu_task_insert(data_handle->init_cl, STARPU_W, data_handle, 0);
 	}
 	/* FIXME: In order to prevent simultaneous receive submissions
@@ -779,3 +787,7 @@ void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
 	starpu_task_wait_for_all();
 
 }
+void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
+{
+	return starpu_mpi_redux_data_prio(comm, data_handle, 0);
+}