|
@@ -17,6 +17,7 @@
|
|
|
*/
|
|
|
|
|
|
#include <stdlib.h>
|
|
|
+#include <limits.h>
|
|
|
#include <starpu_mpi.h>
|
|
|
#include <starpu_mpi_datatype.h>
|
|
|
#include <starpu_mpi_private.h>
|
|
@@ -37,9 +38,16 @@
|
|
|
#include <datawizard/coherency.h>
|
|
|
#include <core/simgrid.h>
|
|
|
#include <core/task.h>
|
|
|
+#include <core/topology.h>
|
|
|
+#include <core/workers.h>
|
|
|
|
|
|
/* Number of ready requests to process before polling for completed requests */
|
|
|
-#define NREADY_PROCESS 10
|
|
|
+static unsigned nready_process;
|
|
|
+
|
|
|
+/* Number of send requests to submit to MPI at the same time */
|
|
|
+static unsigned ndetached_send;
|
|
|
+
|
|
|
+static int mpi_thread_cpuid = -1;
|
|
|
|
|
|
static void _starpu_mpi_add_sync_point_in_fxt(void);
|
|
|
static void _starpu_mpi_submit_ready_request(void *arg);
|
|
@@ -50,7 +58,7 @@ static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type
|
|
|
#endif
|
|
|
static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
|
|
|
int dest, int data_tag, MPI_Comm comm,
|
|
|
- unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
|
|
|
+ unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg,
|
|
|
int sequential_consistency);
|
|
|
static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle,
|
|
|
int source, int data_tag, MPI_Comm comm,
|
|
@@ -61,10 +69,12 @@ static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req);
|
|
|
static void _starpu_mpi_early_data_cb(void* arg);
|
|
|
|
|
|
/* The list of ready requests */
|
|
|
-static struct _starpu_mpi_req_list *ready_requests;
|
|
|
+static struct _starpu_mpi_req_list ready_recv_requests;
|
|
|
+static struct _starpu_mpi_req_prio_list ready_send_requests;
|
|
|
|
|
|
/* The list of detached requests that have already been submitted to MPI */
|
|
|
-static struct _starpu_mpi_req_list *detached_requests;
|
|
|
+static struct _starpu_mpi_req_list detached_requests;
|
|
|
+static unsigned detached_send_nrequests;
|
|
|
static starpu_pthread_mutex_t detached_requests_mutex;
|
|
|
|
|
|
/* Condition to wake up progression thread */
|
|
@@ -105,6 +115,7 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
|
|
|
|
|
|
/* Initialize the request structure */
|
|
|
(*req)->data_handle = NULL;
|
|
|
+ (*req)->prio = 0;
|
|
|
|
|
|
(*req)->datatype = 0;
|
|
|
(*req)->datatype_name = NULL;
|
|
@@ -144,6 +155,7 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
|
|
|
(*req)->size_req = 0;
|
|
|
(*req)->internal_req = NULL;
|
|
|
(*req)->is_internal_req = 0;
|
|
|
+ (*req)->to_destroy = 1;
|
|
|
(*req)->early_data_handle = NULL;
|
|
|
(*req)->envelope = NULL;
|
|
|
(*req)->sequential_consistency = 1;
|
|
@@ -222,7 +234,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
|
req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr,
|
|
|
req->datatype_name, (int)req->count, req->registered_datatype);
|
|
|
- _starpu_mpi_req_list_push_front(ready_requests, req);
|
|
|
+ _starpu_mpi_req_list_push_front(&ready_recv_requests, req);
|
|
|
|
|
|
/* inform the starpu mpi thread that the request has been pushed in the ready_requests list */
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
@@ -288,7 +300,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
STARPU_ASSERT(req->count);
|
|
|
_STARPU_MPI_MALLOC(req->ptr, req->count);
|
|
|
}
|
|
|
- _starpu_mpi_req_list_push_front(ready_requests, req);
|
|
|
+ _starpu_mpi_req_list_push_front(&ready_recv_requests, req);
|
|
|
_starpu_mpi_request_destroy(sync_req);
|
|
|
}
|
|
|
else
|
|
@@ -301,7 +313,10 @@ static void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _starpu_mpi_req_list_push_front(ready_requests, req);
|
|
|
+ if (req->request_type == SEND_REQ)
|
|
|
+ _starpu_mpi_req_prio_list_push_front(&ready_send_requests, req);
|
|
|
+ else
|
|
|
+ _starpu_mpi_req_list_push_front(&ready_recv_requests, req);
|
|
|
_STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
|
req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr,
|
|
|
req->datatype_name, (int)req->count, req->registered_datatype);
|
|
@@ -323,7 +338,7 @@ static void nop_acquire_cb(void *arg)
|
|
|
|
|
|
static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
|
|
|
int srcdst, int data_tag, MPI_Comm comm,
|
|
|
- unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
|
|
|
+ unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg,
|
|
|
enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
|
|
|
enum starpu_data_access_mode mode,
|
|
|
int sequential_consistency,
|
|
@@ -347,6 +362,8 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
|
|
|
/* Initialize the request structure */
|
|
|
_starpu_mpi_request_init(&req);
|
|
|
req->request_type = request_type;
|
|
|
+ /* prio_list is sorted by increasing values */
|
|
|
+ req->prio = prio;
|
|
|
req->data_handle = data_handle;
|
|
|
req->node_tag.rank = srcdst;
|
|
|
req->node_tag.data_tag = data_tag;
|
|
@@ -358,6 +375,8 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
|
|
|
req->func = func;
|
|
|
req->sequential_consistency = sequential_consistency;
|
|
|
req->is_internal_req = is_internal_req;
|
|
|
+ /* For internal requests, we wait for both the request completion and the matching application request completion */
|
|
|
+ req->to_destroy = !is_internal_req;
|
|
|
req->count = count;
|
|
|
|
|
|
/* Asynchronously request StarPU to fetch the data in main memory: when
|
|
@@ -534,10 +553,10 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
|
|
|
|
|
|
static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t data_handle,
|
|
|
int dest, int data_tag, MPI_Comm comm,
|
|
|
- unsigned detached, unsigned sync, void (*callback)(void *), void *arg,
|
|
|
+ unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg,
|
|
|
int sequential_consistency)
|
|
|
{
|
|
|
- return _starpu_mpi_isend_irecv_common(data_handle, dest, data_tag, comm, detached, sync, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func,
|
|
|
+ return _starpu_mpi_isend_irecv_common(data_handle, dest, data_tag, comm, detached, sync, prio, callback, arg, SEND_REQ, _starpu_mpi_isend_size_func,
|
|
|
#ifdef STARPU_MPI_PEDANTIC_ISEND
|
|
|
STARPU_RW,
|
|
|
#else
|
|
@@ -546,14 +565,14 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_common(starpu_data_handle_t dat
|
|
|
sequential_consistency, 0, 0);
|
|
|
}
|
|
|
|
|
|
-int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, MPI_Comm comm)
|
|
|
+int starpu_mpi_isend_prio(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, int prio, MPI_Comm comm)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_isend needs a valid starpu_mpi_req");
|
|
|
|
|
|
struct _starpu_mpi_req *req;
|
|
|
_STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(dest, data_tag, 0);
|
|
|
- req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 0, 0, NULL, NULL, 1);
|
|
|
+ req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 0, 0, prio, NULL, NULL, 1);
|
|
|
_STARPU_MPI_TRACE_ISEND_COMPLETE_END(dest, data_tag, 0);
|
|
|
|
|
|
STARPU_MPI_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
|
|
@@ -563,16 +582,24 @@ int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
-int starpu_mpi_isend_detached(starpu_data_handle_t data_handle,
|
|
|
- int dest, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
|
|
|
+int starpu_mpi_isend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, MPI_Comm comm)
|
|
|
+{
|
|
|
+ return starpu_mpi_isend_prio(data_handle, public_req, dest, data_tag, 0, comm);
|
|
|
+}
|
|
|
+
|
|
|
+int starpu_mpi_isend_detached_prio(starpu_data_handle_t data_handle, int dest, int data_tag, int prio, MPI_Comm comm, void (*callback)(void *), void *arg)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
- _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 1, 0, callback, arg, 1);
|
|
|
+ _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 1, 0, prio, callback, arg, 1);
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
return 0;
|
|
|
}
|
|
|
+int starpu_mpi_isend_detached(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
|
|
|
+{
|
|
|
+ return starpu_mpi_isend_detached_prio(data_handle, dest, data_tag, 0, comm, callback, arg);
|
|
|
+}
|
|
|
|
|
|
-int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm)
|
|
|
+int starpu_mpi_send_prio(starpu_data_handle_t data_handle, int dest, int data_tag, int prio, MPI_Comm comm)
|
|
|
{
|
|
|
starpu_mpi_req req;
|
|
|
MPI_Status status;
|
|
@@ -580,20 +607,25 @@ int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int data_tag, MP
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
memset(&status, 0, sizeof(MPI_Status));
|
|
|
|
|
|
- starpu_mpi_isend(data_handle, &req, dest, data_tag, comm);
|
|
|
+ starpu_mpi_isend_prio(data_handle, &req, dest, data_tag, prio, comm);
|
|
|
starpu_mpi_wait(&req, &status);
|
|
|
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
-int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, MPI_Comm comm)
|
|
|
+int starpu_mpi_send(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm)
|
|
|
+{
|
|
|
+ return starpu_mpi_send_prio(data_handle, dest, data_tag, 0, comm);
|
|
|
+}
|
|
|
+
|
|
|
+int starpu_mpi_issend_prio(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, int prio, MPI_Comm comm)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_issend needs a valid starpu_mpi_req");
|
|
|
|
|
|
struct _starpu_mpi_req *req;
|
|
|
- req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 0, 1, NULL, NULL, 1);
|
|
|
+ req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 0, 1, prio, NULL, NULL, 1);
|
|
|
|
|
|
STARPU_MPI_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
|
|
|
*public_req = req;
|
|
@@ -602,16 +634,26 @@ int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *public_r
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
-int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
|
|
|
+int starpu_mpi_issend(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int dest, int data_tag, MPI_Comm comm)
|
|
|
+{
|
|
|
+ return starpu_mpi_issend_prio(data_handle, public_req, dest, data_tag, 0, comm);
|
|
|
+}
|
|
|
+
|
|
|
+int starpu_mpi_issend_detached_prio(starpu_data_handle_t data_handle, int dest, int data_tag, int prio, MPI_Comm comm, void (*callback)(void *), void *arg)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
|
|
|
- _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 1, 1, callback, arg, 1);
|
|
|
+ _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 1, 1, prio, callback, arg, 1);
|
|
|
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
+int starpu_mpi_issend_detached(starpu_data_handle_t data_handle, int dest, int data_tag, MPI_Comm comm, void (*callback)(void *), void *arg)
|
|
|
+{
|
|
|
+ return starpu_mpi_issend_detached_prio(data_handle, dest, data_tag, 0, comm, callback, arg);
|
|
|
+}
|
|
|
+
|
|
|
/********************************************************/
|
|
|
/* */
|
|
|
/* receive functionalities */
|
|
@@ -670,7 +712,7 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
|
|
|
|
|
|
static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, int data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count)
|
|
|
{
|
|
|
- return _starpu_mpi_isend_irecv_common(data_handle, source, data_tag, comm, detached, sync, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency, is_internal_req, count);
|
|
|
+ return _starpu_mpi_isend_irecv_common(data_handle, source, data_tag, comm, detached, sync, 0, callback, arg, RECV_REQ, _starpu_mpi_irecv_data_func, STARPU_W, sequential_consistency, is_internal_req, count);
|
|
|
}
|
|
|
|
|
|
int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_req, int source, int data_tag, MPI_Comm comm)
|
|
@@ -795,6 +837,7 @@ int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
|
|
|
/* Initialize the request structure */
|
|
|
_starpu_mpi_request_init(&waiting_req);
|
|
|
+ waiting_req->prio = INT_MAX;
|
|
|
waiting_req->status = status;
|
|
|
waiting_req->other_request = req;
|
|
|
waiting_req->func = _starpu_mpi_wait_func;
|
|
@@ -886,6 +929,7 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
|
|
|
/* Initialize the request structure */
|
|
|
_starpu_mpi_request_init(&testing_req);
|
|
|
+ testing_req->prio = INT_MAX;
|
|
|
testing_req->flag = flag;
|
|
|
testing_req->status = status;
|
|
|
testing_req->other_request = req;
|
|
@@ -978,6 +1022,7 @@ int _starpu_mpi_barrier(MPI_Comm comm)
|
|
|
|
|
|
/* Initialize the request structure */
|
|
|
_starpu_mpi_request_init(&barrier_req);
|
|
|
+ barrier_req->prio = INT_MAX;
|
|
|
barrier_req->func = _starpu_mpi_barrier_func;
|
|
|
barrier_req->request_type = BARRIER_REQ;
|
|
|
barrier_req->node_tag.comm = comm;
|
|
@@ -1013,7 +1058,7 @@ int starpu_mpi_barrier(MPI_Comm comm)
|
|
|
static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type)
|
|
|
{
|
|
|
switch (request_type)
|
|
|
- {
|
|
|
+ {
|
|
|
case SEND_REQ: return "SEND_REQ";
|
|
|
case RECV_REQ: return "RECV_REQ";
|
|
|
case WAIT_REQ: return "WAIT_REQ";
|
|
@@ -1021,7 +1066,7 @@ static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type
|
|
|
case BARRIER_REQ: return "BARRIER_REQ";
|
|
|
case UNKNOWN_REQ: return "UNSET_REQ";
|
|
|
default: return "unknown request type";
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
#endif
|
|
|
|
|
@@ -1137,6 +1182,20 @@ static void _starpu_mpi_early_data_cb(void* arg)
|
|
|
{
|
|
|
if (args->req->detached)
|
|
|
{
|
|
|
+ /* have the internal request destroyed now or when completed */
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&args->req->internal_req->req_mutex);
|
|
|
+ if (args->req->internal_req->to_destroy)
|
|
|
+ {
|
|
|
+ /* The request completed first, can now destroy it */
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&args->req->internal_req->req_mutex);
|
|
|
+ _starpu_mpi_request_destroy(args->req->internal_req);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ /* The request didn't complete yet, tell it to destroy it when it completes */
|
|
|
+ args->req->internal_req->to_destroy = 1;
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&args->req->internal_req->req_mutex);
|
|
|
+ }
|
|
|
_starpu_mpi_handle_request_termination(args->req);
|
|
|
_starpu_mpi_request_destroy(args->req);
|
|
|
}
|
|
@@ -1166,11 +1225,20 @@ static void _starpu_mpi_test_detached_requests(void)
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
|
|
|
|
- req = _starpu_mpi_req_list_begin(detached_requests);
|
|
|
- while (req != _starpu_mpi_req_list_end(detached_requests))
|
|
|
+ if (_starpu_mpi_req_list_empty(&detached_requests))
|
|
|
{
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
+ //_STARPU_MPI_LOG_OUT();
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
+ _STARPU_MPI_TRACE_TESTING_DETACHED_BEGIN();
|
|
|
+ req = _starpu_mpi_req_list_begin(&detached_requests);
|
|
|
+ while (req != _starpu_mpi_req_list_end(&detached_requests))
|
|
|
+ {
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
+
|
|
|
+ _STARPU_MPI_TRACE_TEST_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
|
|
|
//_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->data_request, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->node_tag.rank);
|
|
|
#ifdef STARPU_SIMGRID
|
|
|
req->ret = _starpu_mpi_simgrid_mpi_test(&req->done, &flag);
|
|
@@ -1180,6 +1248,7 @@ static void _starpu_mpi_test_detached_requests(void)
|
|
|
#endif
|
|
|
|
|
|
STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
|
|
|
+ _STARPU_MPI_TRACE_TEST_END(req->node_tag.rank, req->node_tag.data_tag);
|
|
|
|
|
|
if (!flag)
|
|
|
{
|
|
@@ -1193,14 +1262,27 @@ static void _starpu_mpi_test_detached_requests(void)
|
|
|
_STARPU_MPI_TRACE_COMPLETE_BEGIN(req->request_type, req->node_tag.rank, req->node_tag.data_tag);
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
|
- _starpu_mpi_req_list_erase(detached_requests, req);
|
|
|
+ if (req->request_type == SEND_REQ)
|
|
|
+ detached_send_nrequests--;
|
|
|
+ _starpu_mpi_req_list_erase(&detached_requests, req);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
_starpu_mpi_handle_request_termination(req);
|
|
|
|
|
|
_STARPU_MPI_TRACE_COMPLETE_END(req->request_type, req->node_tag.rank, req->node_tag.data_tag);
|
|
|
|
|
|
- if (req->is_internal_req == 0)
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
|
|
|
+ /* We don't want to free internal non-detached
|
|
|
+ requests, we need to get their MPI request before
|
|
|
+ destroying them */
|
|
|
+ if (req->is_internal_req && !req->to_destroy)
|
|
|
{
|
|
|
+ /* We have completed the request, let the application request destroy it */
|
|
|
+ req->to_destroy = 1;
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
_starpu_mpi_request_destroy(req);
|
|
|
}
|
|
|
|
|
@@ -1209,6 +1291,7 @@ static void _starpu_mpi_test_detached_requests(void)
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
|
}
|
|
|
+ _STARPU_MPI_TRACE_TESTING_DETACHED_END();
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
//_STARPU_MPI_LOG_OUT();
|
|
@@ -1221,7 +1304,9 @@ static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req)
|
|
|
/* put the submitted request into the list of pending requests
|
|
|
* so that it can be handled by the progression mechanisms */
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
|
- _starpu_mpi_req_list_push_front(detached_requests, req);
|
|
|
+ if (req->request_type == SEND_REQ)
|
|
|
+ detached_send_nrequests++;
|
|
|
+ _starpu_mpi_req_list_push_back(&detached_requests, req);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
|
|
|
starpu_wake_all_blocked_workers();
|
|
@@ -1305,7 +1390,7 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
|
|
|
|
|
|
// Handle the request immediatly to make sure the mpi_irecv is
|
|
|
// posted before receiving an other envelope
|
|
|
- _starpu_mpi_req_list_erase(ready_requests, early_data_handle->req);
|
|
|
+ _starpu_mpi_req_list_erase(&ready_recv_requests, early_data_handle->req);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
|
_starpu_mpi_handle_ready_request(early_data_handle->req);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
@@ -1318,7 +1403,12 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
starpu_pthread_setname("MPI");
|
|
|
|
|
|
#ifndef STARPU_SIMGRID
|
|
|
+ if (mpi_thread_cpuid >= 0)
|
|
|
+ _starpu_bind_thread_on_cpu(mpi_thread_cpuid, STARPU_NOWORKERID);
|
|
|
_starpu_mpi_do_initialize(argc_argv);
|
|
|
+ if (mpi_thread_cpuid >= 0)
|
|
|
+ /* In case MPI changed the binding */
|
|
|
+ _starpu_bind_thread_on_cpu(mpi_thread_cpuid, STARPU_NOWORKERID);
|
|
|
#endif
|
|
|
|
|
|
_starpu_mpi_fake_world_size = starpu_get_env_number("STARPU_MPI_FAKE_SIZE");
|
|
@@ -1381,13 +1471,13 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
int envelope_request_submitted = 0;
|
|
|
|
|
|
- while (running || posted_requests || !(_starpu_mpi_req_list_empty(ready_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))// || !(_starpu_mpi_early_request_count()) || !(_starpu_mpi_sync_data_count()))
|
|
|
+ while (running || posted_requests || !(_starpu_mpi_req_list_empty(&ready_recv_requests)) || !(_starpu_mpi_req_prio_list_empty(&ready_send_requests)) || !(_starpu_mpi_req_list_empty(&detached_requests)))// || !(_starpu_mpi_early_request_count()) || !(_starpu_mpi_sync_data_count()))
|
|
|
{
|
|
|
#ifdef STARPU_SIMGRID
|
|
|
starpu_pthread_wait_reset(&wait);
|
|
|
#endif
|
|
|
/* shall we block ? */
|
|
|
- unsigned block = _starpu_mpi_req_list_empty(ready_requests) && _starpu_mpi_early_request_count() == 0 && _starpu_mpi_sync_data_count() == 0 && _starpu_mpi_req_list_empty(detached_requests);
|
|
|
+ unsigned block = _starpu_mpi_req_list_empty(&ready_recv_requests) && _starpu_mpi_req_prio_list_empty(&ready_send_requests) && _starpu_mpi_early_request_count() == 0 && _starpu_mpi_sync_data_count() == 0 && _starpu_mpi_req_list_empty(&detached_requests);
|
|
|
|
|
|
if (block)
|
|
|
{
|
|
@@ -1402,17 +1492,38 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
_STARPU_MPI_TRACE_SLEEP_END();
|
|
|
}
|
|
|
|
|
|
- /* get one request */
|
|
|
- int n = 0;
|
|
|
- while (!_starpu_mpi_req_list_empty(ready_requests))
|
|
|
+ /* get one recv request */
|
|
|
+ unsigned n = 0;
|
|
|
+ while (!_starpu_mpi_req_list_empty(&ready_recv_requests))
|
|
|
{
|
|
|
struct _starpu_mpi_req *req;
|
|
|
|
|
|
- if (n++ == NREADY_PROCESS)
|
|
|
- /* Already spent some time on submitting ready requests, poll before processing more ready requests */
|
|
|
+ if (n++ == nready_process)
|
|
|
+ /* Already spent some time on submitting ready recv requests, poll before processing more ready recv requests */
|
|
|
break;
|
|
|
|
|
|
- req = _starpu_mpi_req_list_pop_back(ready_requests);
|
|
|
+ req = _starpu_mpi_req_list_pop_back(&ready_recv_requests);
|
|
|
+
|
|
|
+ /* handling a request is likely to block for a while
|
|
|
+ * (on a sync_data_with_mem call), we want to let the
|
|
|
+ * application submit requests in the meantime, so we
|
|
|
+ * release the lock. */
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
|
+ _starpu_mpi_handle_ready_request(req);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
|
+ }
|
|
|
+
|
|
|
+ /* get one send request */
|
|
|
+ n = 0;
|
|
|
+ while (!_starpu_mpi_req_prio_list_empty(&ready_send_requests) && detached_send_nrequests < ndetached_send)
|
|
|
+ {
|
|
|
+ struct _starpu_mpi_req *req;
|
|
|
+
|
|
|
+ if (n++ == nready_process)
|
|
|
+ /* Already spent some time on submitting ready send requests, poll before processing more ready send requests */
|
|
|
+ break;
|
|
|
+
|
|
|
+ req = _starpu_mpi_req_prio_list_pop_back_highest(&ready_send_requests);
|
|
|
|
|
|
/* handling a request is likely to block for a while
|
|
|
* (on a sync_data_with_mem call), we want to let the
|
|
@@ -1573,8 +1684,10 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
starpu_pthread_wait_destroy(&wait);
|
|
|
#endif
|
|
|
|
|
|
- STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
|
|
|
- STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(ready_requests), "List of ready requests not empty");
|
|
|
+ STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(&detached_requests), "List of detached requests not empty");
|
|
|
+ STARPU_MPI_ASSERT_MSG(detached_send_nrequests == 0, "Number of detached send requests not 0");
|
|
|
+ STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(&ready_recv_requests), "List of ready requests not empty");
|
|
|
+ STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_prio_list_empty(&ready_send_requests), "List of ready requests not empty");
|
|
|
STARPU_MPI_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
|
|
|
_starpu_mpi_early_request_check_termination();
|
|
|
_starpu_mpi_early_data_check_termination();
|
|
@@ -1636,13 +1749,17 @@ int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv)
|
|
|
STARPU_PTHREAD_MUTEX_INIT(&progress_mutex, NULL);
|
|
|
STARPU_PTHREAD_COND_INIT(&progress_cond, NULL);
|
|
|
STARPU_PTHREAD_COND_INIT(&barrier_cond, NULL);
|
|
|
- ready_requests = _starpu_mpi_req_list_new();
|
|
|
+ _starpu_mpi_req_list_init(&ready_recv_requests);
|
|
|
+ _starpu_mpi_req_prio_list_init(&ready_send_requests);
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_INIT(&detached_requests_mutex, NULL);
|
|
|
- detached_requests = _starpu_mpi_req_list_new();
|
|
|
+ _starpu_mpi_req_list_init(&detached_requests);
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_INIT(&mutex_posted_requests, NULL);
|
|
|
_starpu_mpi_comm_debug = starpu_getenv("STARPU_MPI_COMM") != NULL;
|
|
|
+ nready_process = starpu_get_env_number_default("STARPU_MPI_NREADY_PROCESS", 10);
|
|
|
+ ndetached_send = starpu_get_env_number_default("STARPU_MPI_NDETACHED_SEND", 10);
|
|
|
+ mpi_thread_cpuid = starpu_get_env_number_default("STARPU_MPI_THREAD_CPUID", -1);
|
|
|
|
|
|
#ifdef STARPU_SIMGRID
|
|
|
STARPU_PTHREAD_MUTEX_INIT(&wait_counter_mutex, NULL);
|
|
@@ -1691,13 +1808,9 @@ void _starpu_mpi_progress_shutdown(int *value)
|
|
|
(void) value;
|
|
|
MSG_process_sleep(1);
|
|
|
#else
|
|
|
- starpu_pthread_join(progress_thread, (void *)value);
|
|
|
+ STARPU_PTHREAD_JOIN(progress_thread, (void *)value);
|
|
|
#endif
|
|
|
|
|
|
- /* free the request queues */
|
|
|
- _starpu_mpi_req_list_delete(detached_requests);
|
|
|
- _starpu_mpi_req_list_delete(ready_requests);
|
|
|
-
|
|
|
STARPU_PTHREAD_MUTEX_DESTROY(&mutex_posted_requests);
|
|
|
STARPU_PTHREAD_MUTEX_DESTROY(&progress_mutex);
|
|
|
STARPU_PTHREAD_COND_DESTROY(&barrier_cond);
|
|
@@ -1776,7 +1889,8 @@ void starpu_mpi_get_data_on_node_detached(MPI_Comm comm, starpu_data_handle_t da
|
|
|
}
|
|
|
|
|
|
starpu_mpi_comm_rank(comm, &me);
|
|
|
- if (node == rank) return;
|
|
|
+ if (node == rank)
|
|
|
+ return;
|
|
|
|
|
|
tag = starpu_mpi_data_get_tag(data_handle);
|
|
|
if (tag == -1)
|
|
@@ -1817,7 +1931,8 @@ void starpu_mpi_get_data_on_node(MPI_Comm comm, starpu_data_handle_t data_handle
|
|
|
}
|
|
|
|
|
|
starpu_mpi_comm_rank(comm, &me);
|
|
|
- if (node == rank) return;
|
|
|
+ if (node == rank)
|
|
|
+ return;
|
|
|
|
|
|
tag = starpu_mpi_data_get_tag(data_handle);
|
|
|
if (tag == -1)
|
|
@@ -1848,6 +1963,17 @@ void starpu_mpi_get_data_on_node(MPI_Comm comm, starpu_data_handle_t data_handle
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+void starpu_mpi_get_data_on_all_nodes_detached(MPI_Comm comm, starpu_data_handle_t data_handle)
|
|
|
+{
|
|
|
+ int size, i;
|
|
|
+ starpu_mpi_comm_size(comm, &size);
|
|
|
+#ifdef STARPU_DEVEL
|
|
|
+#warning TODO: use binary communication tree to optimize broadcast
|
|
|
+#endif
|
|
|
+ for (i = 0; i < size; i++)
|
|
|
+ starpu_mpi_get_data_on_node_detached(comm, data_handle, i, NULL, NULL);
|
|
|
+}
|
|
|
+
|
|
|
void starpu_mpi_data_migrate(MPI_Comm comm, starpu_data_handle_t data, int new_rank)
|
|
|
{
|
|
|
int old_rank = starpu_mpi_data_get_rank(data);
|