|
@@ -37,7 +37,8 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t dat
|
|
|
static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle,
|
|
|
int source, int mpi_tag, MPI_Comm comm,
|
|
|
unsigned detached, void (*callback)(void *), void *arg,
|
|
|
- int sequential_consistency);
|
|
|
+ int sequential_consistency, int is_internal_req,
|
|
|
+ ssize_t psize);
|
|
|
static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
|
|
|
|
|
|
/* The list of requests that have been newly submitted by the application */
|
|
@@ -76,59 +77,60 @@ struct _starpu_mpi_copy_handle
|
|
|
/* */
|
|
|
/********************************************************/
|
|
|
|
|
|
-static struct _starpu_mpi_req *_starpu_mpi_req_hashmap = NULL;
|
|
|
+/** stores application requests for which data have not been received yet */
|
|
|
+static struct _starpu_mpi_req *_starpu_mpi_app_req_hashmap = NULL;
|
|
|
/** stores data which have been received by MPI but have not been requested by the application */
|
|
|
static struct _starpu_mpi_copy_handle *_starpu_mpi_copy_handle_hashmap = NULL;
|
|
|
|
|
|
-static struct _starpu_mpi_req* find_req(int mpi_tag)
|
|
|
+static struct _starpu_mpi_req* find_app_req(int mpi_tag)
|
|
|
{
|
|
|
- struct _starpu_mpi_req* req; // = malloc(sizeof(struct _starpu_mpi_req));
|
|
|
+ struct _starpu_mpi_req* req;
|
|
|
|
|
|
- HASH_FIND_INT(_starpu_mpi_req_hashmap, &mpi_tag, req);
|
|
|
+ HASH_FIND_INT(_starpu_mpi_app_req_hashmap, &mpi_tag, req);
|
|
|
|
|
|
return req;
|
|
|
}
|
|
|
|
|
|
-static void add_req(struct _starpu_mpi_req *req)
|
|
|
+static void add_app_req(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
struct _starpu_mpi_req *test_req;
|
|
|
|
|
|
- test_req = find_req(req->mpi_tag);
|
|
|
+ test_req = find_app_req(req->mpi_tag);
|
|
|
|
|
|
if (test_req == NULL)
|
|
|
{
|
|
|
- HASH_ADD_INT(_starpu_mpi_req_hashmap, mpi_tag, req);
|
|
|
- _STARPU_MPI_DEBUG(3, "Adding request %p with tag %d in the hashmap. \n", req, req->mpi_tag);
|
|
|
+ HASH_ADD_INT(_starpu_mpi_app_req_hashmap, mpi_tag, req);
|
|
|
+ _STARPU_MPI_DEBUG(3, "Adding request %p with tag %d in the application request hashmap. \n", req, req->mpi_tag);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "Error add_req : request %p with tag %d already in the hashmap. \n", req, req->mpi_tag);
|
|
|
+ _STARPU_MPI_DEBUG(3, "[Error] request %p with tag %d already in the application request hashmap. \n", req, req->mpi_tag);
|
|
|
int seq_const = starpu_data_get_sequential_consistency_flag(req->data_handle);
|
|
|
if (seq_const && req->sequential_consistency)
|
|
|
{
|
|
|
- STARPU_ASSERT_MSG(!test_req, "Error add_req : request %p with tag %d wanted to be added to the hashmap, while another request %p with the same tag is already in it. \n Sequential consistency is activated : this is not supported by StarPU.", req, req->mpi_tag, test_req);
|
|
|
+ STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap, while another request %p with the same tag is already in it. \n Sequential consistency is activated : this is not supported by StarPU.", req, req->mpi_tag, test_req);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- STARPU_ASSERT_MSG(!test_req, "Error add_req : request %p with tag %d wanted to be added to the hashmap, while another request %p with the same tag is already in it. \n Sequential consistency isn't activated for this handle : you should want to add dependencies between requests for which the sequential consistency is deactivated.", req, req->mpi_tag, test_req);
|
|
|
+ STARPU_ASSERT_MSG(!test_req, "[Error] request %p with tag %d wanted to be added to the application request hashmap, while another request %p with the same tag is already in it. \n Sequential consistency isn't activated for this handle : you should want to add dependencies between requests for which the sequential consistency is deactivated.", req, req->mpi_tag, test_req);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-static void delete_req(struct _starpu_mpi_req *req)
|
|
|
+static void delete_app_req(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
struct _starpu_mpi_req *test_req;
|
|
|
|
|
|
- test_req = find_req(req->mpi_tag);
|
|
|
+ test_req = find_app_req(req->mpi_tag);
|
|
|
|
|
|
if (test_req != NULL)
|
|
|
{
|
|
|
- HASH_DEL(_starpu_mpi_req_hashmap, req);
|
|
|
- _STARPU_MPI_DEBUG(3, "Deleting request %p with tag %d from the hashmap. \n", req, req->mpi_tag);
|
|
|
+ HASH_DEL(_starpu_mpi_app_req_hashmap, req);
|
|
|
+ _STARPU_MPI_DEBUG(3, "Deleting application request %p with tag %d from the application request hashmap. \n", req, req->mpi_tag);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "Warning delete_req : request %p with tag %d isn't in the hashmap. \n", req, req->mpi_tag);
|
|
|
+ _STARPU_MPI_DEBUG(3, "[Warning] request %p with tag %d is NOT in the application request hashmap. \n", req, req->mpi_tag);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -219,7 +221,7 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
|
|
|
req->is_internal_req = 0;
|
|
|
req->envelope = NULL;
|
|
|
req->sequential_consistency = 1;
|
|
|
- }
|
|
|
+}
|
|
|
|
|
|
/********************************************************/
|
|
|
/* */
|
|
@@ -232,8 +234,10 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
|
|
|
unsigned detached, void (*callback)(void *), void *arg,
|
|
|
enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
|
|
|
enum starpu_data_access_mode mode,
|
|
|
- int sequential_consistency)
|
|
|
- {
|
|
|
+ int sequential_consistency,
|
|
|
+ int is_internal_req,
|
|
|
+ ssize_t psize)
|
|
|
+{
|
|
|
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
struct _starpu_mpi_req *req = malloc(sizeof(struct _starpu_mpi_req));
|
|
@@ -253,6 +257,8 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req *req)
|
|
|
req->callback_arg = arg;
|
|
|
req->func = func;
|
|
|
req->sequential_consistency = sequential_consistency;
|
|
|
+ req->is_internal_req = is_internal_req;
|
|
|
+ req->count = psize;
|
|
|
|
|
|
/* Asynchronously request StarPU to fetch the data in main memory: when
|
|
|
* it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
|
|
@@ -354,7 +360,7 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t dat
|
|
|
unsigned detached, void (*callback)(void *), void *arg,
|
|
|
int sequential_consistency)
|
|
|
{
|
|
|
- return _starpu_mpi_isend_irecv_common(data_handle, dest, mpi_tag, comm, detached, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R, sequential_consistency);
|
|
|
+ return _starpu_mpi_isend_irecv_common(data_handle, dest, mpi_tag, comm, detached, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func, STARPU_R, sequential_consistency, 0, 0);
|
|
|
}
|
|
|
|
|
|
int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
|
|
@@ -429,9 +435,9 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
|
|
|
|
-static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg, int sequential_consistency)
|
|
|
+static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, ssize_t psize)
|
|
|
{
|
|
|
- return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency);
|
|
|
+ return _starpu_mpi_isend_irecv_common(data_handle, source, mpi_tag, comm, detached, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency, is_internal_req, psize);
|
|
|
}
|
|
|
|
|
|
int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
|
|
@@ -447,7 +453,7 @@ int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
|
|
|
starpu_data_set_tag(data_handle, mpi_tag);
|
|
|
|
|
|
struct _starpu_mpi_req *req;
|
|
|
- req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, NULL, NULL, 1);
|
|
|
+ req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, NULL, NULL, 1, 0, 0);
|
|
|
|
|
|
STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_irecv_common");
|
|
|
*public_req = req;
|
|
@@ -467,7 +473,7 @@ int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int
|
|
|
if (tag == -1)
|
|
|
starpu_data_set_tag(data_handle, mpi_tag);
|
|
|
|
|
|
- _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, 1);
|
|
|
+ _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, 1, 0, 0);
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
return 0;
|
|
|
}
|
|
@@ -475,7 +481,8 @@ int starpu_mpi_irecv_detached(starpu_data_handle_t data_handle, int source, int
|
|
|
int starpu_mpi_irecv_detached_sequential_consistency(starpu_data_handle_t data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg, int sequential_consistency)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
- _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, sequential_consistency);
|
|
|
+
|
|
|
+ _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg, sequential_consistency, 0, 0);
|
|
|
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
return 0;
|
|
@@ -766,58 +773,54 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
|
|
|
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
|
|
|
- req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
|
|
|
+ _STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d internal_req %p\n",
|
|
|
+ req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr,
|
|
|
+ _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype, req->internal_req);
|
|
|
|
|
|
- if (req->request_type == RECV_REQ || req->request_type == SEND_REQ)
|
|
|
+ if (req->internal_req)
|
|
|
{
|
|
|
- if (req->user_datatype == 1)
|
|
|
- {
|
|
|
- if (req->request_type == SEND_REQ)
|
|
|
- {
|
|
|
- // We need to make sure the communication for sending the size
|
|
|
- // has completed, as MPI can re-order messages, let's call
|
|
|
- // MPI_Wait to make sure data have been sent
|
|
|
- ret = MPI_Wait(&req->size_req, MPI_STATUS_IGNORE);
|
|
|
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %d", ret);
|
|
|
-
|
|
|
- }
|
|
|
- if (req->request_type == RECV_REQ)
|
|
|
- // req->ptr is freed by starpu_data_unpack
|
|
|
- starpu_data_unpack(req->data_handle, req->ptr, req->count);
|
|
|
- else
|
|
|
- free(req->ptr);
|
|
|
- }
|
|
|
- else
|
|
|
+ struct _starpu_mpi_copy_handle *chandle = find_chandle(starpu_data_get_tag(req->data_handle));
|
|
|
+ _STARPU_MPI_DEBUG(3, "Handling deleting of copy_handle structure from the hashmap..\n");
|
|
|
+ delete_chandle(chandle);
|
|
|
+ free(chandle);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (req->request_type == RECV_REQ || req->request_type == SEND_REQ)
|
|
|
{
|
|
|
- struct _starpu_mpi_copy_handle *chandle = find_chandle(starpu_data_get_tag(req->data_handle));
|
|
|
- if (chandle && (req->data_handle != chandle->handle))
|
|
|
+ if (req->user_datatype == 1)
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "Handling deleting of copy_handle structure from the hashmap..\n");
|
|
|
- delete_chandle(chandle);
|
|
|
- free(chandle);
|
|
|
+ if (req->request_type == SEND_REQ)
|
|
|
+ {
|
|
|
+ // We need to make sure the communication for sending the size
|
|
|
+ // has completed, as MPI can re-order messages, let's call
|
|
|
+ // MPI_Wait to make sure data have been sent
|
|
|
+ ret = MPI_Wait(&req->size_req, MPI_STATUS_IGNORE);
|
|
|
+ STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %d", ret);
|
|
|
+ free(req->ptr);
|
|
|
+ }
|
|
|
+ if (req->request_type == RECV_REQ)
|
|
|
+ {
|
|
|
+ // req->ptr is freed by starpu_data_unpack
|
|
|
+ starpu_data_unpack(req->data_handle, req->ptr, req->count);
|
|
|
+ }
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "NOT deleting chandle %p from hashmap (tag %d %d)\n", chandle, req->mpi_tag, starpu_data_get_tag(req->data_handle));
|
|
|
_starpu_mpi_handle_free_datatype(req->data_handle, &req->datatype);
|
|
|
}
|
|
|
}
|
|
|
- starpu_data_release(req->data_handle);
|
|
|
}
|
|
|
|
|
|
+ if (req->data_handle)
|
|
|
+ starpu_data_release(req->data_handle);
|
|
|
+
|
|
|
if (req->envelope)
|
|
|
{
|
|
|
free(req->envelope);
|
|
|
req->envelope = NULL;
|
|
|
}
|
|
|
|
|
|
- if (req->internal_req)
|
|
|
- {
|
|
|
- free(req->internal_req);
|
|
|
- req->internal_req = NULL;
|
|
|
- }
|
|
|
-
|
|
|
/* Execute the specified callback, if any */
|
|
|
if (req->callback)
|
|
|
req->callback(req->callback_arg);
|
|
@@ -869,12 +872,13 @@ static void _starpu_mpi_copy_cb(void* arg)
|
|
|
starpu_data_unregister_submit(args->copy_handle);
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Done, handling request %p termination of the already received request\n",args->req);
|
|
|
+ // If the request is detached, we need to call _starpu_mpi_handle_request_termination
|
|
|
+ // as it will not be called automatically as the request is not in the list detached_requests
|
|
|
if (args->req->detached)
|
|
|
_starpu_mpi_handle_request_termination(args->req);
|
|
|
// else: If the request is not detached its termination will
|
|
|
// be handled when calling starpu_mpi_wait
|
|
|
|
|
|
-
|
|
|
free(args);
|
|
|
}
|
|
|
|
|
@@ -891,82 +895,78 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
|
|
|
if (req->request_type == RECV_REQ)
|
|
|
{
|
|
|
- /* test whether the receive request has already been submitted internally by StarPU-MPI*/
|
|
|
- struct _starpu_mpi_copy_handle *chandle = find_chandle(req->mpi_tag);
|
|
|
-
|
|
|
- /* Case : the request has already been submitted internally by StarPU.
|
|
|
- * We'll asynchronously ask a Read permission over the temporary handle, so as when
|
|
|
- * the internal receive will be over, the _starpu_mpi_copy_cb function will be called to
|
|
|
- * bring the data back to the original data handle associated to the request.*/
|
|
|
- if (chandle && (req->data_handle != chandle->handle))
|
|
|
+ /* Case : the request is the internal receive request submitted by StarPU-MPI to receive
|
|
|
+ * incoming data without a matching pending receive already submitted by the application.
|
|
|
+ * We immediately allocate the pointer associated to the data_handle, and pushing it into
|
|
|
+ * the list of new_requests, so as the real MPI request can be submitted before the next
|
|
|
+ * submission of the envelope-catching request. */
|
|
|
+ if (req->is_internal_req)
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->mpi_tag);
|
|
|
-
|
|
|
- req->internal_req = chandle->req;
|
|
|
+ _starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
|
|
|
+ if (req->user_datatype == 0)
|
|
|
+ {
|
|
|
+ req->count = 1;
|
|
|
+ req->ptr = starpu_data_get_local_ptr(req->data_handle);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ STARPU_ASSERT(req->count);
|
|
|
+ req->ptr = malloc(req->count);
|
|
|
+ STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
|
|
|
+ }
|
|
|
|
|
|
- struct _starpu_mpi_copy_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_copy_cb_args));
|
|
|
- cb_args->data_handle = req->data_handle;
|
|
|
- cb_args->copy_handle = chandle->handle;
|
|
|
- cb_args->req = req;
|
|
|
+ _STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
|
|
|
+ _starpu_mpi_req_list_push_front(new_requests, req);
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
|
|
|
- starpu_data_acquire_cb(chandle->handle,STARPU_R,_starpu_mpi_copy_cb,(void*) cb_args);
|
|
|
+ /* inform the starpu mpi thread that the request has beenbe pushed in the new_requests list */
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&req->posted_mutex);
|
|
|
+ req->posted = 1;
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&req->posted_cond);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&req->posted_mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- /* Case : the request is the internal receive request submitted by StarPU-MPI to receive
|
|
|
- * incoming data without a matching pending receive already submitted by the application.
|
|
|
- * We immediately allocate the pointer associated to the data_handle, and pushing it into
|
|
|
- * the list of new_requests, so as the real MPI request can be submitted before the next
|
|
|
- * submission of the envelope-catching request. */
|
|
|
- if (chandle && (req->data_handle == chandle->handle))
|
|
|
+ /* test whether the receive request has already been submitted internally by StarPU-MPI*/
|
|
|
+ struct _starpu_mpi_copy_handle *chandle = find_chandle(req->mpi_tag);
|
|
|
+
|
|
|
+ /* Case : the request has already been submitted internally by StarPU.
|
|
|
+ * We'll asynchronously ask a Read permission over the temporary handle, so as when
|
|
|
+ * the internal receive will be over, the _starpu_mpi_copy_cb function will be called to
|
|
|
+ * bring the data back to the original data handle associated to the request.*/
|
|
|
+ if (chandle)
|
|
|
{
|
|
|
- _starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
|
|
|
- if (req->user_datatype == 0)
|
|
|
- {
|
|
|
- req->count = 1;
|
|
|
- req->ptr = starpu_data_get_local_ptr(req->data_handle);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- req->count = chandle->env->psize;
|
|
|
- req->ptr = malloc(req->count);
|
|
|
+ _STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->mpi_tag);
|
|
|
+ STARPU_ASSERT(req->data_handle != chandle->handle);
|
|
|
|
|
|
- STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
|
|
|
- }
|
|
|
+ req->internal_req = chandle->req;
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
|
|
|
- _starpu_mpi_req_list_push_front(new_requests, req);
|
|
|
+ struct _starpu_mpi_copy_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_copy_cb_args));
|
|
|
+ cb_args->data_handle = req->data_handle;
|
|
|
+ cb_args->copy_handle = chandle->handle;
|
|
|
+ cb_args->req = req;
|
|
|
|
|
|
- /* inform the starpu mpi thread that the request has beenbe pushed in the new_requests list */
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&req->posted_mutex);
|
|
|
- req->posted = 1;
|
|
|
- STARPU_PTHREAD_COND_BROADCAST(&req->posted_cond);
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&req->posted_mutex);
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
+ _STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
|
|
|
+ starpu_data_acquire_cb(chandle->handle,STARPU_R,_starpu_mpi_copy_cb,(void*) cb_args);
|
|
|
}
|
|
|
/* Case : a classic receive request with no send received earlier than expected.
|
|
|
* We just add the pending receive request to the requests' hashmap. */
|
|
|
else
|
|
|
{
|
|
|
- add_req(req);
|
|
|
+ add_app_req(req);
|
|
|
}
|
|
|
-
|
|
|
- newer_requests = 1;
|
|
|
- STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
|
|
|
}
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
_starpu_mpi_req_list_push_front(new_requests, req);
|
|
|
-
|
|
|
- newer_requests = 1;
|
|
|
_STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
|
|
|
req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
|
|
|
- STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
|
|
|
}
|
|
|
|
|
|
+ newer_requests = 1;
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
@@ -1043,6 +1043,9 @@ static void _starpu_mpi_test_detached_requests(void)
|
|
|
if (flag)
|
|
|
{
|
|
|
_starpu_mpi_req_list_erase(detached_requests, req);
|
|
|
+#ifdef STARPU_DEVEL
|
|
|
+#warning FIXME: when do we free internal requests
|
|
|
+#endif
|
|
|
if (!req->is_internal_req)
|
|
|
free(req);
|
|
|
}
|
|
@@ -1135,12 +1138,12 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
}
|
|
|
|
|
|
{
|
|
|
- int rank, worldsize;
|
|
|
- MPI_Comm_rank(MPI_COMM_WORLD, &rank);
|
|
|
- MPI_Comm_size(MPI_COMM_WORLD, &worldsize);
|
|
|
- TRACE_MPI_START(rank, worldsize);
|
|
|
+ int rank, worldsize;
|
|
|
+ MPI_Comm_rank(MPI_COMM_WORLD, &rank);
|
|
|
+ MPI_Comm_size(MPI_COMM_WORLD, &worldsize);
|
|
|
+ TRACE_MPI_START(rank, worldsize);
|
|
|
#ifdef STARPU_USE_FXT
|
|
|
- starpu_profiling_set_id(rank);
|
|
|
+ starpu_profiling_set_id(rank);
|
|
|
#endif //STARPU_USE_FXT
|
|
|
}
|
|
|
|
|
@@ -1159,7 +1162,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
while (running || posted_requests || !(_starpu_mpi_req_list_empty(new_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
|
|
|
{
|
|
|
/* shall we block ? */
|
|
|
- unsigned block = _starpu_mpi_req_list_empty(new_requests) && (HASH_COUNT(_starpu_mpi_req_hashmap) == 0);
|
|
|
+ unsigned block = _starpu_mpi_req_list_empty(new_requests) && (HASH_COUNT(_starpu_mpi_app_req_hashmap) == 0);
|
|
|
|
|
|
#ifndef STARPU_MPI_ACTIVITY
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
@@ -1199,7 +1202,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
/* If there is no currently submitted header_req submitted to catch envelopes from senders, and there is some pending receive
|
|
|
* requests in our side, we resubmit a header request. */
|
|
|
MPI_Request header_req;
|
|
|
- if ((HASH_COUNT(_starpu_mpi_req_hashmap) > 0) && (header_req_submitted == 0))// && (HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0))
|
|
|
+ if ((HASH_COUNT(_starpu_mpi_app_req_hashmap) > 0) && (header_req_submitted == 0))// && (HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0))
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop\n");
|
|
|
MPI_Irecv(recv_env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _starpu_mpi_tag, MPI_COMM_WORLD, &header_req);
|
|
@@ -1223,9 +1226,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
if (flag)
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "Searching for request with tag %d (size %ld)\n", recv_env->mpi_tag, recv_env->psize);
|
|
|
+ _STARPU_MPI_DEBUG(3, "Searching for application request with tag %d (size %ld)\n", recv_env->mpi_tag, recv_env->psize);
|
|
|
|
|
|
- struct _starpu_mpi_req *found_req = find_req(recv_env->mpi_tag);
|
|
|
+ struct _starpu_mpi_req *found_req = find_app_req(recv_env->mpi_tag);
|
|
|
|
|
|
/* Case : a data will arrive before the matching receive has been submitted in our side of the application.
|
|
|
* We will allow a temporary handle to store the incoming data, by submitting a starpu_mpi_irecv_detached
|
|
@@ -1253,8 +1256,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
add_chandle(chandle);
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Posting internal detached irecv on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
|
|
|
- chandle->req = _starpu_mpi_irecv_common(chandle->handle, status.MPI_SOURCE, chandle->mpi_tag, MPI_COMM_WORLD, 1, NULL, NULL, 1);
|
|
|
- chandle->req->is_internal_req = 1;
|
|
|
+ chandle->req = _starpu_mpi_irecv_common(chandle->handle, status.MPI_SOURCE, chandle->mpi_tag, MPI_COMM_WORLD, 1, NULL, NULL, 1, 1, recv_env->psize);
|
|
|
|
|
|
// We wait until the request is pushed in the
|
|
|
// new_request list, that ensures that the next loop
|
|
@@ -1272,9 +1274,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
* the data handle, then submit the corresponding receive with _starpu_mpi_handle_new_request. */
|
|
|
else
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "Found !\n");
|
|
|
+ _STARPU_MPI_DEBUG(3, "A matching receive has been found for the incoming data with tag %d\n", recv_env->mpi_tag);
|
|
|
|
|
|
- delete_req(found_req);
|
|
|
+ delete_app_req(found_req);
|
|
|
|
|
|
_starpu_mpi_handle_allocate_datatype(found_req->data_handle, &found_req->datatype, &found_req->user_datatype);
|
|
|
if (found_req->user_datatype == 0)
|
|
@@ -1311,8 +1313,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
|
|
|
STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(new_requests), "List of new requests not empty");
|
|
|
STARPU_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
|
|
|
- STARPU_ASSERT_MSG(HASH_COUNT(_starpu_mpi_req_hashmap) == 0, "Number of receive requests left is not zero");
|
|
|
-
|
|
|
+ STARPU_ASSERT_MSG(HASH_COUNT(_starpu_mpi_app_req_hashmap) == 0, "Number of receive requests left is not zero");
|
|
|
+ STARPU_ASSERT_MSG(HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0, "Number of copy requests left is not zero");
|
|
|
if (argc_argv->initialize_mpi)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(3, "Calling MPI_Finalize()\n");
|