|
@@ -30,7 +30,7 @@
|
|
|
#include <datawizard/coherency.h>
|
|
|
|
|
|
static void _starpu_mpi_add_sync_point_in_fxt(void);
|
|
|
-static void _starpu_mpi_submit_new_mpi_request(void *arg);
|
|
|
+static void _starpu_mpi_submit_ready_request(void *arg);
|
|
|
static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req);
|
|
|
#ifdef STARPU_VERBOSE
|
|
|
static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type);
|
|
@@ -46,8 +46,8 @@ static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t dat
|
|
|
ssize_t count);
|
|
|
static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
|
|
|
|
|
|
-/* The list of requests that have been newly submitted by the application */
|
|
|
-static struct _starpu_mpi_req_list *new_requests;
|
|
|
+/* The list of ready requests */
|
|
|
+static struct _starpu_mpi_req_list *ready_requests;
|
|
|
|
|
|
/* The list of detached requests that have already been submitted to MPI */
|
|
|
static struct _starpu_mpi_req_list *detached_requests;
|
|
@@ -61,7 +61,7 @@ static starpu_pthread_mutex_t mutex;
|
|
|
static starpu_pthread_t progress_thread;
|
|
|
static int running = 0;
|
|
|
|
|
|
-/* Count requests posted by the application and not yet submitted to MPI, i.e pushed into the new_requests list */
|
|
|
+/* Count requests posted by the application and not yet submitted to MPI */
|
|
|
static starpu_pthread_mutex_t mutex_posted_requests;
|
|
|
static int posted_requests = 0, newer_requests, barrier_running = 0;
|
|
|
|
|
@@ -151,9 +151,9 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
|
|
|
req->count = count;
|
|
|
|
|
|
/* Asynchronously request StarPU to fetch the data in main memory: when
|
|
|
- * it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
|
|
|
+ * it is available in main memory, _starpu_mpi_submit_ready_request(req) is called and
|
|
|
* the request is actually submitted */
|
|
|
- starpu_data_acquire_cb_sequential_consistency(data_handle, mode, _starpu_mpi_submit_new_mpi_request, (void *)req, sequential_consistency);
|
|
|
+ starpu_data_acquire_cb_sequential_consistency(data_handle, mode, _starpu_mpi_submit_ready_request, (void *)req, sequential_consistency);
|
|
|
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
return req;
|
|
@@ -447,7 +447,7 @@ int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
waiting_req->func = _starpu_mpi_wait_func;
|
|
|
waiting_req->request_type = WAIT_REQ;
|
|
|
|
|
|
- _starpu_mpi_submit_new_mpi_request(waiting_req);
|
|
|
+ _starpu_mpi_submit_ready_request(waiting_req);
|
|
|
|
|
|
/* We wait for the MPI request to finish */
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
|
|
@@ -532,7 +532,7 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
testing_req->request_type = TEST_REQ;
|
|
|
|
|
|
_STARPU_MPI_INC_POSTED_REQUESTS(1);
|
|
|
- _starpu_mpi_submit_new_mpi_request(testing_req);
|
|
|
+ _starpu_mpi_submit_ready_request(testing_req);
|
|
|
|
|
|
/* We wait for the test request to finish */
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&(testing_req->req_mutex));
|
|
@@ -619,7 +619,7 @@ int starpu_mpi_barrier(MPI_Comm comm)
|
|
|
barrier_req->comm = comm;
|
|
|
|
|
|
_STARPU_MPI_INC_POSTED_REQUESTS(1);
|
|
|
- _starpu_mpi_submit_new_mpi_request(barrier_req);
|
|
|
+ _starpu_mpi_submit_ready_request(barrier_req);
|
|
|
|
|
|
/* We wait for the MPI request to finish */
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&barrier_req->req_mutex);
|
|
@@ -785,24 +785,25 @@ static void _starpu_mpi_early_data_cb(void* arg)
|
|
|
free(args);
|
|
|
}
|
|
|
|
|
|
-static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
+static void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
struct _starpu_mpi_req *req = arg;
|
|
|
|
|
|
_STARPU_MPI_INC_POSTED_REQUESTS(-1);
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "calling _starpu_mpi_submit_new_mpi_request with req %p srcdst %d tag %d and type %s\n", req, req->srcdst, req->mpi_tag, _starpu_mpi_request_type(req->request_type));
|
|
|
+ _STARPU_MPI_DEBUG(3, "new req %p srcdst %d tag %d and type %s\n", req, req->srcdst, req->mpi_tag, _starpu_mpi_request_type(req->request_type));
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
if (req->request_type == RECV_REQ)
|
|
|
{
|
|
|
- /* Case : the request is the internal receive request submitted by StarPU-MPI to receive
|
|
|
- * incoming data without a matching pending receive already submitted by the application.
|
|
|
- * We immediately allocate the pointer associated to the data_handle, and pushing it into
|
|
|
- * the list of new_requests, so as the real MPI request can be submitted before the next
|
|
|
- * submission of the envelope-catching request. */
|
|
|
+ /* Case : the request is the internal receive request submitted
|
|
|
+ * by StarPU-MPI to receive incoming data without a matching
|
|
|
+ * early_request from the application. We immediately allocate the
|
|
|
+ * pointer associated to the data_handle, and push it into the
|
|
|
+ * ready_requests list, so as the real MPI request can be submitted
|
|
|
+ * before the next submission of the envelope-catching request. */
|
|
|
if (req->is_internal_req)
|
|
|
{
|
|
|
_starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
|
|
@@ -818,10 +819,12 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld\n", req->count);
|
|
|
}
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
|
|
|
- _starpu_mpi_req_list_push_front(new_requests, req);
|
|
|
+ _STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
|
|
|
+ req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr,
|
|
|
+ _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
|
|
|
+ _starpu_mpi_req_list_push_front(ready_requests, req);
|
|
|
|
|
|
- /* inform the starpu mpi thread that the request has beenbe pushed in the new_requests list */
|
|
|
+ /* inform the starpu mpi thread that the request has been pushed in the ready_requests list */
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&req->posted_mutex);
|
|
|
req->posted = 1;
|
|
@@ -834,10 +837,10 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
/* test whether the receive request has already been submitted internally by StarPU-MPI*/
|
|
|
struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(req->mpi_tag, req->srcdst);
|
|
|
|
|
|
- /* Case : the request has already been submitted internally by StarPU.
|
|
|
- * We'll asynchronously ask a Read permission over the temporary handle, so as when
|
|
|
- * the internal receive will be over, the _starpu_mpi_early_data_cb function will be called to
|
|
|
- * bring the data back to the original data handle associated to the request.*/
|
|
|
+ /* Case: a receive request for a data with the given tag and source has already been
|
|
|
+ * posted by StarPU. Asynchronously requests a Read permission over the temporary handle ,
|
|
|
+ * so as when the internal receive is completed, the _starpu_mpi_early_data_cb function
|
|
|
+ * will be called to bring the data back to the original data handle associated to the request.*/
|
|
|
if (early_data_handle)
|
|
|
{
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
@@ -861,8 +864,7 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
|
|
|
starpu_data_acquire_cb(early_data_handle->handle,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args);
|
|
|
}
|
|
|
- /* Case : a classic receive request with no send received earlier than expected.
|
|
|
- * We just add the pending receive request to the requests' hashmap. */
|
|
|
+ /* Case: no matching data has been received. Store the receive request as an early_request. */
|
|
|
else
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %d) into the request hashmap\n", req, req->srcdst, req->mpi_tag);
|
|
@@ -872,7 +874,7 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _starpu_mpi_req_list_push_front(new_requests, req);
|
|
|
+ _starpu_mpi_req_list_push_front(ready_requests, req);
|
|
|
_STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
|
|
|
req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
|
|
|
}
|
|
@@ -986,7 +988,7 @@ static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-static void _starpu_mpi_handle_new_request(struct _starpu_mpi_req *req)
|
|
|
+static void _starpu_mpi_handle_ready_request(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
STARPU_ASSERT_MSG(req, "Invalid request");
|
|
@@ -1080,10 +1082,10 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
int header_req_submitted = 0;
|
|
|
|
|
|
- while (running || posted_requests || !(_starpu_mpi_req_list_empty(new_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
|
|
|
+ while (running || posted_requests || !(_starpu_mpi_req_list_empty(ready_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
|
|
|
{
|
|
|
/* shall we block ? */
|
|
|
- unsigned block = _starpu_mpi_req_list_empty(new_requests) && _starpu_mpi_early_request_count() == 0;
|
|
|
+ unsigned block = _starpu_mpi_req_list_empty(ready_requests) && _starpu_mpi_early_request_count() == 0;
|
|
|
|
|
|
#ifndef STARPU_MPI_ACTIVITY
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
@@ -1107,21 +1109,22 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
/* get one request */
|
|
|
struct _starpu_mpi_req *req;
|
|
|
- while (!_starpu_mpi_req_list_empty(new_requests))
|
|
|
+ while (!_starpu_mpi_req_list_empty(ready_requests))
|
|
|
{
|
|
|
- req = _starpu_mpi_req_list_pop_back(new_requests);
|
|
|
+ req = _starpu_mpi_req_list_pop_back(ready_requests);
|
|
|
|
|
|
/* handling a request is likely to block for a while
|
|
|
* (on a sync_data_with_mem call), we want to let the
|
|
|
* application submit requests in the meantime, so we
|
|
|
* release the lock. */
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
- _starpu_mpi_handle_new_request(req);
|
|
|
+ _starpu_mpi_handle_ready_request(req);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
}
|
|
|
|
|
|
- /* If there is no currently submitted header_req submitted to catch envelopes from senders, and there is some pending receive
|
|
|
- * requests in our side, we resubmit a header request. */
|
|
|
+ /* If there is no currently submitted header_req submitted to
|
|
|
+ * catch envelopes from senders, and there is some pending
|
|
|
+ * receive requests on our side, we resubmit a header request. */
|
|
|
MPI_Request header_req;
|
|
|
if ((_starpu_mpi_early_request_count() > 0) && (header_req_submitted == 0))// && (HASH_COUNT(_starpu_mpi_early_data_handle_hashmap) == 0))
|
|
|
{
|
|
@@ -1151,11 +1154,14 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
struct _starpu_mpi_req *found_req = _starpu_mpi_early_request_find(recv_env->mpi_tag, status.MPI_SOURCE);
|
|
|
|
|
|
- /* Case : a data will arrive before the matching receive has been submitted in our side of the application.
|
|
|
- * We will allow a temporary handle to store the incoming data, by submitting a starpu_mpi_irecv_detached
|
|
|
- * on this handle, and register this so as the StarPU-MPI layer can remember it.*/
|
|
|
+ /* Case: a data will arrive before a matching receive is
|
|
|
+ * posted by the application. Create a temporary handle to
|
|
|
+ * store the incoming data, submit a starpu_mpi_irecv_detached
|
|
|
+ * on this handle, and store it as an early_data
|
|
|
+ */
|
|
|
if (!found_req)
|
|
|
{
|
|
|
+
|
|
|
_STARPU_MPI_DEBUG(3, "Request with tag %d and source %d not found, creating a early_handle to receive incoming data..\n", recv_env->mpi_tag, status.MPI_SOURCE);
|
|
|
|
|
|
starpu_data_handle_t data_handle = NULL;
|
|
@@ -1198,8 +1204,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
// We wait until the request is pushed in the
|
|
|
- // new_request list, that ensures that the next loop
|
|
|
- // will call _starpu_mpi_handle_new_request
|
|
|
+ // ready_request list, that ensures that the next loop
|
|
|
+ // will call _starpu_mpi_handle_ready_request
|
|
|
// on the request and post the corresponding mpi_irecv,
|
|
|
// otherwise, it may lead to read data as envelop
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
@@ -1214,8 +1220,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_handle->req_mutex);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
}
|
|
|
- /* Case : a matching receive has been found for the incoming data, we handle the correct allocation of the pointer associated to
|
|
|
- * the data handle, then submit the corresponding receive with _starpu_mpi_handle_new_request. */
|
|
|
+ /* Case: a matching application request has been found for
|
|
|
+ * the incoming data, we handle the correct allocation
|
|
|
+ * of the pointer associated to the data handle, then
|
|
|
+ * submit the corresponding receive with
|
|
|
+ * _starpu_mpi_handle_ready_request. */
|
|
|
else
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(3, "A matching receive has been found for the incoming data with tag %d\n", recv_env->mpi_tag);
|
|
@@ -1242,7 +1251,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
* application submit requests in the meantime, so we
|
|
|
* release the lock. */
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
- _starpu_mpi_handle_new_request(found_req);
|
|
|
+ _starpu_mpi_handle_ready_request(found_req);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
}
|
|
|
header_req_submitted = 0;
|
|
@@ -1255,7 +1264,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
}
|
|
|
|
|
|
STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
|
|
|
- STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(new_requests), "List of new requests not empty");
|
|
|
+ STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(ready_requests), "List of ready requests not empty");
|
|
|
STARPU_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
|
|
|
_starpu_mpi_early_request_check_termination();
|
|
|
_starpu_mpi_early_data_check_termination();
|
|
@@ -1326,7 +1335,7 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
|
|
|
STARPU_PTHREAD_MUTEX_INIT(&mutex, NULL);
|
|
|
STARPU_PTHREAD_COND_INIT(&cond_progression, NULL);
|
|
|
STARPU_PTHREAD_COND_INIT(&cond_finished, NULL);
|
|
|
- new_requests = _starpu_mpi_req_list_new();
|
|
|
+ ready_requests = _starpu_mpi_req_list_new();
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_INIT(&detached_requests_mutex, NULL);
|
|
|
detached_requests = _starpu_mpi_req_list_new();
|
|
@@ -1402,7 +1411,7 @@ int starpu_mpi_shutdown(void)
|
|
|
|
|
|
/* free the request queues */
|
|
|
_starpu_mpi_req_list_delete(detached_requests);
|
|
|
- _starpu_mpi_req_list_delete(new_requests);
|
|
|
+ _starpu_mpi_req_list_delete(ready_requests);
|
|
|
|
|
|
_starpu_mpi_comm_amounts_display(rank);
|
|
|
_starpu_mpi_comm_amounts_free();
|
|
@@ -1423,3 +1432,10 @@ void starpu_mpi_data_register(starpu_data_handle_t data_handle, int tag, int ran
|
|
|
_starpu_data_set_unregister_hook(data_handle, _starpu_mpi_clear_cache);
|
|
|
|
|
|
}
|
|
|
+
|
|
|
+int starpu_mpi_world_rank(void)
|
|
|
+{
|
|
|
+ int rank;
|
|
|
+ MPI_Comm_rank(MPI_COMM_WORLD, &rank);
|
|
|
+ return rank;
|
|
|
+}
|