|
@@ -735,11 +735,11 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
|
|
|
|
|
|
if (req->internal_req)
|
|
|
{
|
|
|
- struct _starpu_mpi_copy_handle *chandle = find_chandle(req->mpi_tag, req->srcdst);
|
|
|
- STARPU_ASSERT_MSG(chandle, "Could not find a copy data handle with the tag %d and the node %d\n", req->mpi_tag, req->srcdst);
|
|
|
- _STARPU_MPI_DEBUG(3, "Handling deleting of copy_handle structure from the hashmap..\n");
|
|
|
- delete_chandle(chandle);
|
|
|
- free(chandle);
|
|
|
+ struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(req->mpi_tag, req->srcdst);
|
|
|
+ STARPU_ASSERT_MSG(early_data_handle, "Could not find a copy data handle with the tag %d and the node %d\n", req->mpi_tag, req->srcdst);
|
|
|
+ _STARPU_MPI_DEBUG(3, "Handling deleting of early_data structure from the hashmap..\n");
|
|
|
+ _starpu_mpi_early_data_delete(early_data_handle);
|
|
|
+ free(early_data_handle);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
@@ -791,17 +791,17 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
|
|
|
|
-struct _starpu_mpi_copy_cb_args
|
|
|
+struct _starpu_mpi_early_data_cb_args
|
|
|
{
|
|
|
starpu_data_handle_t data_handle;
|
|
|
- starpu_data_handle_t copy_handle;
|
|
|
+ starpu_data_handle_t early_handle;
|
|
|
struct _starpu_mpi_req *req;
|
|
|
void *buffer;
|
|
|
};
|
|
|
|
|
|
-static void _starpu_mpi_copy_cb(void* arg)
|
|
|
+static void _starpu_mpi_early_data_cb(void* arg)
|
|
|
{
|
|
|
- struct _starpu_mpi_copy_cb_args *args = arg;
|
|
|
+ struct _starpu_mpi_early_data_cb_args *args = arg;
|
|
|
|
|
|
// We store in the application request the internal MPI
|
|
|
// request so that it can be used by starpu_mpi_wait
|
|
@@ -811,16 +811,16 @@ static void _starpu_mpi_copy_cb(void* arg)
|
|
|
if (args->buffer)
|
|
|
{
|
|
|
/* Data has been received as a raw memory, it has to be unpacked */
|
|
|
- struct starpu_data_interface_ops *itf_src = starpu_data_get_interface_ops(args->copy_handle);
|
|
|
+ struct starpu_data_interface_ops *itf_src = starpu_data_get_interface_ops(args->early_handle);
|
|
|
struct starpu_data_interface_ops *itf_dst = starpu_data_get_interface_ops(args->data_handle);
|
|
|
STARPU_ASSERT_MSG(itf_dst->unpack_data, "The data interface does not define an unpack function\n");
|
|
|
- itf_dst->unpack_data(args->data_handle, STARPU_MAIN_RAM, args->buffer, itf_src->get_size(args->copy_handle));
|
|
|
+ itf_dst->unpack_data(args->data_handle, STARPU_MAIN_RAM, args->buffer, itf_src->get_size(args->early_handle));
|
|
|
free(args->buffer);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->copy_handle);
|
|
|
- void* itf_src = starpu_data_get_interface_on_node(args->copy_handle, STARPU_MAIN_RAM);
|
|
|
+ struct starpu_data_interface_ops *itf = starpu_data_get_interface_ops(args->early_handle);
|
|
|
+ void* itf_src = starpu_data_get_interface_on_node(args->early_handle, STARPU_MAIN_RAM);
|
|
|
void* itf_dst = starpu_data_get_interface_on_node(args->data_handle, STARPU_MAIN_RAM);
|
|
|
|
|
|
if (!itf->copy_methods->ram_to_ram)
|
|
@@ -835,11 +835,11 @@ static void _starpu_mpi_copy_cb(void* arg)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "Done, handling release of copy_handle..\n");
|
|
|
- starpu_data_release(args->copy_handle);
|
|
|
+ _STARPU_MPI_DEBUG(3, "Done, handling release of early_handle..\n");
|
|
|
+ starpu_data_release(args->early_handle);
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "Done, handling unregister of copy_handle..\n");
|
|
|
- starpu_data_unregister_submit(args->copy_handle);
|
|
|
+ _STARPU_MPI_DEBUG(3, "Done, handling unregister of early_handle..\n");
|
|
|
+ starpu_data_unregister_submit(args->early_handle);
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Done, handling request %p termination of the already received request\n",args->req);
|
|
|
// If the request is detached, we need to call _starpu_mpi_handle_request_termination
|
|
@@ -899,34 +899,34 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
else
|
|
|
{
|
|
|
/* test whether the receive request has already been submitted internally by StarPU-MPI*/
|
|
|
- struct _starpu_mpi_copy_handle *chandle = find_chandle(req->mpi_tag, req->srcdst);
|
|
|
+ struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(req->mpi_tag, req->srcdst);
|
|
|
|
|
|
/* Case : the request has already been submitted internally by StarPU.
|
|
|
* We'll asynchronously ask a Read permission over the temporary handle, so as when
|
|
|
- * the internal receive will be over, the _starpu_mpi_copy_cb function will be called to
|
|
|
+ * the internal receive will be over, the _starpu_mpi_early_data_cb function will be called to
|
|
|
* bring the data back to the original data handle associated to the request.*/
|
|
|
- if (chandle)
|
|
|
+ if (early_data_handle)
|
|
|
{
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&(chandle->req_mutex));
|
|
|
- while (!(chandle->req_ready))
|
|
|
- STARPU_PTHREAD_COND_WAIT(&(chandle->req_cond), &(chandle->req_mutex));
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&(chandle->req_mutex));
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req_mutex));
|
|
|
+ while (!(early_data_handle->req_ready))
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req_cond), &(early_data_handle->req_mutex));
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req_mutex));
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->mpi_tag);
|
|
|
- STARPU_ASSERT(req->data_handle != chandle->handle);
|
|
|
+ STARPU_ASSERT(req->data_handle != early_data_handle->handle);
|
|
|
|
|
|
- req->internal_req = chandle->req;
|
|
|
+ req->internal_req = early_data_handle->req;
|
|
|
|
|
|
- struct _starpu_mpi_copy_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_copy_cb_args));
|
|
|
+ struct _starpu_mpi_early_data_cb_args *cb_args = malloc(sizeof(struct _starpu_mpi_early_data_cb_args));
|
|
|
cb_args->data_handle = req->data_handle;
|
|
|
- cb_args->copy_handle = chandle->handle;
|
|
|
- cb_args->buffer = chandle->buffer;
|
|
|
+ cb_args->early_handle = early_data_handle->handle;
|
|
|
+ cb_args->buffer = early_data_handle->buffer;
|
|
|
cb_args->req = req;
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
|
|
|
- starpu_data_acquire_cb(chandle->handle,STARPU_R,_starpu_mpi_copy_cb,(void*) cb_args);
|
|
|
+ starpu_data_acquire_cb(early_data_handle->handle,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args);
|
|
|
}
|
|
|
/* Case : a classic receive request with no send received earlier than expected.
|
|
|
* We just add the pending receive request to the requests' hashmap. */
|
|
@@ -1195,7 +1195,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
/* If there is no currently submitted header_req submitted to catch envelopes from senders, and there is some pending receive
|
|
|
* requests in our side, we resubmit a header request. */
|
|
|
MPI_Request header_req;
|
|
|
- if ((_starpu_mpi_app_req_hashmap_count > 0) && (header_req_submitted == 0))// && (HASH_COUNT(_starpu_mpi_copy_handle_hashmap) == 0))
|
|
|
+ if ((_starpu_mpi_app_req_hashmap_count > 0) && (header_req_submitted == 0))// && (HASH_COUNT(_starpu_mpi_early_data_handle_hashmap) == 0))
|
|
|
{
|
|
|
_STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop\n");
|
|
|
MPI_Irecv(recv_env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _starpu_mpi_tag, MPI_COMM_WORLD, &header_req);
|
|
@@ -1228,7 +1228,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
* on this handle, and register this so as the StarPU-MPI layer can remember it.*/
|
|
|
if (!found_req)
|
|
|
{
|
|
|
- _STARPU_MPI_DEBUG(3, "Request with tag %d and source %d not found, creating a copy_handle to receive incoming data..\n", recv_env->mpi_tag, status.MPI_SOURCE);
|
|
|
+ _STARPU_MPI_DEBUG(3, "Request with tag %d and source %d not found, creating a early_handle to receive incoming data..\n", recv_env->mpi_tag, status.MPI_SOURCE);
|
|
|
|
|
|
starpu_data_handle_t data_handle = NULL;
|
|
|
|
|
@@ -1236,19 +1236,19 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
data_handle = _starpu_data_get_data_handle_from_tag(recv_env->mpi_tag);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
- struct _starpu_mpi_copy_handle* chandle = calloc(1, sizeof(struct _starpu_mpi_copy_handle));
|
|
|
- STARPU_ASSERT(chandle);
|
|
|
- STARPU_PTHREAD_MUTEX_INIT(&chandle->req_mutex, NULL);
|
|
|
- STARPU_PTHREAD_COND_INIT(&chandle->req_cond, NULL);
|
|
|
- chandle->mpi_tag = recv_env->mpi_tag;
|
|
|
- chandle->env = recv_env;
|
|
|
- chandle->source = status.MPI_SOURCE;
|
|
|
+ struct _starpu_mpi_early_data_handle* early_data_handle = calloc(1, sizeof(struct _starpu_mpi_early_data_handle));
|
|
|
+ STARPU_ASSERT(early_data_handle);
|
|
|
+ STARPU_PTHREAD_MUTEX_INIT(&early_data_handle->req_mutex, NULL);
|
|
|
+ STARPU_PTHREAD_COND_INIT(&early_data_handle->req_cond, NULL);
|
|
|
+ early_data_handle->mpi_tag = recv_env->mpi_tag;
|
|
|
+ early_data_handle->env = recv_env;
|
|
|
+ early_data_handle->source = status.MPI_SOURCE;
|
|
|
|
|
|
if (data_handle)
|
|
|
{
|
|
|
- chandle->buffer = NULL;
|
|
|
- starpu_data_register_same(&chandle->handle, data_handle);
|
|
|
- add_chandle(chandle);
|
|
|
+ early_data_handle->buffer = NULL;
|
|
|
+ starpu_data_register_same(&early_data_handle->handle, data_handle);
|
|
|
+ _starpu_mpi_early_data_add(early_data_handle);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
@@ -1256,15 +1256,17 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
* we are going to receive the data as a raw memory, and give it
|
|
|
* to the application when it post a receive for this tag
|
|
|
*/
|
|
|
- _STARPU_MPI_DEBUG(20, "Posting a receive for a data of size %d which has not yet been registered\n", (int)chandle->env->size);
|
|
|
- chandle->buffer = malloc(chandle->env->size);
|
|
|
- starpu_vector_data_register(&chandle->handle, STARPU_MAIN_RAM, (uintptr_t) chandle->buffer, chandle->env->size, 1);
|
|
|
- add_chandle(chandle);
|
|
|
+ _STARPU_MPI_DEBUG(20, "Posting a receive for a data of size %d which has not yet been registered\n", (int)early_data_handle->env->size);
|
|
|
+ early_data_handle->buffer = malloc(early_data_handle->env->size);
|
|
|
+ starpu_vector_data_register(&early_data_handle->handle, STARPU_MAIN_RAM, (uintptr_t) early_data_handle->buffer, early_data_handle->env->size, 1);
|
|
|
+ _starpu_mpi_early_data_add(early_data_handle);
|
|
|
}
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(20, "Posting internal detached irecv on copy_handle with tag %d from src %d ..\n", chandle->mpi_tag, status.MPI_SOURCE);
|
|
|
+ _STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_handle with tag %d from src %d ..\n", early_data_handle->mpi_tag, status.MPI_SOURCE);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
- chandle->req = _starpu_mpi_irecv_common(chandle->handle, status.MPI_SOURCE, chandle->mpi_tag, MPI_COMM_WORLD, 1, NULL, NULL, 1, 1, recv_env->size);
|
|
|
+ early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
|
|
|
+ early_data_handle->mpi_tag, MPI_COMM_WORLD, 1,
|
|
|
+ NULL, NULL, 1, 1, recv_env->size);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
// We wait until the request is pushed in the
|
|
@@ -1273,15 +1275,15 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
// on the request and post the corresponding mpi_irecv,
|
|
|
// otherwise, it may lead to read data as envelop
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&(chandle->req->posted_mutex));
|
|
|
- while (!(chandle->req->posted))
|
|
|
- STARPU_PTHREAD_COND_WAIT(&(chandle->req->posted_cond), &(chandle->req->posted_mutex));
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&(chandle->req->posted_mutex));
|
|
|
-
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&chandle->req_mutex);
|
|
|
- chandle->req_ready = 1;
|
|
|
- STARPU_PTHREAD_COND_BROADCAST(&chandle->req_cond);
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&chandle->req_mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req->posted_mutex));
|
|
|
+ while (!(early_data_handle->req->posted))
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req->posted_cond), &(early_data_handle->req->posted_mutex));
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req->posted_mutex));
|
|
|
+
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&early_data_handle->req_mutex);
|
|
|
+ early_data_handle->req_ready = 1;
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&early_data_handle->req_cond);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_handle->req_mutex);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
}
|
|
|
/* Case : a matching receive has been found for the incoming data, we handle the correct allocation of the pointer associated to
|