|
@@ -63,10 +63,10 @@ static void starpu_mpi_isend_func(struct starpu_mpi_req_s *req)
|
|
|
TRACE_MPI_ISEND(req->srcdst, req->mpi_tag, 0);
|
|
|
|
|
|
/* somebody is perhaps waiting for the MPI request to be posted */
|
|
|
- pthread_mutex_lock(&req->req_mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&req->req_mutex);
|
|
|
req->submitted = 1;
|
|
|
- pthread_cond_broadcast(&req->req_cond);
|
|
|
- pthread_mutex_unlock(&req->req_mutex);
|
|
|
+ PTHREAD_COND_BROADCAST(&req->req_cond);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
}
|
|
|
|
|
|
int starpu_mpi_isend(starpu_data_handle data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
|
|
@@ -83,7 +83,7 @@ int starpu_mpi_isend(starpu_data_handle data_handle, starpu_mpi_req *public_req,
|
|
|
req->submitted = 0;
|
|
|
req->completed = 0;
|
|
|
pthread_mutex_init(&req->req_mutex, NULL);
|
|
|
- pthread_cond_init(&req->req_cond, NULL);
|
|
|
+ PTHREAD_COND_INIT(&req->req_cond, NULL);
|
|
|
|
|
|
req->request_type = SEND_REQ;
|
|
|
|
|
@@ -116,8 +116,8 @@ int starpu_mpi_isend_detached(starpu_data_handle data_handle,
|
|
|
/* Initialize the request structure */
|
|
|
req->submitted = 0;
|
|
|
req->completed = 0;
|
|
|
- pthread_mutex_init(&req->req_mutex, NULL);
|
|
|
- pthread_cond_init(&req->req_cond, NULL);
|
|
|
+ PTHREAD_MUTEX_INIT(&req->req_mutex, NULL);
|
|
|
+ PTHREAD_COND_INIT(&req->req_cond, NULL);
|
|
|
|
|
|
req->request_type = SEND_REQ;
|
|
|
|
|
@@ -161,10 +161,10 @@ static void starpu_mpi_irecv_func(struct starpu_mpi_req_s *req)
|
|
|
MPI_Irecv(ptr, 1, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
|
|
|
|
|
|
/* somebody is perhaps waiting for the MPI request to be posted */
|
|
|
- pthread_mutex_lock(&req->req_mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&req->req_mutex);
|
|
|
req->submitted = 1;
|
|
|
- pthread_cond_broadcast(&req->req_cond);
|
|
|
- pthread_mutex_unlock(&req->req_mutex);
|
|
|
+ PTHREAD_COND_BROADCAST(&req->req_cond);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
}
|
|
|
|
|
|
int starpu_mpi_irecv(starpu_data_handle data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
|
|
@@ -181,8 +181,8 @@ int starpu_mpi_irecv(starpu_data_handle data_handle, starpu_mpi_req *public_req,
|
|
|
|
|
|
/* Initialize the request structure */
|
|
|
req->submitted = 0;
|
|
|
- pthread_mutex_init(&req->req_mutex, NULL);
|
|
|
- pthread_cond_init(&req->req_cond, NULL);
|
|
|
+ PTHREAD_MUTEX_INIT(&req->req_mutex, NULL);
|
|
|
+ PTHREAD_COND_INIT(&req->req_cond, NULL);
|
|
|
|
|
|
req->request_type = RECV_REQ;
|
|
|
|
|
@@ -214,8 +214,8 @@ int starpu_mpi_irecv_detached(starpu_data_handle data_handle, int source, int mp
|
|
|
|
|
|
/* Initialize the request structure */
|
|
|
req->submitted = 0;
|
|
|
- pthread_mutex_init(&req->req_mutex, NULL);
|
|
|
- pthread_cond_init(&req->req_cond, NULL);
|
|
|
+ PTHREAD_MUTEX_INIT(&req->req_mutex, NULL);
|
|
|
+ PTHREAD_COND_INIT(&req->req_cond, NULL);
|
|
|
|
|
|
req->request_type = RECV_REQ;
|
|
|
|
|
@@ -296,14 +296,14 @@ int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
|
|
|
/* We cannot try to complete a MPI request that was not actually posted
|
|
|
* to MPI yet. */
|
|
|
- pthread_mutex_lock(&req->req_mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&req->req_mutex);
|
|
|
while (!req->submitted)
|
|
|
- pthread_cond_wait(&req->req_cond, &req->req_mutex);
|
|
|
- pthread_mutex_unlock(&req->req_mutex);
|
|
|
+ PTHREAD_COND_WAIT(&req->req_cond, &req->req_mutex);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
|
|
|
/* Initialize the request structure */
|
|
|
- pthread_mutex_init(&waiting_req.req_mutex, NULL);
|
|
|
- pthread_cond_init(&waiting_req.req_cond, NULL);
|
|
|
+ PTHREAD_MUTEX_INIT(&waiting_req.req_mutex, NULL);
|
|
|
+ PTHREAD_COND_INIT(&waiting_req.req_cond, NULL);
|
|
|
waiting_req.status = status;
|
|
|
waiting_req.other_request = req;
|
|
|
waiting_req.func = starpu_mpi_wait_func;
|
|
@@ -311,10 +311,10 @@ int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
submit_mpi_req(&waiting_req);
|
|
|
|
|
|
/* We wait for the MPI request to finish */
|
|
|
- pthread_mutex_lock(&req->req_mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&req->req_mutex);
|
|
|
while (!req->completed)
|
|
|
- pthread_cond_wait(&req->req_cond, &req->req_mutex);
|
|
|
- pthread_mutex_unlock(&req->req_mutex);
|
|
|
+ PTHREAD_COND_WAIT(&req->req_cond, &req->req_mutex);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
|
|
|
ret = req->ret;
|
|
|
|
|
@@ -343,10 +343,10 @@ static void starpu_mpi_test_func(struct starpu_mpi_req_s *testing_req)
|
|
|
handle_request_termination(req);
|
|
|
}
|
|
|
|
|
|
- pthread_mutex_lock(&testing_req->req_mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&testing_req->req_mutex);
|
|
|
testing_req->completed = 1;
|
|
|
pthread_cond_signal(&testing_req->req_cond);
|
|
|
- pthread_mutex_unlock(&testing_req->req_mutex);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&testing_req->req_mutex);
|
|
|
}
|
|
|
|
|
|
int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
@@ -359,9 +359,9 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
|
|
|
STARPU_ASSERT(!req->detached);
|
|
|
|
|
|
- pthread_mutex_lock(&req->req_mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&req->req_mutex);
|
|
|
unsigned submitted = req->submitted;
|
|
|
- pthread_mutex_unlock(&req->req_mutex);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
|
|
|
if (submitted)
|
|
|
{
|
|
@@ -369,8 +369,8 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
memset(&testing_req, 0, sizeof(struct starpu_mpi_req_s));
|
|
|
|
|
|
/* Initialize the request structure */
|
|
|
- pthread_mutex_init(&testing_req.req_mutex, NULL);
|
|
|
- pthread_cond_init(&testing_req.req_cond, NULL);
|
|
|
+ PTHREAD_MUTEX_INIT(&testing_req.req_mutex, NULL);
|
|
|
+ PTHREAD_COND_INIT(&testing_req.req_cond, NULL);
|
|
|
testing_req.flag = flag;
|
|
|
testing_req.status = status;
|
|
|
testing_req.other_request = req;
|
|
@@ -380,10 +380,10 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
submit_mpi_req(&testing_req);
|
|
|
|
|
|
/* We wait for the test request to finish */
|
|
|
- pthread_mutex_lock(&testing_req.req_mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&testing_req.req_mutex);
|
|
|
while (!testing_req.completed)
|
|
|
- pthread_cond_wait(&testing_req.req_cond, &testing_req.req_mutex);
|
|
|
- pthread_mutex_unlock(&testing_req.req_mutex);
|
|
|
+ PTHREAD_COND_WAIT(&testing_req.req_cond, &testing_req.req_mutex);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&testing_req.req_mutex);
|
|
|
|
|
|
ret = testing_req.ret;
|
|
|
|
|
@@ -430,20 +430,20 @@ static void handle_request_termination(struct starpu_mpi_req_s *req)
|
|
|
|
|
|
/* tell anyone potentiallly waiting on the request that it is
|
|
|
* terminated now */
|
|
|
- pthread_mutex_lock(&req->req_mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&req->req_mutex);
|
|
|
req->completed = 1;
|
|
|
- pthread_cond_broadcast(&req->req_cond);
|
|
|
- pthread_mutex_unlock(&req->req_mutex);
|
|
|
+ PTHREAD_COND_BROADCAST(&req->req_cond);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
}
|
|
|
|
|
|
static void submit_mpi_req(void *arg)
|
|
|
{
|
|
|
struct starpu_mpi_req_s *req = arg;
|
|
|
|
|
|
- pthread_mutex_lock(&mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
starpu_mpi_req_list_push_front(new_requests, req);
|
|
|
- pthread_cond_broadcast(&cond);
|
|
|
- pthread_mutex_unlock(&mutex);
|
|
|
+ PTHREAD_COND_BROADCAST(&cond);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -454,13 +454,13 @@ static unsigned progression_hook_func(void *arg __attribute__((unused)))
|
|
|
{
|
|
|
unsigned may_block = 1;
|
|
|
|
|
|
- pthread_mutex_lock(&mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
if (!starpu_mpi_req_list_empty(detached_requests))
|
|
|
{
|
|
|
pthread_cond_signal(&cond);
|
|
|
may_block = 0;
|
|
|
}
|
|
|
- pthread_mutex_unlock(&mutex);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
return may_block;
|
|
|
}
|
|
@@ -475,7 +475,7 @@ static void test_detached_requests(void)
|
|
|
MPI_Status status;
|
|
|
struct starpu_mpi_req_s *req, *next_req;
|
|
|
|
|
|
- pthread_mutex_lock(&detached_requests_mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
|
|
|
|
for (req = starpu_mpi_req_list_begin(detached_requests);
|
|
|
req != starpu_mpi_req_list_end(detached_requests);
|
|
@@ -483,7 +483,7 @@ static void test_detached_requests(void)
|
|
|
{
|
|
|
next_req = starpu_mpi_req_list_next(req);
|
|
|
|
|
|
- pthread_mutex_unlock(&detached_requests_mutex);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
|
|
|
int ret = MPI_Test(&req->request, &flag, &status);
|
|
|
STARPU_ASSERT(ret == MPI_SUCCESS);
|
|
@@ -507,7 +507,7 @@ static void test_detached_requests(void)
|
|
|
handle_request_termination(req);
|
|
|
}
|
|
|
|
|
|
- pthread_mutex_lock(&detached_requests_mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
|
|
|
|
if (flag)
|
|
|
starpu_mpi_req_list_erase(detached_requests, req);
|
|
@@ -518,7 +518,7 @@ static void test_detached_requests(void)
|
|
|
// free(req);
|
|
|
}
|
|
|
|
|
|
- pthread_mutex_unlock(&detached_requests_mutex);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
}
|
|
|
|
|
|
static void handle_new_request(struct starpu_mpi_req_s *req)
|
|
@@ -530,29 +530,29 @@ static void handle_new_request(struct starpu_mpi_req_s *req)
|
|
|
|
|
|
if (req->detached)
|
|
|
{
|
|
|
- pthread_mutex_lock(&mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
starpu_mpi_req_list_push_front(detached_requests, req);
|
|
|
- pthread_mutex_unlock(&mutex);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
starpu_wake_all_blocked_workers();
|
|
|
|
|
|
/* put the submitted request into the list of pending requests
|
|
|
* so that it can be handled by the progression mechanisms */
|
|
|
- pthread_mutex_lock(&mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
pthread_cond_signal(&cond);
|
|
|
- pthread_mutex_unlock(&mutex);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
static void *progress_thread_func(void *arg __attribute__((unused)))
|
|
|
{
|
|
|
/* notify the main thread that the progression thread is ready */
|
|
|
- pthread_mutex_lock(&mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
running = 1;
|
|
|
pthread_cond_signal(&cond);
|
|
|
- pthread_mutex_unlock(&mutex);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
- pthread_mutex_lock(&mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
while (running) {
|
|
|
/* shall we block ? */
|
|
|
unsigned block = starpu_mpi_req_list_empty(new_requests);
|
|
@@ -568,16 +568,16 @@ static void *progress_thread_func(void *arg __attribute__((unused)))
|
|
|
// if (rank == 3)
|
|
|
// fprintf(stderr, "<<<< STARPU MPI >>>> Rank %d NO MORE REQUESTS TO HANDLE\n", rank);
|
|
|
|
|
|
- pthread_cond_wait(&cond, &mutex);
|
|
|
+ PTHREAD_COND_WAIT(&cond, &mutex);
|
|
|
}
|
|
|
|
|
|
if (!running)
|
|
|
break;
|
|
|
|
|
|
/* test whether there are some terminated "detached request" */
|
|
|
- pthread_mutex_unlock(&mutex);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
test_detached_requests();
|
|
|
- pthread_mutex_lock(&mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
/* get one request */
|
|
|
struct starpu_mpi_req_s *req;
|
|
@@ -589,13 +589,13 @@ static void *progress_thread_func(void *arg __attribute__((unused)))
|
|
|
* (on a sync_data_with_mem call), we want to let the
|
|
|
* application submit requests in the meantime, so we
|
|
|
* release the lock. */
|
|
|
- pthread_mutex_unlock(&mutex);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
handle_new_request(req);
|
|
|
- pthread_mutex_lock(&mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- pthread_mutex_unlock(&mutex);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
return NULL;
|
|
|
}
|
|
@@ -645,19 +645,19 @@ static void _starpu_mpi_add_sync_point_in_fxt(void)
|
|
|
|
|
|
int starpu_mpi_initialize(void)
|
|
|
{
|
|
|
- pthread_mutex_init(&mutex, NULL);
|
|
|
- pthread_cond_init(&cond, NULL);
|
|
|
+ PTHREAD_MUTEX_INIT(&mutex, NULL);
|
|
|
+ PTHREAD_COND_INIT(&cond, NULL);
|
|
|
new_requests = starpu_mpi_req_list_new();
|
|
|
|
|
|
- pthread_mutex_init(&detached_requests_mutex, NULL);
|
|
|
+ PTHREAD_MUTEX_INIT(&detached_requests_mutex, NULL);
|
|
|
detached_requests = starpu_mpi_req_list_new();
|
|
|
|
|
|
int ret = pthread_create(&progress_thread, NULL, progress_thread_func, NULL);
|
|
|
|
|
|
- pthread_mutex_lock(&mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
while (!running)
|
|
|
- pthread_cond_wait(&cond, &mutex);
|
|
|
- pthread_mutex_unlock(&mutex);
|
|
|
+ PTHREAD_COND_WAIT(&cond, &mutex);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
#ifdef USE_STARPU_ACTIVITY
|
|
|
hookid = starpu_register_progression_hook(progression_hook_func, NULL);
|
|
@@ -674,10 +674,10 @@ int starpu_mpi_shutdown(void)
|
|
|
void *value;
|
|
|
|
|
|
/* kill the progression thread */
|
|
|
- pthread_mutex_lock(&mutex);
|
|
|
+ PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
running = 0;
|
|
|
- pthread_cond_broadcast(&cond);
|
|
|
- pthread_mutex_unlock(&mutex);
|
|
|
+ PTHREAD_COND_BROADCAST(&cond);
|
|
|
+ PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
pthread_join(progress_thread, &value);
|
|
|
|