|
@@ -130,49 +130,52 @@ static void delete_app_req(struct _starpu_mpi_req *req)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
|
|
|
+static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
|
|
|
{
|
|
|
+ *req = malloc(sizeof(struct _starpu_mpi_req));
|
|
|
+ STARPU_ASSERT_MSG(*req, "Invalid request");
|
|
|
+
|
|
|
/* Initialize the request structure */
|
|
|
- req->data_handle = NULL;
|
|
|
+ (*req)->data_handle = NULL;
|
|
|
|
|
|
- req->datatype = 0;
|
|
|
- req->ptr = NULL;
|
|
|
- req->count = -1;
|
|
|
- req->user_datatype = -1;
|
|
|
+ (*req)->datatype = 0;
|
|
|
+ (*req)->ptr = NULL;
|
|
|
+ (*req)->count = -1;
|
|
|
+ (*req)->user_datatype = -1;
|
|
|
|
|
|
- req->srcdst = -1;
|
|
|
- req->mpi_tag = -1;
|
|
|
- req->comm = 0;
|
|
|
+ (*req)->srcdst = -1;
|
|
|
+ (*req)->mpi_tag = -1;
|
|
|
+ (*req)->comm = 0;
|
|
|
|
|
|
- req->func = NULL;
|
|
|
+ (*req)->func = NULL;
|
|
|
|
|
|
- req->status = NULL;
|
|
|
- req->request = 0;
|
|
|
- req->flag = NULL;
|
|
|
+ (*req)->status = NULL;
|
|
|
+ (*req)->request = 0;
|
|
|
+ (*req)->flag = NULL;
|
|
|
|
|
|
- req->ret = -1;
|
|
|
- STARPU_PTHREAD_MUTEX_INIT(&req->req_mutex, NULL);
|
|
|
- STARPU_PTHREAD_COND_INIT(&req->req_cond, NULL);
|
|
|
- STARPU_PTHREAD_MUTEX_INIT(&req->posted_mutex, NULL);
|
|
|
- STARPU_PTHREAD_COND_INIT(&req->posted_cond, NULL);
|
|
|
+ (*req)->ret = -1;
|
|
|
+ STARPU_PTHREAD_MUTEX_INIT(&((*req)->req_mutex), NULL);
|
|
|
+ STARPU_PTHREAD_COND_INIT(&((*req)->req_cond), NULL);
|
|
|
+ STARPU_PTHREAD_MUTEX_INIT(&((*req)->posted_mutex), NULL);
|
|
|
+ STARPU_PTHREAD_COND_INIT(&((*req)->posted_cond), NULL);
|
|
|
|
|
|
- req->request_type = UNKNOWN_REQ;
|
|
|
+ (*req)->request_type = UNKNOWN_REQ;
|
|
|
|
|
|
- req->submitted = 0;
|
|
|
- req->completed = 0;
|
|
|
- req->posted = 0;
|
|
|
+ (*req)->submitted = 0;
|
|
|
+ (*req)->completed = 0;
|
|
|
+ (*req)->posted = 0;
|
|
|
|
|
|
- req->other_request = NULL;
|
|
|
+ (*req)->other_request = NULL;
|
|
|
|
|
|
- req->detached = -1;
|
|
|
- req->callback = NULL;
|
|
|
- req->callback_arg = NULL;
|
|
|
+ (*req)->detached = -1;
|
|
|
+ (*req)->callback = NULL;
|
|
|
+ (*req)->callback_arg = NULL;
|
|
|
|
|
|
- req->size_req = 0;
|
|
|
- req->internal_req = NULL;
|
|
|
- req->is_internal_req = 0;
|
|
|
- req->envelope = NULL;
|
|
|
- req->sequential_consistency = 1;
|
|
|
+ (*req)->size_req = 0;
|
|
|
+ (*req)->internal_req = NULL;
|
|
|
+ (*req)->is_internal_req = 0;
|
|
|
+ (*req)->envelope = NULL;
|
|
|
+ (*req)->sequential_consistency = 1;
|
|
|
}
|
|
|
|
|
|
/********************************************************/
|
|
@@ -190,35 +193,33 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
|
|
|
int is_internal_req,
|
|
|
ssize_t count)
|
|
|
{
|
|
|
+ struct _starpu_mpi_req *req;
|
|
|
|
|
|
- _STARPU_MPI_LOG_IN();
|
|
|
- struct _starpu_mpi_req *req = malloc(sizeof(struct _starpu_mpi_req));
|
|
|
- STARPU_ASSERT_MSG(req, "Invalid request");
|
|
|
-
|
|
|
- _STARPU_MPI_INC_POSTED_REQUESTS(1);
|
|
|
-
|
|
|
- /* Initialize the request structure */
|
|
|
- _starpu_mpi_request_init(req);
|
|
|
- req->request_type = request_type;
|
|
|
- req->data_handle = data_handle;
|
|
|
- req->srcdst = srcdst;
|
|
|
- req->mpi_tag = mpi_tag;
|
|
|
- req->comm = comm;
|
|
|
- req->detached = detached;
|
|
|
- req->callback = callback;
|
|
|
- req->callback_arg = arg;
|
|
|
- req->func = func;
|
|
|
- req->sequential_consistency = sequential_consistency;
|
|
|
- req->is_internal_req = is_internal_req;
|
|
|
- req->count = count;
|
|
|
-
|
|
|
- /* Asynchronously request StarPU to fetch the data in main memory: when
|
|
|
- * it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
|
|
|
- * the request is actually submitted */
|
|
|
- starpu_data_acquire_cb_sequential_consistency(data_handle, mode, _starpu_mpi_submit_new_mpi_request, (void *)req, sequential_consistency);
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
+ _STARPU_MPI_INC_POSTED_REQUESTS(1);
|
|
|
|
|
|
- _STARPU_MPI_LOG_OUT();
|
|
|
- return req;
|
|
|
+ /* Initialize the request structure */
|
|
|
+ _starpu_mpi_request_init(&req);
|
|
|
+ req->request_type = request_type;
|
|
|
+ req->data_handle = data_handle;
|
|
|
+ req->srcdst = srcdst;
|
|
|
+ req->mpi_tag = mpi_tag;
|
|
|
+ req->comm = comm;
|
|
|
+ req->detached = detached;
|
|
|
+ req->callback = callback;
|
|
|
+ req->callback_arg = arg;
|
|
|
+ req->func = func;
|
|
|
+ req->sequential_consistency = sequential_consistency;
|
|
|
+ req->is_internal_req = is_internal_req;
|
|
|
+ req->count = count;
|
|
|
+
|
|
|
+ /* Asynchronously request StarPU to fetch the data in main memory: when
|
|
|
+ * it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
|
|
|
+ * the request is actually submitted */
|
|
|
+ starpu_data_acquire_cb_sequential_consistency(data_handle, mode, _starpu_mpi_submit_new_mpi_request, (void *)req, sequential_consistency);
|
|
|
+
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
+ return req;
|
|
|
}
|
|
|
|
|
|
/********************************************************/
|
|
@@ -488,14 +489,11 @@ static void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
|
|
|
|
|
|
int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
{
|
|
|
- _STARPU_MPI_LOG_IN();
|
|
|
int ret;
|
|
|
-
|
|
|
- struct _starpu_mpi_req *waiting_req = malloc(sizeof(struct _starpu_mpi_req));
|
|
|
- STARPU_ASSERT_MSG(waiting_req, "Allocation failed");
|
|
|
-
|
|
|
struct _starpu_mpi_req *req = *public_req;
|
|
|
+ struct _starpu_mpi_req *waiting_req;
|
|
|
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
_STARPU_MPI_INC_POSTED_REQUESTS(1);
|
|
|
|
|
|
/* We cannot try to complete a MPI request that was not actually posted
|
|
@@ -506,7 +504,7 @@ int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&(req->req_mutex));
|
|
|
|
|
|
/* Initialize the request structure */
|
|
|
- _starpu_mpi_request_init(waiting_req);
|
|
|
+ _starpu_mpi_request_init(&waiting_req);
|
|
|
waiting_req->status = status;
|
|
|
waiting_req->other_request = req;
|
|
|
waiting_req->func = _starpu_mpi_wait_func;
|
|
@@ -583,9 +581,8 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
|
|
|
if (submitted)
|
|
|
{
|
|
|
- struct _starpu_mpi_req *testing_req = malloc(sizeof(struct _starpu_mpi_req));
|
|
|
- STARPU_ASSERT_MSG(testing_req, "allocation failed");
|
|
|
- _starpu_mpi_request_init(testing_req);
|
|
|
+ struct _starpu_mpi_req *testing_req;
|
|
|
+ _starpu_mpi_request_init(&testing_req);
|
|
|
|
|
|
/* Initialize the request structure */
|
|
|
STARPU_PTHREAD_MUTEX_INIT(&(testing_req->req_mutex), NULL);
|
|
@@ -647,11 +644,11 @@ static void _starpu_mpi_barrier_func(struct _starpu_mpi_req *barrier_req)
|
|
|
|
|
|
int starpu_mpi_barrier(MPI_Comm comm)
|
|
|
{
|
|
|
- _STARPU_MPI_LOG_IN();
|
|
|
int ret;
|
|
|
- struct _starpu_mpi_req *barrier_req = malloc(sizeof(struct _starpu_mpi_req));
|
|
|
- STARPU_ASSERT_MSG(barrier_req, "allocation failed");
|
|
|
- _starpu_mpi_request_init(barrier_req);
|
|
|
+ struct _starpu_mpi_req *barrier_req;
|
|
|
+
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
+ _starpu_mpi_request_init(&barrier_req);
|
|
|
|
|
|
/* First wait for *both* all tasks and MPI requests to finish, in case
|
|
|
* some tasks generate MPI requests, MPI requests generate tasks, etc.
|