|
@@ -16,6 +16,7 @@
|
|
|
|
|
|
#include <starpu_mpi.h>
|
|
#include <starpu_mpi.h>
|
|
#include <starpu_mpi_datatype.h>
|
|
#include <starpu_mpi_datatype.h>
|
|
|
|
+#include <starpu_mpi_private.h>
|
|
|
|
|
|
/* TODO find a better way to select the polling method (perhaps during the
|
|
/* TODO find a better way to select the polling method (perhaps during the
|
|
* configuration) */
|
|
* configuration) */
|
|
@@ -54,9 +55,13 @@ static void starpu_mpi_isend_func(struct starpu_mpi_req_s *req)
|
|
pthread_mutex_unlock(&req->req_mutex);
|
|
pthread_mutex_unlock(&req->req_mutex);
|
|
}
|
|
}
|
|
|
|
|
|
-int starpu_mpi_isend(starpu_data_handle data_handle, struct starpu_mpi_req_s *req, int dest, int mpi_tag, MPI_Comm comm)
|
|
|
|
|
|
+int starpu_mpi_isend(starpu_data_handle data_handle, starpu_mpi_req *public_req, int dest, int mpi_tag, MPI_Comm comm)
|
|
{
|
|
{
|
|
|
|
+ STARPU_ASSERT(public_req);
|
|
|
|
+
|
|
|
|
+ struct starpu_mpi_req_s *req = starpu_mpi_req_new();
|
|
STARPU_ASSERT(req);
|
|
STARPU_ASSERT(req);
|
|
|
|
+ *public_req = req;
|
|
|
|
|
|
memset(req, 0, sizeof(struct starpu_mpi_req_s));
|
|
memset(req, 0, sizeof(struct starpu_mpi_req_s));
|
|
|
|
|
|
@@ -135,8 +140,14 @@ static void starpu_mpi_irecv_func(struct starpu_mpi_req_s *req)
|
|
pthread_mutex_unlock(&req->req_mutex);
|
|
pthread_mutex_unlock(&req->req_mutex);
|
|
}
|
|
}
|
|
|
|
|
|
-int starpu_mpi_irecv(starpu_data_handle data_handle, struct starpu_mpi_req_s *req, int source, int mpi_tag, MPI_Comm comm)
|
|
|
|
|
|
+int starpu_mpi_irecv(starpu_data_handle data_handle, starpu_mpi_req *public_req, int source, int mpi_tag, MPI_Comm comm)
|
|
{
|
|
{
|
|
|
|
+ STARPU_ASSERT(public_req);
|
|
|
|
+
|
|
|
|
+ struct starpu_mpi_req_s *req = starpu_mpi_req_new();
|
|
|
|
+ STARPU_ASSERT(req);
|
|
|
|
+ *public_req = req;
|
|
|
|
+
|
|
STARPU_ASSERT(req);
|
|
STARPU_ASSERT(req);
|
|
|
|
|
|
memset(req, 0, sizeof(struct starpu_mpi_req_s));
|
|
memset(req, 0, sizeof(struct starpu_mpi_req_s));
|
|
@@ -205,9 +216,7 @@ int starpu_mpi_irecv_detached(starpu_data_handle data_handle, int source, int mp
|
|
int starpu_mpi_recv(starpu_data_handle data_handle,
|
|
int starpu_mpi_recv(starpu_data_handle data_handle,
|
|
int source, int mpi_tag, MPI_Comm comm, MPI_Status *status)
|
|
int source, int mpi_tag, MPI_Comm comm, MPI_Status *status)
|
|
{
|
|
{
|
|
- struct starpu_mpi_req_s req;
|
|
|
|
-
|
|
|
|
- memset(&req, 0, sizeof(struct starpu_mpi_req_s));
|
|
|
|
|
|
+ starpu_mpi_req req;
|
|
|
|
|
|
starpu_mpi_irecv(data_handle, &req, source, mpi_tag, comm);
|
|
starpu_mpi_irecv(data_handle, &req, source, mpi_tag, comm);
|
|
starpu_mpi_wait(&req, status);
|
|
starpu_mpi_wait(&req, status);
|
|
@@ -222,10 +231,9 @@ int starpu_mpi_recv(starpu_data_handle data_handle,
|
|
int starpu_mpi_send(starpu_data_handle data_handle,
|
|
int starpu_mpi_send(starpu_data_handle data_handle,
|
|
int dest, int mpi_tag, MPI_Comm comm)
|
|
int dest, int mpi_tag, MPI_Comm comm)
|
|
{
|
|
{
|
|
- struct starpu_mpi_req_s req;
|
|
|
|
|
|
+ starpu_mpi_req req;
|
|
MPI_Status status;
|
|
MPI_Status status;
|
|
|
|
|
|
- memset(&req, 0, sizeof(struct starpu_mpi_req_s));
|
|
|
|
memset(&status, 0, sizeof(MPI_Status));
|
|
memset(&status, 0, sizeof(MPI_Status));
|
|
|
|
|
|
starpu_mpi_isend(data_handle, &req, dest, mpi_tag, comm);
|
|
starpu_mpi_isend(data_handle, &req, dest, mpi_tag, comm);
|
|
@@ -247,12 +255,14 @@ static void starpu_mpi_wait_func(struct starpu_mpi_req_s *waiting_req)
|
|
handle_request_termination(req);
|
|
handle_request_termination(req);
|
|
}
|
|
}
|
|
|
|
|
|
-int starpu_mpi_wait(struct starpu_mpi_req_s *req, MPI_Status *status)
|
|
|
|
|
|
+int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
{
|
|
{
|
|
int ret;
|
|
int ret;
|
|
struct starpu_mpi_req_s waiting_req;
|
|
struct starpu_mpi_req_s waiting_req;
|
|
memset(&waiting_req, 0, sizeof(struct starpu_mpi_req_s));
|
|
memset(&waiting_req, 0, sizeof(struct starpu_mpi_req_s));
|
|
|
|
|
|
|
|
+ struct starpu_mpi_req_s *req = *public_req;
|
|
|
|
+
|
|
/* We cannot try to complete a MPI request that was not actually posted
|
|
/* We cannot try to complete a MPI request that was not actually posted
|
|
* to MPI yet. */
|
|
* to MPI yet. */
|
|
pthread_mutex_lock(&req->req_mutex);
|
|
pthread_mutex_lock(&req->req_mutex);
|
|
@@ -277,6 +287,10 @@ int starpu_mpi_wait(struct starpu_mpi_req_s *req, MPI_Status *status)
|
|
|
|
|
|
ret = req->ret;
|
|
ret = req->ret;
|
|
|
|
|
|
|
|
+ /* The internal request structure was automatically allocated */
|
|
|
|
+ *public_req = NULL;
|
|
|
|
+ free(req);
|
|
|
|
+
|
|
return ret;
|
|
return ret;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -303,10 +317,14 @@ static void starpu_mpi_test_func(struct starpu_mpi_req_s *testing_req)
|
|
pthread_mutex_unlock(&testing_req->req_mutex);
|
|
pthread_mutex_unlock(&testing_req->req_mutex);
|
|
}
|
|
}
|
|
|
|
|
|
-int starpu_mpi_test(struct starpu_mpi_req_s *req, int *flag, MPI_Status *status)
|
|
|
|
|
|
+int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
{
|
|
{
|
|
int ret = 0;
|
|
int ret = 0;
|
|
|
|
|
|
|
|
+ STARPU_ASSERT(public_req);
|
|
|
|
+
|
|
|
|
+ struct starpu_mpi_req_s *req = *public_req;
|
|
|
|
+
|
|
STARPU_ASSERT(!req->detached);
|
|
STARPU_ASSERT(!req->detached);
|
|
|
|
|
|
pthread_mutex_lock(&req->req_mutex);
|
|
pthread_mutex_lock(&req->req_mutex);
|
|
@@ -336,6 +354,15 @@ int starpu_mpi_test(struct starpu_mpi_req_s *req, int *flag, MPI_Status *status)
|
|
pthread_mutex_unlock(&testing_req.req_mutex);
|
|
pthread_mutex_unlock(&testing_req.req_mutex);
|
|
|
|
|
|
ret = testing_req.ret;
|
|
ret = testing_req.ret;
|
|
|
|
+
|
|
|
|
+ if (*testing_req.flag)
|
|
|
|
+ {
|
|
|
|
+ /* The request was completed so we liberate the
|
|
|
|
+ * internal request structure which was automatically
|
|
|
|
+ * allocated */
|
|
|
|
+ *public_req = NULL;
|
|
|
|
+ free(req);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
*flag = 0;
|
|
*flag = 0;
|
|
@@ -348,7 +375,7 @@ int starpu_mpi_test(struct starpu_mpi_req_s *req, int *flag, MPI_Status *status)
|
|
* Requests
|
|
* Requests
|
|
*/
|
|
*/
|
|
|
|
|
|
-void handle_request_termination(struct starpu_mpi_req_s *req)
|
|
|
|
|
|
+static void handle_request_termination(struct starpu_mpi_req_s *req)
|
|
{
|
|
{
|
|
MPI_Type_free(&req->datatype);
|
|
MPI_Type_free(&req->datatype);
|
|
starpu_release_data_from_mem(req->data_handle);
|
|
starpu_release_data_from_mem(req->data_handle);
|
|
@@ -365,7 +392,7 @@ void handle_request_termination(struct starpu_mpi_req_s *req)
|
|
pthread_mutex_unlock(&req->req_mutex);
|
|
pthread_mutex_unlock(&req->req_mutex);
|
|
}
|
|
}
|
|
|
|
|
|
-void submit_mpi_req(void *arg)
|
|
|
|
|
|
+static void submit_mpi_req(void *arg)
|
|
{
|
|
{
|
|
struct starpu_mpi_req_s *req = arg;
|
|
struct starpu_mpi_req_s *req = arg;
|
|
|
|
|
|
@@ -379,7 +406,7 @@ void submit_mpi_req(void *arg)
|
|
* Scheduler hook
|
|
* Scheduler hook
|
|
*/
|
|
*/
|
|
|
|
|
|
-unsigned progression_hook_func(void *arg __attribute__((unused)))
|
|
|
|
|
|
+static unsigned progression_hook_func(void *arg __attribute__((unused)))
|
|
{
|
|
{
|
|
unsigned may_block = 1;
|
|
unsigned may_block = 1;
|
|
|
|
|
|
@@ -398,7 +425,7 @@ unsigned progression_hook_func(void *arg __attribute__((unused)))
|
|
* Progression loop
|
|
* Progression loop
|
|
*/
|
|
*/
|
|
|
|
|
|
-void test_detached_requests(void)
|
|
|
|
|
|
+static void test_detached_requests(void)
|
|
{
|
|
{
|
|
int flag;
|
|
int flag;
|
|
MPI_Status status;
|
|
MPI_Status status;
|
|
@@ -432,7 +459,7 @@ void test_detached_requests(void)
|
|
pthread_mutex_unlock(&detached_requests_mutex);
|
|
pthread_mutex_unlock(&detached_requests_mutex);
|
|
}
|
|
}
|
|
|
|
|
|
-void handle_new_request(struct starpu_mpi_req_s *req)
|
|
|
|
|
|
+static void handle_new_request(struct starpu_mpi_req_s *req)
|
|
{
|
|
{
|
|
STARPU_ASSERT(req);
|
|
STARPU_ASSERT(req);
|
|
|
|
|
|
@@ -455,7 +482,7 @@ void handle_new_request(struct starpu_mpi_req_s *req)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-void *progress_thread_func(void *arg __attribute__((unused)))
|
|
|
|
|
|
+static void *progress_thread_func(void *arg __attribute__((unused)))
|
|
{
|
|
{
|
|
/* notify the main thread that the progression thread is ready */
|
|
/* notify the main thread that the progression thread is ready */
|
|
pthread_mutex_lock(&mutex);
|
|
pthread_mutex_lock(&mutex);
|
|
@@ -509,7 +536,7 @@ void *progress_thread_func(void *arg __attribute__((unused)))
|
|
*/
|
|
*/
|
|
|
|
|
|
#ifdef USE_STARPU_ACTIVITY
|
|
#ifdef USE_STARPU_ACTIVITY
|
|
-int hookid = - 1;
|
|
|
|
|
|
+static int hookid = - 1;
|
|
#endif
|
|
#endif
|
|
|
|
|
|
int starpu_mpi_initialize(void)
|
|
int starpu_mpi_initialize(void)
|