|
@@ -135,6 +135,7 @@ struct _starpu_mpi_early_data_cb_args
|
|
|
starpu_data_handle_t early_handle;
|
|
|
struct _starpu_mpi_req *req;
|
|
|
void *buffer;
|
|
|
+ size_t size;
|
|
|
};
|
|
|
|
|
|
void _starpu_mpi_submit_ready_request_inc(struct _starpu_mpi_req *req)
|
|
@@ -251,6 +252,7 @@ void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
cb_args->data_handle = req->data_handle;
|
|
|
cb_args->early_handle = early_data_handle->handle;
|
|
|
cb_args->buffer = early_data_handle->buffer;
|
|
|
+ cb_args->size = early_data_handle->size;
|
|
|
cb_args->req = req;
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Calling data_acquire_cb on starpu_mpi_copy_cb..\n");
|
|
@@ -925,6 +927,8 @@ static void _starpu_mpi_early_data_cb(void* arg)
|
|
|
int position=0;
|
|
|
void *ptr = starpu_data_get_local_ptr(args->data_handle);
|
|
|
MPI_Unpack(args->buffer, itf_src->get_size(args->early_handle), &position, ptr, 1, datatype, args->req->node_tag.node.comm);
|
|
|
+ starpu_free_on_node_flags(STARPU_MAIN_RAM, (uintptr_t) args->buffer, args->size, 0);
|
|
|
+ args->buffer = NULL;
|
|
|
_starpu_mpi_datatype_free(args->data_handle, &datatype);
|
|
|
}
|
|
|
else
|
|
@@ -956,6 +960,9 @@ static void _starpu_mpi_early_data_cb(void* arg)
|
|
|
starpu_data_release(args->early_handle);
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Done, handling unregister of early_handle..\n");
|
|
|
+ /* XXX: note that we have already freed the registered buffer above. In
|
|
|
+ * principle that's unsafe. As of now it is fine because StarPU has no
|
|
|
+ reason to access it. */
|
|
|
starpu_data_unregister_submit(args->early_handle);
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Done, handling request %p termination of the already received request\n",args->req);
|
|
@@ -1145,9 +1152,10 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
|
|
|
* we are going to receive the data as a raw memory, and give it
|
|
|
* to the application when it post a receive for this tag
|
|
|
*/
|
|
|
- _STARPU_MPI_DEBUG(3, "Posting a receive for a data of size %d which has not yet been registered\n", (int)early_data_handle->env->size);
|
|
|
- early_data_handle->buffer = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, early_data_handle->env->size, 0);
|
|
|
- starpu_variable_data_register(&early_data_handle->handle, STARPU_MAIN_RAM, (uintptr_t) early_data_handle->buffer, early_data_handle->env->size);
|
|
|
+ _STARPU_MPI_DEBUG(3, "Posting a receive for a data of size %d which has not yet been registered\n", (int)envelope->size);
|
|
|
+ early_data_handle->buffer = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, envelope->size, 0);
|
|
|
+ early_data_handle->size = envelope->size;
|
|
|
+ starpu_variable_data_register(&early_data_handle->handle, STARPU_MAIN_RAM, (uintptr_t) early_data_handle->buffer, envelope->size);
|
|
|
//_starpu_mpi_early_data_add(early_data_handle);
|
|
|
}
|
|
|
|