|
@@ -16,6 +16,7 @@
|
|
|
*/
|
|
|
|
|
|
#include <stdlib.h>
|
|
|
+#include <limits.h>
|
|
|
#include <starpu_mpi.h>
|
|
|
#include <starpu_mpi_datatype.h>
|
|
|
#include <starpu_mpi_private.h>
|
|
@@ -24,11 +25,16 @@
|
|
|
#include <starpu_mpi_stats.h>
|
|
|
#include <starpu_mpi_cache.h>
|
|
|
#include <starpu_mpi_select_node.h>
|
|
|
+#include <starpu_mpi_tag.h>
|
|
|
+#include <starpu_mpi_comm.h>
|
|
|
+#include <starpu_mpi_init.h>
|
|
|
#include <common/config.h>
|
|
|
#include <common/thread.h>
|
|
|
#include <datawizard/coherency.h>
|
|
|
#include <nm_sendrecv_interface.h>
|
|
|
#include <nm_mpi_nmad.h>
|
|
|
+#include <core/task.h>
|
|
|
+#include <core/topology.h>
|
|
|
|
|
|
static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_event_t event);
|
|
|
#ifdef STARPU_VERBOSE
|
|
@@ -43,9 +49,16 @@ static struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t dat
|
|
|
static void _starpu_mpi_handle_new_request(void *arg);
|
|
|
|
|
|
static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req);
|
|
|
+static void _starpu_mpi_add_sync_point_in_fxt(void);
|
|
|
+
|
|
|
+static int mpi_thread_cpuid = -1;
|
|
|
+int _starpu_mpi_fake_world_size = -1;
|
|
|
+int _starpu_mpi_fake_world_rank = -1;
|
|
|
|
|
|
/* Condition to wake up waiting for all current MPI requests to finish */
|
|
|
static starpu_pthread_t progress_thread;
|
|
|
+static starpu_pthread_cond_t progress_cond;
|
|
|
+static starpu_pthread_mutex_t progress_mutex;
|
|
|
static volatile int running = 0;
|
|
|
|
|
|
/* Count requests posted by the application and not yet submitted to MPI, i.e pushed into the new_requests list */
|
|
@@ -54,13 +67,73 @@ static volatile int pending_request = 0;
|
|
|
|
|
|
#define REQ_FINALIZED 0x1
|
|
|
|
|
|
-
|
|
|
-
|
|
|
PUK_LFSTACK_TYPE(callback, struct _starpu_mpi_req *req;);
|
|
|
static callback_lfstack_t callback_stack = NULL;
|
|
|
|
|
|
static starpu_sem_t callback_sem;
|
|
|
|
|
|
+static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
|
|
|
+{
|
|
|
+ _STARPU_MPI_CALLOC(*req, 1, sizeof(struct _starpu_mpi_req));
|
|
|
+
|
|
|
+ /* Initialize the request structure */
|
|
|
+ (*req)->data_handle = NULL;
|
|
|
+ (*req)->prio = 0;
|
|
|
+ (*req)->completed = 0;
|
|
|
+
|
|
|
+ (*req)->datatype = 0;
|
|
|
+ (*req)->datatype_name = NULL;
|
|
|
+ (*req)->ptr = NULL;
|
|
|
+ (*req)->count = -1;
|
|
|
+ (*req)->registered_datatype = -1;
|
|
|
+
|
|
|
+ (*req)->node_tag.rank = -1;
|
|
|
+ (*req)->node_tag.data_tag = -1;
|
|
|
+ (*req)->node_tag.comm = 0;
|
|
|
+
|
|
|
+ (*req)->func = NULL;
|
|
|
+
|
|
|
+ (*req)->status = NULL;
|
|
|
+ // (*req)->data_request = 0;
|
|
|
+ (*req)->flag = NULL;
|
|
|
+
|
|
|
+ (*req)->ret = -1;
|
|
|
+ piom_cond_init(&((*req)->req_cond), 0);
|
|
|
+ //STARPU_PTHREAD_MUTEX_INIT(&((*req)->req_mutex), NULL);
|
|
|
+ STARPU_PTHREAD_COND_INIT(&((*req)->req_cond), NULL);
|
|
|
+ // STARPU_PTHREAD_MUTEX_INIT(&((*req)->posted_mutex), NULL);
|
|
|
+ //STARPU_PTHREAD_COND_INIT(&((*req)->posted_cond), NULL);
|
|
|
+
|
|
|
+ (*req)->request_type = UNKNOWN_REQ;
|
|
|
+
|
|
|
+ (*req)->submitted = 0;
|
|
|
+ (*req)->completed = 0;
|
|
|
+ (*req)->posted = 0;
|
|
|
+
|
|
|
+ //(*req)->other_request = NULL;
|
|
|
+
|
|
|
+ (*req)->sync = 0;
|
|
|
+ (*req)->detached = -1;
|
|
|
+ (*req)->callback = NULL;
|
|
|
+ (*req)->callback_arg = NULL;
|
|
|
+
|
|
|
+ // (*req)->size_req = 0;
|
|
|
+ //(*req)->internal_req = NULL;
|
|
|
+ //(*req)->is_internal_req = 0;
|
|
|
+ //(*req)->to_destroy = 1;
|
|
|
+ //(*req)->early_data_handle = NULL;
|
|
|
+ //(*req)->envelope = NULL;
|
|
|
+ (*req)->sequential_consistency = 1;
|
|
|
+ (*req)->pre_sync_jobid = -1;
|
|
|
+ (*req)->post_sync_jobid = -1;
|
|
|
+
|
|
|
+#ifdef STARPU_SIMGRID
|
|
|
+ starpu_pthread_queue_init(&((*req)->queue));
|
|
|
+ starpu_pthread_queue_register(&wait, &((*req)->queue));
|
|
|
+ (*req)->done = 0;
|
|
|
+#endif
|
|
|
+}
|
|
|
+
|
|
|
static void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
piom_cond_destroy(&(req->req_cond));
|
|
@@ -73,6 +146,11 @@ static void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req)
|
|
|
/* */
|
|
|
/********************************************************/
|
|
|
|
|
|
+static void nop_acquire_cb(void *arg)
|
|
|
+{
|
|
|
+ starpu_data_release(arg);
|
|
|
+}
|
|
|
+
|
|
|
static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle_t data_handle,
|
|
|
int srcdst, int data_tag, MPI_Comm comm,
|
|
|
unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg,
|
|
@@ -81,32 +159,34 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
|
|
|
int sequential_consistency)
|
|
|
{
|
|
|
|
|
|
- _STARPU_MPI_LOG_IN();
|
|
|
- struct _starpu_mpi_req *req = calloc(1, sizeof(struct _starpu_mpi_req));
|
|
|
- STARPU_ASSERT_MSG(req, "Invalid request");
|
|
|
+ struct _starpu_mpi_req *req;
|
|
|
+
|
|
|
+ if (_starpu_mpi_fake_world_size != -1)
|
|
|
+ {
|
|
|
+ /* Don't actually do the communication */
|
|
|
+ starpu_data_acquire_on_node_cb_sequential_consistency(data_handle, STARPU_MAIN_RAM, mode, nop_acquire_cb, data_handle, sequential_consistency);
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
|
|
|
+ _STARPU_MPI_LOG_IN();
|
|
|
STARPU_ATOMIC_ADD( &pending_request, 1);
|
|
|
|
|
|
/* Initialize the request structure */
|
|
|
- req->completed = 0;
|
|
|
- piom_cond_init(&req->req_cond, 0);
|
|
|
-
|
|
|
+ _starpu_mpi_request_init(&req);
|
|
|
req->request_type = request_type;
|
|
|
+ /* prio_list is sorted by increasing values */
|
|
|
req->prio = prio;
|
|
|
- req->user_datatype = -1;
|
|
|
- req->count = -1;
|
|
|
req->data_handle = data_handle;
|
|
|
- req->srcdst = srcdst;
|
|
|
- req->mpi_tag = data_tag;
|
|
|
- req->comm = comm;
|
|
|
- nm_mpi_nmad_dest(&req->session, &req->gate, comm, req->srcdst);
|
|
|
-
|
|
|
+ req->node_tag.rank = srcdst;
|
|
|
+ req->node_tag.data_tag = data_tag;
|
|
|
+ req->node_tag.comm = comm;
|
|
|
req->detached = detached;
|
|
|
req->sync = sync;
|
|
|
req->callback = callback;
|
|
|
req->callback_arg = arg;
|
|
|
-
|
|
|
req->func = func;
|
|
|
+ req->sequential_consistency = sequential_consistency;
|
|
|
+ nm_mpi_nmad_dest(&req->session, &req->gate, comm, req->node_tag.rank);
|
|
|
|
|
|
/* Asynchronously request StarPU to fetch the data in main memory: when
|
|
|
* it is available in main memory, _starpu_mpi_submit_new_mpi_request(req) is called and
|
|
@@ -127,11 +207,11 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(2, "post MPI isend request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
|
|
|
+ _STARPU_MPI_DEBUG(30, "post MPI isend request %p type %s tag %d src %d data %p datasize %ld ptr %p datatype '%s' count %d registered_datatype %d sync %d\n", req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, req->datatype_name, (int)req->count, req->registered_datatype, req->sync);
|
|
|
|
|
|
- _starpu_mpi_comm_amounts_inc(req->comm, req->srcdst, req->datatype, req->count);
|
|
|
+ _starpu_mpi_comm_amounts_inc(req->node_tag.comm, req->node_tag.rank, req->datatype, req->count);
|
|
|
|
|
|
- TRACE_MPI_ISEND_SUBMIT_BEGIN(req->srcdst, req->mpi_tag, 0);
|
|
|
+ _STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(req->node_tag.rank, req->node_tag.data_tag, 0);
|
|
|
|
|
|
struct nm_data_s data;
|
|
|
nm_mpi_nmad_data(&data, (void*)req->ptr, req->datatype, req->count);
|
|
@@ -141,17 +221,16 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
|
|
|
|
|
|
if (req->sync == 0)
|
|
|
{
|
|
|
- req->ret = nm_sr_send_isend(req->session, &(req->request), req->gate, req->mpi_tag);
|
|
|
-
|
|
|
+ req->ret = nm_sr_send_isend(req->session, &(req->request), req->gate, req->node_tag.data_tag);
|
|
|
STARPU_ASSERT_MSG(req->ret == NM_ESUCCESS, "MPI_Isend returning %d", req->ret);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- req->ret = nm_sr_send_issend(req->session, &(req->request), req->gate, req->mpi_tag);
|
|
|
+ req->ret = nm_sr_send_issend(req->session, &(req->request), req->gate, req->node_tag.data_tag);
|
|
|
STARPU_ASSERT_MSG(req->ret == NM_ESUCCESS, "MPI_Issend returning %d", req->ret);
|
|
|
}
|
|
|
|
|
|
- TRACE_MPI_ISEND_SUBMIT_END(req->srcdst, req->mpi_tag, starpu_data_get_size(req->data_handle));
|
|
|
+ _STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->node_tag.rank, req->node_tag.data_tag, starpu_data_get_size(req->data_handle), req->pre_sync_jobid);
|
|
|
|
|
|
_starpu_mpi_handle_pending_request(req);
|
|
|
|
|
@@ -160,8 +239,9 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
|
|
|
|
|
|
static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
- _starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
|
|
|
- if (req->user_datatype == 0)
|
|
|
+ _starpu_mpi_datatype_allocate(req->data_handle, req);
|
|
|
+
|
|
|
+ if (req->registered_datatype == 1)
|
|
|
{
|
|
|
req->waited = 1;
|
|
|
req->count = 1;
|
|
@@ -179,12 +259,12 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
|
|
|
if (psize != -1)
|
|
|
{
|
|
|
// We already know the size of the data, let's send it to overlap with the packing of the data
|
|
|
- _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (first call to pack)\n", psize, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->mpi_tag, req->srcdst); req->count = psize;
|
|
|
+ _STARPU_MPI_DEBUG(20, "Sending size %ld (%ld %s) to node %d (first call to pack)\n", psize, sizeof(req->count), "MPI_BYTE", req->node_tag.rank);
|
|
|
+ req->count = psize;
|
|
|
//ret = nm_sr_isend(nm_mpi_communicator_get_session(p_req->p_comm),nm_mpi_communicator_get_gate(p_comm,req->srcdst), req->mpi_tag,&req->count, sizeof(req->count), &req->size_req);
|
|
|
- ret = nm_sr_isend(req->session,req->gate, req->mpi_tag,&req->count, sizeof(req->count), &req->size_req);
|
|
|
+ ret = nm_sr_isend(req->session,req->gate, req->node_tag.data_tag,&req->count, sizeof(req->count), &req->size_req);
|
|
|
|
|
|
-
|
|
|
- // ret = MPI_Isend(&req->count, sizeof(req->count), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->size_req);
|
|
|
+ // ret = MPI_Isend(&req->count, sizeof(req->count), MPI_BYTE, req->srcdst, req->mpi_tag, req->comm, &req->size_req);
|
|
|
STARPU_ASSERT_MSG(ret == NM_ESUCCESS, "when sending size, nm_sr_isend returning %d", ret);
|
|
|
}
|
|
|
|
|
@@ -193,8 +273,8 @@ static void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
|
|
|
if (psize == -1)
|
|
|
{
|
|
|
// We know the size now, let's send it
|
|
|
- _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", req->count, sizeof(req->count), _starpu_mpi_datatype(MPI_BYTE), req->mpi_tag, req->srcdst);
|
|
|
- ret = nm_sr_isend(req->session,req->gate, req->mpi_tag,&req->count, sizeof(req->count), &req->size_req);
|
|
|
+ _STARPU_MPI_DEBUG(1, "Sending size %ld (%ld %s) with tag %d to node %d (second call to pack)\n", req->count, sizeof(req->count), "MPI_BYTE", req->node_tag.data_tag, req->node_tag.rank);
|
|
|
+ ret = nm_sr_isend(req->session,req->gate, req->node_tag.data_tag,&req->count, sizeof(req->count), &req->size_req);
|
|
|
STARPU_ASSERT_MSG(ret == NM_ESUCCESS, "when sending size, nm_sr_isend returning %d", ret);
|
|
|
}
|
|
|
else
|
|
@@ -221,9 +301,9 @@ int starpu_mpi_isend_prio(starpu_data_handle_t data_handle, starpu_mpi_req *publ
|
|
|
STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_isend needs a valid starpu_mpi_req");
|
|
|
|
|
|
struct _starpu_mpi_req *req;
|
|
|
- TRACE_MPI_ISEND_COMPLETE_BEGIN(dest, data_tag, 0);
|
|
|
+ _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(dest, data_tag, 0);
|
|
|
req = _starpu_mpi_isend_common(data_handle, dest, data_tag, comm, 0, 0, prio, NULL, NULL);
|
|
|
- TRACE_MPI_ISEND_COMPLETE_END(dest, data_tag, 0);
|
|
|
+ _STARPU_MPI_TRACE_ISEND_COMPLETE_END(dest, data_tag, 0);
|
|
|
|
|
|
STARPU_MPI_ASSERT_MSG(req, "Invalid return for _starpu_mpi_isend_common");
|
|
|
*public_req = req;
|
|
@@ -315,18 +395,18 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(2, "post MPI irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
|
|
|
+ _STARPU_MPI_DEBUG(20, "post MPI irecv request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr, req->datatype_name, (int)req->count, req->registered_datatype);
|
|
|
|
|
|
- TRACE_MPI_IRECV_SUBMIT_BEGIN(req->srcdst, req->mpi_tag);
|
|
|
+ _STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
|
|
|
|
|
|
//req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->srcdst, req->mpi_tag, req->comm, &req->request);
|
|
|
struct nm_data_s data;
|
|
|
nm_mpi_nmad_data(&data, (void*)req->ptr, req->datatype, req->count);
|
|
|
nm_sr_recv_init(req->session, &(req->request));
|
|
|
nm_sr_recv_unpack_data(req->session, &(req->request), &data);
|
|
|
- nm_sr_recv_irecv(req->session, &(req->request), req->gate, req->mpi_tag,NM_TAG_MASK_FULL);
|
|
|
+ nm_sr_recv_irecv(req->session, &(req->request), req->gate, req->node_tag.data_tag, NM_TAG_MASK_FULL);
|
|
|
|
|
|
- TRACE_MPI_IRECV_SUBMIT_END(req->srcdst, req->mpi_tag);
|
|
|
+ _STARPU_MPI_TRACE_IRECV_SUBMIT_END(req->node_tag.rank, req->node_tag.data_tag);
|
|
|
|
|
|
_starpu_mpi_handle_pending_request(req);
|
|
|
|
|
@@ -354,8 +434,8 @@ static void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
|
|
|
- _starpu_mpi_handle_allocate_datatype(req->data_handle, &req->datatype, &req->user_datatype);
|
|
|
- if (req->user_datatype == 0)
|
|
|
+ _starpu_mpi_datatype_allocate(req->data_handle, req);
|
|
|
+ if (req->registered_datatype == 1)
|
|
|
{
|
|
|
req->count = 1;
|
|
|
req->ptr = starpu_data_get_local_ptr(req->data_handle);
|
|
@@ -366,8 +446,8 @@ static void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
|
|
|
struct _starpu_mpi_irecv_size_callback *callback = malloc(sizeof(struct _starpu_mpi_irecv_size_callback));
|
|
|
callback->req = req;
|
|
|
starpu_variable_data_register(&callback->handle, 0, (uintptr_t)&(callback->req->count), sizeof(callback->req->count));
|
|
|
- _STARPU_MPI_DEBUG(4, "Receiving size with tag %d from node %d\n", req->mpi_tag, req->srcdst);
|
|
|
- _starpu_mpi_irecv_common(callback->handle, req->srcdst, req->mpi_tag, req->comm, 1, 0, _starpu_mpi_irecv_size_callback, callback,1);
|
|
|
+ _STARPU_MPI_DEBUG(4, "Receiving size with tag %d from node %d\n", req->node_tag.data_tag, req->node_tag.rank);
|
|
|
+ _starpu_mpi_irecv_common(callback->handle, req->node_tag.rank, req->node_tag.data_tag, req->node_tag.comm, 1, 0, _starpu_mpi_irecv_size_callback, callback,1);
|
|
|
}
|
|
|
|
|
|
}
|
|
@@ -383,9 +463,9 @@ int starpu_mpi_irecv(starpu_data_handle_t data_handle, starpu_mpi_req *public_re
|
|
|
STARPU_ASSERT_MSG(public_req, "starpu_mpi_irecv needs a valid starpu_mpi_req");
|
|
|
|
|
|
struct _starpu_mpi_req *req;
|
|
|
- TRACE_MPI_IRECV_COMPLETE_BEGIN(source, mpi_tag);
|
|
|
+ _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(source, mpi_tag);
|
|
|
req = _starpu_mpi_irecv_common(data_handle, source, mpi_tag, comm, 0, 0, NULL, NULL,1);
|
|
|
- TRACE_MPI_IRECV_COMPLETE_END(source, mpi_tag);
|
|
|
+ _STARPU_MPI_TRACE_IRECV_COMPLETE_END(source, mpi_tag);
|
|
|
|
|
|
STARPU_ASSERT_MSG(req, "Invalid return for _starpu_mpi_irecv_common");
|
|
|
*public_req = req;
|
|
@@ -444,10 +524,8 @@ int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
struct _starpu_mpi_req *req = *public_req;
|
|
|
STARPU_MPI_ASSERT_MSG(!req->detached, "MPI_Wait cannot be called on a detached request");
|
|
|
|
|
|
-
|
|
|
-/* we must do a test_locked to avoid race condition :
|
|
|
- * without req_cond could still be used and couldn't be freed)*/
|
|
|
-
|
|
|
+ /* we must do a test_locked to avoid race condition :
|
|
|
+ * without req_cond could still be used and couldn't be freed)*/
|
|
|
while (!req->completed || ! piom_cond_test_locked(&(req->req_cond),REQ_FINALIZED))
|
|
|
{
|
|
|
piom_cond_wait(&(req->req_cond),REQ_FINALIZED);
|
|
@@ -468,26 +546,25 @@ int starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
/* */
|
|
|
/********************************************************/
|
|
|
|
|
|
-
|
|
|
int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
{
|
|
|
-
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
STARPU_MPI_ASSERT_MSG(public_req, "starpu_mpi_test needs a valid starpu_mpi_req");
|
|
|
struct _starpu_mpi_req *req = *public_req;
|
|
|
STARPU_MPI_ASSERT_MSG(!req->detached, "MPI_Test cannot be called on a detached request");
|
|
|
- _STARPU_MPI_DEBUG(2, "Test request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
|
|
|
- req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
|
|
|
+ _STARPU_MPI_DEBUG(2, "Test request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
|
+ req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr, req->datatype_name, (int)req->count, req->registered_datatype);
|
|
|
|
|
|
- TRACE_MPI_UTESTING_BEGIN(req->srcdst, req->mpi_tag);
|
|
|
-
|
|
|
-/* we must do a test_locked to avoid race condition :
|
|
|
- * without req_cond could still be used and couldn't be freed)*/
|
|
|
+ _STARPU_MPI_TRACE_UTESTING_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
|
|
|
|
|
|
+ /* we must do a test_locked to avoid race condition :
|
|
|
+ * without req_cond could still be used and couldn't be freed)*/
|
|
|
*flag = req->completed && piom_cond_test_locked(&(req->req_cond),REQ_FINALIZED);
|
|
|
if (*flag && status!=MPI_STATUS_IGNORE)
|
|
|
_starpu_mpi_req_status(req,status);
|
|
|
- TRACE_MPI_UTESTING_END(req->srcdst, req->mpi_tag);
|
|
|
+
|
|
|
+ _STARPU_MPI_TRACE_UTESTING_END(req->node_tag.rank, req->node_tag.data_tag);
|
|
|
+
|
|
|
if(*flag)
|
|
|
{
|
|
|
_starpu_mpi_request_destroy(req);
|
|
@@ -507,7 +584,7 @@ int starpu_mpi_barrier(MPI_Comm comm)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
int ret;
|
|
|
-// STARPU_ASSERT_MSG(!barrier_running, "Concurrent starpu_mpi_barrier is not implemented, even on different communicators");
|
|
|
+ // STARPU_ASSERT_MSG(!barrier_running, "Concurrent starpu_mpi_barrier is not implemented, even on different communicators");
|
|
|
ret = MPI_Barrier(comm);
|
|
|
|
|
|
STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Barrier returning %d", ret);
|
|
@@ -539,15 +616,14 @@ static char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type
|
|
|
|
|
|
static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_event_t event)
|
|
|
{
|
|
|
-
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
|
|
|
- req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
|
|
|
+ _STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
|
+ req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr, req->datatype_name, (int)req->count, req->registered_datatype);
|
|
|
|
|
|
if (req->request_type == RECV_REQ || req->request_type == SEND_REQ)
|
|
|
{
|
|
|
- if (req->user_datatype == 1)
|
|
|
+ if (req->registered_datatype == 0)
|
|
|
{
|
|
|
if (req->request_type == SEND_REQ)
|
|
|
{
|
|
@@ -569,7 +645,7 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,n
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _starpu_mpi_handle_free_datatype(req->data_handle, &req->datatype);
|
|
|
+ _starpu_mpi_datatype_free(req->data_handle, &req->datatype);
|
|
|
}
|
|
|
starpu_data_release(req->data_handle);
|
|
|
}
|
|
@@ -614,11 +690,9 @@ void _starpu_mpi_handle_request_termination_callback(nm_sr_event_t event, const
|
|
|
|
|
|
static void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
-
|
|
|
if(req->request_type == SEND_REQ && req->waited>1)
|
|
|
{
|
|
|
nm_sr_request_set_ref(&(req->size_req), req);
|
|
|
-
|
|
|
nm_sr_request_monitor(req->session, &(req->size_req), NM_SR_EVENT_FINALIZED,_starpu_mpi_handle_request_termination_callback);
|
|
|
}
|
|
|
/* the if must be before, because the first callback can directly free
|
|
@@ -635,20 +709,13 @@ static void _starpu_mpi_handle_new_request(void *arg)
|
|
|
STARPU_ASSERT_MSG(req, "Invalid request");
|
|
|
|
|
|
/* submit the request to MPI */
|
|
|
- _STARPU_MPI_DEBUG(2, "Handling new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d user_datatype %d \n",
|
|
|
- req, _starpu_mpi_request_type(req->request_type), req->mpi_tag, req->srcdst, req->data_handle, req->ptr, _starpu_mpi_datatype(req->datatype), (int)req->count, req->user_datatype);
|
|
|
+ _STARPU_MPI_DEBUG(2, "Handling new request %p type %s tag %d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
|
+ req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr, req->datatype_name, (int)req->count, req->registered_datatype);
|
|
|
req->func(req);
|
|
|
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
|
|
|
|
-struct _starpu_mpi_argc_argv
|
|
|
-{
|
|
|
- int initialize_mpi;
|
|
|
- int *argc;
|
|
|
- char ***argv;
|
|
|
-};
|
|
|
-
|
|
|
static void _starpu_mpi_print_thread_level_support(int thread_level, char *msg)
|
|
|
{
|
|
|
switch (thread_level)
|
|
@@ -675,22 +742,61 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
{
|
|
|
struct _starpu_mpi_argc_argv *argc_argv = (struct _starpu_mpi_argc_argv *) arg;
|
|
|
|
|
|
+ starpu_pthread_setname("MPI");
|
|
|
+
|
|
|
+#ifndef STARPU_SIMGRID
|
|
|
+ if (mpi_thread_cpuid >= 0)
|
|
|
+ _starpu_bind_thread_on_cpu(mpi_thread_cpuid, STARPU_NOWORKERID);
|
|
|
+ _starpu_mpi_do_initialize(argc_argv);
|
|
|
+ if (mpi_thread_cpuid >= 0)
|
|
|
+ /* In case MPI changed the binding */
|
|
|
+ _starpu_bind_thread_on_cpu(mpi_thread_cpuid, STARPU_NOWORKERID);
|
|
|
+#endif
|
|
|
+
|
|
|
+ _starpu_mpi_fake_world_size = starpu_get_env_number("STARPU_MPI_FAKE_SIZE");
|
|
|
+ _starpu_mpi_fake_world_rank = starpu_get_env_number("STARPU_MPI_FAKE_RANK");
|
|
|
+
|
|
|
+#ifdef STARPU_SIMGRID
|
|
|
+ /* Now that MPI is set up, let the rest of simgrid get initialized */
|
|
|
+ char **argv_cpy;
|
|
|
+ _STARPU_MPI_MALLOC(argv_cpy, *(argc_argv->argc) * sizeof(char*));
|
|
|
+ int i;
|
|
|
+ for (i = 0; i < *(argc_argv->argc); i++)
|
|
|
+ argv_cpy[i] = strdup((*(argc_argv->argv))[i]);
|
|
|
+ MSG_process_create_with_arguments("main", smpi_simulated_main_, NULL, _starpu_simgrid_get_host_by_name("MAIN"), *(argc_argv->argc), argv_cpy);
|
|
|
+ /* And set TSD for us */
|
|
|
+ void **tsd;
|
|
|
+ _STARPU_CALLOC(tsd, MAX_TSD + 1, sizeof(void*));
|
|
|
+ if (!smpi_process_set_user_data)
|
|
|
{
|
|
|
- int provided;
|
|
|
- MPI_Query_thread(&provided);
|
|
|
- _starpu_mpi_print_thread_level_support(provided, " has been initialized with");
|
|
|
+ _STARPU_ERROR("Your version of simgrid does not provide smpi_process_set_user_data, we can not continue without it\n");
|
|
|
}
|
|
|
+ smpi_process_set_user_data(tsd);
|
|
|
+#endif
|
|
|
+
|
|
|
+#ifdef STARPU_USE_FXT
|
|
|
+ _starpu_fxt_wait_initialisation();
|
|
|
+#endif //STARPU_USE_FXT
|
|
|
|
|
|
{
|
|
|
- int rank, worldsize;
|
|
|
- MPI_Comm_rank(MPI_COMM_WORLD, &rank);
|
|
|
- MPI_Comm_size(MPI_COMM_WORLD, &worldsize);
|
|
|
- TRACE_MPI_START(rank, worldsize);
|
|
|
+ _STARPU_MPI_TRACE_START(argc_argv->rank, argc_argv->world_size);
|
|
|
#ifdef STARPU_USE_FXT
|
|
|
- starpu_profiling_set_id(rank);
|
|
|
+ starpu_profiling_set_id(argc_argv->rank);
|
|
|
#endif //STARPU_USE_FXT
|
|
|
}
|
|
|
|
|
|
+ _starpu_mpi_add_sync_point_in_fxt();
|
|
|
+ _starpu_mpi_comm_amounts_init(argc_argv->comm);
|
|
|
+ _starpu_mpi_cache_init(argc_argv->comm);
|
|
|
+ _starpu_mpi_select_node_init();
|
|
|
+ _starpu_mpi_datatype_init();
|
|
|
+
|
|
|
+ /* notify the main thread that the progression thread is ready */
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
|
+ running = 1;
|
|
|
+ STARPU_PTHREAD_COND_SIGNAL(&progress_cond);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
|
+
|
|
|
while (1)
|
|
|
{
|
|
|
struct callback_lfstack_cell_s* c = callback_lfstack_pop(&callback_stack);
|
|
@@ -712,7 +818,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- if (pending_request==0)
|
|
|
+ if (pending_request==0)
|
|
|
break;
|
|
|
}
|
|
|
continue;
|
|
@@ -733,12 +839,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
STARPU_ATOMIC_ADD( &pending_request, -1);
|
|
|
/* we signal that the request is completed.*/
|
|
|
|
|
|
-
|
|
|
free(c);
|
|
|
|
|
|
}
|
|
|
- STARPU_ASSERT_MSG(callback_lfstack_pop(&callback_stack)==NULL, "List of callback not empty.");
|
|
|
- STARPU_ASSERT_MSG(pending_request==0, "Request still pending.");
|
|
|
+ STARPU_ASSERT_MSG(callback_lfstack_pop(&callback_stack)==NULL, "List of callback not empty.");
|
|
|
+ STARPU_ASSERT_MSG(pending_request==0, "Request still pending.");
|
|
|
|
|
|
if (argc_argv->initialize_mpi)
|
|
|
{
|
|
@@ -768,11 +873,11 @@ static void _starpu_mpi_add_sync_point_in_fxt(void)
|
|
|
int worldsize;
|
|
|
int ret;
|
|
|
|
|
|
- MPI_Comm_rank(MPI_COMM_WORLD, &rank);
|
|
|
- MPI_Comm_size(MPI_COMM_WORLD, &worldsize);
|
|
|
+ starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
|
|
|
+ starpu_mpi_comm_size(MPI_COMM_WORLD, &worldsize);
|
|
|
|
|
|
ret = MPI_Barrier(MPI_COMM_WORLD);
|
|
|
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Barrier returning %d", ret);
|
|
|
+ STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Barrier returning %s", _starpu_mpi_get_mpi_error_code(ret));
|
|
|
|
|
|
/* We generate a "unique" key so that we can make sure that different
|
|
|
* FxT traces come from the same MPI run. */
|
|
@@ -787,102 +892,52 @@ static void _starpu_mpi_add_sync_point_in_fxt(void)
|
|
|
}
|
|
|
|
|
|
ret = MPI_Bcast(&random_number, 1, MPI_INT, 0, MPI_COMM_WORLD);
|
|
|
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Bcast returning %d", ret);
|
|
|
+ STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Bcast returning %s", _starpu_mpi_get_mpi_error_code(ret));
|
|
|
|
|
|
- TRACE_MPI_BARRIER(rank, worldsize, random_number);
|
|
|
+ _STARPU_MPI_TRACE_BARRIER(rank, worldsize, random_number);
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "unique key %x\n", random_number);
|
|
|
#endif
|
|
|
}
|
|
|
|
|
|
-static
|
|
|
-int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi)
|
|
|
+int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv)
|
|
|
{
|
|
|
-
|
|
|
- struct _starpu_mpi_argc_argv *argc_argv = malloc(sizeof(struct _starpu_mpi_argc_argv));
|
|
|
- argc_argv->initialize_mpi = initialize_mpi;
|
|
|
- argc_argv->argc = argc;
|
|
|
- argc_argv->argv = argv;
|
|
|
-
|
|
|
-
|
|
|
- if (initialize_mpi)
|
|
|
- {
|
|
|
- int thread_support;
|
|
|
- _STARPU_DEBUG("Calling MPI_Init_thread\n");
|
|
|
- if (MPI_Init_thread(argc_argv->argc, argc_argv->argv, MPI_THREAD_SERIALIZED, &thread_support) != MPI_SUCCESS)
|
|
|
- {
|
|
|
- _STARPU_ERROR("MPI_Init_thread failed\n");
|
|
|
- }
|
|
|
- _starpu_mpi_print_thread_level_support(thread_support, "_Init_thread level =");
|
|
|
- }
|
|
|
+ STARPU_PTHREAD_MUTEX_INIT(&progress_mutex, NULL);
|
|
|
+ STARPU_PTHREAD_COND_INIT(&progress_cond, NULL);
|
|
|
|
|
|
starpu_sem_init(&callback_sem, 0, 0);
|
|
|
- running = 1;
|
|
|
+ running = 0;
|
|
|
+ mpi_thread_cpuid = starpu_get_env_number_default("STARPU_MPI_THREAD_CPUID", -1);
|
|
|
|
|
|
STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
|
|
|
|
|
|
- _starpu_mpi_add_sync_point_in_fxt();
|
|
|
- _starpu_mpi_comm_amounts_init(MPI_COMM_WORLD);
|
|
|
- _starpu_mpi_cache_init(MPI_COMM_WORLD);
|
|
|
- _starpu_mpi_select_node_init();
|
|
|
- _starpu_mpi_datatype_init();
|
|
|
- return 0;
|
|
|
-}
|
|
|
-
|
|
|
-int starpu_mpi_init(int *argc, char ***argv, int initialize_mpi)
|
|
|
-{
|
|
|
- return _starpu_mpi_initialize(argc, argv, initialize_mpi);
|
|
|
-}
|
|
|
-
|
|
|
-int starpu_mpi_initialize(void)
|
|
|
-{
|
|
|
- return _starpu_mpi_initialize(NULL, NULL, 0);
|
|
|
-}
|
|
|
-
|
|
|
-int starpu_mpi_initialize_extended(int *rank, int *world_size)
|
|
|
-{
|
|
|
- int ret;
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
|
+ while (!running)
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&progress_cond, &progress_mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
|
|
|
|
- ret = _starpu_mpi_initialize(NULL, NULL, 1);
|
|
|
- if (ret == 0)
|
|
|
- {
|
|
|
- _STARPU_DEBUG("Calling MPI_Comm_rank\n");
|
|
|
- MPI_Comm_rank(MPI_COMM_WORLD, rank);
|
|
|
- MPI_Comm_size(MPI_COMM_WORLD, world_size);
|
|
|
- }
|
|
|
- return ret;
|
|
|
+ return 0;
|
|
|
}
|
|
|
|
|
|
-int starpu_mpi_shutdown(void)
|
|
|
+void _starpu_mpi_progress_shutdown(int *value)
|
|
|
{
|
|
|
- void *value;
|
|
|
- int rank, world_size;
|
|
|
-
|
|
|
- /* We need to get the rank before calling MPI_Finalize to pass to _starpu_mpi_comm_amounts_display() */
|
|
|
- MPI_Comm_rank(MPI_COMM_WORLD, &rank);
|
|
|
- MPI_Comm_size(MPI_COMM_WORLD, &world_size);
|
|
|
-
|
|
|
/* kill the progression thread */
|
|
|
- running = 0;
|
|
|
- starpu_sem_post(&callback_sem);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
|
+ running = 0;
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&progress_cond);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
|
|
|
|
+ starpu_sem_post(&callback_sem);
|
|
|
|
|
|
starpu_pthread_join(progress_thread, &value);
|
|
|
|
|
|
- TRACE_MPI_STOP(rank, world_size);
|
|
|
-
|
|
|
-
|
|
|
- _starpu_mpi_comm_amounts_display(rank);
|
|
|
- _starpu_mpi_comm_amounts_free();
|
|
|
- _starpu_mpi_cache_free(world_size);
|
|
|
- _starpu_mpi_datatype_shutdown();
|
|
|
- return 0;
|
|
|
+ STARPU_PTHREAD_MUTEX_DESTROY(&progress_mutex);
|
|
|
+ STARPU_PTHREAD_COND_DESTROY(&progress_cond);
|
|
|
}
|
|
|
|
|
|
-void _starpu_mpi_clear_cache(starpu_data_handle_t data_handle)
|
|
|
+void _starpu_mpi_data_clear(starpu_data_handle_t data_handle)
|
|
|
{
|
|
|
-// struct _starpu_mpi_data *mpi_data = data_handle->mpi_data;
|
|
|
- _starpu_mpi_cache_flush(data_handle);
|
|
|
+ _starpu_mpi_cache_data_clear(data_handle);
|
|
|
free(data_handle->mpi_data);
|
|
|
}
|
|
|
|
|
@@ -895,19 +950,25 @@ void starpu_mpi_data_register_comm(starpu_data_handle_t data_handle, int tag, in
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- mpi_data = malloc(sizeof(struct _starpu_mpi_data));
|
|
|
+ _STARPU_CALLOC(mpi_data, 1, sizeof(struct _starpu_mpi_data));
|
|
|
+ mpi_data->magic = 42;
|
|
|
+ mpi_data->node_tag.data_tag = -1;
|
|
|
+ mpi_data->node_tag.rank = -1;
|
|
|
+ mpi_data->node_tag.comm = MPI_COMM_WORLD;
|
|
|
data_handle->mpi_data = mpi_data;
|
|
|
- _starpu_data_set_unregister_hook(data_handle, _starpu_mpi_clear_cache);
|
|
|
+ _starpu_mpi_cache_data_init(data_handle);
|
|
|
+ _starpu_data_set_unregister_hook(data_handle, _starpu_mpi_data_clear);
|
|
|
}
|
|
|
|
|
|
if (tag != -1)
|
|
|
{
|
|
|
- mpi_data->tag = tag;
|
|
|
+ mpi_data->node_tag.data_tag = tag;
|
|
|
}
|
|
|
if (rank != -1)
|
|
|
{
|
|
|
- mpi_data->rank = rank;
|
|
|
- mpi_data->comm = comm;
|
|
|
+ _STARPU_MPI_TRACE_DATA_SET_RANK(data_handle, rank);
|
|
|
+ mpi_data->node_tag.rank = rank;
|
|
|
+ mpi_data->node_tag.comm = comm;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -924,36 +985,13 @@ void starpu_mpi_data_set_tag(starpu_data_handle_t handle, int tag)
|
|
|
int starpu_mpi_data_get_rank(starpu_data_handle_t data)
|
|
|
{
|
|
|
STARPU_ASSERT_MSG(data->mpi_data, "starpu_mpi_data_register MUST be called for data %p\n", data);
|
|
|
- return ((struct _starpu_mpi_data *)(data->mpi_data))->rank;
|
|
|
+ return ((struct _starpu_mpi_data *)(data->mpi_data))->node_tag.rank;
|
|
|
}
|
|
|
|
|
|
int starpu_mpi_data_get_tag(starpu_data_handle_t data)
|
|
|
{
|
|
|
STARPU_ASSERT_MSG(data->mpi_data, "starpu_mpi_data_register MUST be called for data %p\n", data);
|
|
|
- return ((struct _starpu_mpi_data *)(data->mpi_data))->tag;
|
|
|
-}
|
|
|
-
|
|
|
-
|
|
|
-int starpu_mpi_comm_size(MPI_Comm comm, int *size)
|
|
|
-{
|
|
|
-#ifdef STARPU_SIMGRID
|
|
|
- STARPU_MPI_ASSERT_MSG(comm == MPI_COMM_WORLD, "StarPU-SMPI only works with MPI_COMM_WORLD for now");
|
|
|
- *size = _mpi_world_size;
|
|
|
- return 0;
|
|
|
-#else
|
|
|
- return MPI_Comm_size(comm, size);
|
|
|
-#endif
|
|
|
-}
|
|
|
-
|
|
|
-int starpu_mpi_comm_rank(MPI_Comm comm, int *rank)
|
|
|
-{
|
|
|
-#ifdef STARPU_SIMGRID
|
|
|
- STARPU_MPI_ASSERT_MSG(comm == MPI_COMM_WORLD, "StarPU-SMPI only works with MPI_COMM_WORLD for now");
|
|
|
- *rank = _mpi_world_rank;
|
|
|
- return 0;
|
|
|
-#else
|
|
|
- return MPI_Comm_rank(comm, rank);
|
|
|
-#endif
|
|
|
+ return ((struct _starpu_mpi_data *)(data->mpi_data))->node_tag.data_tag;
|
|
|
}
|
|
|
|
|
|
void starpu_mpi_get_data_on_node_detached(MPI_Comm comm, starpu_data_handle_t data_handle, int node, void (*callback)(void*), void *arg)
|
|
@@ -967,7 +1005,8 @@ void starpu_mpi_get_data_on_node_detached(MPI_Comm comm, starpu_data_handle_t da
|
|
|
}
|
|
|
|
|
|
starpu_mpi_comm_rank(comm, &me);
|
|
|
- if (node == rank) return;
|
|
|
+ if (node == rank)
|
|
|
+ return;
|
|
|
|
|
|
tag = starpu_mpi_data_get_tag(data_handle);
|
|
|
if (tag == -1)
|
|
@@ -978,8 +1017,8 @@ void starpu_mpi_get_data_on_node_detached(MPI_Comm comm, starpu_data_handle_t da
|
|
|
if (me == node)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(1, "Migrating data %p from %d to %d\n", data_handle, rank, node);
|
|
|
- void* already_received = _starpu_mpi_cache_received_data_set(data_handle);
|
|
|
- if (already_received == NULL)
|
|
|
+ int already_received = _starpu_mpi_cache_received_data_set(data_handle);
|
|
|
+ if (already_received == 0)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(1, "Receiving data %p from %d\n", data_handle, rank);
|
|
|
starpu_mpi_irecv_detached(data_handle, rank, tag, comm, callback, arg);
|
|
@@ -988,8 +1027,8 @@ void starpu_mpi_get_data_on_node_detached(MPI_Comm comm, starpu_data_handle_t da
|
|
|
else if (me == rank)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(1, "Migrating data %p from %d to %d\n", data_handle, rank, node);
|
|
|
- void* already_sent = _starpu_mpi_cache_sent_data_set(data_handle, node);
|
|
|
- if (already_sent == NULL)
|
|
|
+ int already_sent = _starpu_mpi_cache_sent_data_set(data_handle, node);
|
|
|
+ if (already_sent == 0)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(1, "Sending data %p to %d\n", data_handle, node);
|
|
|
starpu_mpi_isend_detached(data_handle, node, tag, comm, NULL, NULL);
|
|
@@ -1008,7 +1047,8 @@ void starpu_mpi_get_data_on_node(MPI_Comm comm, starpu_data_handle_t data_handle
|
|
|
}
|
|
|
|
|
|
starpu_mpi_comm_rank(comm, &me);
|
|
|
- if (node == rank) return;
|
|
|
+ if (node == rank)
|
|
|
+ return;
|
|
|
|
|
|
tag = starpu_mpi_data_get_tag(data_handle);
|
|
|
if (tag == -1)
|
|
@@ -1020,8 +1060,8 @@ void starpu_mpi_get_data_on_node(MPI_Comm comm, starpu_data_handle_t data_handle
|
|
|
{
|
|
|
MPI_Status status;
|
|
|
_STARPU_MPI_DEBUG(1, "Migrating data %p from %d to %d\n", data_handle, rank, node);
|
|
|
- void* already_received = _starpu_mpi_cache_received_data_set(data_handle);
|
|
|
- if (already_received == NULL)
|
|
|
+ int already_received = _starpu_mpi_cache_received_data_set(data_handle);
|
|
|
+ if (already_received == 0)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(1, "Receiving data %p from %d\n", data_handle, rank);
|
|
|
starpu_mpi_recv(data_handle, rank, tag, comm, &status);
|
|
@@ -1030,11 +1070,54 @@ void starpu_mpi_get_data_on_node(MPI_Comm comm, starpu_data_handle_t data_handle
|
|
|
else if (me == rank)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(1, "Migrating data %p from %d to %d\n", data_handle, rank, node);
|
|
|
- void* already_sent = _starpu_mpi_cache_sent_data_set(data_handle, node);
|
|
|
- if (already_sent == NULL)
|
|
|
+ int already_sent = _starpu_mpi_cache_sent_data_set(data_handle, node);
|
|
|
+ if (already_sent == 0)
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(1, "Sending data %p to %d\n", data_handle, node);
|
|
|
starpu_mpi_send(data_handle, node, tag, comm);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+void starpu_mpi_get_data_on_all_nodes_detached(MPI_Comm comm, starpu_data_handle_t data_handle)
|
|
|
+{
|
|
|
+ int size, i;
|
|
|
+ starpu_mpi_comm_size(comm, &size);
|
|
|
+#ifdef STARPU_DEVEL
|
|
|
+#warning TODO: use binary communication tree to optimize broadcast
|
|
|
+#endif
|
|
|
+ for (i = 0; i < size; i++)
|
|
|
+ starpu_mpi_get_data_on_node_detached(comm, data_handle, i, NULL, NULL);
|
|
|
+}
|
|
|
+
|
|
|
+void starpu_mpi_data_migrate(MPI_Comm comm, starpu_data_handle_t data, int new_rank)
|
|
|
+{
|
|
|
+ int old_rank = starpu_mpi_data_get_rank(data);
|
|
|
+ if (new_rank == old_rank)
|
|
|
+ /* Already there */
|
|
|
+ return;
|
|
|
+
|
|
|
+ /* First submit data migration if it's not already on destination */
|
|
|
+ starpu_mpi_get_data_on_node_detached(comm, data, new_rank, NULL, NULL);
|
|
|
+
|
|
|
+ /* And note new owner */
|
|
|
+ starpu_mpi_data_set_rank_comm(data, new_rank, comm);
|
|
|
+
|
|
|
+ /* Flush cache in all other nodes */
|
|
|
+ /* TODO: Ideally we'd transmit the knowledge of who owns it */
|
|
|
+ starpu_mpi_cache_flush(comm, data);
|
|
|
+ return;
|
|
|
+}
|
|
|
+
|
|
|
+int starpu_mpi_wait_for_all(MPI_Comm comm)
|
|
|
+{
|
|
|
+ int mpi = 1;
|
|
|
+ int task = 1;
|
|
|
+ while (task || mpi)
|
|
|
+ {
|
|
|
+ task = _starpu_task_wait_for_all_and_return_nb_waited_tasks();
|
|
|
+ mpi = starpu_mpi_barrier(comm);
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+}
|
|
|
+
|