|
@@ -73,7 +73,8 @@ static void starpu_mpi_isend_func(struct starpu_mpi_req_s *req)
|
|
|
|
|
|
starpu_mpi_handle_to_datatype(req->data_handle, &req->datatype);
|
|
|
|
|
|
- MPI_Isend(ptr, 1, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
|
|
|
+ req->ret = MPI_Isend(ptr, 1, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
|
|
|
+ STARPU_ASSERT(req->ret == MPI_SUCCESS);
|
|
|
|
|
|
TRACE_MPI_ISEND(req->srcdst, req->mpi_tag, 0);
|
|
|
|
|
@@ -114,8 +115,7 @@ static struct starpu_mpi_req_s *_starpu_mpi_isend_common(starpu_data_handle data
|
|
|
/* Asynchronously request StarPU to fetch the data in main memory: when
|
|
|
* it is available in main memory, submit_mpi_req(req) is called and
|
|
|
* the request is actually submitted */
|
|
|
- starpu_data_acquire_cb(data_handle, STARPU_R,
|
|
|
- submit_mpi_req, (void *)req);
|
|
|
+ starpu_data_acquire_cb(data_handle, STARPU_R, submit_mpi_req, (void *)req);
|
|
|
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
return req;
|
|
@@ -164,7 +164,8 @@ static void starpu_mpi_irecv_func(struct starpu_mpi_req_s *req)
|
|
|
|
|
|
_STARPU_MPI_DEBUG("post MPI irecv tag %x src %d ptr %p req %p datatype %d\n", req->mpi_tag, req->srcdst, ptr, &req->request, req->datatype);
|
|
|
|
|
|
- MPI_Irecv(ptr, 1, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
|
|
|
+ req->ret = MPI_Irecv(ptr, 1, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
|
|
|
+ STARPU_ASSERT(req->ret == MPI_SUCCESS);
|
|
|
|
|
|
/* somebody is perhaps waiting for the MPI request to be posted */
|
|
|
PTHREAD_MUTEX_LOCK(&req->req_mutex);
|
|
@@ -201,8 +202,7 @@ static struct starpu_mpi_req_s *_starpu_mpi_irecv_common(starpu_data_handle data
|
|
|
/* Asynchronously request StarPU to fetch the data in main memory: when
|
|
|
* it is available in main memory, submit_mpi_req(req) is called and
|
|
|
* the request is actually submitted */
|
|
|
- starpu_data_acquire_cb(data_handle, STARPU_W,
|
|
|
- submit_mpi_req, (void *)req);
|
|
|
+ starpu_data_acquire_cb(data_handle, STARPU_W, submit_mpi_req, (void *)req);
|
|
|
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
return req;
|
|
@@ -283,6 +283,8 @@ static void starpu_mpi_wait_func(struct starpu_mpi_req_s *waiting_req)
|
|
|
struct starpu_mpi_req_s *req = waiting_req->other_request;
|
|
|
|
|
|
req->ret = MPI_Wait(&req->request, waiting_req->status);
|
|
|
+ STARPU_ASSERT(req->ret == MPI_SUCCESS);
|
|
|
+
|
|
|
handle_request_termination(req);
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
@@ -340,11 +342,12 @@ static void starpu_mpi_test_func(struct starpu_mpi_req_s *testing_req)
|
|
|
struct starpu_mpi_req_s *req = testing_req->other_request;
|
|
|
|
|
|
_STARPU_MPI_DEBUG("Test request %p - mpitag %x - TYPE %s %d\n", &req->request, req->mpi_tag, (req->request_type == RECV_REQ)?"recv : source":"send : dest", req->srcdst);
|
|
|
- int ret = MPI_Test(&req->request, testing_req->flag, testing_req->status);
|
|
|
+ req->ret = MPI_Test(&req->request, testing_req->flag, testing_req->status);
|
|
|
+ STARPU_ASSERT(req->ret == MPI_SUCCESS);
|
|
|
|
|
|
if (*testing_req->flag)
|
|
|
{
|
|
|
- testing_req->ret = ret;
|
|
|
+ testing_req->ret = req->ret;
|
|
|
handle_request_termination(req);
|
|
|
}
|
|
|
|
|
@@ -498,8 +501,8 @@ static void test_detached_requests(void)
|
|
|
PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
|
|
|
_STARPU_MPI_DEBUG("Test detached request %p - mpitag %x - TYPE %s %d\n", &req->request, req->mpi_tag, (req->request_type == RECV_REQ)?"recv : source":"send : dest", req->srcdst);
|
|
|
- int ret = MPI_Test(&req->request, &flag, &status);
|
|
|
- STARPU_ASSERT(ret == MPI_SUCCESS);
|
|
|
+ req->ret = MPI_Test(&req->request, &flag, &status);
|
|
|
+ STARPU_ASSERT(req->ret == MPI_SUCCESS);
|
|
|
|
|
|
if (flag)
|
|
|
{
|
|
@@ -555,7 +558,7 @@ static void *progress_thread_func(void *arg __attribute__((unused)))
|
|
|
PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
- while (running) {
|
|
|
+ while (running || !(starpu_mpi_req_list_empty(new_requests)) || !(starpu_mpi_req_list_empty(detached_requests))) {
|
|
|
/* shall we block ? */
|
|
|
unsigned block = starpu_mpi_req_list_empty(new_requests);
|
|
|
|
|
@@ -569,9 +572,6 @@ static void *progress_thread_func(void *arg __attribute__((unused)))
|
|
|
PTHREAD_COND_WAIT(&cond, &mutex);
|
|
|
}
|
|
|
|
|
|
- if (!running)
|
|
|
- break;
|
|
|
-
|
|
|
/* test whether there are some terminated "detached request" */
|
|
|
PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
test_detached_requests();
|