|
@@ -206,7 +206,7 @@ void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
else
|
|
|
{
|
|
|
STARPU_ASSERT(req->count);
|
|
|
- _STARPU_MPI_MALLOC(req->ptr, req->count);
|
|
|
+ req->ptr = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, req->count, 0);
|
|
|
}
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Pushing internal starpu_mpi_irecv request %p type %s tag %"PRIi64" src %d data %p ptr %p datatype '%s' count %d registered_datatype %d \n",
|
|
@@ -228,12 +228,12 @@ void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
/* test whether some data with the given tag and source have already been received by StarPU-MPI*/
|
|
|
struct _starpu_mpi_early_data_handle *early_data_handle = _starpu_mpi_early_data_find(&req->node_tag);
|
|
|
|
|
|
- /* Case: a receive request for a data with the given tag and source has already been
|
|
|
- * posted by StarPU. Asynchronously requests a Read permission over the temporary handle ,
|
|
|
- * so as when the internal receive is completed, the _starpu_mpi_early_data_cb function
|
|
|
- * will be called to bring the data back to the original data handle associated to the request.*/
|
|
|
if (early_data_handle)
|
|
|
{
|
|
|
+ /* Case: a receive request for a data with the given tag and source has already been
|
|
|
+ * posted to MPI by StarPU. Asynchronously requests a Read permission over the temporary handle ,
|
|
|
+ * so as when the internal receive is completed, the _starpu_mpi_early_data_cb function
|
|
|
+ * will be called to bring the data back to the original data handle associated to the request.*/
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req_mutex));
|
|
|
while (!(early_data_handle->req_ready))
|
|
@@ -260,13 +260,13 @@ void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
starpu_data_acquire_on_node_cb(early_data_handle->handle,STARPU_MAIN_RAM,STARPU_R,_starpu_mpi_early_data_cb,(void*) cb_args);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
|
}
|
|
|
- /* Case: no matching data has been received. Store the receive request as an early_request. */
|
|
|
else
|
|
|
{
|
|
|
struct _starpu_mpi_req *sync_req = _starpu_mpi_sync_data_find(req->node_tag.data_tag, req->node_tag.node.rank, req->node_tag.node.comm);
|
|
|
_STARPU_MPI_DEBUG(3, "----------> Looking for sync data for tag %"PRIi64" and src %d = %p\n", req->node_tag.data_tag, req->node_tag.node.rank, sync_req);
|
|
|
if (sync_req)
|
|
|
{
|
|
|
+ /* Case: we already received the send envelope, we can proceed with the receive */
|
|
|
req->sync = 1;
|
|
|
_starpu_mpi_datatype_allocate(req->data_handle, req);
|
|
|
if (req->registered_datatype == 1)
|
|
@@ -278,14 +278,16 @@ void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
{
|
|
|
req->count = sync_req->count;
|
|
|
STARPU_ASSERT(req->count);
|
|
|
- _STARPU_MPI_MALLOC(req->ptr, req->count);
|
|
|
+ req->ptr = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, req->count, 0);
|
|
|
}
|
|
|
_starpu_mpi_req_list_push_front(&ready_recv_requests, req);
|
|
|
_STARPU_MPI_INC_READY_REQUESTS(+1);
|
|
|
+ /* Throw away the dumb request that was only used to know that we got the envelope */
|
|
|
_starpu_mpi_request_destroy(sync_req);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
+ /* Case: no matching data has been received. Store the receive request as an early_request. */
|
|
|
_STARPU_MPI_DEBUG(3, "Adding the pending receive request %p (srcdst %d tag %"PRIi64") into the request hashmap\n", req, req->node_tag.node.rank, req->node_tag.data_tag);
|
|
|
_starpu_mpi_early_request_enqueue(req);
|
|
|
}
|
|
@@ -687,6 +689,8 @@ int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
|
|
|
STARPU_MPI_ASSERT_MSG(!req->detached, "MPI_Test cannot be called on a detached request");
|
|
|
|
|
|
+ STARPU_VALGRIND_YIELD();
|
|
|
+
|
|
|
#ifdef STARPU_SIMGRID
|
|
|
ret = req->ret = _starpu_mpi_simgrid_mpi_test(&req->done, flag);
|
|
|
if (*flag)
|
|
@@ -911,6 +915,8 @@ static void _starpu_mpi_handle_request_termination(struct _starpu_mpi_req *req)
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
|
|
|
|
+/* This is called when the data is now received in the early data handle, we can
|
|
|
+ * now copy it over to the real handle. */
|
|
|
static void _starpu_mpi_early_data_cb(void* arg)
|
|
|
{
|
|
|
struct _starpu_mpi_early_data_cb_args *args = arg;
|
|
@@ -1205,14 +1211,14 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
_starpu_mpi_thread_cpuid = starpu_get_next_bindid(STARPU_THREAD_ACTIVE, NULL, 0);
|
|
|
}
|
|
|
|
|
|
- if (starpu_bind_thread_on(_starpu_mpi_thread_cpuid, STARPU_THREAD_ACTIVE, "MPI") < 0)
|
|
|
+ if (!_starpu_mpi_nobind && starpu_bind_thread_on(_starpu_mpi_thread_cpuid, STARPU_THREAD_ACTIVE, "MPI") < 0)
|
|
|
{
|
|
|
char hostname[65];
|
|
|
gethostname(hostname, sizeof(hostname));
|
|
|
_STARPU_DISP("[%s] No core was available for the MPI thread. You should use STARPU_RESERVE_NCPU to leave one core available for MPI, or specify one core less in STARPU_NCPU\n", hostname);
|
|
|
}
|
|
|
_starpu_mpi_do_initialize(argc_argv);
|
|
|
- if (_starpu_mpi_thread_cpuid >= 0)
|
|
|
+ if (!_starpu_mpi_nobind && _starpu_mpi_thread_cpuid >= 0)
|
|
|
/* In case MPI changed the binding */
|
|
|
starpu_bind_thread_on(_starpu_mpi_thread_cpuid, STARPU_THREAD_ACTIVE, "MPI");
|
|
|
#else
|
|
@@ -1456,7 +1462,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
else
|
|
|
{
|
|
|
early_request->count = envelope->size;
|
|
|
- _STARPU_MPI_MALLOC(early_request->ptr, early_request->count);
|
|
|
+ early_request->ptr = (void *)starpu_malloc_on_node_flags(STARPU_MAIN_RAM, early_request->count, 0);
|
|
|
starpu_memory_allocate(STARPU_MAIN_RAM, early_request->count, STARPU_MEMORY_OVERFLOW);
|
|
|
|
|
|
STARPU_MPI_ASSERT_MSG(early_request->ptr, "cannot allocate message of size %ld\n", early_request->count);
|