|
@@ -132,6 +132,7 @@ struct _starpu_mpi_early_data_cb_args
|
|
|
struct _starpu_mpi_req *req;
|
|
struct _starpu_mpi_req *req;
|
|
|
void *buffer;
|
|
void *buffer;
|
|
|
size_t size;
|
|
size_t size;
|
|
|
|
|
+ unsigned buffer_node;
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
void _starpu_mpi_submit_ready_request_inc(struct _starpu_mpi_req *req)
|
|
void _starpu_mpi_submit_ready_request_inc(struct _starpu_mpi_req *req)
|
|
@@ -173,7 +174,7 @@ void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
if (req->reserved_size)
|
|
if (req->reserved_size)
|
|
|
{
|
|
{
|
|
|
/* The core will have really allocated the reception buffer now, release our reservation */
|
|
/* The core will have really allocated the reception buffer now, release our reservation */
|
|
|
- starpu_memory_deallocate(STARPU_MAIN_RAM, req->reserved_size);
|
|
|
|
|
|
|
+ starpu_memory_deallocate(req->node, req->reserved_size);
|
|
|
req->reserved_size = 0;
|
|
req->reserved_size = 0;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -197,12 +198,12 @@ void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
if (req->registered_datatype == 1)
|
|
if (req->registered_datatype == 1)
|
|
|
{
|
|
{
|
|
|
req->count = 1;
|
|
req->count = 1;
|
|
|
- req->ptr = starpu_data_handle_to_pointer(req->data_handle, STARPU_MAIN_RAM);
|
|
|
|
|
|
|
+ req->ptr = starpu_data_handle_to_pointer(req->data_handle, req->node);
|
|
|
}
|
|
}
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
STARPU_ASSERT(req->count);
|
|
STARPU_ASSERT(req->count);
|
|
|
- req->ptr = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, req->count, 0);
|
|
|
|
|
|
|
+ req->ptr = (void *)starpu_malloc_on_node_flags(req->node, req->count, 0);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %"PRIi64" src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %"PRIi64" src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
@@ -249,11 +250,13 @@ void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
cb_args->early_handle = early_data_handle->handle;
|
|
cb_args->early_handle = early_data_handle->handle;
|
|
|
cb_args->buffer = early_data_handle->buffer;
|
|
cb_args->buffer = early_data_handle->buffer;
|
|
|
cb_args->size = early_data_handle->size;
|
|
cb_args->size = early_data_handle->size;
|
|
|
|
|
+ cb_args->buffer_node = early_data_handle->buffer_node;
|
|
|
cb_args->req = req;
|
|
cb_args->req = req;
|
|
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
|
|
_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
|
- starpu_data_acquire_on_node_cb(early_data_handle->handle,STARPU_MAIN_RAM,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args);
|
|
|
|
|
|
|
+ // FIXME: when buffer == NULL, do not hardcode acquiring on early_data_handle->buffer_node, to just acquire where the data happens to have been stored by MPI
|
|
|
|
|
+ starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(early_data_handle->handle,early_data_handle->buffer_node,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args, 1, 0, NULL, NULL);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
|
}
|
|
}
|
|
|
else
|
|
else
|
|
@@ -268,13 +271,13 @@ void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
if (req->registered_datatype == 1)
|
|
if (req->registered_datatype == 1)
|
|
|
{
|
|
{
|
|
|
req->count = 1;
|
|
req->count = 1;
|
|
|
- req->ptr = starpu_data_handle_to_pointer(req->data_handle, STARPU_MAIN_RAM);
|
|
|
|
|
|
|
+ req->ptr = starpu_data_handle_to_pointer(req->data_handle, req->node);
|
|
|
}
|
|
}
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
req->count = sync_req->count;
|
|
req->count = sync_req->count;
|
|
|
STARPU_ASSERT(req->count);
|
|
STARPU_ASSERT(req->count);
|
|
|
- req->ptr = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, req->count, 0);
|
|
|
|
|
|
|
+ req->ptr = (void *)starpu_malloc_on_node_flags(req->node, req->count, 0);
|
|
|
}
|
|
}
|
|
|
_starpu_mpi_req_list_push_front(&ready_recv_requests, req);
|
|
_starpu_mpi_req_list_push_front(&ready_recv_requests, req);
|
|
|
_STARPU_MPI_INC_READY_REQUESTS(+1);
|
|
_STARPU_MPI_INC_READY_REQUESTS(+1);
|
|
@@ -431,7 +434,7 @@ void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
|
|
|
{
|
|
{
|
|
|
int size, ret;
|
|
int size, ret;
|
|
|
req->count = 1;
|
|
req->count = 1;
|
|
|
- req->ptr = starpu_data_handle_to_pointer(req->data_handle, STARPU_MAIN_RAM);
|
|
|
|
|
|
|
+ req->ptr = starpu_data_handle_to_pointer(req->data_handle, req->node);
|
|
|
|
|
|
|
|
MPI_Type_size(req->datatype, &size);
|
|
MPI_Type_size(req->datatype, &size);
|
|
|
req->backend->envelope->size = (starpu_ssize_t)req->count * size;
|
|
req->backend->envelope->size = (starpu_ssize_t)req->count * size;
|
|
@@ -445,7 +448,7 @@ void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
|
|
|
int ret;
|
|
int ret;
|
|
|
|
|
|
|
|
// Do not pack the data, just try to find out the size
|
|
// Do not pack the data, just try to find out the size
|
|
|
- starpu_data_pack(req->data_handle, NULL, &(req->backend->envelope->size));
|
|
|
|
|
|
|
+ starpu_data_pack_node(req->data_handle, req->node, NULL, &(req->backend->envelope->size));
|
|
|
|
|
|
|
|
if (req->backend->envelope->size != -1)
|
|
if (req->backend->envelope->size != -1)
|
|
|
{
|
|
{
|
|
@@ -458,7 +461,7 @@ void _starpu_mpi_isend_size_func(struct _starpu_mpi_req *req)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Pack the data
|
|
// Pack the data
|
|
|
- starpu_data_pack(req->data_handle, &req->ptr, &req->count);
|
|
|
|
|
|
|
+ starpu_data_pack_node(req->data_handle, req->node, &req->ptr, &req->count);
|
|
|
if (req->backend->envelope->size == -1)
|
|
if (req->backend->envelope->size == -1)
|
|
|
{
|
|
{
|
|
|
// We know the size now, let's send it
|
|
// We know the size now, let's send it
|
|
@@ -879,14 +882,14 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
|
|
|
int ret;
|
|
int ret;
|
|
|
ret = MPI_Wait(&req->backend->size_req, MPI_STATUS_IGNORE);
|
|
ret = MPI_Wait(&req->backend->size_req, MPI_STATUS_IGNORE);
|
|
|
STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(ret));
|
|
STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(ret));
|
|
|
- starpu_free_on_node_flags(STARPU_MAIN_RAM, (uintptr_t)req->ptr, req->count, 0);
|
|
|
|
|
|
|
+ starpu_free_on_node_flags(req->node, (uintptr_t)req->ptr, req->count, 0);
|
|
|
req->ptr = NULL;
|
|
req->ptr = NULL;
|
|
|
}
|
|
}
|
|
|
else if (req->request_type == RECV_REQ)
|
|
else if (req->request_type == RECV_REQ)
|
|
|
{
|
|
{
|
|
|
// req->ptr is freed by starpu_data_unpack
|
|
// req->ptr is freed by starpu_data_unpack
|
|
|
- starpu_data_unpack(req->data_handle, req->ptr, req->count);
|
|
|
|
|
- starpu_memory_deallocate(STARPU_MAIN_RAM, req->count);
|
|
|
|
|
|
|
+ starpu_data_unpack_node(req->data_handle, req->node, req->ptr, req->count);
|
|
|
|
|
+ starpu_memory_deallocate(req->node, req->count);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
else
|
|
else
|
|
@@ -930,44 +933,47 @@ static void _starpu_mpi_early_data_cb(void* arg)
|
|
|
/* Data has been received as a raw memory, it has to be unpacked */
|
|
/* Data has been received as a raw memory, it has to be unpacked */
|
|
|
struct starpu_data_interface_ops *itf_src = starpu_data_get_interface_ops(args->early_handle);
|
|
struct starpu_data_interface_ops *itf_src = starpu_data_get_interface_ops(args->early_handle);
|
|
|
struct starpu_data_interface_ops *itf_dst = starpu_data_get_interface_ops(args->data_handle);
|
|
struct starpu_data_interface_ops *itf_dst = starpu_data_get_interface_ops(args->data_handle);
|
|
|
- MPI_Datatype datatype = _starpu_mpi_datatype_get_user_defined_datatype(args->data_handle);
|
|
|
|
|
|
|
+ MPI_Datatype datatype = _starpu_mpi_datatype_get_user_defined_datatype(args->data_handle, args->req->node);
|
|
|
|
|
|
|
|
if (datatype)
|
|
if (datatype)
|
|
|
{
|
|
{
|
|
|
int position=0;
|
|
int position=0;
|
|
|
- void *ptr = starpu_data_get_local_ptr(args->data_handle);
|
|
|
|
|
|
|
+ void *ptr = starpu_data_handle_to_pointer(args->data_handle, args->req->node);
|
|
|
MPI_Unpack(args->buffer, itf_src->get_size(args->early_handle), &position, ptr, 1, datatype, args->req->node_tag.node.comm);
|
|
MPI_Unpack(args->buffer, itf_src->get_size(args->early_handle), &position, ptr, 1, datatype, args->req->node_tag.node.comm);
|
|
|
- starpu_free_on_node_flags(STARPU_MAIN_RAM, (uintptr_t) args->buffer, args->size, 0);
|
|
|
|
|
|
|
+ starpu_free_on_node_flags(args->buffer_node, (uintptr_t) args->buffer, args->size, 0);
|
|
|
args->buffer = NULL;
|
|
args->buffer = NULL;
|
|
|
_starpu_mpi_datatype_free(args->data_handle, &datatype);
|
|
_starpu_mpi_datatype_free(args->data_handle, &datatype);
|
|
|
}
|
|
}
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
STARPU_MPI_ASSERT_MSG(itf_dst->unpack_data, "The data interface does not define an unpack function\n");
|
|
STARPU_MPI_ASSERT_MSG(itf_dst->unpack_data, "The data interface does not define an unpack function\n");
|
|
|
- itf_dst->unpack_data(args->data_handle, STARPU_MAIN_RAM, args->buffer, itf_src->get_size(args->early_handle));
|
|
|
|
|
|
|
+ // FIXME: args->buffer is in args->buffer_node, not req->node
|
|
|
|
|
+ // There is conflation between the memory node for the handle and the memory node for the buffer
|
|
|
|
|
+ // Actually we may not want unpack_data to free the buffer, for the case when we are participating to a collective send
|
|
|
|
|
+ itf_dst->unpack_data(args->data_handle, args->req->node, args->buffer, itf_src->get_size(args->early_handle));
|
|
|
args->buffer = NULL;
|
|
args->buffer = NULL;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->early_handle);
|
|
struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->early_handle);
|
|
|
- void* itf_src = starpu_data_get_interface_on_node(args->early_handle, STARPU_MAIN_RAM);
|
|
|
|
|
- void* itf_dst = starpu_data_get_interface_on_node(args->data_handle, STARPU_MAIN_RAM);
|
|
|
|
|
|
|
+ void* itf_src = starpu_data_get_interface_on_node(args->early_handle, args->buffer_node);
|
|
|
|
|
+ void* itf_dst = starpu_data_get_interface_on_node(args->data_handle, args->req->node);
|
|
|
|
|
|
|
|
if (!itf->copy_methods->ram_to_ram)
|
|
if (!itf->copy_methods->ram_to_ram)
|
|
|
{
|
|
{
|
|
|
_STARPU_MPI_DEBUG(3, "Initiating any_to_any copy..\n");
|
|
_STARPU_MPI_DEBUG(3, "Initiating any_to_any copy..\n");
|
|
|
- itf->copy_methods->any_to_any(itf_src, STARPU_MAIN_RAM, itf_dst, STARPU_MAIN_RAM, NULL);
|
|
|
|
|
|
|
+ itf->copy_methods->any_to_any(itf_src, args->buffer_node, itf_dst, args->req->node, NULL);
|
|
|
}
|
|
}
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
_STARPU_MPI_DEBUG(3, "Initiating ram_to_ram copy..\n");
|
|
_STARPU_MPI_DEBUG(3, "Initiating ram_to_ram copy..\n");
|
|
|
- itf->copy_methods->ram_to_ram(itf_src, STARPU_MAIN_RAM, itf_dst, STARPU_MAIN_RAM);
|
|
|
|
|
|
|
+ itf->copy_methods->ram_to_ram(itf_src, args->buffer_node, itf_dst, args->req->node);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Done, handling release of early_handle..\n");
|
|
_STARPU_MPI_DEBUG(3, "Done, handling release of early_handle..\n");
|
|
|
- starpu_data_release_on_node(args->early_handle, STARPU_MAIN_RAM);
|
|
|
|
|
|
|
+ starpu_data_release_on_node(args->early_handle, args->buffer_node);
|
|
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Done, handling unregister of early_handle..\n");
|
|
_STARPU_MPI_DEBUG(3, "Done, handling unregister of early_handle..\n");
|
|
|
/* XXX: note that we have already freed the registered buffer above. In
|
|
/* XXX: note that we have already freed the registered buffer above. In
|
|
@@ -1147,10 +1153,13 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
|
|
|
data_handle = _starpu_mpi_tag_get_data_handle_from_tag(envelope->data_tag);
|
|
data_handle = _starpu_mpi_tag_get_data_handle_from_tag(envelope->data_tag);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
|
|
|
|
|
|
|
|
+ // TODO: rather select some memory node next to the NIC
|
|
|
|
|
+ unsigned buffer_node = STARPU_MAIN_RAM;
|
|
|
if (data_handle && starpu_data_get_interface_id(data_handle) < STARPU_MAX_INTERFACE_ID)
|
|
if (data_handle && starpu_data_get_interface_id(data_handle) < STARPU_MAX_INTERFACE_ID)
|
|
|
{
|
|
{
|
|
|
/* We know which data will receive it and we won't have to unpack, use just the same kind of data. */
|
|
/* We know which data will receive it and we won't have to unpack, use just the same kind of data. */
|
|
|
early_data_handle->buffer = NULL;
|
|
early_data_handle->buffer = NULL;
|
|
|
|
|
+ early_data_handle->buffer_node = buffer_node;
|
|
|
starpu_data_register_same(&early_data_handle->handle, data_handle);
|
|
starpu_data_register_same(&early_data_handle->handle, data_handle);
|
|
|
//_starpu_mpi_early_data_add(early_data_handle);
|
|
//_starpu_mpi_early_data_add(early_data_handle);
|
|
|
}
|
|
}
|
|
@@ -1161,9 +1170,10 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
|
|
|
* to the application when it post a receive for this tag
|
|
* to the application when it post a receive for this tag
|
|
|
*/
|
|
*/
|
|
|
_STARPU_MPI_DEBUG(3, "Posting a receive for a data of size %d which has not yet been registered\n", (int)envelope->size);
|
|
_STARPU_MPI_DEBUG(3, "Posting a receive for a data of size %d which has not yet been registered\n", (int)envelope->size);
|
|
|
- early_data_handle->buffer = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, envelope->size, 0);
|
|
|
|
|
|
|
+ early_data_handle->buffer = (void *)starpu_malloc_on_node_flags(buffer_node, envelope->size, 0);
|
|
|
early_data_handle->size = envelope->size;
|
|
early_data_handle->size = envelope->size;
|
|
|
- starpu_variable_data_register(&early_data_handle->handle, STARPU_MAIN_RAM, (uintptr_t) early_data_handle->buffer, envelope->size);
|
|
|
|
|
|
|
+ early_data_handle->buffer_node = buffer_node;
|
|
|
|
|
+ starpu_variable_data_register(&early_data_handle->handle, buffer_node, (uintptr_t) early_data_handle->buffer, envelope->size);
|
|
|
//_starpu_mpi_early_data_add(early_data_handle);
|
|
//_starpu_mpi_early_data_add(early_data_handle);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -1456,13 +1466,13 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
if (early_request->registered_datatype == 1)
|
|
if (early_request->registered_datatype == 1)
|
|
|
{
|
|
{
|
|
|
early_request->count = 1;
|
|
early_request->count = 1;
|
|
|
- early_request->ptr = starpu_data_handle_to_pointer(early_request->data_handle, STARPU_MAIN_RAM);
|
|
|
|
|
|
|
+ early_request->ptr = starpu_data_handle_to_pointer(early_request->data_handle, early_request->node);
|
|
|
}
|
|
}
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
early_request->count = envelope->size;
|
|
early_request->count = envelope->size;
|
|
|
- early_request->ptr = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, early_request->count, 0);
|
|
|
|
|
- starpu_memory_allocate(STARPU_MAIN_RAM, early_request->count, STARPU_MEMORY_OVERFLOW);
|
|
|
|
|
|
|
+ early_request->ptr = (void *)starpu_malloc_on_node_flags(early_request->node, early_request->count, 0);
|
|
|
|
|
+ starpu_memory_allocate(early_request->node, early_request->count, STARPU_MEMORY_OVERFLOW);
|
|
|
|
|
|
|
|
STARPU_MPI_ASSERT_MSG(early_request->ptr, "cannot allocate message of size %ld\n", early_request->count);
|
|
STARPU_MPI_ASSERT_MSG(early_request->ptr, "cannot allocate message of size %ld\n", early_request->count);
|
|
|
}
|
|
}
|