|
@@ -61,10 +61,10 @@ static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
|
|
|
static void _starpu_mpi_early_data_cb(void* arg);
|
|
|
|
|
|
/* The list of ready requests */
|
|
|
-static struct _starpu_mpi_req_list *ready_requests;
|
|
|
+static struct _starpu_mpi_req_list ready_requests;
|
|
|
|
|
|
/* The list of detached requests that have already been submitted to MPI */
|
|
|
-static struct _starpu_mpi_req_list *detached_requests;
|
|
|
+static struct _starpu_mpi_req_list detached_requests;
|
|
|
static starpu_pthread_mutex_t detached_requests_mutex;
|
|
|
|
|
|
/* Condition to wake up progression thread */
|
|
@@ -222,7 +222,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
|
req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr,
|
|
|
req->datatype_name, (int)req->count, req->registered_datatype);
|
|
|
- _starpu_mpi_req_list_push_front(ready_requests, req);
|
|
|
+ _starpu_mpi_req_list_push_front(&ready_requests, req);
|
|
|
|
|
|
/* inform the starpu mpi thread that the request has been pushed in the ready_requests list */
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
@@ -288,7 +288,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
STARPU_ASSERT(req->count);
|
|
|
_STARPU_MPI_MALLOC(req->ptr, req->count);
|
|
|
}
|
|
|
- _starpu_mpi_req_list_push_front(ready_requests, req);
|
|
|
+ _starpu_mpi_req_list_push_front(&ready_requests, req);
|
|
|
_starpu_mpi_request_destroy(sync_req);
|
|
|
}
|
|
|
else
|
|
@@ -301,7 +301,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _starpu_mpi_req_list_push_front(ready_requests, req);
|
|
|
+ _starpu_mpi_req_list_push_front(&ready_requests, req);
|
|
|
_STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
|
req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr,
|
|
|
req->datatype_name, (int)req->count, req->registered_datatype);
|
|
@@ -1166,7 +1166,7 @@ static void _starpu_mpi_test_detached_requests(void)
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
|
|
|
|
- if (_starpu_mpi_req_list_empty(detached_requests))
|
|
|
+ if (_starpu_mpi_req_list_empty(&detached_requests))
|
|
|
{
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
//_STARPU_MPI_LOG_OUT();
|
|
@@ -1174,8 +1174,8 @@ static void _starpu_mpi_test_detached_requests(void)
|
|
|
}
|
|
|
|
|
|
_STARPU_MPI_TRACE_TESTING_DETACHED_BEGIN();
|
|
|
- req = _starpu_mpi_req_list_begin(detached_requests);
|
|
|
- while (req != _starpu_mpi_req_list_end(detached_requests))
|
|
|
+ req = _starpu_mpi_req_list_begin(&detached_requests);
|
|
|
+ while (req != _starpu_mpi_req_list_end(&detached_requests))
|
|
|
{
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
|
|
@@ -1203,7 +1203,7 @@ static void _starpu_mpi_test_detached_requests(void)
|
|
|
_STARPU_MPI_TRACE_COMPLETE_BEGIN(req->request_type, req->node_tag.rank, req->node_tag.data_tag);
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
|
- _starpu_mpi_req_list_erase(detached_requests, req);
|
|
|
+ _starpu_mpi_req_list_erase(&detached_requests, req);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
_starpu_mpi_handle_request_termination(req);
|
|
|
|
|
@@ -1232,7 +1232,7 @@ static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req)
|
|
|
/* put the submitted request into the list of pending requests
|
|
|
* so that it can be handled by the progression mechanisms */
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
|
- _starpu_mpi_req_list_push_back(detached_requests, req);
|
|
|
+ _starpu_mpi_req_list_push_back(&detached_requests, req);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
|
|
|
starpu_wake_all_blocked_workers();
|
|
@@ -1316,7 +1316,7 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
|
|
|
|
|
|
// Handle the request immediatly to make sure the mpi_irecv is
|
|
|
// posted before receiving an other envelope
|
|
|
- _starpu_mpi_req_list_erase(ready_requests, early_data_handle->req);
|
|
|
+ _starpu_mpi_req_list_erase(&ready_requests, early_data_handle->req);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
|
_starpu_mpi_handle_ready_request(early_data_handle->req);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
@@ -1392,13 +1392,13 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
int envelope_request_submitted = 0;
|
|
|
|
|
|
- while (running || posted_requests || !(_starpu_mpi_req_list_empty(ready_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))// || !(_starpu_mpi_early_request_count()) || !(_starpu_mpi_sync_data_count()))
|
|
|
+ while (running || posted_requests || !(_starpu_mpi_req_list_empty(&ready_requests)) || !(_starpu_mpi_req_list_empty(&detached_requests)))// || !(_starpu_mpi_early_request_count()) || !(_starpu_mpi_sync_data_count()))
|
|
|
{
|
|
|
#ifdef STARPU_SIMGRID
|
|
|
starpu_pthread_wait_reset(&wait);
|
|
|
#endif
|
|
|
/* shall we block ? */
|
|
|
- unsigned block = _starpu_mpi_req_list_empty(ready_requests) && _starpu_mpi_early_request_count() == 0 && _starpu_mpi_sync_data_count() == 0 && _starpu_mpi_req_list_empty(detached_requests);
|
|
|
+ unsigned block = _starpu_mpi_req_list_empty(&ready_requests) && _starpu_mpi_early_request_count() == 0 && _starpu_mpi_sync_data_count() == 0 && _starpu_mpi_req_list_empty(&detached_requests);
|
|
|
|
|
|
if (block)
|
|
|
{
|
|
@@ -1415,7 +1415,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
/* get one request */
|
|
|
int n = 0;
|
|
|
- while (!_starpu_mpi_req_list_empty(ready_requests))
|
|
|
+ while (!_starpu_mpi_req_list_empty(&ready_requests))
|
|
|
{
|
|
|
struct _starpu_mpi_req *req;
|
|
|
|
|
@@ -1423,7 +1423,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
/* Already spent some time on submitting ready requests, poll before processing more ready requests */
|
|
|
break;
|
|
|
|
|
|
- req = _starpu_mpi_req_list_pop_back(ready_requests);
|
|
|
+ req = _starpu_mpi_req_list_pop_back(&ready_requests);
|
|
|
|
|
|
/* handling a request is likely to block for a while
|
|
|
* (on a sync_data_with_mem call), we want to let the
|
|
@@ -1584,8 +1584,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
starpu_pthread_wait_destroy(&wait);
|
|
|
#endif
|
|
|
|
|
|
- STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
|
|
|
- STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(ready_requests), "List of ready requests not empty");
|
|
|
+ STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(&detached_requests), "List of detached requests not empty");
|
|
|
+ STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(&ready_requests), "List of ready requests not empty");
|
|
|
STARPU_MPI_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
|
|
|
_starpu_mpi_early_request_check_termination();
|
|
|
_starpu_mpi_early_data_check_termination();
|
|
@@ -1647,10 +1647,10 @@ int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv)
|
|
|
STARPU_PTHREAD_MUTEX_INIT(&progress_mutex, NULL);
|
|
|
STARPU_PTHREAD_COND_INIT(&progress_cond, NULL);
|
|
|
STARPU_PTHREAD_COND_INIT(&barrier_cond, NULL);
|
|
|
- ready_requests = _starpu_mpi_req_list_new();
|
|
|
+ _starpu_mpi_req_list_init(&ready_requests);
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_INIT(&detached_requests_mutex, NULL);
|
|
|
- detached_requests = _starpu_mpi_req_list_new();
|
|
|
+ _starpu_mpi_req_list_init(&detached_requests);
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_INIT(&mutex_posted_requests, NULL);
|
|
|
_starpu_mpi_comm_debug = starpu_getenv("STARPU_MPI_COMM") != NULL;
|
|
@@ -1705,10 +1705,6 @@ void _starpu_mpi_progress_shutdown(int *value)
|
|
|
STARPU_PTHREAD_JOIN(progress_thread, (void *)value);
|
|
|
#endif
|
|
|
|
|
|
- /* free the request queues */
|
|
|
- _starpu_mpi_req_list_delete(detached_requests);
|
|
|
- _starpu_mpi_req_list_delete(ready_requests);
|
|
|
-
|
|
|
STARPU_PTHREAD_MUTEX_DESTROY(&mutex_posted_requests);
|
|
|
STARPU_PTHREAD_MUTEX_DESTROY(&progress_mutex);
|
|
|
STARPU_PTHREAD_COND_DESTROY(&barrier_cond);
|