|
|
@@ -29,6 +29,18 @@
|
|
|
# define _STARPU_MPI_DEBUG(fmt, args ...)
|
|
|
#endif
|
|
|
|
|
|
+#ifdef STARPU_MPI_VERBOSE
|
|
|
+# define _STARPU_MPI_LOG_IN() { int rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank); \
|
|
|
+ fprintf(stderr, "[%d][starpu_mpi][%s] -->\n", rank, __func__ ); \
|
|
|
+ fflush(stderr); }
|
|
|
+# define _STARPU_MPI_LOG_OUT() { int rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank); \
|
|
|
+ fprintf(stderr, "[%d][starpu_mpi][%s] <--\n", rank, __func__ ); \
|
|
|
+ fflush(stderr); }
|
|
|
+#else
|
|
|
+# define _STARPU_MPI_LOG_IN()
|
|
|
+# define _STARPU_MPI_LOG_OUT()
|
|
|
+#endif
|
|
|
+
|
|
|
/* TODO find a better way to select the polling method (perhaps during the
|
|
|
* configuration) */
|
|
|
//#define USE_STARPU_ACTIVITY 1
|
|
|
@@ -54,6 +66,7 @@ static int running = 0;
|
|
|
|
|
|
static void starpu_mpi_isend_func(struct starpu_mpi_req_s *req)
|
|
|
{
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
void *ptr = starpu_mpi_handle_to_ptr(req->data_handle);
|
|
|
|
|
|
_STARPU_MPI_DEBUG("post MPI isend tag %x dst %d ptr %p req %p\n", req->mpi_tag, req->srcdst, ptr, &req->request);
|
|
|
@@ -70,6 +83,7 @@ static void starpu_mpi_isend_func(struct starpu_mpi_req_s *req)
|
|
|
req->submitted = 1;
|
|
|
PTHREAD_COND_BROADCAST(&req->req_cond);
|
|
|
PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
}
|
|
|
|
|
|
static struct starpu_mpi_req_s *_starpu_mpi_isend_common(starpu_data_handle data_handle,
|
|
|
@@ -79,6 +93,7 @@ static struct starpu_mpi_req_s *_starpu_mpi_isend_common(starpu_data_handle data
|
|
|
struct starpu_mpi_req_s *req = calloc(1, sizeof(struct starpu_mpi_req_s));
|
|
|
STARPU_ASSERT(req);
|
|
|
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
/* Initialize the request structure */
|
|
|
req->submitted = 0;
|
|
|
req->completed = 0;
|
|
|
@@ -103,11 +118,13 @@ static struct starpu_mpi_req_s *_starpu_mpi_isend_common(starpu_data_handle data
|
|
|
starpu_data_acquire_cb(data_handle, STARPU_R,
|
|
|
submit_mpi_req, (void *)req);
|
|
|
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
return req;
|
|
|
}
|
|
|
|
|
|
int starpu_mpi_isend(starpu_data_handle data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
|
|
|
{
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
STARPU_ASSERT(public_req);
|
|
|
|
|
|
struct starpu_mpi_req_s *req;
|
|
|
@@ -116,6 +133,7 @@ int starpu_mpi_isend(starpu_data_handle data_handle, starpu_mpi_req *public_req,
|
|
|
STARPU_ASSERT(req);
|
|
|
*public_req = req;
|
|
|
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
@@ -126,8 +144,10 @@ int starpu_mpi_isend(starpu_data_handle data_handle, starpu_mpi_req *public_req,
|
|
|
int starpu_mpi_isend_detached(starpu_data_handle data_handle,
|
|
|
int dest, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
|
|
|
{
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
_starpu_mpi_isend_common(data_handle, dest, mpi_tag, comm, 1, callback, arg);
|
|
|
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
@@ -137,6 +157,7 @@ int starpu_mpi_isend_detached(starpu_data_handle data_handle,
|
|
|
|
|
|
static void starpu_mpi_irecv_func(struct starpu_mpi_req_s *req)
|
|
|
{
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
void *ptr = starpu_mpi_handle_to_ptr(req->data_handle);
|
|
|
STARPU_ASSERT(ptr);
|
|
|
|
|
|
@@ -151,10 +172,12 @@ static void starpu_mpi_irecv_func(struct starpu_mpi_req_s *req)
|
|
|
req->submitted = 1;
|
|
|
PTHREAD_COND_BROADCAST(&req->req_cond);
|
|
|
PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
|
|
|
|
static struct starpu_mpi_req_s *_starpu_mpi_irecv_common(starpu_data_handle data_handle, int source, int mpi_tag, MPI_Comm comm, unsigned detached, void (*callback)(void *), void *arg)
|
|
|
{
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
struct starpu_mpi_req_s *req = calloc(1, sizeof(struct starpu_mpi_req_s));
|
|
|
STARPU_ASSERT(req);
|
|
|
|
|
|
@@ -182,11 +205,13 @@ static struct starpu_mpi_req_s *_starpu_mpi_irecv_common(starpu_data_handle data
|
|
|
starpu_data_acquire_cb(data_handle, STARPU_W,
|
|
|
submit_mpi_req, (void *)req);
|
|
|
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
return req;
|
|
|
}
|
|
|
|
|
|
int starpu_mpi_irecv(starpu_data_handle data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
|
|
|
{
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
STARPU_ASSERT(public_req);
|
|
|
|
|
|
struct starpu_mpi_req_s *req;
|
|
|
@@ -195,6 +220,7 @@ int starpu_mpi_irecv(starpu_data_handle data_handle, starpu_mpi_req *public_req,
|
|
|
STARPU_ASSERT(req);
|
|
|
*public_req = req;
|
|
|
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
@@ -204,8 +230,10 @@ int starpu_mpi_irecv(starpu_data_handle data_handle, starpu_mpi_req *public_req,
|
|
|
|
|
|
int starpu_mpi_irecv_detached(starpu_data_handle data_handle, int source, int mpi_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
|
|
|
{
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
_starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 1, callback, arg);
|
|
|
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
@@ -214,14 +242,15 @@ int starpu_mpi_irecv_detached(starpu_data_handle data_handle, int source, int mp
|
|
|
* Recv
|
|
|
*/
|
|
|
|
|
|
-int starpu_mpi_recv(starpu_data_handle data_handle,
|
|
|
- int source, int mpi_tag, MPI_Comm comm, MPI_Status *status)
|
|
|
+int starpu_mpi_recv(starpu_data_handle data_handle, int source, int mpi_tag, MPI_Comm comm, MPI_Status *status)
|
|
|
{
|
|
|
starpu_mpi_req req;
|
|
|
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
starpu_mpi_irecv(data_handle, &req, source, mpi_tag, comm);
|
|
|
starpu_mpi_wait(&req, status);
|
|
|
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
@@ -229,17 +258,18 @@ int starpu_mpi_recv(starpu_data_handle data_handle,
|
|
|
* Send
|
|
|
*/
|
|
|
|
|
|
-int starpu_mpi_send(starpu_data_handle data_handle,
|
|
|
- int dest, int mpi_tag, MPI_Comm comm)
|
|
|
+int starpu_mpi_send(starpu_data_handle data_handle, int dest, int mpi_tag, MPI_Comm comm)
|
|
|
{
|
|
|
starpu_mpi_req req;
|
|
|
MPI_Status status;
|
|
|
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
memset(&status, 0, sizeof(MPI_Status));
|
|
|
|
|
|
starpu_mpi_isend(data_handle, &req, dest, mpi_tag, comm);
|
|
|
starpu_mpi_wait(&req, &status);
|
|
|
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
@@ -249,15 +279,18 @@ int starpu_mpi_send(starpu_data_handle data_handle,
|
|
|
|
|
|
static void starpu_mpi_wait_func(struct starpu_mpi_req_s *waiting_req)
|
|
|
{
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
/* Which is the mpi request we are waiting for ? */
|
|
|
struct starpu_mpi_req_s *req = waiting_req->other_request;
|
|
|
|
|
|
req->ret = MPI_Wait(&req->request, waiting_req->status);
|
|
|
handle_request_termination(req);
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
|
|
|
|
int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
{
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
int ret;
|
|
|
struct starpu_mpi_req_s waiting_req;
|
|
|
memset(&waiting_req, 0, sizeof(struct starpu_mpi_req_s));
|
|
|
@@ -293,6 +326,7 @@ int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
*public_req = NULL;
|
|
|
free(req);
|
|
|
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
@@ -302,6 +336,7 @@ int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
|
|
|
static void starpu_mpi_test_func(struct starpu_mpi_req_s *testing_req)
|
|
|
{
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
/* Which is the mpi request we are testing for ? */
|
|
|
struct starpu_mpi_req_s *req = testing_req->other_request;
|
|
|
|
|
|
@@ -318,10 +353,12 @@ static void starpu_mpi_test_func(struct starpu_mpi_req_s *testing_req)
|
|
|
testing_req->completed = 1;
|
|
|
pthread_cond_signal(&testing_req->req_cond);
|
|
|
PTHREAD_MUTEX_UNLOCK(&testing_req->req_mutex);
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
|
|
|
|
int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
{
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
int ret = 0;
|
|
|
|
|
|
STARPU_ASSERT(public_req);
|
|
|
@@ -372,6 +409,7 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
*flag = 0;
|
|
|
}
|
|
|
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
@@ -381,6 +419,7 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
|
|
|
static void handle_request_termination(struct starpu_mpi_req_s *req)
|
|
|
{
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
MPI_Type_free(&req->datatype);
|
|
|
starpu_data_release(req->data_handle);
|
|
|
|
|
|
@@ -401,16 +440,19 @@ static void handle_request_termination(struct starpu_mpi_req_s *req)
|
|
|
req->completed = 1;
|
|
|
PTHREAD_COND_BROADCAST(&req->req_cond);
|
|
|
PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
|
|
|
|
static void submit_mpi_req(void *arg)
|
|
|
{
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
struct starpu_mpi_req_s *req = arg;
|
|
|
|
|
|
PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
starpu_mpi_req_list_push_front(new_requests, req);
|
|
|
PTHREAD_COND_BROADCAST(&cond);
|
|
|
PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
@@ -438,6 +480,7 @@ static unsigned progression_hook_func(void *arg __attribute__((unused)))
|
|
|
|
|
|
static void test_detached_requests(void)
|
|
|
{
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
int flag;
|
|
|
MPI_Status status;
|
|
|
struct starpu_mpi_req_s *req, *next_req;
|
|
|
@@ -475,10 +518,12 @@ static void test_detached_requests(void)
|
|
|
}
|
|
|
|
|
|
PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
|
|
|
|
static void handle_new_request(struct starpu_mpi_req_s *req)
|
|
|
{
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
STARPU_ASSERT(req);
|
|
|
|
|
|
/* submit the request to MPI */
|
|
|
@@ -498,6 +543,7 @@ static void handle_new_request(struct starpu_mpi_req_s *req)
|
|
|
pthread_cond_signal(&cond);
|
|
|
PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
}
|
|
|
+ _STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
|
|
|
|
static void *progress_thread_func(void *arg __attribute__((unused)))
|