소스 검색

harmonize a bit with MPI driver, bringing priorities support in

Samuel Thibault 8 년 전
부모
커밋
df4e5d334e

+ 8 - 0
nmad/include/starpu_mpi.h

@@ -33,13 +33,18 @@ extern "C"
 typedef void *starpu_mpi_req;
 
 int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *req, int dest, int mpi_tag, MPI_Comm comm);
+int starpu_mpi_isend_prio(starpu_data_handle_t data_handle, starpu_mpi_req *req, int dest, int mpi_tag, int prio, MPI_Comm comm);
 int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *req, int source, int mpi_tag, MPI_Comm comm);
 int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm);
+int starpu_mpi_send_prio(starpu_data_handle_t data_handle, int dest, int mpi_tag, int prio, MPI_Comm comm);
 int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, MPI_Status *status);
 int starpu_mpi_isend_detached(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
+int starpu_mpi_isend_detached_prio(starpu_data_handle_t data_handle, int dest, int mpi_tag, int prio, MPI_Comm comm, void (*callback)(void *), void *arg);
 int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
 int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *req, int dest, int mpi_tag, MPI_Comm comm);
+int starpu_mpi_issend_prio(starpu_data_handle_t data_handle, starpu_mpi_req *req, int dest, int mpi_tag, int prio, MPI_Comm comm);
 int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg);
+int starpu_mpi_issend_detached_prio(starpu_data_handle_t data_handle, int dest, int mpi_tag, int prio, MPI_Comm comm, void (*callback)(void *), void *arg);
 int starpu_mpi_wait(starpu_mpi_req *req, MPI_Status *status);
 int starpu_mpi_test(starpu_mpi_req *req, int *flag, MPI_Status *status);
 int starpu_mpi_barrier(MPI_Comm comm);
@@ -61,14 +66,17 @@ int starpu_mpi_insert_task(MPI_Comm comm, struct starpu_codelet *codelet, ...);
 void starpu_mpi_get_data_on_node(MPI_Comm comm, starpu_data_handle_t data_handle, int node);
 void starpu_mpi_get_data_on_node_detached(MPI_Comm comm, starpu_data_handle_t data_handle, int node, void (*callback)(void*), void *arg);
 void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle);
+void starpu_mpi_redux_data_prio(MPI_Comm comm, starpu_data_handle_t data_handle, int prio);
 
 int starpu_mpi_scatter_detached(starpu_data_handle_t *data_handles, int count, int root, MPI_Comm comm, void (*scallback)(void *), void *sarg, void (*rcallback)(void *), void *rarg);
 int starpu_mpi_gather_detached(starpu_data_handle_t *data_handles, int count, int root, MPI_Comm comm, void (*scallback)(void *), void *sarg, void (*rcallback)(void *), void *rarg);
 
 int starpu_mpi_isend_detached_unlock_tag(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm, starpu_tag_t tag);
+int starpu_mpi_isend_detached_unlock_tag_prio(starpu_data_handle_t data_handle, int dest, int mpi_tag, int prio, MPI_Comm comm, starpu_tag_t tag);
 int starpu_mpi_irecv_detached_unlock_tag(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, starpu_tag_t tag);
 
 int starpu_mpi_isend_array_detached_unlock_tag(unsigned array_size, starpu_data_handle_t *data_handle, int *dest, int *mpi_tag, MPI_Comm *comm, starpu_tag_t tag);
+int starpu_mpi_isend_array_detached_unlock_tag_prio(unsigned array_size, starpu_data_handle_t *data_handle, int *dest, int *mpi_tag, int *prio, MPI_Comm *comm, starpu_tag_t tag);
 int starpu_mpi_irecv_array_detached_unlock_tag(unsigned array_size, starpu_data_handle_t *data_handle, int *source, int *mpi_tag, MPI_Comm *comm, starpu_tag_t tag);
 
 void starpu_mpi_comm_amounts_retrieve(size_t *comm_amounts);

+ 52 - 27
nmad/src/starpu_mpi.c

@@ -34,10 +34,10 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,n
 static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type);
 #endif
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
-							int dest, int mpi_tag, MPI_Comm comm,
-							unsigned detached, unsigned sync, void (*callback)(void *), void *arg);
+							int dest, int data_tag, MPI_Comm comm,
+							unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg);
 static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle,
-							int source, int mpi_tag, MPI_Comm comm,
+							int source, int data_tag, MPI_Comm comm,
 							unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency);
 static void _starpu_mpi_handle_new_request(struct _starpu_mpi_req *req);
 
@@ -73,8 +73,8 @@ static void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req)
 /********************************************************/
 
 static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
-							      int srcdst, int mpi_tag, MPI_Comm comm,
-							      unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
+							      int srcdst, int data_tag, MPI_Comm comm,
+							      unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg,
 							      enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
 							      enum starpu_data_access_mode mode, 
 							      int sequential_consistency)
@@ -93,11 +93,12 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 	piom_cond_init(&req->req_cond, 0);
 
 	req->request_type = request_type;
+	req->prio = prio;
 	req->user_datatype = -1;
 	req->count = -1;
 	req->data_handle = data_handle;
 	req->srcdst = srcdst;
-	req->mpi_tag = mpi_tag;
+	req->mpi_tag = data_tag;
 	req->comm = comm;
 	req->session = nm_mpi_communicator_get_session(p_comm);
 	req->gate = nm_mpi_communicator_get_gate(p_comm,req->srcdst);
@@ -112,7 +113,7 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
 	/* Asynchronously request StarPU to fetch the data in main memory: when
 	 * it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
 	 * the request is actually submitted */
-	starpu_data_acquire_cb_sequential_consistency(data_handle, mode, _starpu_mpi_handle_new_request, (void *)req,sequential_consistency);
+	starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(data_handle, STARPU_MAIN_RAM, mode, _starpu_mpi_handle_new_request, (void *)req, sequential_consistency, &req->pre_sync_jobid, &req->post_sync_jobid);
 
 	_STARPU_MPI_LOG_OUT();
 	return req;
@@ -209,38 +210,47 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
 }
 
 static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
