|
@@ -415,23 +415,6 @@ int starpu_mpi_recv(starpu_data_handle_t data_handle, int source, int mpi_tag, M
|
|
|
/* */
|
|
|
/********************************************************/
|
|
|
|
|
|
-// static void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
|
|
|
-// {
|
|
|
-// _STARPU_MPI_LOG_IN();
|
|
|
-// /* Which is the mpi request we are waiting for ? */
|
|
|
-// struct _starpu_mpi_req *req = waiting_req->other_requestreq_mutex;
|
|
|
-
|
|
|
-// TRACE_MPI_UWAIT_BEGIN(req->srcdst, req->mpi_tag);
|
|
|
-
|
|
|
-// //TODO req->ret = MPI_Wait(&req->request, waiting_req->status);
|
|
|
-// STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Wait returning %d", req->ret);
|
|
|
-
|
|
|
-// TRACE_MPI_UWAIT_END(req->srcdst, req->mpi_tag);
|
|
|
-
|
|
|
-// _starpu_mpi_handle_request_termination(req);
|
|
|
-// _STARPU_MPI_LOG_OUT();
|
|
|
-// }
|
|
|
-
|
|
|
int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
@@ -599,99 +582,6 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,n
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
|
|
|
|
-// static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
-// {
|
|
|
-// _STARPU_MPI_LOG_IN();
|
|
|
-// struct _starpu_mpi_req *req = arg;
|
|
|
-
|
|
|
-// _STARPU_MPI_INC_POSTED_REQUESTS(-1);
|
|
|
-
|
|
|
-// STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
-// _starpu_mpi_req_list_push_front(new_requests, req);
|
|
|
-// newer_requests = 1;
|
|
|
-// _STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
|
|
|
-// req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
|
|
|
-// STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
|
|
|
-// STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
-// _STARPU_MPI_LOG_OUT();
|
|
|
-// }
|
|
|
-/*
|
|
|
-#ifdef STARPU_MPI_ACTIVITY
|
|
|
-static unsigned _starpu_mpi_progression_hook_func(void *arg STARPU_ATTRIBUTE_UNUSED)
|
|
|
-{
|
|
|
- unsigned may_block = 1;
|
|
|
-
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
- if (!_starpu_mpi_req_list_empty(detached_requests))
|
|
|
- {
|
|
|
- STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
|
|
|
- may_block = 0;
|
|
|
- }
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
-
|
|
|
- return may_block;
|
|
|
-}
|
|
|
-*/
|
|
|
-//#endif /* STARPU_MPI_ACTIVITY */
|
|
|
-
|
|
|
-// static void _starpu_mpi_test_detached_requests(void)
|
|
|
-// {
|
|
|
-// _STARPU_MPI_LOG_IN();
|
|
|
-// int flag;
|
|
|
-// MPI_Status status;
|
|
|
-// struct _starpu_mpi_req *req, *next_req;
|
|
|
-
|
|
|
-// STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
|
-
|
|
|
-// for (req = _starpu_mpi_req_list_begin(detached_requests);
|
|
|
-// req != _starpu_mpi_req_list_end(detached_requests);
|
|
|
-// req = next_req)
|
|
|
-// {
|
|
|
-// next_req = _starpu_mpi_req_list_next(req);
|
|
|
-
|
|
|
-// STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
-
|
|
|
-// //_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->request, req->mpi_tag, _starpu_mpi_request_type(req->request_type), req->srcdst);
|
|
|
-// //TODO req->ret = MPI_Test(&req->request, &flag, &status);
|
|
|
-// STARPU_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %d", req->ret);
|
|
|
-
|
|
|
-// if (flag)
|
|
|
-// {
|
|
|
-// if (req->request_type == RECV_REQ)
|
|
|
-// {
|
|
|
-// TRACE_MPI_IRECV_COMPLETE_BEGIN(req->srcdst, req->mpi_tag);
|
|
|
-// }
|
|
|
-// else if (req->request_type == SEND_REQ)
|
|
|
-// {
|
|
|
-// TRACE_MPI_ISEND_COMPLETE_BEGIN(req->srcdst, req->mpi_tag, 0);
|
|
|
-// }
|
|
|
-
|
|
|
-// _starpu_mpi_handle_request_termination(req);
|
|
|
-
|
|
|
-// if (req->request_type == RECV_REQ)
|
|
|
-// {
|
|
|
-// TRACE_MPI_IRECV_COMPLETE_END(req->srcdst, req->mpi_tag);
|
|
|
-// }
|
|
|
-// else if (req->request_type == SEND_REQ)
|
|
|
-// {
|
|
|
-// TRACE_MPI_ISEND_COMPLETE_END(req->srcdst, req->mpi_tag, 0);
|
|
|
-// }
|
|
|
-// }
|
|
|
-
|
|
|
-// STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
|
-
|
|
|
-// if (flag)
|
|
|
-// {
|
|
|
-// _starpu_mpi_req_list_erase(detached_requests, req);
|
|
|
-// free(req);
|
|
|
-// }
|
|
|
-
|
|
|
-// }
|
|
|
-
|
|
|
-// STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
-// _STARPU_MPI_LOG_OUT();
|
|
|
-// }
|
|
|
-
|
|
|
void _starpu_mpi_handle_request_termination_callback(nm_sr_event_t event, const nm_sr_event_info_t*event_info, void*ref){
|
|
|
_starpu_mpi_handle_request_termination(ref,event);
|
|
|
}
|
|
@@ -773,28 +663,13 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
#endif //STARPU_USE_FXT
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- /* notify the main thread that the progression thread is ready */ //Why?
|
|
|
-// STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
-/* STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
-
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
- */
|
|
|
- while (1)// || posted_requests || !(_starpu_mpi_req_list_empty(new_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
|
|
|
+ while (1)
|
|
|
{
|
|
|
-
|
|
|
- /* shall we block ? */
|
|
|
-// unsigned block = _starpu_mpi_req_list_empty(new_requests);
|
|
|
-
|
|
|
-// #ifndef STARPU_MPI_ACTIVITY
|
|
|
-// block = block && _starpu_mpi_req_list_empty(detached_requests);
|
|
|
-// #endif /* STARPU_MPI_ACTIVITY */
|
|
|
struct callback_lfstack_cell_s* c = callback_lfstack_pop(&callback_stack);
|
|
|
int err=0;
|
|
|
|
|
|
if(running || pending_request>0)
|
|
|
- {
|
|
|
+ {/* shall we block ? */
|
|
|
err = starpu_sem_wait(&callback_sem);
|
|
|
//running pending_request can change while waiting
|
|
|
}
|
|
@@ -815,19 +690,6 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // if (block)
|
|
|
- // {
|
|
|
- // _STARPU_MPI_DEBUG(3, "NO MORE REQUESTS TO HANDLE\n");
|
|
|
-
|
|
|
- // TRACE_MPI_SLEEP_BEGIN();
|
|
|
-
|
|
|
- // if (barrier_running)
|
|
|
- // Tell mpi_barrier
|
|
|
- // STARPU_PTHREAD_COND_SIGNAL(&cond_finished);
|
|
|
- // STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
|
|
|
-
|
|
|
- // TRACE_MPI_SLEEP_END();
|
|
|
- // }
|
|
|
|
|
|
c->req->callback(c->req->callback_arg);
|
|
|
if (c->req->detached){
|
|
@@ -843,30 +705,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
free(c);
|
|
|
|
|
|
- // /* test whether there are some terminated "detached request" */
|
|
|
- // STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
- // _starpu_mpi_test_detached_requests();
|
|
|
- // STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
-
|
|
|
- // /* get one request */
|
|
|
- // struct _starpu_mpi_req *req;
|
|
|
- // while (!_starpu_mpi_req_list_empty(new_requests))
|
|
|
- // {
|
|
|
- // req = _starpu_mpi_req_list_pop_back(new_requests);
|
|
|
-
|
|
|
- // /* handling a request is likely to block for a while
|
|
|
- // * (on a sync_data_with_mem call), we want to let the
|
|
|
- // * application submit requests in the meantime, so we
|
|
|
- // * release the lock. */
|
|
|
- // STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
- // _starpu_mpi_handle_new_request(req);
|
|
|
- // STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
- // }
|
|
|
}
|
|
|
STARPU_ASSERT_MSG(callback_lfstack_pop(&callback_stack)==NULL, "List of callback not empty.");
|
|
|
- //STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
|
|
|
- //STARPU_ASSERT_MSG(_starpu_mpi_req_list_empty(new_requests), "List of new requests not empty");
|
|
|
- //STARPU_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
|
|
|
+ STARPU_ASSERT_MSG(pending_request==0, "Request still pending.");
|
|
|
|
|
|
if (argc_argv->initialize_mpi)
|
|
|
{
|
|
@@ -874,7 +715,6 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
MPI_Finalize();
|
|
|
}
|
|
|
|
|
|
- // STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
starpu_sem_destroy(&callback_sem);
|
|
|
free(argc_argv);
|
|
|
return NULL;
|
|
@@ -950,16 +790,6 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
|
|
|
|
|
|
STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
|
|
|
|
|
|
- // STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
- // while (!running)
|
|
|
- // STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
|
|
|
- // STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
-
|
|
|
-// #ifdef STARPU_MPI_ACTIVITY
|
|
|
-// hookid = starpu_progression_hook_register(_starpu_mpi_progression_hook_func, NULL);
|
|
|
-// STARPU_ASSERT_MSG(hookid >= 0, "starpu_progression_hook_register failed");
|
|
|
-// #endif /* STARPU_MPI_ACTIVITY */
|
|
|
-
|
|
|
_starpu_mpi_add_sync_point_in_fxt();
|
|
|
_starpu_mpi_comm_amounts_init(MPI_COMM_WORLD);
|
|
|
_starpu_mpi_cache_init(MPI_COMM_WORLD);
|
|
@@ -1002,17 +832,11 @@ int starpu_mpi_shutdown(void)
|
|
|
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
|
|
|
|
|
|
/* kill the progression thread */
|
|
|
-// STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
running = 0;
|
|
|
starpu_sem_post(&callback_sem);
|
|
|
-// STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
|
|
|
-// STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
- starpu_pthread_join(progress_thread, &value);
|
|
|
|
|
|
-// #ifdef STARPU_MPI_ACTIVITY
|
|
|
-// starpu_progression_hook_deregister(hookid);
|
|
|
-// #endif /* STARPU_MPI_ACTIVITY */
|
|
|
+ starpu_pthread_join(progress_thread, &value);
|
|
|
|
|
|
TRACE_MPI_STOP(rank, world_size);
|
|
|
|