|
@@ -152,7 +152,7 @@ void _starpu_mpi_submit_coop_sends(struct _starpu_mpi_coop_sends *coop_sends, in
|
|
|
{
|
|
|
if (coop_sends->reqs_array[i]->request_type == SEND_REQ && submit_data)
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(0, "cooperative sends %p sending to %d\n", coop_sends, coop_sends->reqs_array[i]->node_tag.rank);
|
|
|
+ _STARPU_MPI_DEBUG(0, "cooperative sends %p sending to %d\n", coop_sends, coop_sends->reqs_array[i]->node_tag.node.rank);
|
|
|
_starpu_mpi_submit_ready_request(coop_sends->reqs_array[i]);
|
|
|
}
|
|
|
/* TODO: handle redirect requests */
|
|
@@ -166,7 +166,7 @@ void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
|
|
|
_STARPU_MPI_INC_POSTED_REQUESTS(-1);
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(0, "new req %p srcdst %d tag %"PRIi64"d and type %s %d\n", req, req->node_tag.rank, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->is_internal_req);
|
|
|
+ _STARPU_MPI_DEBUG(0, "new req %p srcdst %d tag %"PRIi64" and type %s %d\n", req, req->node_tag.node.rank, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->backend->is_internal_req);
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
|
|
|
@@ -178,7 +178,7 @@ void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
* pointer associated to the data_handle, and push it into the
|
|
|
* ready_requests list, so as the real MPI request can be submitted
|
|
|
* before the next submission of the envelope-catching request. */
|
|
|
- if (req->is_internal_req)
|
|
|
+ if (req->backend->is_internal_req)
|
|
|
{
|
|
|
_starpu_mpi_datatype_allocate(req->data_handle, req);
|
|
|
if (req->registered_datatype == 1)
|
|
@@ -192,18 +192,18 @@ void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
_STARPU_MPI_MALLOC(req->ptr, req->count);
|
|
|
}
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %"PRIi64"d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
|
- req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr,
|
|
|
+ _STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %"PRIi64" src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
|
+ req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.node.rank, req->data_handle, req->ptr,
|
|
|
req->datatype_name, (int)req->count, req->registered_datatype);
|
|
|
_starpu_mpi_req_list_push_front(&ready_recv_requests, req);
|
|
|
_STARPU_MPI_INC_READY_REQUESTS(+1);
|
|
|
|
|
|
/* inform the starpu mpi thread that the request has been pushed in the ready_requests list */
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&req->posted_mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&req->backend->posted_mutex);
|
|
|
req->posted = 1;
|
|
|
- STARPU_PTHREAD_COND_BROADCAST(&req->posted_cond);
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&req->posted_mutex);
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&req->backend->posted_cond);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->posted_mutex);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
|
}
|
|
|
else
|
|
@@ -224,11 +224,11 @@ void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req_mutex));
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "The RECV request %p with tag %"PRIi64"d has already been received, copying previously received data into handle's pointer..\n", req, req->node_tag.data_tag);
|
|
|
+ _STARPU_MPI_DEBUG(3, "The RECV request %p with tag %"PRIi64" has already been received, copying previously received data into handle's pointer..\n", req, req->node_tag.data_tag);
|
|
|
STARPU_ASSERT(req->data_handle != early_data_handle->handle);
|
|
|
|
|
|
- req->internal_req = early_data_handle->req;
|
|
|
- req->early_data_handle = early_data_handle;
|
|
|
+ req->backend->internal_req = early_data_handle->req;
|
|
|
+ req->backend->early_data_handle = early_data_handle;
|
|
|
|
|
|
struct _starpu_mpi_early_data_cb_args *cb_args;
|
|
|
_STARPU_MPI_MALLOC(cb_args, sizeof(struct _starpu_mpi_early_data_cb_args));
|
|
@@ -245,8 +245,8 @@ void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
/* Case: no matching data has been received. Store the receive request as an early_request. */
|
|
|
else
|
|
|
{
|
|
|
- struct _starpu_mpi_req *sync_req = _starpu_mpi_sync_data_find(req->node_tag.data_tag, req->node_tag.rank, req->node_tag.comm);
|
|
|
- _STARPU_MPI_DEBUG(3, "----------> Looking for sync data for tag %"PRIi64"d and src %d = %p\n", req->node_tag.data_tag, req->node_tag.rank, sync_req);
|
|
|
+ struct _starpu_mpi_req *sync_req = _starpu_mpi_sync_data_find(req->node_tag.data_tag, req->node_tag.node.rank, req->node_tag.node.comm);
|
|
|
+ _STARPU_MPI_DEBUG(3, "----------> Looking for sync data for tag %"PRIi64" and src %d = %p\n", req->node_tag.data_tag, req->node_tag.node.rank, sync_req);
|
|
|
if (sync_req)
|
|
|
{
|
|
|
req->sync = 1;
|
|
@@ -268,7 +268,7 @@ void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %"PRIi64"d) into the request hashmap\n", req, req->node_tag.rank, req->node_tag.data_tag);
|
|
|
+ _STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %"PRIi64") into the request hashmap\n", req, req->node_tag.node.rank, req->node_tag.data_tag);
|
|
|
_starpu_mpi_early_request_enqueue(req);
|
|
|
}
|
|
|
}
|
|
@@ -281,8 +281,8 @@ void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
else
|
|
|
_starpu_mpi_req_list_push_front(&ready_recv_requests, req);
|
|
|
_STARPU_MPI_INC_READY_REQUESTS(+1);
|
|
|
- _STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %"PRIi64"d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
|
- req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr,
|
|
|
+ _STARPU_MPI_DEBUG(3, "Pushing new request %p type %s tag %"PRIi64" src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
|
+ req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.node.rank, req->data_handle, req->ptr,
|
|
|
req->datatype_name, (int)req->count, req->registered_datatype);
|
|
|
}
|
|
|
|
|
@@ -359,36 +359,36 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(0, "post MPI isend request %p type %s tag %"PRIi64"d src %d data %p datasize %ld ptr %p datatype '%s' count %d registered_datatype %d sync %d\n", req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, req->datatype_name, (int)req->count, req->registered_datatype, req->sync);
|
|
|
+ _STARPU_MPI_DEBUG(0, "post MPI isend request %p type %s tag %"PRIi64" src %d data %p datasize %ld ptr %p datatype '%s' count %d registered_datatype %d sync %d\n", req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.node.rank, req->data_handle, starpu_data_get_size(req->data_handle), req->ptr, req->datatype_name, (int)req->count, req->registered_datatype, req->sync);
|
|
|
|
|
|
- _starpu_mpi_comm_amounts_inc(req->node_tag.comm, req->node_tag.rank, req->datatype, req->count);
|
|
|
+ _starpu_mpi_comm_amounts_inc(req->node_tag.node.comm, req->node_tag.node.rank, req->datatype, req->count);
|
|
|
|
|
|
- _STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(req->node_tag.rank, req->node_tag.data_tag, 0);
|
|
|
+ _STARPU_MPI_TRACE_ISEND_SUBMIT_BEGIN(req->node_tag.node.rank, req->node_tag.data_tag, 0);
|
|
|
|
|
|
if (req->sync == 0)
|
|
|
{
|
|
|
- _STARPU_MPI_COMM_TO_DEBUG(req, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.data_tag, req->node_tag.comm);
|
|
|
- req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.comm, &req->data_request);
|
|
|
+ _STARPU_MPI_COMM_TO_DEBUG(req, req->count, req->datatype, req->node_tag.node.rank, _STARPU_MPI_TAG_DATA, req->node_tag.data_tag, req->node_tag.node.comm);
|
|
|
+ req->ret = MPI_Isend(req->ptr, req->count, req->datatype, req->node_tag.node.rank, _STARPU_MPI_TAG_DATA, req->node_tag.node.comm, &req->backend->data_request);
|
|
|
STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Isend returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _STARPU_MPI_COMM_TO_DEBUG(req, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.data_tag, req->node_tag.comm);
|
|
|
- req->ret = MPI_Issend(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.comm, &req->data_request);
|
|
|
+ _STARPU_MPI_COMM_TO_DEBUG(req, req->count, req->datatype, req->node_tag.node.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.data_tag, req->node_tag.node.comm);
|
|
|
+ req->ret = MPI_Issend(req->ptr, req->count, req->datatype, req->node_tag.node.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.node.comm, &req->backend->data_request);
|
|
|
STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Issend returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
|
|
|
}
|
|
|
|
|
|
#ifdef STARPU_SIMGRID
|
|
|
- _starpu_mpi_simgrid_wait_req(&req->data_request, &req->status_store, &req->queue, &req->done);
|
|
|
+ _starpu_mpi_simgrid_wait_req(&req->backend->data_request, &req->status_store, &req->queue, &req->done);
|
|
|
#endif
|
|
|
|
|
|
- _STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->node_tag.rank, req->node_tag.data_tag, starpu_data_get_size(req->data_handle), req->pre_sync_jobid);
|
|
|
+ _STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->node_tag.node.rank, req->node_tag.data_tag, starpu_data_get_size(req->data_handle), req->pre_sync_jobid);
|
|
|
|
|
|
/* somebody is perhaps waiting for the MPI request to be posted */
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
|
|
|
req->submitted = 1;
|
|
|
- STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&req->backend->req_cond);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
|
|
|
|
|
|
_starpu_mpi_handle_detached_request(req);
|
|
|
|
|
@@ -399,54 +399,55 @@ void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
_starpu_mpi_datatype_allocate(req->data_handle, req);
|
|
|
|
|
|
- _STARPU_MPI_CALLOC(req->envelope, 1,sizeof(struct _starpu_mpi_envelope));
|
|
|
- req->envelope->mode = _STARPU_MPI_ENVELOPE_DATA;
|
|
|
- req->envelope->data_tag = req->node_tag.data_tag;
|
|
|
- req->envelope->sync = req->sync;
|
|
|
+ _STARPU_MPI_CALLOC(req->backend->envelope, 1,sizeof(struct _starpu_mpi_envelope));
|
|
|
+ req->backend->envelope->mode = _STARPU_MPI_ENVELOPE_DATA;
|
|
|
+ req->backend->envelope->data_tag = req->node_tag.data_tag;
|
|
|
+ req->backend->envelope->sync = req->sync;
|
|
|
|
|
|
if (req->registered_datatype == 1)
|
|
|
{
|
|
|
- int size;
|
|
|
+ int size, ret;
|
|
|
req->count = 1;
|
|
|
req->ptr = starpu_data_handle_to_pointer(req->data_handle, STARPU_MAIN_RAM);
|
|
|
|
|
|
MPI_Type_size(req->datatype, &size);
|
|
|
- req->envelope->size = (starpu_ssize_t)req->count * size;
|
|
|
- _STARPU_MPI_DEBUG(20, "Post MPI isend count (%ld) datatype_size %ld request to %d\n",req->count,starpu_data_get_size(req->data_handle), req->node_tag.rank);
|
|
|
- _STARPU_MPI_COMM_TO_DEBUG(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->envelope->data_tag, req->node_tag.comm);
|
|
|
- MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm, &req->size_req);
|
|
|
+ req->backend->envelope->size = (starpu_ssize_t)req->count * size;
|
|
|
+ _STARPU_MPI_DEBUG(20, "Post MPI isend count (%ld) datatype_size %ld request to %d\n",req->count,starpu_data_get_size(req->data_handle), req->node_tag.node.rank);
|
|
|
+ _STARPU_MPI_COMM_TO_DEBUG(req->backend->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.node.rank, _STARPU_MPI_TAG_ENVELOPE, req->backend->envelope->data_tag, req->node_tag.node.comm);
|
|
|
+ ret = MPI_Isend(req->backend->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.node.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.node.comm, &req->backend->size_req);
|
|
|
+ STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "when sending envelope, MPI_Isend returning %s", _starpu_mpi_get_mpi_error_code(ret));
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
int ret;
|
|
|
|
|
|
// Do not pack the data, just try to find out the size
|
|
|
- starpu_data_pack(req->data_handle, NULL, &(req->envelope->size));
|
|
|
+ starpu_data_pack(req->data_handle, NULL, &(req->backend->envelope->size));
|
|
|
|
|
|
- if (req->envelope->size != -1)
|
|
|
+ if (req->backend->envelope->size != -1)
|
|
|
{
|
|
|
// We already know the size of the data, let's send it to overlap with the packing of the data
|
|
|
- _STARPU_MPI_DEBUG(20, "Sending size %ld (%ld %s) to node %d (first call to pack)\n", req->envelope->size, sizeof(req->count), "MPI_BYTE", req->node_tag.rank);
|
|
|
- req->count = req->envelope->size;
|
|
|
- _STARPU_MPI_COMM_TO_DEBUG(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->envelope->data_tag, req->node_tag.comm);
|
|
|
- ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm, &req->size_req);
|
|
|
+ _STARPU_MPI_DEBUG(20, "Sending size %ld (%ld %s) to node %d (first call to pack)\n", req->backend->envelope->size, sizeof(req->count), "MPI_BYTE", req->node_tag.node.rank);
|
|
|
+ req->count = req->backend->envelope->size;
|
|
|
+ _STARPU_MPI_COMM_TO_DEBUG(req->backend->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.node.rank, _STARPU_MPI_TAG_ENVELOPE, req->backend->envelope->data_tag, req->node_tag.node.comm);
|
|
|
+ ret = MPI_Isend(req->backend->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.node.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.node.comm, &req->backend->size_req);
|
|
|
STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %s", _starpu_mpi_get_mpi_error_code(ret));
|
|
|
}
|
|
|
|
|
|
// Pack the data
|
|
|
starpu_data_pack(req->data_handle, &req->ptr, &req->count);
|
|
|
- if (req->envelope->size == -1)
|
|
|
+ if (req->backend->envelope->size == -1)
|
|
|
{
|
|
|
// We know the size now, let's send it
|
|
|
- _STARPU_MPI_DEBUG(20, "Sending size %ld (%ld %s) to node %d (second call to pack)\n", req->envelope->size, sizeof(req->count), "MPI_BYTE", req->node_tag.rank);
|
|
|
- _STARPU_MPI_COMM_TO_DEBUG(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->envelope->data_tag, req->node_tag.comm);
|
|
|
- ret = MPI_Isend(req->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm, &req->size_req);
|
|
|
+ _STARPU_MPI_DEBUG(20, "Sending size %ld (%ld %s) to node %d (second call to pack)\n", req->backend->envelope->size, sizeof(req->count), "MPI_BYTE", req->node_tag.node.rank);
|
|
|
+ _STARPU_MPI_COMM_TO_DEBUG(req->backend->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.node.rank, _STARPU_MPI_TAG_ENVELOPE, req->backend->envelope->data_tag, req->node_tag.node.comm);
|
|
|
+ ret = MPI_Isend(req->backend->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.node.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.node.comm, &req->backend->size_req);
|
|
|
STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "when sending size, MPI_Isend returning %s", _starpu_mpi_get_mpi_error_code(ret));
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
// We check the size returned with the 2 calls to pack is the same
|
|
|
- STARPU_MPI_ASSERT_MSG(req->count == req->envelope->size, "Calls to pack_data returned different sizes %ld != %ld", req->count, req->envelope->size);
|
|
|
+ STARPU_MPI_ASSERT_MSG(req->count == req->backend->envelope->size, "Calls to pack_data returned different sizes %ld != %ld", req->count, req->backend->envelope->size);
|
|
|
}
|
|
|
// We can send the data now
|
|
|
}
|
|
@@ -473,9 +474,9 @@ void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(0, "post MPI irecv request %p type %s tag %"PRIi64"d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr, req->datatype_name, (int)req->count, req->registered_datatype);
|
|
|
+ _STARPU_MPI_DEBUG(0, "post MPI irecv request %p type %s tag %"PRIi64" src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n", req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.node.rank, req->data_handle, req->ptr, req->datatype_name, (int)req->count, req->registered_datatype);
|
|
|
|
|
|
- _STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
|
|
|
+ _STARPU_MPI_TRACE_IRECV_SUBMIT_BEGIN(req->node_tag.node.rank, req->node_tag.data_tag);
|
|
|
|
|
|
if (req->sync)
|
|
|
{
|
|
@@ -483,9 +484,9 @@ void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
|
|
|
_STARPU_MPI_CALLOC(_envelope, 1, sizeof(struct _starpu_mpi_envelope));
|
|
|
_envelope->mode = _STARPU_MPI_ENVELOPE_SYNC_READY;
|
|
|
_envelope->data_tag = req->node_tag.data_tag;
|
|
|
- _STARPU_MPI_DEBUG(20, "Telling node %d it can send the data and waiting for the data back ...\n", req->node_tag.rank);
|
|
|
- _STARPU_MPI_COMM_TO_DEBUG(_envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, _envelope->data_tag, req->node_tag.comm);
|
|
|
- req->ret = MPI_Send(_envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.comm);
|
|
|
+ _STARPU_MPI_DEBUG(20, "Telling node %d it can send the data and waiting for the data back ...\n", req->node_tag.node.rank);
|
|
|
+ _STARPU_MPI_COMM_TO_DEBUG(_envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.node.rank, _STARPU_MPI_TAG_ENVELOPE, _envelope->data_tag, req->node_tag.node.comm);
|
|
|
+ req->ret = MPI_Send(_envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, req->node_tag.node.rank, _STARPU_MPI_TAG_ENVELOPE, req->node_tag.node.comm);
|
|
|
STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Send returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
|
|
|
free(_envelope);
|
|
|
_envelope = NULL;
|
|
@@ -493,26 +494,26 @@ void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
|
|
|
|
|
|
if (req->sync)
|
|
|
{
|
|
|
- _STARPU_MPI_COMM_FROM_DEBUG(req, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.data_tag, req->node_tag.comm);
|
|
|
- req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.comm, &req->data_request);
|
|
|
+ _STARPU_MPI_COMM_FROM_DEBUG(req, req->count, req->datatype, req->node_tag.node.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.data_tag, req->node_tag.node.comm);
|
|
|
+ req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->node_tag.node.rank, _STARPU_MPI_TAG_SYNC_DATA, req->node_tag.node.comm, &req->backend->data_request);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _STARPU_MPI_COMM_FROM_DEBUG(req, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.data_tag, req->node_tag.comm);
|
|
|
- req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.comm, &req->data_request);
|
|
|
+ _STARPU_MPI_COMM_FROM_DEBUG(req, req->count, req->datatype, req->node_tag.node.rank, _STARPU_MPI_TAG_DATA, req->node_tag.data_tag, req->node_tag.node.comm);
|
|
|
+ req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->node_tag.node.rank, _STARPU_MPI_TAG_DATA, req->node_tag.node.comm, &req->backend->data_request);
|
|
|
#ifdef STARPU_SIMGRID
|
|
|
- _starpu_mpi_simgrid_wait_req(&req->data_request, &req->status_store, &req->queue, &req->done);
|
|
|
+ _starpu_mpi_simgrid_wait_req(&req->backend->data_request, &req->status_store, &req->queue, &req->done);
|
|
|
#endif
|
|
|
}
|
|
|
STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
|
|
|
|
|
|
- _STARPU_MPI_TRACE_IRECV_SUBMIT_END(req->node_tag.rank, req->node_tag.data_tag);
|
|
|
+ _STARPU_MPI_TRACE_IRECV_SUBMIT_END(req->node_tag.node.rank, req->node_tag.data_tag);
|
|
|
|
|
|
/* somebody is perhaps waiting for the MPI request to be posted */
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
|
|
|
req->submitted = 1;
|
|
|
- STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&req->backend->req_cond);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
|
|
|
|
|
|
_starpu_mpi_handle_detached_request(req);
|
|
|
|
|
@@ -529,19 +530,19 @@ void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
/* Which is the mpi request we are waiting for ? */
|
|
|
- struct _starpu_mpi_req *req = waiting_req->other_request;
|
|
|
+ struct _starpu_mpi_req *req = waiting_req->backend->other_request;
|
|
|
|
|
|
- _STARPU_MPI_TRACE_UWAIT_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
|
|
|
- if (req->data_request != MPI_REQUEST_NULL)
|
|
|
+ _STARPU_MPI_TRACE_UWAIT_BEGIN(req->node_tag.node.rank, req->node_tag.data_tag);
|
|
|
+ if (req->backend->data_request != MPI_REQUEST_NULL)
|
|
|
{
|
|
|
// TODO: Fix for STARPU_SIMGRID
|
|
|
#ifdef STARPU_SIMGRID
|
|
|
STARPU_MPI_ASSERT_MSG(0, "Implement this in STARPU_SIMGRID");
|
|
|
#endif
|
|
|
- req->ret = MPI_Wait(&req->data_request, waiting_req->status);
|
|
|
+ req->ret = MPI_Wait(&req->backend->data_request, waiting_req->status);
|
|
|
STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
|
|
|
}
|
|
|
- _STARPU_MPI_TRACE_UWAIT_END(req->node_tag.rank, req->node_tag.data_tag);
|
|
|
+ _STARPU_MPI_TRACE_UWAIT_END(req->node_tag.node.rank, req->node_tag.data_tag);
|
|
|
|
|
|
_starpu_mpi_handle_request_termination(req);
|
|
|
|
|
@@ -558,34 +559,34 @@ int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
|
|
|
/* We cannot try to complete a MPI request that was not actually posted
|
|
|
* to MPI yet. */
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&(req->req_mutex));
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&(req->backend->req_mutex));
|
|
|
while (!(req->submitted))
|
|
|
- STARPU_PTHREAD_COND_WAIT(&(req->req_cond), &(req->req_mutex));
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&(req->req_mutex));
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&(req->backend->req_cond), &(req->backend->req_mutex));
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&(req->backend->req_mutex));
|
|
|
|
|
|
/* Initialize the request structure */
|
|
|
_starpu_mpi_request_init(&waiting_req);
|
|
|
waiting_req->prio = INT_MAX;
|
|
|
waiting_req->status = status;
|
|
|
- waiting_req->other_request = req;
|
|
|
+ waiting_req->backend->other_request = req;
|
|
|
waiting_req->func = _starpu_mpi_wait_func;
|
|
|
waiting_req->request_type = WAIT_REQ;
|
|
|
|
|
|
_starpu_mpi_submit_ready_request_inc(waiting_req);
|
|
|
|
|
|
/* We wait for the MPI request to finish */
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
|
|
|
while (!req->completed)
|
|
|
- STARPU_PTHREAD_COND_WAIT(&req->req_cond, &req->req_mutex);
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&req->backend->req_cond, &req->backend->req_mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
|
|
|
|
|
|
ret = req->ret;
|
|
|
|
|
|
/* The internal request structure was automatically allocated */
|
|
|
*public_req = NULL;
|
|
|
- if (req->internal_req)
|
|
|
+ if (req->backend->internal_req)
|
|
|
{
|
|
|
- _starpu_mpi_request_destroy(req->internal_req);
|
|
|
+ _starpu_mpi_request_destroy(req->backend->internal_req);
|
|
|
}
|
|
|
_starpu_mpi_request_destroy(req);
|
|
|
_starpu_mpi_request_destroy(waiting_req);
|
|
@@ -604,24 +605,24 @@ void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
/* Which is the mpi request we are testing for ? */
|
|
|
- struct _starpu_mpi_req *req = testing_req->other_request;
|
|
|
+ struct _starpu_mpi_req *req = testing_req->backend->other_request;
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(0, "Test request %p type %s tag %"PRIi64"d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
|
- req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr,
|
|
|
+ _STARPU_MPI_DEBUG(0, "Test request %p type %s tag %"PRIi64" src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
|
+ req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.node.rank, req->data_handle, req->ptr,
|
|
|
req->datatype_name, (int)req->count, req->registered_datatype);
|
|
|
|
|
|
- _STARPU_MPI_TRACE_UTESTING_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
|
|
|
+ _STARPU_MPI_TRACE_UTESTING_BEGIN(req->node_tag.node.rank, req->node_tag.data_tag);
|
|
|
|
|
|
#ifdef STARPU_SIMGRID
|
|
|
req->ret = _starpu_mpi_simgrid_mpi_test(&req->done, testing_req->flag);
|
|
|
memcpy(testing_req->status, &req->status_store, sizeof(*testing_req->status));
|
|
|
#else
|
|
|
- req->ret = MPI_Test(&req->data_request, testing_req->flag, testing_req->status);
|
|
|
+ req->ret = MPI_Test(&req->backend->data_request, testing_req->flag, testing_req->status);
|
|
|
#endif
|
|
|
|
|
|
STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
|
|
|
|
|
|
- _STARPU_MPI_TRACE_UTESTING_END(req->node_tag.rank, req->node_tag.data_tag);
|
|
|
+ _STARPU_MPI_TRACE_UTESTING_END(req->node_tag.node.rank, req->node_tag.data_tag);
|
|
|
|
|
|
if (*testing_req->flag)
|
|
|
{
|
|
@@ -629,10 +630,10 @@ void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
|
|
|
_starpu_mpi_handle_request_termination(req);
|
|
|
}
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&testing_req->req_mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&testing_req->backend->req_mutex);
|
|
|
testing_req->completed = 1;
|
|
|
- STARPU_PTHREAD_COND_SIGNAL(&testing_req->req_cond);
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&testing_req->req_mutex);
|
|
|
+ STARPU_PTHREAD_COND_SIGNAL(&testing_req->backend->req_cond);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&testing_req->backend->req_mutex);
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
|
|
|
@@ -647,9 +648,9 @@ int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
|
|
|
STARPU_MPI_ASSERT_MSG(!req->detached, "MPI_Test cannot be called on a detached request");
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
|
|
|
unsigned submitted = req->submitted;
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
|
|
|
|
|
|
if (submitted)
|
|
|
{
|
|
@@ -660,7 +661,7 @@ int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
testing_req->prio = INT_MAX;
|
|
|
testing_req->flag = flag;
|
|
|
testing_req->status = status;
|
|
|
- testing_req->other_request = req;
|
|
|
+ testing_req->backend->other_request = req;
|
|
|
testing_req->func = _starpu_mpi_test_func;
|
|
|
testing_req->completed = 0;
|
|
|
testing_req->request_type = TEST_REQ;
|
|
@@ -668,10 +669,10 @@ int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
_starpu_mpi_submit_ready_request_inc(testing_req);
|
|
|
|
|
|
/* We wait for the test request to finish */
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&(testing_req->req_mutex));
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&(testing_req->backend->req_mutex));
|
|
|
while (!(testing_req->completed))
|
|
|
- STARPU_PTHREAD_COND_WAIT(&(testing_req->req_cond), &(testing_req->req_mutex));
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&(testing_req->req_mutex));
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&(testing_req->backend->req_cond), &(testing_req->backend->req_mutex));
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&(testing_req->backend->req_mutex));
|
|
|
|
|
|
ret = testing_req->ret;
|
|
|
|
|
@@ -681,9 +682,9 @@ int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
* request structure which was automatically allocated
|
|
|
* */
|
|
|
*public_req = NULL;
|
|
|
- if (req->internal_req)
|
|
|
+ if (req->backend->internal_req)
|
|
|
{
|
|
|
- _starpu_mpi_request_destroy(req->internal_req);
|
|
|
+ _starpu_mpi_request_destroy(req->backend->internal_req);
|
|
|
}
|
|
|
_starpu_mpi_request_destroy(req);
|
|
|
}
|
|
@@ -709,7 +710,7 @@ static void _starpu_mpi_barrier_func(struct _starpu_mpi_req *barrier_req)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
|
|
|
- barrier_req->ret = MPI_Barrier(barrier_req->node_tag.comm);
|
|
|
+ barrier_req->ret = MPI_Barrier(barrier_req->node_tag.node.comm);
|
|
|
STARPU_MPI_ASSERT_MSG(barrier_req->ret == MPI_SUCCESS, "MPI_Barrier returning %s", _starpu_mpi_get_mpi_error_code(barrier_req->ret));
|
|
|
|
|
|
_starpu_mpi_handle_request_termination(barrier_req);
|
|
@@ -752,16 +753,16 @@ int _starpu_mpi_barrier(MPI_Comm comm)
|
|
|
barrier_req->prio = INT_MAX;
|
|
|
barrier_req->func = _starpu_mpi_barrier_func;
|
|
|
barrier_req->request_type = BARRIER_REQ;
|
|
|
- barrier_req->node_tag.comm = comm;
|
|
|
+ barrier_req->node_tag.node.comm = comm;
|
|
|
|
|
|
_STARPU_MPI_INC_POSTED_REQUESTS(1);
|
|
|
_starpu_mpi_submit_ready_request(barrier_req);
|
|
|
|
|
|
/* We wait for the MPI request to finish */
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&barrier_req->req_mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&barrier_req->backend->req_mutex);
|
|
|
while (!barrier_req->completed)
|
|
|
- STARPU_PTHREAD_COND_WAIT(&barrier_req->req_cond, &barrier_req->req_mutex);
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&barrier_req->req_mutex);
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&barrier_req->backend->req_cond, &barrier_req->backend->req_mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&barrier_req->backend->req_mutex);
|
|
|
|
|
|
_starpu_mpi_request_destroy(barrier_req);
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
@@ -795,14 +796,14 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %"PRIi64"d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d internal_req %p\n",
|
|
|
- req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle, req->ptr,
|
|
|
- req->datatype_name, (int)req->count, req->registered_datatype, req->internal_req);
|
|
|
+ _STARPU_MPI_DEBUG(2, "complete MPI request %p type %s tag %"PRIi64" src %d data %p ptr %p datatype '%s' count %d registered_datatype %d internal_req %p\n",
|
|
|
+ req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.node.rank, req->data_handle, req->ptr,
|
|
|
+ req->datatype_name, (int)req->count, req->registered_datatype, req->backend->internal_req);
|
|
|
|
|
|
- if (req->internal_req)
|
|
|
+ if (req->backend->internal_req)
|
|
|
{
|
|
|
- free(req->early_data_handle);
|
|
|
- req->early_data_handle = NULL;
|
|
|
+ free(req->backend->early_data_handle);
|
|
|
+ req->backend->early_data_handle = NULL;
|
|
|
}
|
|
|
else
|
|
|
{
|
|
@@ -816,7 +817,7 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
|
|
|
// has completed, as MPI can re-order messages, let's call
|
|
|
// MPI_Wait to make sure data have been sent
|
|
|
int ret;
|
|
|
- ret = MPI_Wait(&req->size_req, MPI_STATUS_IGNORE);
|
|
|
+ ret = MPI_Wait(&req->backend->size_req, MPI_STATUS_IGNORE);
|
|
|
STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(ret));
|
|
|
free(req->ptr);
|
|
|
req->ptr = NULL;
|
|
@@ -833,15 +834,15 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
|
|
|
_starpu_mpi_datatype_free(req->data_handle, &req->datatype);
|
|
|
}
|
|
|
}
|
|
|
- _STARPU_MPI_TRACE_TERMINATED(req, req->node_tag.rank, req->node_tag.data_tag);
|
|
|
+ _STARPU_MPI_TRACE_TERMINATED(req, req->node_tag.node.rank, req->node_tag.data_tag);
|
|
|
}
|
|
|
|
|
|
_starpu_mpi_release_req_data(req);
|
|
|
|
|
|
- if (req->envelope)
|
|
|
+ if (req->backend->envelope)
|
|
|
{
|
|
|
- free(req->envelope);
|
|
|
- req->envelope = NULL;
|
|
|
+ free(req->backend->envelope);
|
|
|
+ req->backend->envelope = NULL;
|
|
|
}
|
|
|
|
|
|
/* Execute the specified callback, if any */
|
|
@@ -850,10 +851,10 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
|
|
|
|
|
|
/* tell anyone potentially waiting on the request that it is
|
|
|
* terminated now */
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
|
|
|
req->completed = 1;
|
|
|
- STARPU_PTHREAD_COND_BROADCAST(&req->req_cond);
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&req->backend->req_cond);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
|
|
|
@@ -902,18 +903,18 @@ static void _starpu_mpi_early_data_cb(void* arg)
|
|
|
if (args->req->detached)
|
|
|
{
|
|
|
/* have the internal request destroyed now or when completed */
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&args->req->internal_req->req_mutex);
|
|
|
- if (args->req->internal_req->to_destroy)
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&args->req->backend->internal_req->backend->req_mutex);
|
|
|
+ if (args->req->backend->internal_req->backend->to_destroy)
|
|
|
{
|
|
|
/* The request completed first, can now destroy it */
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&args->req->internal_req->req_mutex);
|
|
|
- _starpu_mpi_request_destroy(args->req->internal_req);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&args->req->backend->internal_req->backend->req_mutex);
|
|
|
+ _starpu_mpi_request_destroy(args->req->backend->internal_req);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
/* The request didn't complete yet, tell it to destroy it when it completes */
|
|
|
- args->req->internal_req->to_destroy = 1;
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&args->req->internal_req->req_mutex);
|
|
|
+ args->req->backend->internal_req->backend->to_destroy = 1;
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&args->req->backend->internal_req->backend->req_mutex);
|
|
|
}
|
|
|
_starpu_mpi_handle_request_termination(args->req);
|
|
|
_starpu_mpi_request_destroy(args->req);
|
|
@@ -924,11 +925,11 @@ static void _starpu_mpi_early_data_cb(void* arg)
|
|
|
// be handled when calling starpu_mpi_wait
|
|
|
// We store in the application request the internal MPI
|
|
|
// request so that it can be used by starpu_mpi_wait
|
|
|
- args->req->data_request = args->req->internal_req->data_request;
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&args->req->req_mutex);
|
|
|
+ args->req->backend->data_request = args->req->backend->internal_req->backend->data_request;
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&args->req->backend->req_mutex);
|
|
|
args->req->submitted = 1;
|
|
|
- STARPU_PTHREAD_COND_BROADCAST(&args->req->req_cond);
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&args->req->req_mutex);
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&args->req->backend->req_cond);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&args->req->backend->req_mutex);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -957,17 +958,17 @@ static void _starpu_mpi_test_detached_requests(void)
|
|
|
{
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
|
|
|
- _STARPU_MPI_TRACE_TEST_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
|
|
|
- //_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %"PRIi64"d - TYPE %s %d\n", &req->data_request, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->node_tag.rank);
|
|
|
+ _STARPU_MPI_TRACE_TEST_BEGIN(req->node_tag.node.rank, req->node_tag.data_tag);
|
|
|
+ //_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %"PRIi64" - TYPE %s %d\n", &req->backend->data_request, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->node_tag.node.rank);
|
|
|
#ifdef STARPU_SIMGRID
|
|
|
req->ret = _starpu_mpi_simgrid_mpi_test(&req->done, &flag);
|
|
|
#else
|
|
|
- STARPU_MPI_ASSERT_MSG(req->data_request != MPI_REQUEST_NULL, "Cannot test completion of the request MPI_REQUEST_NULL");
|
|
|
- req->ret = MPI_Test(&req->data_request, &flag, MPI_STATUS_IGNORE);
|
|
|
+ STARPU_MPI_ASSERT_MSG(req->backend->data_request != MPI_REQUEST_NULL, "Cannot test completion of the request MPI_REQUEST_NULL");
|
|
|
+ req->ret = MPI_Test(&req->backend->data_request, &flag, MPI_STATUS_IGNORE);
|
|
|
#endif
|
|
|
|
|
|
STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
|
|
|
- _STARPU_MPI_TRACE_TEST_END(req->node_tag.rank, req->node_tag.data_tag);
|
|
|
+ _STARPU_MPI_TRACE_TEST_END(req->node_tag.node.rank, req->node_tag.data_tag);
|
|
|
|
|
|
if (!flag)
|
|
|
{
|
|
@@ -979,7 +980,7 @@ static void _starpu_mpi_test_detached_requests(void)
|
|
|
struct _starpu_mpi_req *next_req;
|
|
|
next_req = _starpu_mpi_req_list_next(req);
|
|
|
|
|
|
- _STARPU_MPI_TRACE_COMPLETE_BEGIN(req->request_type, req->node_tag.rank, req->node_tag.data_tag);
|
|
|
+ _STARPU_MPI_TRACE_COMPLETE_BEGIN(req->request_type, req->node_tag.node.rank, req->node_tag.data_tag);
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
|
if (req->request_type == SEND_REQ)
|
|
@@ -988,21 +989,21 @@ static void _starpu_mpi_test_detached_requests(void)
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
_starpu_mpi_handle_request_termination(req);
|
|
|
|
|
|
- _STARPU_MPI_TRACE_COMPLETE_END(req->request_type, req->node_tag.rank, req->node_tag.data_tag);
|
|
|
+ _STARPU_MPI_TRACE_COMPLETE_END(req->request_type, req->node_tag.node.rank, req->node_tag.data_tag);
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&req->req_mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
|
|
|
/* We don't want to free internal non-detached
|
|
|
requests, we need to get their MPI request before
|
|
|
destroying them */
|
|
|
- if (req->is_internal_req && !req->to_destroy)
|
|
|
+ if (req->backend->is_internal_req && !req->backend->to_destroy)
|
|
|
{
|
|
|
/* We have completed the request, let the application request destroy it */
|
|
|
- req->to_destroy = 1;
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
+ req->backend->to_destroy = 1;
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&req->req_mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
|
|
|
_starpu_mpi_request_destroy(req);
|
|
|
}
|
|
|
|
|
@@ -1044,8 +1045,8 @@ static void _starpu_mpi_handle_ready_request(struct _starpu_mpi_req *req)
|
|
|
STARPU_MPI_ASSERT_MSG(req, "Invalid request");
|
|
|
|
|
|
/* submit the request to MPI */
|
|
|
- _STARPU_MPI_DEBUG(2, "Handling new request %p type %s tag %"PRIi64"d src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
|
- req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.rank, req->data_handle,
|
|
|
+ _STARPU_MPI_DEBUG(2, "Handling new request %p type %s tag %"PRIi64" src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
|
+ req, _starpu_mpi_request_type(req->request_type), req->node_tag.data_tag, req->node_tag.node.rank, req->data_handle,
|
|
|
req->ptr, req->datatype_name, (int)req->count, req->registered_datatype);
|
|
|
req->func(req);
|
|
|
|
|
@@ -1054,7 +1055,7 @@ static void _starpu_mpi_handle_ready_request(struct _starpu_mpi_req *req)
|
|
|
|
|
|
static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope, MPI_Status status, MPI_Comm comm)
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(20, "Request with tag %"PRIi64"d and source %d not found, creating a early_data_handle to receive incoming data..\n", envelope->data_tag, status.MPI_SOURCE);
|
|
|
+ _STARPU_MPI_DEBUG(20, "Request with tag %"PRIi64" and source %d not found, creating a early_data_handle to receive incoming data..\n", envelope->data_tag, status.MPI_SOURCE);
|
|
|
_STARPU_MPI_DEBUG(20, "Request sync %d\n", envelope->sync);
|
|
|
|
|
|
struct _starpu_mpi_early_data_handle* early_data_handle = _starpu_mpi_early_data_create(envelope, status.MPI_SOURCE, comm);
|
|
@@ -1084,7 +1085,7 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
|
|
|
//_starpu_mpi_early_data_add(early_data_handle);
|
|
|
}
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_data_handle with tag %"PRIi64"d from comm %ld src %d ..\n",
|
|
|
+ _STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_data_handle with tag %"PRIi64" from comm %ld src %d ..\n",
|
|
|
early_data_handle->node_tag.data_tag, (long int)comm, status.MPI_SOURCE);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
|
early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
|
|
@@ -1095,10 +1096,10 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
|
|
|
// We wait until the request is pushed in the
|
|
|
// ready_request list
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req->posted_mutex));
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req->backend->posted_mutex));
|
|
|
while (!(early_data_handle->req->posted))
|
|
|
- STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req->posted_cond), &(early_data_handle->req->posted_mutex));
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req->posted_mutex));
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req->backend->posted_cond), &(early_data_handle->req->backend->posted_mutex));
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req->backend->posted_mutex));
|
|
|
|
|
|
#ifdef STARPU_DEVEL
|
|
|
#warning check if req_ready is still necessary
|
|
@@ -1305,7 +1306,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
{
|
|
|
struct _starpu_mpi_req *_sync_req = _starpu_mpi_sync_data_find(envelope->data_tag, envelope_status.MPI_SOURCE, envelope_comm);
|
|
|
_STARPU_MPI_DEBUG(20, "Sending data with tag %"PRIi64" to node %d\n", _sync_req->node_tag.data_tag, envelope_status.MPI_SOURCE);
|
|
|
- STARPU_MPI_ASSERT_MSG(envelope->data_tag == _sync_req->node_tag.data_tag, "Tag mismatch (envelope %"PRIi64"d != req %"PRIi64"d)\n",
|
|
|
+ STARPU_MPI_ASSERT_MSG(envelope->data_tag == _sync_req->node_tag.data_tag, "Tag mismatch (envelope %"PRIi64" != req %"PRIi64")\n",
|
|
|
envelope->data_tag, _sync_req->node_tag.data_tag);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
|
_starpu_mpi_isend_data_func(_sync_req);
|
|
@@ -1313,7 +1314,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "Searching for application request with tag %"PRIi64"d and source %d (size %ld)\n", envelope->data_tag, envelope_status.MPI_SOURCE, envelope->size);
|
|
|
+ _STARPU_MPI_DEBUG(3, "Searching for application request with tag %"PRIi64" and source %d (size %ld)\n", envelope->data_tag, envelope_status.MPI_SOURCE, envelope->size);
|
|
|
|
|
|
struct _starpu_mpi_req *early_request = _starpu_mpi_early_request_dequeue(envelope->data_tag, envelope_status.MPI_SOURCE, envelope_comm);
|
|
|
|
|
@@ -1326,7 +1327,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
{
|
|
|
if (envelope->sync)
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(2000, "-------------------------> adding request for tag %l"PRIi64"\n", envelope->data_tag);
|
|
|
+ _STARPU_MPI_DEBUG(2000, "-------------------------> adding request for tag %"PRIi64"\n", envelope->data_tag);
|
|
|
struct _starpu_mpi_req *new_req;
|
|
|
#ifdef STARPU_DEVEL
|
|
|
#warning creating a request is not really useful.
|
|
@@ -1335,16 +1336,16 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
_starpu_mpi_request_init(&new_req);
|
|
|
new_req->request_type = RECV_REQ;
|
|
|
new_req->data_handle = NULL;
|
|
|
- new_req->node_tag.rank = envelope_status.MPI_SOURCE;
|
|
|
+ new_req->node_tag.node.rank = envelope_status.MPI_SOURCE;
|
|
|
new_req->node_tag.data_tag = envelope->data_tag;
|
|
|
- new_req->node_tag.comm = envelope_comm;
|
|
|
+ new_req->node_tag.node.comm = envelope_comm;
|
|
|
new_req->detached = 1;
|
|
|
new_req->sync = 1;
|
|
|
new_req->callback = NULL;
|
|
|
new_req->callback_arg = NULL;
|
|
|
new_req->func = _starpu_mpi_irecv_size_func;
|
|
|
new_req->sequential_consistency = 1;
|
|
|
- new_req->is_internal_req = 0; // ????
|
|
|
+ new_req->backend->is_internal_req = 0; // ????
|
|
|
new_req->count = envelope->size;
|
|
|
_starpu_mpi_sync_data_add(new_req);
|
|
|
}
|
|
@@ -1360,7 +1361,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
* _starpu_mpi_handle_ready_request. */
|
|
|
else
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(2000, "A matching application request has been found for the incoming data with tag %"PRIi64"d\n", envelope->data_tag);
|
|
|
+ _STARPU_MPI_DEBUG(2000, "A matching application request has been found for the incoming data with tag %"PRIi64"\n", envelope->data_tag);
|
|
|
_STARPU_MPI_DEBUG(2000, "Request sync %d\n", envelope->sync);
|
|
|
|
|
|
early_request->sync = envelope->sync;
|
|
@@ -1572,7 +1573,7 @@ void _starpu_mpi_progress_shutdown(void **value)
|
|
|
#ifdef STARPU_SIMGRID
|
|
|
/* FIXME: should rather properly wait for _starpu_mpi_progress_thread_func to finish */
|
|
|
(void) value;
|
|
|
- MSG_process_sleep(1);
|
|
|
+ starpu_sleep(1);
|
|
|
#else
|
|
|
STARPU_PTHREAD_JOIN(progress_thread, value);
|
|
|
#endif
|