|
@@ -76,7 +76,7 @@ struct _starpu_mpi_copy_handle
|
|
|
/* Hashmap's requests functionalities */
|
|
|
/* */
|
|
|
/********************************************************/
|
|
|
-
|
|
|
+
|
|
|
static struct _starpu_mpi_req *_starpu_mpi_req_hashmap = NULL;
|
|
|
static struct _starpu_mpi_copy_handle *_starpu_mpi_copy_handle_hashmap = NULL;
|
|
|
|
|
@@ -93,7 +93,7 @@ static void add_req(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
struct _starpu_mpi_req *test_req;
|
|
|
|
|
|
- test_req = find_req(req->mpi_tag);
|
|
|
+ test_req = find_req(req->mpi_tag);
|
|
|
|
|
|
if (test_req == NULL)
|
|
|
{
|
|
@@ -119,7 +119,7 @@ static void delete_req(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
struct _starpu_mpi_req *test_req;
|
|
|
|
|
|
- test_req = find_req(req->mpi_tag);
|
|
|
+ test_req = find_req(req->mpi_tag);
|
|
|
|
|
|
if (test_req != NULL)
|
|
|
{
|
|
@@ -145,7 +145,7 @@ static void add_chandle(struct _starpu_mpi_copy_handle *chandle)
|
|
|
{
|
|
|
struct _starpu_mpi_copy_handle *test_chandle;
|
|
|
|
|
|
- test_chandle = find_chandle(chandle->mpi_tag);
|
|
|
+ test_chandle = find_chandle(chandle->mpi_tag);
|
|
|
|
|
|
if (test_chandle == NULL)
|
|
|
{
|
|
@@ -163,7 +163,7 @@ static void delete_chandle(struct _starpu_mpi_copy_handle *chandle)
|
|
|
{
|
|
|
struct _starpu_mpi_copy_handle *test_chandle;
|
|
|
|
|
|
- test_chandle = find_chandle(chandle->mpi_tag);
|
|
|
+ test_chandle = find_chandle(chandle->mpi_tag);
|
|
|
|
|
|
if (test_chandle != NULL)
|
|
|
{
|
|
@@ -242,7 +242,7 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
|
|
|
|
|
|
TRACE_MPI_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
|
|
|
|
|
|
- req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, starpu_mpi_tag, req->comm, &req->request);
|
|
|
+ req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
|
|
|
STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %d", req->ret);
|
|
|
|
|
|
TRACE_MPI_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, 0);
|
|
@@ -270,35 +270,35 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
req->count = 1;
|
|
|
req->ptr = starpu_handle_get_local_ptr(req->data_handle);
|
|
|
-
|
|
|
+
|
|
|
env->psize = (ssize_t)req->count;
|
|
|
-
|
|
|
- _STARPU_MPI_DEBUG(1, "Post MPI isend count (%ld) datatype_size %d request to %d with tag %d\n",req->count,starpu_handle_get_size(req->data_handle),req->srcdst,starpu_mpi_tag);
|
|
|
- MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, starpu_mpi_tag, req->comm, &req->size_req);
|
|
|
+
|
|
|
+ _STARPU_MPI_DEBUG(1, "Post MPI isend count (%ld) datatype_size %d request to %d with tag %d\n",req->count,starpu_handle_get_size(req->data_handle),req->srcdst, _starpu_mpi_tag);
|
|
|
+ MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
int ret;
|
|
|
-
|
|
|
+
|
|
|
// Do not pack the data, just try to find out the size
|
|
|
starpu_handle_pack_data(req->data_handle, NULL, &(env->psize));
|
|
|
-
|
|
|
+
|
|
|
if (env->psize != -1)
|
|
|
{
|
|
|
// We already know the size of the data, let's send it to overlap with the packing of the data
|
|
|
- _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", env->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), starpu_mpi_tag, req->srcdst);
|
|
|
+ _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", env->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
|
|
|
req->count = env->psize;
|
|
|
- ret = MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, starpu_mpi_tag, req->comm, &req->size_req);
|
|
|
+ ret = MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
|
|
|
STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Pack the data
|
|
|
starpu_handle_pack_data(req->data_handle, &req->ptr, &req->count);
|
|
|
if (env->psize == -1)
|
|
|
{
|
|
|
// We know the size now, let's send it
|
|
|
- _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", env->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), starpu_mpi_tag, req->srcdst);
|
|
|
- ret = MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, starpu_mpi_tag, req->comm, &req->size_req);
|
|
|
+ _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", env->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
|
|
|
+ ret = MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
|
|
|
STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
|
|
|
}
|
|
|
else
|
|
@@ -374,7 +374,7 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
|
|
|
|
|
|
TRACE_MPI_IRECV_SUBMIT_BEGIN(req->srcdst, req->mpi_tag);
|
|
|
|
|
|
- req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, starpu_mpi_tag, req->comm, &req->request);
|
|
|
+ req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
|
|
|
STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %d", req->ret);
|
|
|
|
|
|
TRACE_MPI_IRECV_SUBMIT_END(req->srcdst, req->mpi_tag);
|
|
@@ -795,7 +795,7 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
{
|
|
|
/* test whether the receive request has already been submitted internally by StarPU-MPI*/
|
|
|
struct _starpu_mpi_copy_handle *chandle = find_chandle(req->mpi_tag);
|
|
|
-
|
|
|
+
|
|
|
/* Case : the request has already been submitted internally by StarPU.
|
|
|
* We'll asynchronously ask a Read permission over the temporary handle, so as when
|
|
|
* the internal receive will be over, the _starpu_mpi_copy_cb function will be called to
|
|
@@ -804,15 +804,15 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->mpi_tag);
|
|
|
|
|
|
- struct _starpu_mpi_copy_cb_args *arg = malloc(sizeof(struct _starpu_mpi_copy_cb_args));
|
|
|
- arg->data_handle = req->data_handle;
|
|
|
- arg->copy_handle = chandle->handle;
|
|
|
- arg->req = req;
|
|
|
+ struct _starpu_mpi_copy_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_copy_cb_args));
|
|
|
+ cb_args->data_handle = req->data_handle;
|
|
|
+ cb_args->copy_handle = chandle->handle;
|
|
|
+ cb_args->req = req;
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
|
|
|
- starpu_data_acquire_cb(chandle->handle,STARPU_R,_starpu_mpi_copy_cb,(void*) arg);
|
|
|
+ starpu_data_acquire_cb(chandle->handle,STARPU_R,_starpu_mpi_copy_cb,(void*) cb_args);
|
|
|
}
|
|
|
- else
|
|
|
+ else
|
|
|
{
|
|
|
/* Case : the request is the internal receive request submitted by StarPU-MPI to receive
|
|
|
* incoming data without a matching pending receive already submitted by the application.
|
|
@@ -1038,12 +1038,12 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
_STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
_STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
-
|
|
|
+
|
|
|
struct _starpu_mpi_envelope *recv_env = calloc(1,sizeof(struct _starpu_mpi_envelope));
|
|
|
-
|
|
|
+
|
|
|
MPI_Request header_req;
|
|
|
int header_req_submitted = 0;
|
|
|
-
|
|
|
+
|
|
|
while (running || posted_requests || !(_starpu_mpi_req_list_empty(new_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
|
|
|
{
|
|
|
/* shall we block ? */
|
|
@@ -1087,7 +1087,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
* requests in our side, we resubmit a header request. */
|
|
|
if ((HASH_COUNT(_starpu_mpi_req_hashmap) > 0) && (header_req_submitted == 0) && (HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0))
|
|
|
{
|
|
|
- MPI_Irecv(recv_env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, starpu_mpi_tag, MPI_COMM_WORLD, &header_req);
|
|
|
+ MPI_Irecv(recv_env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _starpu_mpi_tag, MPI_COMM_WORLD, &header_req);
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Submit of header_req OK!\n");
|
|
|
header_req_submitted = 1;
|
|
@@ -1116,7 +1116,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
struct _starpu_mpi_req *found_req = find_req(recv_env->mpi_tag);
|
|
|
|
|
|
- /* Case : a data will arrive before the matching receive has been submitted in our side of the application.
|
|
|
+ /* Case : a data will arrive before the matching receive has been submitted in our side of the application.
|
|
|
* We will allow a temporary handle to store the incoming data, by submitting a starpu_mpi_irecv_detached
|
|
|
* on this handle, and register this so as the StarPU-MPI layer can remember it.*/
|
|
|
if (!found_req)
|
|
@@ -1139,12 +1139,12 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
starpu_data_register_same(&chandle->handle, data_handle);
|
|
|
add_chandle(chandle);
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "Posting internal starpu_irecv_detached on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
|
|
|
+ _STARPU_MPI_DEBUG(3, "Posting internal starpu_irecv_detached on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
|
|
|
|
|
|
res = starpu_mpi_irecv_detached(chandle->handle,status.MPI_SOURCE,chandle->mpi_tag,MPI_COMM_WORLD,NULL,NULL);
|
|
|
STARPU_ASSERT(res == MPI_SUCCESS);
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "Success of starpu_irecv_detached on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
|
|
|
+ _STARPU_MPI_DEBUG(3, "Success of starpu_irecv_detached on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
|
|
|
}
|
|
|
/* Case : a matching receive has been found for the incoming data, we handle the correct allocation of the pointer associated to
|
|
|
* the data handle, then submit the corresponding receive with _starpu_mpi_handle_new_request. */
|