-							int dest, int mpi_tag, MPI_Comm comm,
-							 unsigned detached, unsigned sync, void (*callback)(void *), void *arg)
+							int dest, int data_tag, MPI_Comm comm,
+							 unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg)
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, dest, mpi_tag, comm, detached, sync, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R,1);
+	return _starpu_mpi_isend_irecv_common(data_handle, dest, data_tag, comm, detached, sync, prio, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R,1);
 }
 
-int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
+int starpu_mpi_isend_prio(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, int prio, MPI_Comm comm)
 {
 	_STARPU_MPI_LOG_IN();
-	STARPU_ASSERT_MSG(public_req, "starpu_mpi_isend needs a valid starpu_mpi_req");
+	STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_isend needs a valid starpu_mpi_req");
 
 	struct _starpu_mpi_req *req;
-	req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, 0, NULL, NULL);
+	TRACE_MPI_ISEND_COMPLETE_BEGIN(dest, data_tag, 0);
+	req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 0, 0, prio, NULL, NULL);
+	TRACE_MPI_ISEND_COMPLETE_END(dest, data_tag, 0);
 
-	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
+	STARPU_MPI_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
 	*public_req = req;
 
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 }
 
-int starpu_mpi_isend_detached(starpu_data_handle_t data_handle,
-			      int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
+int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, MPI_Comm comm)
 {
-	_STARPU_MPI_LOG_IN();
-	_starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, 0, callback, arg);
+	return starpu_mpi_isend_prio(data_handle, public_req, dest, data_tag, 0, comm);
+}
 
+int starpu_mpi_isend_detached_prio(starpu_data_handle_t data_handle, int dest, int data_tag, int prio, MPI_Comm comm, void (*callback)(void *), void *arg)
+{
+	_STARPU_MPI_LOG_IN();
+	_starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 1, 0, prio, callback, arg);
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 }
+int starpu_mpi_isend_detached(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
+{
+	return starpu_mpi_isend_detached_prio(data_handle, dest, data_tag, 0, comm, callback, arg);
+}
 
-int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm)
+int starpu_mpi_send_prio(starpu_data_handle_t data_handle, int dest, int data_tag, int prio, MPI_Comm comm)
 {
 	starpu_mpi_req req;
 	MPI_Status status;
@@ -248,38 +258,53 @@ int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI
 	_STARPU_MPI_LOG_IN();
 	memset(&status, 0, sizeof(MPI_Status));
 
-	starpu_mpi_isend(data_handle, &req, dest, mpi_tag, comm);
+	starpu_mpi_isend_prio(data_handle, &req, dest, data_tag, prio, comm);
 	starpu_mpi_wait(&req, &status);
 
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 }
 
-int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
+int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm)
+{
+	return starpu_mpi_send_prio(data_handle, dest, data_tag, 0, comm);
+}
+
+int starpu_mpi_issend_prio(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, int prio, MPI_Comm comm)
 {
 	_STARPU_MPI_LOG_IN();
-	STARPU_ASSERT_MSG(public_req, "starpu_mpi_issend needs a valid starpu_mpi_req");
+	STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_issend needs a valid starpu_mpi_req");
 
 	struct _starpu_mpi_req *req;
-	req = _starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 0, 1, NULL, NULL);
+	req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 0, 1, prio, NULL, NULL);
 
-	STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
+	STARPU_MPI_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
 	*public_req = req;
 
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 }
 
-int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
+int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, MPI_Comm comm)
+{
+	return starpu_mpi_issend_prio(data_handle, public_req, dest, data_tag, 0, comm);
+}
+
+int starpu_mpi_issend_detached_prio(starpu_data_handle_t data_handle, int dest, int data_tag, int prio, MPI_Comm comm, void (*callback)(void *), void *arg)
 {
 	_STARPU_MPI_LOG_IN();
 
-	_starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, 1, callback, arg);
+	_starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 1, 1, prio, callback, arg);
 
 	_STARPU_MPI_LOG_OUT();
 	return 0;
 }
 
+int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
+{
+	return starpu_mpi_issend_detached_prio(data_handle, dest, data_tag, 0, comm, callback, arg);
+}
+
 /********************************************************/
 /*                                                      */
 /*  Receive functionalities                             */
@@ -352,7 +377,7 @@ static void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
 
 static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency)
 {
-	return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, sync, callback, arg, RECV_REQ, _starpu_mpi_irecv_size_func, STARPU_W,sequential_consistency);
+	return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, sync, 0, callback, arg, RECV_REQ, _starpu_mpi_irecv_size_func, STARPU_W,sequential_consistency);
 }
 
 int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)

+ 3 - 2
nmad/src/starpu_mpi_cache.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2011, 2012, 2013, 2014, 2015  Centre National de la Recherche Scientifique
- * Copyright (C) 2011-2014  Université de Bordeaux
+ * Copyright (C) 2011-2014, 2017  Université de Bordeaux
  * Copyright (C) 2014 INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -24,7 +24,8 @@
 #include <mpi.h>
 
 #ifdef __cplusplus
