|
@@ -39,7 +39,7 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t dat
|
|
|
static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle,
|
|
|
int source, int data_tag, MPI_Comm comm,
|
|
|
unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency);
|
|
|
-static void _starpu_mpi_handle_new_request(struct _starpu_mpi_req *req);
|
|
|
+static void _starpu_mpi_handle_new_request(void *arg);
|
|
|
|
|
|
static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req);
|
|
|
|
|
@@ -76,7 +76,7 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
|
|
|
int srcdst, int data_tag, MPI_Comm comm,
|
|
|
unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg,
|
|
|
enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
|
|
|
- enum starpu_data_access_mode mode,
|
|
|
+ enum starpu_data_access_mode mode,
|
|
|
int sequential_consistency)
|
|
|
{
|
|
|
|
|
@@ -245,6 +245,7 @@ int starpu_mpi_isend_detached_prio(starpu_data_handle_t data_handle, int dest, i
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
return 0;
|
|
|
}
|
|
|
+
|
|
|
int starpu_mpi_isend_detached(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
|
|
|
{
|
|
|
return starpu_mpi_isend_detached_prio(data_handle, dest, data_tag, 0, comm, callback, arg);
|
|
@@ -324,13 +325,10 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
|
|
|
nm_mpi_data_build(&data, (void*)req->ptr, nm_mpi_datatype_get(req->datatype), req->count);
|
|
|
nm_sr_recv_init(req->session, &(req->request));
|
|
|
nm_sr_recv_unpack_data(req->session, &(req->request), &data);
|
|
|
- req->ret = nm_sr_recv_irecv(req->session, &(req->request), req->gate, req->mpi_tag,NM_TAG_MASK_FULL);
|
|
|
-
|
|
|
- STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %d", req->ret);
|
|
|
+ nm_sr_recv_irecv(req->session, &(req->request), req->gate, req->mpi_tag,NM_TAG_MASK_FULL);
|
|
|
|
|
|
TRACE_MPI_IRECV_SUBMIT_END(req->srcdst, req->mpi_tag);
|
|
|
|
|
|
-
|
|
|
_starpu_mpi_handle_pending_request(req);
|
|
|
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
@@ -448,10 +446,11 @@ int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
STARPU_MPI_ASSERT_MSG(!req->detached, "MPI_Wait cannot be called on a detached request");
|
|
|
|
|
|
|
|
|
-/* we must do a test_locked to avoid race condition :
|
|
|
+/* we must do a test_locked to avoid race condition :
|
|
|
* without req_cond could still be used and couldn't be freed)*/
|
|
|
|
|
|
- while (!req->completed || ! piom_cond_test_locked(&(req->req_cond),REQ_FINALIZED)){
|
|
|
+ while (!req->completed || ! piom_cond_test_locked(&(req->req_cond),REQ_FINALIZED))
|
|
|
+ {
|
|
|
piom_cond_wait(&(req->req_cond),REQ_FINALIZED);
|
|
|
}
|
|
|
|
|
@@ -483,14 +482,15 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
|
|
|
TRACE_MPI_UTESTING_BEGIN(req->srcdst, req->mpi_tag);
|
|
|
|
|
|
-/* we must do a test_locked to avoid race condition :
|
|
|
+/* we must do a test_locked to avoid race condition :
|
|
|
* without req_cond could still be used and couldn't be freed)*/
|
|
|
|
|
|
*flag = req->completed && piom_cond_test_locked(&(req->req_cond),REQ_FINALIZED);
|
|
|
if (*flag && status!=MPI_STATUS_IGNORE)
|
|
|
_starpu_mpi_req_status(req,status);
|
|
|
TRACE_MPI_UTESTING_END(req->srcdst, req->mpi_tag);
|
|
|
- if(*flag){
|
|
|
+ if(*flag)
|
|
|
+ {
|
|
|
_starpu_mpi_request_destroy(req);
|
|
|
*public_req = NULL;
|
|
|
}
|
|
@@ -527,14 +527,14 @@ int starpu_mpi_barrier(MPI_Comm comm)
|
|
|
static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type)
|
|
|
{
|
|
|
switch (request_type)
|
|
|
- {
|
|
|
+ {
|
|
|
case SEND_REQ: return "SEND_REQ";
|
|
|
case RECV_REQ: return "RECV_REQ";
|
|
|
case WAIT_REQ: return "WAIT_REQ";
|
|
|
case TEST_REQ: return "TEST_REQ";
|
|
|
case BARRIER_REQ: return "BARRIER_REQ";
|
|
|
default: return "unknown request type";
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
#endif
|
|
|
|
|
@@ -560,7 +560,7 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,n
|
|
|
STARPU_ASSERT_MSG(event == NM_SR_EVENT_FINALIZED, "Callback with event %d", event);
|
|
|
if(req->waited>0)
|
|
|
return;
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
if (req->request_type == RECV_REQ)
|
|
|
// req->ptr is freed by starpu_data_unpack
|
|
@@ -576,7 +576,8 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,n
|
|
|
}
|
|
|
|
|
|
/* Execute the specified callback, if any */
|
|
|
- if (req->callback){
|
|
|
+ if (req->callback)
|
|
|
+ {
|
|
|
struct callback_lfstack_cell_s* c = padico_malloc(sizeof(struct callback_lfstack_cell_s));
|
|
|
c->req = req;
|
|
|
/* The main thread can exit without waiting
|
|
@@ -593,10 +594,10 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,n
|
|
|
_starpu_mpi_request_destroy(req);
|
|
|
// a detached request wont be wait/test (and freed inside).
|
|
|
}
|
|
|
- else
|
|
|
+ else
|
|
|
{
|
|
|
- /* tell anyone potentially waiting on the request that it is
|
|
|
- * terminated now (should be done after the callback)*/
|
|
|
+ /* tell anyone potentially waiting on the request that it is
|
|
|
+ * terminated now (should be done after the callback)*/
|
|
|
req->completed = 1;
|
|
|
piom_cond_signal(&req->req_cond, REQ_FINALIZED);
|
|
|
}
|
|
@@ -607,28 +608,31 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,n
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
|
|
|
|
-void _starpu_mpi_handle_request_termination_callback(nm_sr_event_t event, const nm_sr_event_info_t*event_info, void*ref){
|
|
|
+void _starpu_mpi_handle_request_termination_callback(nm_sr_event_t event, const nm_sr_event_info_t*event_info, void*ref)
|
|
|
+{
|
|
|
_starpu_mpi_handle_request_termination(ref,event);
|
|
|
}
|
|
|
|
|
|
static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
|
|
|
- if(req->request_type == SEND_REQ && req->waited>1){
|
|
|
+ if(req->request_type == SEND_REQ && req->waited>1)
|
|
|
+ {
|
|
|
nm_sr_request_set_ref(&(req->size_req), req);
|
|
|
|
|
|
nm_sr_request_monitor(req->session, &(req->size_req), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
|
|
|
}
|
|
|
- /* the if must be before, because the first callback can directly free
|
|
|
+ /* the if must be before, because the first callback can directly free
|
|
|
* a detached request (the second callback free if req->waited>1). */
|
|
|
nm_sr_request_set_ref(&(req->request), req);
|
|
|
|
|
|
nm_sr_request_monitor(req->session, &(req->request), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
|
|
|
}
|
|
|
|
|
|
-static void _starpu_mpi_handle_new_request(struct _starpu_mpi_req *req)
|
|
|
+static void _starpu_mpi_handle_new_request(void *arg)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
+ struct _starpu_mpi_req *req = arg;
|
|
|
STARPU_ASSERT_MSG(req, "Invalid request");
|
|
|
|
|
|
/* submit the request to MPI */
|
|
@@ -703,8 +707,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
c = callback_lfstack_pop(&callback_stack);
|
|
|
if (c == NULL)
|
|
|
{
|
|
|
- if(running && pending_request>0){
|
|
|
- STARPU_ASSERT_MSG(c!=NULL, "Callback thread awakened without callback ready with error %d.",err);
|
|
|
+ if(running && pending_request>0)
|
|
|
+ {
|
|
|
+ STARPU_ASSERT_MSG(c!=NULL, "Callback thread awakened without callback ready with error %d.",err);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
@@ -717,10 +722,12 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
|
|
|
c->req->callback(c->req->callback_arg);
|
|
|
- if (c->req->detached){
|
|
|
+ if (c->req->detached)
|
|
|
+ {
|
|
|
_starpu_mpi_request_destroy(c->req);
|
|
|
}
|
|
|
- else{
|
|
|
+ else
|
|
|
+ {
|
|
|
c->req->completed=1;
|
|
|
piom_cond_signal(&(c->req->req_cond), REQ_FINALIZED);
|
|
|
}
|
|
@@ -817,7 +824,7 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
|
|
|
|
|
|
_starpu_mpi_add_sync_point_in_fxt();
|
|
|
_starpu_mpi_comm_amounts_init(MPI_COMM_WORLD);
|
|
|
- _starpu_mpi_cache_init(MPI_COMM_WORLD);
|
|
|
+ _starpu_mpi_cache_init(MPI_COMM_WORLD);
|
|
|
_starpu_mpi_select_node_init();
|
|
|
_starpu_mpi_datatype_init();
|
|
|
return 0;
|