|
@@ -33,38 +33,6 @@ extern char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type
|
|
|
extern void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req,nm_sr_event_t event);
|
|
|
extern void _starpu_mpi_handle_pending_request(struct _starpu_mpi_req *req);
|
|
|
|
|
|
-struct starpu_nm_datatype_unknown
|
|
|
-{
|
|
|
- starpu_ssize_t* count;
|
|
|
- const struct nm_data_s* body;
|
|
|
-};
|
|
|
-
|
|
|
-static void starpu_nm_datatype_unknown_traversal(const void* _content, nm_data_apply_t apply, void* _context);
|
|
|
-const struct nm_data_ops_s starpu_nm_datatype_unknown_ops =
|
|
|
-{
|
|
|
- .p_traversal = &starpu_nm_datatype_unknown_traversal
|
|
|
-};
|
|
|
-
|
|
|
-NM_DATA_TYPE(datatype_unknown, struct starpu_nm_datatype_unknown, &starpu_nm_datatype_unknown_ops);
|
|
|
-
|
|
|
-static void starpu_nm_datatype_unknown_traversal(const void* _content, nm_data_apply_t apply, void* _context)
|
|
|
-{
|
|
|
- const struct starpu_nm_datatype_unknown* p_content = _content;
|
|
|
-
|
|
|
- (*apply)(p_content->count, sizeof(starpu_ssize_t), _context);
|
|
|
-
|
|
|
- nm_data_traversal_apply(p_content->body, apply, _context);
|
|
|
-}
|
|
|
-
|
|
|
-// warning: this function requires valid pointers for future usage
|
|
|
-void starpu_nm_datatype_unknown_build(struct nm_data_s* datatype_unknown_data, starpu_ssize_t* count, const struct nm_data_s* body)
|
|
|
-{
|
|
|
- nm_data_datatype_unknown_set(datatype_unknown_data, (struct starpu_nm_datatype_unknown)
|
|
|
- {
|
|
|
- .count = count,
|
|
|
- .body = body
|
|
|
- });
|
|
|
-}
|
|
|
|
|
|
/**********************************************
|
|
|
* Send
|
|
@@ -84,15 +52,16 @@ void _starpu_mpi_isend_unknown_datatype(struct _starpu_mpi_req *req)
|
|
|
|
|
|
starpu_data_pack(req->data_handle, &req->ptr, &req->count);
|
|
|
|
|
|
- nm_mpi_nmad_data_get(&(req->backend->unknown_datatype_body), (void*)req->ptr, req->datatype, req->count);
|
|
|
-
|
|
|
- // warning: this function requires valid pointers for future usage
|
|
|
- starpu_nm_datatype_unknown_build(&(req->backend->unknown_datatype_data), &(req->count), &(req->backend->unknown_datatype_body));
|
|
|
+ req->backend->unknown_datatype_v[0].iov_base = &req->count;
|
|
|
+ req->backend->unknown_datatype_v[0].iov_len = sizeof(starpu_ssize_t);
|
|
|
+ req->backend->unknown_datatype_v[1].iov_base = req->ptr;
|
|
|
+ req->backend->unknown_datatype_v[1].iov_len = req->count;
|
|
|
+ nm_data_iov_build(&req->backend->unknown_datatype_data, req->backend->unknown_datatype_v, 2);
|
|
|
|
|
|
- nm_sr_send_init(req->backend->session, &(req->backend->data_request));
|
|
|
- nm_sr_send_pack_data(req->backend->session, &(req->backend->data_request), &(req->backend->unknown_datatype_data));
|
|
|
- nm_sr_send_set_priority(req->backend->session, &(req->backend->data_request), req->prio);
|
|
|
- nm_sr_send_header(req->backend->session, &(req->backend->data_request), sizeof(starpu_ssize_t));
|
|
|
+ nm_sr_send_init(req->backend->session, &req->backend->data_request);
|
|
|
+ nm_sr_send_pack_data(req->backend->session, &req->backend->data_request, &req->backend->unknown_datatype_data);
|
|
|
+ nm_sr_send_set_priority(req->backend->session, &req->backend->data_request, req->prio);
|
|
|
+ nm_sr_send_header(req->backend->session, &req->backend->data_request, sizeof(starpu_ssize_t));
|
|
|
|
|
|
// this trace event is the start of the communication link:
|
|
|
_STARPU_MPI_TRACE_ISEND_SUBMIT_END(_STARPU_MPI_FUT_POINT_TO_POINT_SEND, req->node_tag.node.rank, req->node_tag.data_tag,
|
|
@@ -100,12 +69,12 @@ void _starpu_mpi_isend_unknown_datatype(struct _starpu_mpi_req *req)
|
|
|
|
|
|
if (req->sync == 0)
|
|
|
{
|
|
|
- req->ret = nm_sr_send_isend(req->backend->session, &(req->backend->data_request), req->backend->gate, req->node_tag.data_tag);
|
|
|
+ req->ret = nm_sr_send_isend(req->backend->session, &req->backend->data_request, req->backend->gate, req->node_tag.data_tag);
|
|
|
STARPU_ASSERT_MSG(req->ret == NM_ESUCCESS, "nm_sr_send_isend returning %d", req->ret);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- req->ret = nm_sr_send_issend(req->backend->session, &(req->backend->data_request), req->backend->gate, req->node_tag.data_tag);
|
|
|
+ req->ret = nm_sr_send_issend(req->backend->session, &req->backend->data_request, req->backend->gate, req->node_tag.data_tag);
|
|
|
STARPU_ASSERT_MSG(req->ret == NM_ESUCCESS, "nm_sr_send_issend returning %d", req->ret);
|
|
|
}
|
|
|
|
|
@@ -128,19 +97,24 @@ static void _starpu_mpi_unknown_datatype_recv_callback(nm_sr_event_t event, cons
|
|
|
if (event & NM_SR_EVENT_RECV_DATA)
|
|
|
{
|
|
|
// Header arrived, so get the size of the datatype and store it in req->count:
|
|
|
- nm_data_contiguous_build(&(req->backend->unknown_datatype_size), &(req->count), sizeof(starpu_ssize_t));
|
|
|
- int ret = nm_sr_recv_peek(req->backend->session, &(req->backend->data_request), &(req->backend->unknown_datatype_size));
|
|
|
- STARPU_ASSERT_MSG(ret == NM_ESUCCESS, "nm_sr_recv_peek returned %d", ret);
|
|
|
+ struct nm_data_s data_header;
|
|
|
+ nm_data_contiguous_build(&data_header, &req->count, sizeof(starpu_ssize_t));
|
|
|
+ nm_sr_recv_peek(req->backend->session, &req->backend->data_request, &data_header);
|
|
|
|
|
|
- // Now we know the size of the datatype, allocate the buffer:
|
|
|
+ // Now we know the size, allocate the buffer:
|
|
|
req->ptr = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, req->count, 0);
|
|
|
STARPU_ASSERT_MSG(req->ptr, "cannot allocate message of size %ld", req->count);
|
|
|
|
|
|
- // Last step: give this buffer to NewMadeleine to receive data:
|
|
|
- nm_mpi_nmad_data_get(&(req->backend->unknown_datatype_body), (void*) req->ptr, req->datatype, req->count);
|
|
|
- // warning: this function requires valid pointers for future usage:
|
|
|
- starpu_nm_datatype_unknown_build(&(req->backend->unknown_datatype_data), &(req->count), &(req->backend->unknown_datatype_body));
|
|
|
- nm_sr_recv_unpack_data(req->backend->session, &(req->backend->data_request), &(req->backend->unknown_datatype_data));
|
|
|
+ /* Last step: give this buffer to NewMadeleine to receive data
|
|
|
+ * We need to use an iov to easily take into account the offset used
|
|
|
+ * during the peek. */
|
|
|
+ req->backend->unknown_datatype_v[0].iov_base = &req->count;
|
|
|
+ req->backend->unknown_datatype_v[0].iov_len = sizeof(starpu_ssize_t);
|
|
|
+ req->backend->unknown_datatype_v[1].iov_base = req->ptr;
|
|
|
+ req->backend->unknown_datatype_v[1].iov_len = req->count;
|
|
|
+ nm_data_iov_build(&req->backend->unknown_datatype_data, req->backend->unknown_datatype_v, 2);
|
|
|
+ nm_sr_recv_offset(req->backend->session, &req->backend->data_request, sizeof(starpu_ssize_t));
|
|
|
+ nm_sr_recv_unpack_data(req->backend->session, &req->backend->data_request, &req->backend->unknown_datatype_data);
|
|
|
}
|
|
|
else if (event & NM_SR_EVENT_FINALIZED)
|
|
|
{
|
|
@@ -161,11 +135,11 @@ void _starpu_mpi_irecv_unknown_datatype(struct _starpu_mpi_req *req)
|
|
|
/* we post a recv without giving a buffer because we don't know the required size of this buffer,
|
|
|
* the buffer will be allocated and provided to nmad when the header of data will be received,
|
|
|
* in _starpu_mpi_unknown_datatype_recv_callback() */
|
|
|
- nm_sr_recv_init(req->backend->session, &(req->backend->data_request));
|
|
|
- nm_sr_request_set_ref(&(req->backend->data_request), req);
|
|
|
- nm_sr_request_monitor(req->backend->session, &(req->backend->data_request), NM_SR_EVENT_FINALIZED | NM_SR_EVENT_RECV_DATA,
|
|
|
+ nm_sr_recv_init(req->backend->session, &req->backend->data_request);
|
|
|
+ nm_sr_request_set_ref(&req->backend->data_request, req);
|
|
|
+ nm_sr_request_monitor(req->backend->session, &req->backend->data_request, NM_SR_EVENT_FINALIZED | NM_SR_EVENT_RECV_DATA,
|
|
|
&_starpu_mpi_unknown_datatype_recv_callback);
|
|
|
- nm_sr_recv_irecv(req->backend->session, &(req->backend->data_request), req->backend->gate, req->node_tag.data_tag, NM_TAG_MASK_FULL);
|
|
|
+ nm_sr_recv_irecv(req->backend->session, &req->backend->data_request, req->backend->gate, req->node_tag.data_tag, NM_TAG_MASK_FULL);
|
|
|
|
|
|
_STARPU_MPI_TRACE_IRECV_SUBMIT_END(req->node_tag.node.rank, req->node_tag.data_tag);
|
|
|
|