-extern "C" {
+extern "C"
+{
 #endif
 
 extern int _starpu_cache_enabled;

+ 2 - 1
nmad/src/starpu_mpi_cache_stats.h

@@ -22,7 +22,8 @@
 #include <mpi.h>
 
 #ifdef __cplusplus
-extern "C" {
+extern "C"
+{
 #endif
 
 void _starpu_mpi_cache_stats_init(MPI_Comm comm);

+ 1 - 0
nmad/src/starpu_mpi_collective.c

@@ -27,6 +27,7 @@ struct _callback_arg
 	int count;
 };
 
+static
 void _callback_collective(void *arg)
 {
 	struct _callback_arg *callback_arg = arg;

+ 3 - 2
nmad/src/starpu_mpi_datatype.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2011  Université de Bordeaux
+ * Copyright (C) 2009-2011, 2017  Université de Bordeaux
  * Copyright (C) 2010, 2012, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -21,7 +21,8 @@
 #include <starpu_mpi.h>
 
 #ifdef __cplusplus
-extern "C" {
+extern "C"
+{
 #endif
 
 void _starpu_mpi_datatype_init(void);

+ 52 - 40
nmad/src/starpu_mpi_fxt.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010  Université de Bordeaux
+ * Copyright (C) 2010, 2017  Université de Bordeaux
  * Copyright (C) 2010, 2012  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -23,68 +23,80 @@
 #include <common/fxt.h>
 
 #ifdef __cplusplus
-extern "C" {
+extern "C"
+{
 #endif
 
-#define FUT_MPI_START				0x5201
-#define FUT_MPI_STOP				0x5202
-#define FUT_MPI_BARRIER				0x5203
-#define FUT_MPI_ISEND_SUBMIT_BEGIN		0x5204
-#define FUT_MPI_ISEND_SUBMIT_END		0x5205
-#define FUT_MPI_IRECV_SUBMIT_BEGIN		0x5206
-#define FUT_MPI_IRECV_SUBMIT_END		0x5207
-#define FUT_MPI_ISEND_COMPLETE_BEGIN		0x5208
-#define FUT_MPI_ISEND_COMPLETE_END		0x5209
-#define FUT_MPI_IRECV_COMPLETE_BEGIN		0x5210
-#define FUT_MPI_IRECV_COMPLETE_END		0x5211
-#define FUT_MPI_SLEEP_BEGIN			0x5212
-#define FUT_MPI_SLEEP_END			0x5213
-#define FUT_MPI_DTESTING_BEGIN			0x5214
-#define FUT_MPI_DTESTING_END			0x5215
-#define FUT_MPI_UTESTING_BEGIN			0x5216
-#define FUT_MPI_UTESTING_END			0x5217
-#define FUT_MPI_UWAIT_BEGIN			0x5218
-#define FUT_MPI_UWAIT_END			0x5219
+#define _STARPU_MPI_FUT_START				0x5201
+#define _STARPU_MPI_FUT_STOP				0x5202
+#define _STARPU_MPI_FUT_BARRIER				0x5203
+#define _STARPU_MPI_FUT_ISEND_SUBMIT_BEGIN		0x5204
+#define _STARPU_MPI_FUT_ISEND_SUBMIT_END		0x5205
+#define _STARPU_MPI_FUT_IRECV_SUBMIT_BEGIN		0x5206
+#define _STARPU_MPI_FUT_IRECV_SUBMIT_END		0x5207
+#define _STARPU_MPI_FUT_ISEND_COMPLETE_BEGIN		0x5208
+#define _STARPU_MPI_FUT_ISEND_COMPLETE_END		0x5209
+#define _STARPU_MPI_FUT_IRECV_COMPLETE_BEGIN		0x5210
+#define _STARPU_MPI_FUT_IRECV_COMPLETE_END		0x5211
+#define _STARPU_MPI_FUT_SLEEP_BEGIN			0x5212
+#define _STARPU_MPI_FUT_SLEEP_END			0x5213
+#define _STARPU_MPI_FUT_DTESTING_BEGIN			0x5214
+#define _STARPU_MPI_FUT_DTESTING_END			0x5215
+#define _STARPU_MPI_FUT_UTESTING_BEGIN			0x5216
+#define _STARPU_MPI_FUT_UTESTING_END			0x5217
+#define _STARPU_MPI_FUT_UWAIT_BEGIN			0x5218
+#define _STARPU_MPI_FUT_UWAIT_END			0x5219
+#define _STARPU_MPI_FUT_DATA_SET_RANK			0x521a
+#define _STARPU_MPI_FUT_IRECV_TERMINATED		0x521b
+#define _STARPU_MPI_FUT_ISEND_TERMINATED		0x521c
+#define _STARPU_MPI_FUT_TESTING_DETACHED_BEGIN		0x521d
+#define _STARPU_MPI_FUT_TESTING_DETACHED_END		0x521e
+#define _STARPU_MPI_FUT_TEST_BEGIN			0x521f
+#define _STARPU_MPI_FUT_TEST_END			0x5220
 
 #ifdef STARPU_USE_FXT
 #define TRACE_MPI_START(rank, worldsize)	\
-	FUT_DO_PROBE3(FUT_MPI_START, (rank), (worldsize), _starpu_gettid());
+	FUT_DO_PROBE3(_STARPU_MPI_FUT_START, (rank), (worldsize), _starpu_gettid());
 #define TRACE_MPI_STOP(rank, worldsize)	\
-	FUT_DO_PROBE3(FUT_MPI_STOP, (rank), (worldsize), _starpu_gettid());
+	FUT_DO_PROBE3(_STARPU_MPI_FUT_STOP, (rank), (worldsize), _starpu_gettid());
 #define TRACE_MPI_BARRIER(rank, worldsize, key)	\
-	FUT_DO_PROBE4(FUT_MPI_BARRIER, (rank), (worldsize), (key), _starpu_gettid());
+	FUT_DO_PROBE4(_STARPU_MPI_FUT_BARRIER, (rank), (worldsize), (key), _starpu_gettid());
 #define TRACE_MPI_ISEND_SUBMIT_BEGIN(dest, mpi_tag, size)	\
-	FUT_DO_PROBE4(FUT_MPI_ISEND_SUBMIT_BEGIN, (dest), (mpi_tag), (size), _starpu_gettid());
+	FUT_DO_PROBE4(_STARPU_MPI_FUT_ISEND_SUBMIT_BEGIN, (dest), (mpi_tag), (size), _starpu_gettid());
 #define TRACE_MPI_ISEND_SUBMIT_END(dest, mpi_tag, size)	\
-	FUT_DO_PROBE4(FUT_MPI_ISEND_SUBMIT_END, (dest), (mpi_tag), (size), _starpu_gettid());
+	FUT_DO_PROBE4(_STARPU_MPI_FUT_ISEND_SUBMIT_END, (dest), (mpi_tag), (size), _starpu_gettid());
 #define TRACE_MPI_IRECV_SUBMIT_BEGIN(src, mpi_tag)	\
-	FUT_DO_PROBE3(FUT_MPI_IRECV_SUBMIT_BEGIN, (src), (mpi_tag), _starpu_gettid());
+	FUT_DO_PROBE3(_STARPU_MPI_FUT_IRECV_SUBMIT_BEGIN, (src), (mpi_tag), _starpu_gettid());
 #define TRACE_MPI_IRECV_SUBMIT_END(src, mpi_tag)	\
-	FUT_DO_PROBE3(FUT_MPI_IRECV_SUBMIT_END, (src), (mpi_tag), _starpu_gettid());
+	FUT_DO_PROBE3(_STARPU_MPI_FUT_IRECV_SUBMIT_END, (src), (mpi_tag), _starpu_gettid());
 #define TRACE_MPI_ISEND_COMPLETE_BEGIN(dest, mpi_tag, size)	\
-	FUT_DO_PROBE4(FUT_MPI_ISEND_COMPLETE_BEGIN, (dest), (mpi_tag), (size), _starpu_gettid());
+	FUT_DO_PROBE4(_STARPU_MPI_FUT_ISEND_COMPLETE_BEGIN, (dest), (mpi_tag), (size), _starpu_gettid());
+#define TRACE_MPI_COMPLETE_BEGIN(type, rank, mpi_tag)		\
+	if (type == RECV_REQ) { TRACE_MPI_IRECV_COMPLETE_BEGIN((rank), (mpi_tag)); } else if (type == SEND_REQ) { TRACE_MPI_ISEND_COMPLETE_BEGIN((rank), (mpi_tag), 0); }
 #define TRACE_MPI_ISEND_COMPLETE_END(dest, mpi_tag, size)	\
-	FUT_DO_PROBE4(FUT_MPI_ISEND_COMPLETE_END, (dest), (mpi_tag), (size), _starpu_gettid());
+	FUT_DO_PROBE4(_STARPU_MPI_FUT_ISEND_COMPLETE_END, (dest), (mpi_tag), (size), _starpu_gettid());
 #define TRACE_MPI_IRECV_COMPLETE_BEGIN(src, mpi_tag)	\
-	FUT_DO_PROBE3(FUT_MPI_IRECV_COMPLETE_BEGIN, (src), (mpi_tag), _starpu_gettid());
+	FUT_DO_PROBE3(_STARPU_MPI_FUT_IRECV_COMPLETE_BEGIN, (src), (mpi_tag), _starpu_gettid());
 #define TRACE_MPI_IRECV_COMPLETE_END(src, mpi_tag)	\
-	FUT_DO_PROBE3(FUT_MPI_IRECV_COMPLETE_END, (src), (mpi_tag), _starpu_gettid());
+	FUT_DO_PROBE3(_STARPU_MPI_FUT_IRECV_COMPLETE_END, (src), (mpi_tag), _starpu_gettid());
+#define TRACE_MPI_COMPLETE_END(type, rank, mpi_tag)		\
+	if (type == RECV_REQ) { TRACE_MPI_IRECV_COMPLETE_END((rank), (mpi_tag)); } else if (type == SEND_REQ) { TRACE_MPI_ISEND_COMPLETE_END((rank), (mpi_tag), 0); }
 #define TRACE_MPI_SLEEP_BEGIN()	\
-	FUT_DO_PROBE1(FUT_MPI_SLEEP_BEGIN, _starpu_gettid());
+	FUT_DO_PROBE1(_STARPU_MPI_FUT_SLEEP_BEGIN, _starpu_gettid());
 #define TRACE_MPI_SLEEP_END()	\
-	FUT_DO_PROBE1(FUT_MPI_SLEEP_END, _starpu_gettid());
+	FUT_DO_PROBE1(_STARPU_MPI_FUT_SLEEP_END, _starpu_gettid());
 #define TRACE_MPI_DTESTING_BEGIN()	\
-	FUT_DO_PROBE1(FUT_MPI_DTESTING_BEGIN,  _starpu_gettid());
+	FUT_DO_PROBE1(_STARPU_MPI_FUT_DTESTING_BEGIN,  _starpu_gettid());
 #define TRACE_MPI_DTESTING_END()	\
-	FUT_DO_PROBE1(FUT_MPI_DTESTING_END, _starpu_gettid());
+	FUT_DO_PROBE1(_STARPU_MPI_FUT_DTESTING_END, _starpu_gettid());
 #define TRACE_MPI_UTESTING_BEGIN(src, mpi_tag)	\
-	FUT_DO_PROBE3(FUT_MPI_UTESTING_BEGIN, (src), (mpi_tag),  _starpu_gettid());
+	FUT_DO_PROBE3(_STARPU_MPI_FUT_UTESTING_BEGIN, (src), (mpi_tag),  _starpu_gettid());
 #define TRACE_MPI_UTESTING_END(src, mpi_tag)	\
-	FUT_DO_PROBE3(FUT_MPI_UTESTING_END, (src), (mpi_tag), _starpu_gettid());
+	FUT_DO_PROBE3(_STARPU_MPI_FUT_UTESTING_END, (src), (mpi_tag), _starpu_gettid());
 #define TRACE_MPI_UWAIT_BEGIN(src, mpi_tag)	\
-	FUT_DO_PROBE3(FUT_MPI_UWAIT_BEGIN, (src), (mpi_tag),  _starpu_gettid());
+	FUT_DO_PROBE3(_STARPU_MPI_FUT_UWAIT_BEGIN, (src), (mpi_tag),  _starpu_gettid());
 #define TRACE_MPI_UWAIT_END(src, mpi_tag)	\
-	FUT_DO_PROBE3(FUT_MPI_UWAIT_END, (src), (mpi_tag), _starpu_gettid());
+	FUT_DO_PROBE3(_STARPU_MPI_FUT_UWAIT_END, (src), (mpi_tag), _starpu_gettid());
 #define TRACE
 #else
 #define TRACE_MPI_START(a, b)				do {} while(0);

+ 36 - 21
nmad/src/starpu_mpi_helper.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010  Université de Bordeaux
+ * Copyright (C) 2010, 2017  Université de Bordeaux
  * Copyright (C) 2010, 2012  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -16,6 +16,7 @@
  */
 
 #include <starpu_mpi.h>
+#include <starpu_mpi_private.h>
 
 static void starpu_mpi_unlock_tag_callback(void *arg)
 {
@@ -26,24 +27,27 @@ static void starpu_mpi_unlock_tag_callback(void *arg)
 	free(tagptr);
 }
 
-int starpu_mpi_isend_detached_unlock_tag(starpu_data_handle_t data_handle,
-				int dest, int mpi_tag, MPI_Comm comm, starpu_tag_t tag)
+int starpu_mpi_isend_detached_unlock_tag_prio(starpu_data_handle_t data_handle, int dest, int data_tag, int prio, MPI_Comm comm, starpu_tag_t tag)
 {
-	starpu_tag_t *tagptr = malloc(sizeof(starpu_tag_t));
+	starpu_tag_t *tagptr;
+	_STARPU_MPI_MALLOC(tagptr, sizeof(starpu_tag_t));
 	*tagptr = tag;
 
-	return starpu_mpi_isend_detached(data_handle, dest, mpi_tag, comm,
-						starpu_mpi_unlock_tag_callback, tagptr);
+	return starpu_mpi_isend_detached_prio(data_handle, dest, data_tag, prio, comm, starpu_mpi_unlock_tag_callback, tagptr);
+}
+int starpu_mpi_isend_detached_unlock_tag(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm, starpu_tag_t tag)
+{
+	return starpu_mpi_isend_detached_unlock_tag_prio(data_handle, dest, data_tag, 0, comm, tag);
 }
 
 
-int starpu_mpi_irecv_detached_unlock_tag(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, starpu_tag_t tag)
+int starpu_mpi_irecv_detached_unlock_tag(starpu_data_handle_t data_handle, int source, int data_tag, MPI_Comm comm, starpu_tag_t tag)
 {
-	starpu_tag_t *tagptr = malloc(sizeof(starpu_tag_t));
+	starpu_tag_t *tagptr;
+	_STARPU_MPI_MALLOC(tagptr, sizeof(starpu_tag_t));
 	*tagptr = tag;
 
-	return starpu_mpi_irecv_detached(data_handle, source, mpi_tag, comm,
-						starpu_mpi_unlock_tag_callback, tagptr);
+	return starpu_mpi_irecv_detached(data_handle, source, data_tag, comm, starpu_mpi_unlock_tag_callback, tagptr);
 }
 
 struct arg_array
@@ -65,11 +69,14 @@ static void starpu_mpi_array_unlock_callback(void *_arg)
 	}
 }
 
-int starpu_mpi_isend_array_detached_unlock_tag(unsigned array_size,
-		starpu_data_handle_t *data_handle, int *dest, int *mpi_tag,
+int starpu_mpi_isend_array_detached_unlock_tag_prio(unsigned array_size,
+		starpu_data_handle_t *data_handle, int *dest, int *data_tag, int *prio,
 		MPI_Comm *comm, starpu_tag_t tag)
 {
-	struct arg_array *arg = malloc(sizeof(struct arg_array));
+	if (!array_size)
+		return 0;
+	struct arg_array *arg;
+	_STARPU_MPI_MALLOC(arg, sizeof(struct arg_array));
 
 	arg->array_size = array_size;
 	arg->tag = tag;
@@ -77,18 +84,28 @@ int starpu_mpi_isend_array_detached_unlock_tag(unsigned array_size,
 	unsigned elem;
 	for (elem = 0; elem < array_size; elem++)
 	{
-		starpu_mpi_isend_detached(data_handle[elem], dest[elem],
-				mpi_tag[elem], comm[elem],
-				starpu_mpi_array_unlock_callback, arg);
+		int p = 0;
+		if (prio)
+			p = prio[elem];
+		starpu_mpi_isend_detached_prio(data_handle[elem], dest[elem], data_tag[elem], p, comm[elem], starpu_mpi_array_unlock_callback, arg);
 	}
 
 	return 0;
 }
+int starpu_mpi_isend_array_detached_unlock_tag(unsigned array_size,
+		starpu_data_handle_t *data_handle, int *dest, int *data_tag,
+		MPI_Comm *comm, starpu_tag_t tag)
+{
+	return starpu_mpi_isend_array_detached_unlock_tag_prio(array_size, data_handle, dest, data_tag, NULL, comm, tag);
+}
 
 
-int starpu_mpi_irecv_array_detached_unlock_tag(unsigned array_size, starpu_data_handle_t *data_handle, int *source, int *mpi_tag, MPI_Comm *comm, starpu_tag_t tag)
+int starpu_mpi_irecv_array_detached_unlock_tag(unsigned array_size, starpu_data_handle_t *data_handle, int *source, int *data_tag, MPI_Comm *comm, starpu_tag_t tag)
 {
-	struct arg_array *arg = malloc(sizeof(struct arg_array));
+	if (!array_size)
+		return 0;
+	struct arg_array *arg;
+	_STARPU_MPI_MALLOC(arg, sizeof(struct arg_array));
 
 	arg->array_size = array_size;
 	arg->tag = tag;
@@ -96,9 +113,7 @@ int starpu_mpi_irecv_array_detached_unlock_tag(unsigned array_size, starpu_data_
 	unsigned elem;
 	for (elem = 0; elem < array_size; elem++)
 	{
-		starpu_mpi_irecv_detached(data_handle[elem], source[elem],
-				mpi_tag[elem], comm[elem],
-				starpu_mpi_array_unlock_callback, arg);
+		starpu_mpi_irecv_detached(data_handle[elem], source[elem], data_tag[elem], comm[elem], starpu_mpi_array_unlock_callback, arg);
 	}
 
 	return 0;

+ 9 - 2
nmad/src/starpu_mpi_private.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010, 2012-2015  Université de Bordeaux
+ * Copyright (C) 2010, 2012-2015, 2017  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -27,7 +27,8 @@
 #include <piom_lock.h>
 
 #ifdef __cplusplus
-extern "C" {
+extern "C"
+{
 #endif
 
 extern int _starpu_debug_rank;
@@ -121,6 +122,8 @@ LIST_TYPE(_starpu_mpi_req,
 	/* description of the data at StarPU level */
 	starpu_data_handle_t data_handle;
 
+	int prio;
+
 	/* description of the data to be sent/received */
 	MPI_Datatype datatype;
 	void *ptr;
@@ -158,6 +161,10 @@ LIST_TYPE(_starpu_mpi_req,
         /* in the case of user-defined datatypes, we need to send the size of the data */
 	nm_sr_request_t size_req;
 
+	long pre_sync_jobid;
+	long post_sync_jobid;
+
+
 	int waited;
 );
 

+ 3 - 2
nmad/src/starpu_mpi_select_node.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2014, 2015, 2016  CNRS
+ * Copyright (C) 2014, 2015, 2016, 2017  CNRS
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -57,7 +57,8 @@ int starpu_mpi_node_selection_register_policy(starpu_mpi_select_node_policy_func
 	// Look for a unregistered policy
 	while(i<_STARPU_MPI_NODE_SELECTION_MAX_POLICY)
 	{
-		if (_policies[i] == NULL) break;
+		if (_policies[i] == NULL)
+			break;
 		i++;
 	}
 	STARPU_ASSERT_MSG(i<_STARPU_MPI_NODE_SELECTION_MAX_POLICY, "No unused policy available. Unregister existing policies before registering a new one.");

+ 83 - 37
nmad/src/starpu_mpi_task_insert.c

@@ -30,11 +30,15 @@
 #include <starpu_mpi_cache.h>
 #include <starpu_mpi_select_node.h>
 
-#define _SEND_DATA(data, mode, dest, data_tag, comm, callback, arg)     \
+#include "starpu_mpi_task_insert.h"
+
+#define _SEND_DATA(data, mode, dest, data_tag, prio, comm, callback, arg)     \
+	do {									\
 	if (mode & STARPU_SSEND)					\
-		starpu_mpi_issend_detached(data, dest, data_tag, comm, callback, arg); \
+			starpu_mpi_issend_detached_prio(data, dest, data_tag, prio, comm, callback, arg); 	\
 	else								\
-		starpu_mpi_isend_detached(data, dest, data_tag, comm, callback, arg);
+			starpu_mpi_isend_detached_prio(data, dest, data_tag, prio, comm, callback, arg);	\
+	} while (0)
 
 static void (*pre_submit_hook)(struct starpu_task *task) = NULL;
 
@@ -54,7 +58,7 @@ int starpu_mpi_pre_submit_hook_unregister()
 
 int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int *do_execute, int *inconsistent_execute, int *xrank)
 {
-	if (mode & STARPU_W)
+	if (mode & STARPU_W || mode & STARPU_REDUX)
 	{
 		if (!data)
 		{
@@ -80,11 +84,11 @@ int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_data_a
 			// No node has been selected yet
 			*xrank = mpi_rank;
 			_STARPU_MPI_DEBUG(100, "Codelet is going to be executed by node %d\n", *xrank);
-			*do_execute = (mpi_rank == me);
+			*do_execute = mpi_rank == STARPU_MPI_PER_NODE || (mpi_rank == me);
 		}
 		else if (mpi_rank != *xrank)
 		{
-			_STARPU_MPI_DEBUG(100, "Another node %d had already been selected to execute the codelet\n", *xrank);
+			_STARPU_MPI_DEBUG(100, "Another node %d had already been selected to execute the codelet, can't now set %d\n", *xrank, mpi_rank);
 			*inconsistent_execute = 1;
 		}
 	}
@@ -92,8 +96,12 @@ int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_data_a
 	return 0;
 }
 
-void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int xrank, int do_execute, MPI_Comm comm)
+void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int xrank, int do_execute, int prio, MPI_Comm comm)
 {
+	if (data && xrank == STARPU_MPI_PER_NODE)
+	{
+		STARPU_ASSERT_MSG(starpu_mpi_data_get_rank(data) == STARPU_MPI_PER_NODE, "If task is replicated, it has to access only per-node data");
+	}
 	if (data && mode & STARPU_R)
 	{
 		int mpi_rank = starpu_mpi_data_get_rank(data);
@@ -103,7 +111,7 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
 			_STARPU_ERROR("StarPU needs to be told the MPI rank of this data, using starpu_mpi_data_register\n");
 		}
 
-		if (do_execute && mpi_rank != me)
+		if (do_execute && mpi_rank != STARPU_MPI_PER_NODE && mpi_rank != me)
 		{
 			/* The node is going to execute the codelet, but it does not own the data, it needs to receive the data from the owner node */
 			int already_received = _starpu_mpi_cache_received_data_set(data);
@@ -126,7 +134,7 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
 				if (data_tag == -1)
 					_STARPU_ERROR("StarPU needs to be told the MPI tag of this data, using starpu_mpi_data_register\n");
 				_STARPU_MPI_DEBUG(1, "Sending data %p to %d\n", data, xrank);
-				_SEND_DATA(data, mode, xrank, data_tag, comm, NULL, NULL);
+				_SEND_DATA(data, mode, xrank, data_tag, prio, comm, NULL, NULL);
 			}
 			// Else the data has already been sent
 		}
@@ -134,7 +142,7 @@ void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum
 }
 
 static
-void _starpu_mpi_exchange_data_after_execution(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int xrank, int do_execute, MPI_Comm comm)
+void _starpu_mpi_exchange_data_after_execution(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int xrank, int do_execute, int prio, MPI_Comm comm)
 {
 	if (mode & STARPU_W)
 	{
@@ -144,9 +152,13 @@ void _starpu_mpi_exchange_data_after_execution(starpu_data_handle_t data, enum s
 		{
 			_STARPU_ERROR("StarPU needs to be told the MPI rank of this data, using starpu_mpi_data_register\n");
 		}
+		if (mpi_rank == STARPU_MPI_PER_NODE)
+		{
+			mpi_rank = me;
+		}
 		if (mpi_rank == me)
 		{
-			if (xrank != -1 && me != xrank)
+			if (xrank != -1 && (xrank != STARPU_MPI_PER_NODE && me != xrank))
 			{
 				_STARPU_MPI_DEBUG(1, "Receive data %p back from the task %d which executed the codelet ...\n", data, xrank);
 				if(data_tag == -1)
@@ -159,7 +171,7 @@ void _starpu_mpi_exchange_data_after_execution(starpu_data_handle_t data, enum s
 			if(data_tag == -1)
 				_STARPU_ERROR("StarPU needs to be told the MPI tag of this data, using starpu_mpi_data_register\n");
 			_STARPU_MPI_DEBUG(1, "Send data %p back to its owner %d...\n", data, mpi_rank);
-			_SEND_DATA(data, mode, mpi_rank, data_tag, comm, NULL, NULL);
+			_SEND_DATA(data, mode, mpi_rank, data_tag, prio, comm, NULL, NULL);
 		}
 	}
 }
@@ -182,6 +194,10 @@ void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum star
 		if ((mode & STARPU_R) && do_execute)
 		{
 			int mpi_rank = starpu_mpi_data_get_rank(data);
+			if (mpi_rank == STARPU_MPI_PER_NODE)
+			{
+				mpi_rank = me;
+			}
 			if (mpi_rank != me && mpi_rank != -1)
 			{
 				starpu_data_invalidate_submit(data);
@@ -191,8 +207,9 @@ void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum star
 }
 
 static
-int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nodes, int *xrank, int *do_execute, struct starpu_data_descr **descrs_p, int *nb_data_p, va_list varg_list)
+int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nodes, int *xrank, int *do_execute, struct starpu_data_descr **descrs_p, int *nb_data_p, int *prio_p, va_list varg_list)
 {
+	/* XXX: _fstarpu_mpi_task_decode_v needs to be updated at the same time */
 	va_list varg_list_copy;
 	int inconsistent_execute = 0;
 	int arg_type;
@@ -200,6 +217,7 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
 	int nb_allocated_data = 16;
 	struct starpu_data_descr *descrs;
 	int nb_data;
+	int prio = 0;
 	int select_node_policy = STARPU_MPI_NODE_SELECTION_CURRENT_POLICY;
 
 	_STARPU_TRACE_TASK_MPI_DECODE_START();
@@ -348,7 +366,7 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
 		}
 		else if (arg_type==STARPU_PRIORITY)
 		{
-			(void)va_arg(varg_list_copy, int);
+			prio = va_arg(varg_list_copy, int);
 		}
 		/* STARPU_EXECUTE_ON_NODE handled above */
 		/* STARPU_EXECUTE_ON_DATA handled above */
@@ -386,6 +404,12 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
                 {
                         (void)va_arg(varg_list_copy, void *);
 		}
+		else if (arg_type==STARPU_EXECUTE_WHERE)
+		{
+			// the flag is decoded and set later when
+			// calling function _starpu_task_insert_create()
+			(void)va_arg(varg_list_copy, unsigned long long);
+		}
 		else if (arg_type==STARPU_EXECUTE_ON_WORKER)
 		{
 			// the flag is decoded and set later when
@@ -425,32 +449,34 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
 	if (inconsistent_execute == 1 || *xrank == -1)
 	{
 		// We need to find out which node is going to execute the codelet.
-		_STARPU_MPI_DISP("Different nodes are owning W data. The node to execute the codelet is going to be selected with the current selection node policy. See starpu_mpi_node_selection_set_current_policy() to change the policy, or use STARPU_EXECUTE_ON_NODE or STARPU_EXECUTE_ON_DATA to specify the node\n");
+		_STARPU_MPI_DEBUG(100, "Different nodes are owning W data. The node to execute the codelet is going to be selected with the current selection node policy. See starpu_mpi_node_selection_set_current_policy() to change the policy, or use STARPU_EXECUTE_ON_NODE or STARPU_EXECUTE_ON_DATA to specify the node\n");
 		*xrank = _starpu_mpi_select_node(me, nb_nodes, descrs, nb_data, select_node_policy);
-		*do_execute = (me == *xrank);
+		*do_execute = *xrank == STARPU_MPI_PER_NODE || (me == *xrank);
 	}
 	else
 	{
 		_STARPU_MPI_DEBUG(100, "Inconsistent=%d - xrank=%d\n", inconsistent_execute, *xrank);
-		*do_execute = (me == *xrank);
+		*do_execute = *xrank == STARPU_MPI_PER_NODE || (me == *xrank);
 	}
 	_STARPU_MPI_DEBUG(100, "do_execute=%d\n", *do_execute);
 
 	*descrs_p = descrs;
 	*nb_data_p = nb_data;
+	*prio_p = prio;
 
 	_STARPU_TRACE_TASK_MPI_DECODE_END();
 	return 0;
 }
 
 static
-int _starpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, struct starpu_task **task, int *xrank_p, struct starpu_data_descr **descrs_p, int *nb_data_p, va_list varg_list)
+int _starpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, struct starpu_task **task, int *xrank_p, struct starpu_data_descr **descrs_p, int *nb_data_p, int *prio_p, va_list varg_list)
 {
 	int me, do_execute, xrank, nb_nodes;
 	int ret;
 	int i;
 	struct starpu_data_descr *descrs;
 	int nb_data;
+	int prio;
 
 	_STARPU_MPI_LOG_IN();
 
@@ -458,25 +484,36 @@ int _starpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, stru
 	starpu_mpi_comm_size(comm, &nb_nodes);
 
 	/* Find out whether we are to execute the data because we own the data to be written to. */
-	ret = _starpu_mpi_task_decode_v(codelet, me, nb_nodes, &xrank, &do_execute, &descrs, &nb_data, varg_list);
-	if (ret < 0) return ret;
+	ret = _starpu_mpi_task_decode_v(codelet, me, nb_nodes, &xrank, &do_execute, &descrs, &nb_data, &prio, varg_list);
+	if (ret < 0)
+		return ret;
 
 	_STARPU_TRACE_TASK_MPI_PRE_START();
 	/* Send and receive data as requested */
 	for(i=0 ; i<nb_data ; i++)
 	{
-		_starpu_mpi_exchange_data_before_execution(descrs[i].handle, descrs[i].mode, me, xrank, do_execute, comm);
+		_starpu_mpi_exchange_data_before_execution(descrs[i].handle, descrs[i].mode, me, xrank, do_execute, prio, comm);
 	}
 
-	if (xrank_p) *xrank_p = xrank;
-	if (nb_data_p) *nb_data_p = nb_data;
+	if (xrank_p)
+		*xrank_p = xrank;
+	if (nb_data_p)
+		*nb_data_p = nb_data;
+	if (prio_p)
+		*prio_p = prio;
+
 	if (descrs_p)
 		*descrs_p = descrs;
 	else
 		free(descrs);
+
+
 	_STARPU_TRACE_TASK_MPI_PRE_END();
 
-	if (do_execute == 0) return 1;
+	if (do_execute == 0)
+	{
+		return 1;
+	}
 	else
 	{
 		va_list varg_list_copy;
@@ -493,7 +530,7 @@ int _starpu_mpi_task_build_v(MPI_Comm comm, struct starpu_codelet *codelet, stru
 	}
 }
 
-int _starpu_mpi_task_postbuild_v(MPI_Comm comm, int xrank, int do_execute, struct starpu_data_descr *descrs, int nb_data)
+int _starpu_mpi_task_postbuild_v(MPI_Comm comm, int xrank, int do_execute, struct starpu_data_descr *descrs, int nb_data, int prio)
 {
 	int me, i;
 
@@ -502,7 +539,7 @@ int _starpu_mpi_task_postbuild_v(MPI_Comm comm, int xrank, int do_execute, struc
 
 	for(i=0 ; i<nb_data ; i++)
 	{
-		_starpu_mpi_exchange_data_after_execution(descrs[i].handle, descrs[i].mode, me, xrank, do_execute, comm);
+		_starpu_mpi_exchange_data_after_execution(descrs[i].handle, descrs[i].mode, me, xrank, do_execute, prio, comm);
 		_starpu_mpi_clear_data_after_execution(descrs[i].handle, descrs[i].mode, me, do_execute);
 	}
 
@@ -522,9 +559,11 @@ int _starpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, va_
 	int do_execute = 0;
 	struct starpu_data_descr *descrs;
 	int nb_data;
+	int prio;
 
-	ret = _starpu_mpi_task_build_v(comm, codelet, &task, &xrank, &descrs, &nb_data, varg_list);
-	if (ret < 0) return ret;
+	ret = _starpu_mpi_task_build_v(comm, codelet, &task, &xrank, &descrs, &nb_data, &prio, varg_list);
+	if (ret < 0)
+		return ret;
 
 	if (ret == 0)
 	{
@@ -544,7 +583,7 @@ int _starpu_mpi_task_insert_v(MPI_Comm comm, struct starpu_codelet *codelet, va_
 		}
 	}
 
-	int val = _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data);
+	int val = _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data, prio);
 
 	if (ret == 0 && pre_submit_hook)
 		pre_submit_hook(task);
@@ -581,10 +620,10 @@ struct starpu_task *starpu_mpi_task_build(MPI_Comm comm, struct starpu_codelet *
 	int ret;
 
 	va_start(varg_list, codelet);
-	ret = _starpu_mpi_task_build_v(comm, codelet, &task, NULL, NULL, NULL, varg_list);
+	ret = _starpu_mpi_task_build_v(comm, codelet, &task, NULL, NULL, NULL, NULL, varg_list);
 	va_end(varg_list);
 	STARPU_ASSERT(ret >= 0);
-	if (ret > 0) return NULL; else return task;
+	return (ret > 0) ? NULL : task;
 }
 
 int starpu_mpi_task_post_build(MPI_Comm comm, struct starpu_codelet *codelet, ...)
@@ -594,17 +633,19 @@ int starpu_mpi_task_post_build(MPI_Comm comm, struct starpu_codelet *codelet, ..
 	va_list varg_list;
 	struct starpu_data_descr *descrs;
 	int nb_data;
+	int prio;
 
 	starpu_mpi_comm_rank(comm, &me);
 	starpu_mpi_comm_size(comm, &nb_nodes);
 
 	va_start(varg_list, codelet);
 	/* Find out whether we are to execute the data because we own the data to be written to. */
-	ret = _starpu_mpi_task_decode_v(codelet, me, nb_nodes, &xrank, &do_execute, &descrs, &nb_data, varg_list);
+	ret = _starpu_mpi_task_decode_v(codelet, me, nb_nodes, &xrank, &do_execute, &descrs, &nb_data, &prio, varg_list);
 	va_end(varg_list);
-	if (ret < 0) return ret;
+	if (ret < 0)
+		return ret;
 
-	return _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data);
+	return _starpu_mpi_task_postbuild_v(comm, xrank, do_execute, descrs, nb_data, prio);
 }
 
 struct _starpu_mpi_redux_data_args
@@ -679,7 +720,7 @@ void _starpu_mpi_redux_data_recv_callback(void *callback_arg)
 
 /* TODO: this should rather be implicitly called by starpu_mpi_task_insert when
  * a data previously accessed in REDUX mode gets accessed in R mode. */
-void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
+void starpu_mpi_redux_data_prio(MPI_Comm comm, starpu_data_handle_t data_handle, int prio)
 {
 	int me, rank, tag, nb_nodes;
 
@@ -741,7 +782,8 @@ void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
 				args->taskB->cl = args->data_handle->redux_cl;
 				args->taskB->sequential_consistency = 0;
 				STARPU_TASK_SET_HANDLE(args->taskB, args->data_handle, 0);
-				taskBs[j] = args->taskB; j++;
+				taskBs[j] = args->taskB;
+				j++;
 
 				// Submit taskA
 				starpu_task_insert(&_starpu_mpi_redux_data_read_cl,
@@ -762,7 +804,7 @@ void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
 	else
 	{
 		_STARPU_MPI_DEBUG(1, "Sending redux handle to %d ...\n", rank);
-		starpu_mpi_isend_detached(data_handle, rank, tag, comm, NULL, NULL);
+		starpu_mpi_isend_detached_prio(data_handle, rank, tag, prio, comm, NULL, NULL);
 		starpu_task_insert(data_handle->init_cl, STARPU_W, data_handle, 0);
 	}
 	/* FIXME: In order to prevent simultaneous receive submissions
@@ -773,3 +815,7 @@ void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
 	starpu_task_wait_for_all();
 
 }
+void starpu_mpi_redux_data(MPI_Comm comm, starpu_data_handle_t data_handle)
+{
+	return starpu_mpi_redux_data_prio(comm, data_handle, 0);
+}

+ 2 - 2
nmad/src/starpu_mpi_task_insert.h

@@ -23,8 +23,8 @@ extern "C"
 #endif
 
 int _starpu_mpi_find_executee_node(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int *do_execute, int *inconsistent_execute, int *xrank);
-void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int xrank, int do_execute, MPI_Comm comm);
-int _starpu_mpi_task_postbuild_v(MPI_Comm comm, int xrank, int do_execute, struct starpu_data_descr *descrs, int nb_data);
+void _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int xrank, int do_execute, int prio, MPI_Comm comm);
+int _starpu_mpi_task_postbuild_v(MPI_Comm comm, int xrank, int do_execute, struct starpu_data_descr *descrs, int nb_data, int prio);
 
 #ifdef __cplusplus
 }