|
@@ -57,18 +57,13 @@ static int posted_requests = 0, newer_requests, barrier_running = 0;
|
|
|
|
|
|
#define _STARPU_MPI_INC_POSTED_REQUESTS(value) { STARPU_PTHREAD_MUTEX_LOCK(&mutex_posted_requests); posted_requests += value; STARPU_PTHREAD_MUTEX_UNLOCK(&mutex_posted_requests); }
|
|
|
|
|
|
-struct _starpu_mpi_envelope
|
|
|
-{
|
|
|
- ssize_t psize;
|
|
|
- int mpi_tag;
|
|
|
-};
|
|
|
-
|
|
|
struct _starpu_mpi_copy_handle
|
|
|
{
|
|
|
starpu_data_handle_t handle;
|
|
|
struct _starpu_mpi_envelope *env;
|
|
|
int mpi_tag;
|
|
|
UT_hash_handle hh;
|
|
|
+ struct _starpu_mpi_req *req;
|
|
|
};
|
|
|
|
|
|
/********************************************************/
|
|
@@ -176,135 +171,170 @@ static void delete_chandle(struct _starpu_mpi_copy_handle *chandle)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/********************************************************/
|
|
|
-/* */
|
|
|
-/* Send/Receive functionalities */
|
|
|
-/* */
|
|
|
-/********************************************************/
|
|
|
-
|
|
|
-static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
|
|
|
- int srcdst, int mpi_tag, MPI_Comm comm,
|
|
|
- unsigned detached, void (*callback)(void *), void *arg,
|
|
|
- enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
|
|
|
- enum starpu_data_access_mode mode)
|
|
|
+static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
+ /* Initialize the request structure */
|
|
|
+ req->data_handle = NULL;
|
|
|
|
|
|
- _STARPU_MPI_LOG_IN();
|
|
|
- struct _starpu_mpi_req *req = calloc(1, sizeof(struct _starpu_mpi_req));
|
|
|
- STARPU_ASSERT_MSG(req, "Invalid request");
|
|
|
+ req->datatype = NULL;
|
|
|
+ req->ptr = NULL;
|
|
|
+ req->count = -1;
|
|
|
+ req->user_datatype = -1;
|
|
|
|
|
|
- _STARPU_MPI_INC_POSTED_REQUESTS(1);
|
|
|
+ req->srcdst = -1;
|
|
|
+ req->mpi_tag = -1;
|
|
|
+ req->comm = 0;
|
|
|
|
|
|
- /* Initialize the request structure */
|
|
|
- req->submitted = 0;
|
|
|
- req->completed = 0;
|
|
|
+ req->func = NULL;
|
|
|
+
|
|
|
+ req->status = NULL;
|
|
|
+ req->request = NULL;
|
|
|
+ req->flag = NULL;
|
|
|
+
|
|
|
+ req->ret = -1;
|
|
|
STARPU_PTHREAD_MUTEX_INIT(&req->req_mutex, NULL);
|
|
|
STARPU_PTHREAD_COND_INIT(&req->req_cond, NULL);
|
|
|
+ STARPU_PTHREAD_MUTEX_INIT(&req->posted_mutex, NULL);
|
|
|
+ STARPU_PTHREAD_COND_INIT(&req->posted_cond, NULL);
|
|
|
|
|
|
- req->request_type = request_type;
|
|
|
- req->user_datatype = -1;
|
|
|
- req->count = -1;
|
|
|
- req->data_handle = data_handle;
|
|
|
- req->srcdst = srcdst;
|
|
|
- req->mpi_tag = mpi_tag;
|
|
|
- req->comm = comm;
|
|
|
+ req->request_type = UNKNOWN_REQ;
|
|
|
|
|
|
- req->detached = detached;
|
|
|
- req->callback = callback;
|
|
|
- req->callback_arg = arg;
|
|
|
+ req->submitted = 0;
|
|
|
+ req->completed = 0;
|
|
|
+ req->posted = 0;
|
|
|
|
|
|
- req->func = func;
|
|
|
+ req->other_request = NULL;
|
|
|
|
|
|
- /* Asynchronously request StarPU to fetch the data in main memory: when
|
|
|
- * it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
|
|
|
- * the request is actually submitted */
|
|
|
- starpu_data_acquire_cb(data_handle, mode, _starpu_mpi_submit_new_mpi_request, (void *)req);
|
|
|
+ req->detached = -1;
|
|
|
+ req->callback = NULL;
|
|
|
+ req->callback_arg = NULL;
|
|
|
|
|
|
- _STARPU_MPI_LOG_OUT();
|
|
|
- return req;
|
|
|
-}
|
|
|
+ req->size_req = NULL;
|
|
|
+ req->internal_req = NULL;
|
|
|
+ req->is_internal_req = 0;
|
|
|
+ req->envelope = NULL;
|
|
|
+ }
|
|
|
|
|
|
-/********************************************************/
|
|
|
-/* */
|
|
|
-/* Send functionalities */
|
|
|
-/* */
|
|
|
-/********************************************************/
|
|
|
+ /********************************************************/
|
|
|
+ /* */
|
|
|
+ /* Send/Receive functionalities */
|
|
|
+ /* */
|
|
|
+ /********************************************************/
|
|
|
|
|
|
-static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
|
|
|
-{
|
|
|
- _STARPU_MPI_LOG_IN();
|
|
|
+ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
|
|
|
+ int srcdst, int mpi_tag, MPI_Comm comm,
|
|
|
+ unsigned detached, void (*callback)(void *), void *arg,
|
|
|
+ enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
|
|
|
+ enum starpu_data_access_mode mode)
|
|
|
+ {
|
|
|
+
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
+ struct _starpu_mpi_req *req = malloc(sizeof(struct _starpu_mpi_req));
|
|
|
+ STARPU_ASSERT_MSG(req, "Invalid request");
|
|
|
+
|
|
|
+ _STARPU_MPI_INC_POSTED_REQUESTS(1);
|
|
|
+
|
|
|
+ /* Initialize the request structure */
|
|
|
+ _starpu_mpi_request_init(req);
|
|
|
+ req->request_type = request_type;
|
|
|
+ req->data_handle = data_handle;
|
|
|
+ req->srcdst = srcdst;
|
|
|
+ req->mpi_tag = mpi_tag;
|
|
|
+ req->comm = comm;
|
|
|
+ req->detached = detached;
|
|
|
+ req->callback = callback;
|
|
|
+ req->callback_arg = arg;
|
|
|
+ req->func = func;
|
|
|
+
|
|
|
+ /* Asynchronously request StarPU to fetch the data in main memory: when
|
|
|
+ * it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
|
|
|
+ * the request is actually submitted */
|
|
|
+ starpu_data_acquire_cb(data_handle, mode, _starpu_mpi_submit_new_mpi_request, (void *)req);
|
|
|
+
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
+ return req;
|
|
|
+ }
|
|
|
|
|
|
- STARPU_ASSERT_MSG(req->ptr, "Pointer containing data to send is invalid");
|
|
|
+ /********************************************************/
|
|
|
+ /* */
|
|
|
+ /* Send functionalities */
|
|
|
+ /* */
|
|
|
+ /********************************************************/
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p datasize %ld ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
|
|
|
+ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
|
|
|
+ {
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
|
|
|
- _starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
|
|
|
+ STARPU_ASSERT_MSG(req->ptr, "Pointer containing data to send is invalid");
|
|
|
|
|
|
- TRACE_MPI_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
|
|
|
+ _STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p datasize %ld ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
|
|
|
|
|
|
- req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
|
|
|
- STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %d", req->ret);
|
|
|
+ _starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
|
|
|
|
|
|
- TRACE_MPI_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, 0);
|
|
|
+ TRACE_MPI_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
|
|
|
|
|
|
- /* somebody is perhaps waiting for the MPI request to be posted */
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
|
|
|
- req->submitted = 1;
|
|
|
- STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
+ req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->srcdst, _starpu_mpi_tag, req->comm, &req->request);
|
|
|
+ STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %d", req->ret);
|
|
|
|
|
|
- _starpu_mpi_handle_detached_request(req);
|
|
|
+ TRACE_MPI_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, 0);
|
|
|
|
|
|
- _STARPU_MPI_LOG_OUT();
|
|
|
-}
|
|
|
+ /* somebody is perhaps waiting for the MPI request to be posted */
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
|
|
|
+ req->submitted = 1;
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
|
|
|
-static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
|
|
|
-{
|
|
|
- _starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
|
|
|
+ _starpu_mpi_handle_detached_request(req);
|
|
|
+
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
+ }
|
|
|
|
|
|
- struct _starpu_mpi_envelope* env = calloc(1,sizeof(struct _starpu_mpi_envelope));
|
|
|
+ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
|
|
|
+ {
|
|
|
+ _starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
|
|
|
|
|
|
- env->mpi_tag = req->mpi_tag;
|
|
|
+ req->envelope = calloc(1,sizeof(struct _starpu_mpi_envelope));
|
|
|
+ req->envelope->mpi_tag = req->mpi_tag;
|
|
|
|
|
|
if (req->user_datatype == 0)
|
|
|
{
|
|
|
req->count = 1;
|
|
|
req->ptr = starpu_data_get_local_ptr(req->data_handle);
|
|
|
|
|
|
- env->psize = (ssize_t)req->count;
|
|
|
+ req->envelope->psize = (ssize_t)req->count;
|
|
|
|
|
|
_STARPU_MPI_DEBUG(1, "Post MPI isend count (%ld) datatype_size %ld request to %d with tag %d\n",req->count,starpu_data_get_size(req->data_handle),req->srcdst, _starpu_mpi_tag);
|
|
|
- MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
|
|
|
+ MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
int ret;
|
|
|
|
|
|
// Do not pack the data, just try to find out the size
|
|
|
- starpu_data_pack(req->data_handle, NULL, &(env->psize));
|
|
|
+ starpu_data_pack(req->data_handle, NULL, &(req->envelope->psize));
|
|
|
|
|
|
- if (env->psize != -1)
|
|
|
+ if (req->envelope->psize != -1)
|
|
|
{
|
|
|
// We already know the size of the data, let's send it to overlap with the packing of the data
|
|
|
- _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", env->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
|
|
|
- req->count = env->psize;
|
|
|
- ret = MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
|
|
|
+ _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", req->envelope->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
|
|
|
+ req->count = req->envelope->psize;
|
|
|
+ ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
|
|
|
STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
|
|
|
}
|
|
|
|
|
|
// Pack the data
|
|
|
starpu_data_pack(req->data_handle, &req->ptr, &req->count);
|
|
|
- if (env->psize == -1)
|
|
|
+ if (req->envelope->psize == -1)
|
|
|
{
|
|
|
// We know the size now, let's send it
|
|
|
- _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", env->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
|
|
|
- ret = MPI_Isend(env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
|
|
|
+ _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", req->envelope->psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), _starpu_mpi_tag, req->srcdst);
|
|
|
+ ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->srcdst, _starpu_mpi_tag, req->comm, &req->size_req);
|
|
|
STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %d", ret);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
// We check the size returned with the 2 calls to pack is the same
|
|
|
- STARPU_ASSERT_MSG(req->count == env->psize, "Calls to pack_data returned different sizes %ld != %ld", req->count, env->psize);
|
|
|
+ STARPU_ASSERT_MSG(req->count == req->envelope->psize, "Calls to pack_data returned different sizes %ld != %ld", req->count, req->envelope->psize);
|
|
|
}
|
|
|
// We can send the data now
|
|
|
}
|
|
@@ -400,6 +430,13 @@ int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
STARPU_ASSERT_MSG(public_req, "starpu_mpi_irecv needs a valid starpu_mpi_req");
|
|
|
|
|
|
+ // We check if a tag is defined for the data handle, if not,
|
|
|
+ // we define the one given for the communication.
|
|
|
+ // A tag is necessary for the internal mpi engine.
|
|
|
+ int tag = starpu_data_get_tag(data_handle);
|
|
|
+ if (tag == -1)
|
|
|
+ starpu_data_set_tag(data_handle, mpi_tag);
|
|
|
+
|
|
|
struct _starpu_mpi_req *req;
|
|
|
req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, NULL, NULL);
|
|
|
|
|
@@ -413,7 +450,16 @@ int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
|
|
|
int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
+
|
|
|
+ // We check if a tag is defined for the data handle, if not,
|
|
|
+ // we define the one given for the communication.
|
|
|
+ // A tag is necessary for the internal mpi engine.
|
|
|
+ int tag = starpu_data_get_tag(data_handle);
|
|
|
+ if (tag == -1)
|
|
|
+ starpu_data_set_tag(data_handle, mpi_tag);
|
|
|
+
|
|
|
_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg);
|
|
|
+
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
return 0;
|
|
|
}
|
|
@@ -421,8 +467,15 @@ int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int
|
|
|
int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, MPI_Status *status)
|
|
|
{
|
|
|
starpu_mpi_req req;
|
|
|
-
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
+
|
|
|
+ // We check if a tag is defined for the data handle, if not,
|
|
|
+ // we define the one given for the communication.
|
|
|
+ // A tag is necessary for the internal mpi engine.
|
|
|
+ int tag = starpu_data_get_tag(data_handle);
|
|
|
+ if (tag == -1)
|
|
|
+ starpu_data_set_tag(data_handle, mpi_tag);
|
|
|
+
|
|
|
starpu_mpi_irecv(data_handle, &req, source, mpi_tag, comm);
|
|
|
starpu_mpi_wait(&req, status);
|
|
|
|
|
@@ -457,8 +510,11 @@ int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
int ret;
|
|
|
- struct _starpu_mpi_req *waiting_req = calloc(1, sizeof(struct _starpu_mpi_req));
|
|
|
+
|
|
|
+ struct _starpu_mpi_req *waiting_req = malloc(sizeof(struct _starpu_mpi_req));
|
|
|
+ _starpu_mpi_request_init(waiting_req);
|
|
|
STARPU_ASSERT_MSG(waiting_req, "Allocation failed");
|
|
|
+
|
|
|
struct _starpu_mpi_req *req = *public_req;
|
|
|
|
|
|
_STARPU_MPI_INC_POSTED_REQUESTS(1);
|
|
@@ -549,9 +605,9 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
|
|
|
if (submitted)
|
|
|
{
|
|
|
- struct _starpu_mpi_req *testing_req = calloc(1, sizeof(struct _starpu_mpi_req));
|
|
|
+ struct _starpu_mpi_req *testing_req = malloc(sizeof(struct _starpu_mpi_req));
|
|
|
STARPU_ASSERT_MSG(testing_req, "allocation failed");
|
|
|
- // memset(testing_req, 0, sizeof(struct _starpu_mpi_req));
|
|
|
+ _starpu_mpi_request_init(testing_req);
|
|
|
|
|
|
/* Initialize the request structure */
|
|
|
STARPU_PTHREAD_MUTEX_INIT(&(testing_req->req_mutex), NULL);
|
|
@@ -615,8 +671,9 @@ int starpu_mpi_barrier(MPI_Comm comm)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
int ret;
|
|
|
- struct _starpu_mpi_req *barrier_req = calloc(1, sizeof(struct _starpu_mpi_req));
|
|
|
+ struct _starpu_mpi_req *barrier_req = malloc(sizeof(struct _starpu_mpi_req));
|
|
|
STARPU_ASSERT_MSG(barrier_req, "allocation failed");
|
|
|
+ _starpu_mpi_request_init(barrier_req);
|
|
|
|
|
|
/* First wait for *both* all tasks and MPI requests to finish, in case
|
|
|
* some tasks generate MPI requests, MPI requests generate tasks, etc.
|
|
@@ -681,6 +738,7 @@ static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type
|
|
|
case WAIT_REQ: return "WAIT_REQ";
|
|
|
case TEST_REQ: return "TEST_REQ";
|
|
|
case BARRIER_REQ: return "BARRIER_REQ";
|
|
|
+ case UNKNOWN_REQ: return "UNSET_REQ";
|
|
|
default: return "unknown request type";
|
|
|
}
|
|
|
}
|
|
@@ -725,12 +783,25 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
+ _STARPU_MPI_DEBUG(3, "NOT deleting chandle %p from hashmap (tag %d %d)\n", chandle, req->mpi_tag, starpu_data_get_tag(req->data_handle));
|
|
|
_starpu_mpi_handle_free_datatype(req->data_handle, &req->datatype);
|
|
|
}
|
|
|
}
|
|
|
starpu_data_release(req->data_handle);
|
|
|
}
|
|
|
|
|
|
+ if (req->envelope)
|
|
|
+ {
|
|
|
+ free(req->envelope);
|
|
|
+ req->envelope = NULL;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (req->internal_req)
|
|
|
+ {
|
|
|
+ free(req->internal_req);
|
|
|
+ req->internal_req = NULL;
|
|
|
+ }
|
|
|
+
|
|
|
/* Execute the specified callback, if any */
|
|
|
if (req->callback)
|
|
|
req->callback(req->callback_arg);
|
|
@@ -755,6 +826,11 @@ static void _starpu_mpi_copy_cb(void* arg)
|
|
|
{
|
|
|
struct _starpu_mpi_copy_cb_args *args = arg;
|
|
|
|
|
|
+ // We store in the application request the internal MPI
|
|
|
+ // request so that it can be used by starpu_mpi_wait
|
|
|
+ args->req->request = args->req->internal_req->request;
|
|
|
+ args->req->submitted = 1;
|
|
|
+
|
|
|
struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->copy_handle);
|
|
|
void* itf_src = starpu_data_get_interface_on_node(args->copy_handle,0);
|
|
|
void* itf_dst = starpu_data_get_interface_on_node(args->data_handle,0);
|
|
@@ -777,7 +853,11 @@ static void _starpu_mpi_copy_cb(void* arg)
|
|
|
starpu_data_unregister_submit(args->copy_handle);
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Done, handling request %p termination of the already received request\n",args->req);
|
|
|
- _starpu_mpi_handle_request_termination(args->req);
|
|
|
+ if (args->req->detached)
|
|
|
+ _starpu_mpi_handle_request_termination(args->req);
|
|
|
+ // else: If the request is not detached its termination will
|
|
|
+ // be handled when calling starpu_mpi_wait
|
|
|
+
|
|
|
|
|
|
free(args);
|
|
|
}
|
|
@@ -789,6 +869,8 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
|
|
|
_STARPU_MPI_INC_POSTED_REQUESTS(-1);
|
|
|
|
|
|
+ _STARPU_MPI_DEBUG(3, "calling _starpu_mpi_submit_new_mpi_request with req %p tag %d and type %s\n", req, req->mpi_tag, _starpu_mpi_request_type(req->request_type));
|
|
|
+
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
if (req->request_type == RECV_REQ)
|
|
@@ -804,6 +886,8 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->mpi_tag);
|
|
|
|
|
|
+ req->internal_req = chandle->req;
|
|
|
+
|
|
|
struct _starpu_mpi_copy_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_copy_cb_args));
|
|
|
cb_args->data_handle = req->data_handle;
|
|
|
cb_args->copy_handle = chandle->handle;
|
|
@@ -835,9 +919,16 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
|
|
|
}
|
|
|
|
|
|
+ _STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
|
|
|
_starpu_mpi_req_list_push_front(new_requests, req);
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
|
|
|
+ /* inform the starpu mpi thread that the request has beenbe pushed in the new_requests list */
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&req->posted_mutex);
|
|
|
+ req->posted = 1;
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&req->posted_cond);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&req->posted_mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
}
|
|
|
/* Case : a classic receive request with no send received earlier than expected.
|
|
|
* We just add the pending receive request to the requests' hashmap. */
|
|
@@ -931,7 +1022,8 @@ static void _starpu_mpi_test_detached_requests(void)
|
|
|
if (flag)
|
|
|
{
|
|
|
_starpu_mpi_req_list_erase(detached_requests, req);
|
|
|
- free(req);
|
|
|
+ if (!req->is_internal_req)
|
|
|
+ free(req);
|
|
|
}
|
|
|
|
|
|
}
|
|
@@ -1041,13 +1133,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
struct _starpu_mpi_envelope *recv_env = calloc(1,sizeof(struct _starpu_mpi_envelope));
|
|
|
|
|
|
- MPI_Request header_req;
|
|
|
int header_req_submitted = 0;
|
|
|
|
|
|
while (running || posted_requests || !(_starpu_mpi_req_list_empty(new_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
|
|
|
{
|
|
|
/* shall we block ? */
|
|
|
- _STARPU_MPI_DEBUG(3, "HASH_COUNT(_starpu_mpi_req_hashmap) = %d\n",HASH_COUNT(_starpu_mpi_req_hashmap));
|
|
|
unsigned block = _starpu_mpi_req_list_empty(new_requests) && (HASH_COUNT(_starpu_mpi_req_hashmap) == 0);
|
|
|
|
|
|
#ifndef STARPU_MPI_ACTIVITY
|
|
@@ -1085,11 +1175,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
/* If there is no currently submitted header_req submitted to catch envelopes from senders, and there is some pending receive
|
|
|
* requests in our side, we resubmit a header request. */
|
|
|
- if ((HASH_COUNT(_starpu_mpi_req_hashmap) > 0) && (header_req_submitted == 0) && (HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0))
|
|
|
+ MPI_Request header_req;
|
|
|
+ if ((HASH_COUNT(_starpu_mpi_req_hashmap) > 0) && (header_req_submitted == 0))// && (HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0))
|
|
|
{
|
|
|
+ _STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop\n");
|
|
|
MPI_Irecv(recv_env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _starpu_mpi_tag, MPI_COMM_WORLD, &header_req);
|
|
|
-
|
|
|
- _STARPU_MPI_DEBUG(3, "Submit of header_req OK!\n");
|
|
|
header_req_submitted = 1;
|
|
|
}
|
|
|
|
|
@@ -1102,7 +1192,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
{
|
|
|
int flag,res;
|
|
|
MPI_Status status;
|
|
|
- _STARPU_MPI_DEBUG(3, "Test of header_req\n");
|
|
|
+ _STARPU_MPI_DEBUG(4, "Test of header_req\n");
|
|
|
|
|
|
/* test whether an envelope has arrived. */
|
|
|
res = MPI_Test(&header_req, &flag, &status);
|
|
@@ -1110,9 +1200,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
if (flag)
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "header_req received !\n");
|
|
|
-
|
|
|
- _STARPU_MPI_DEBUG(3, "Searching for request with tag %d, size %ld ..\n",recv_env->mpi_tag, recv_env->psize);
|
|
|
+ _STARPU_MPI_DEBUG(3, "Searching for request with tag %d (size %ld)\n", recv_env->mpi_tag, recv_env->psize);
|
|
|
|
|
|
struct _starpu_mpi_req *found_req = find_req(recv_env->mpi_tag);
|
|
|
|
|
@@ -1127,7 +1215,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
while(!(data_handle))
|
|
|
{
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
data_handle = starpu_data_get_data_handle_from_tag(recv_env->mpi_tag);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
}
|
|
|
STARPU_ASSERT(data_handle);
|
|
|
|
|
@@ -1139,12 +1229,21 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
starpu_data_register_same(&chandle->handle, data_handle);
|
|
|
add_chandle(chandle);
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "Posting internal starpu_irecv_detached on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
|
|
|
-
|
|
|
- res = starpu_mpi_irecv_detached(chandle->handle,status.MPI_SOURCE,chandle->mpi_tag,MPI_COMM_WORLD,NULL,NULL);
|
|
|
- STARPU_ASSERT(res == MPI_SUCCESS);
|
|
|
+ _STARPU_MPI_DEBUG(3, "Posting internal detached irecv on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
|
|
|
+ chandle->req = _starpu_mpi_irecv_common(chandle->handle, status.MPI_SOURCE, chandle->mpi_tag, MPI_COMM_WORLD, 1, NULL, NULL);
|
|
|
+ chandle->req->is_internal_req = 1;
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "Success of starpu_irecv_detached on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
|
|
|
+ // We wait until the request is pushed in the
|
|
|
+ // new_request list, that ensures that the next loop
|
|
|
+ // will call _starpu_mpi_handle_new_request
|
|
|
+ // on the request and post the corresponding mpi_irecv,
|
|
|
+ // otherwise, it may lead to read data as envelop
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&(chandle->req->posted_mutex));
|
|
|
+ while (!(chandle->req->posted))
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&(chandle->req->posted_cond), &(chandle->req->posted_mutex));
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&(chandle->req->posted_mutex));
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
}
|
|
|
/* Case : a matching receive has been found for the incoming data, we handle the correct allocation of the pointer associated to
|
|
|
* the data handle, then submit the corresponding receive with _starpu_mpi_handle_new_request. */
|
|
@@ -1181,7 +1280,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "Nothing received, continue ..\n");
|
|
|
+ _STARPU_MPI_DEBUG(4, "Nothing received, continue ..\n");
|
|
|
}
|
|
|
}
|
|
|
}
|