|
@@ -1115,9 +1115,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
- struct _starpu_mpi_envelope *recv_env = calloc(1,sizeof(struct _starpu_mpi_envelope));
|
|
|
|
|
|
+ struct _starpu_mpi_envelope *envelope = calloc(1,sizeof(struct _starpu_mpi_envelope));
|
|
|
|
|
|
- int header_req_submitted = 0;
|
|
|
|
|
|
+ int envelope_request_submitted = 0;
|
|
|
|
|
|
while (running || posted_requests || !(_starpu_mpi_req_list_empty(ready_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
|
|
while (running || posted_requests || !(_starpu_mpi_req_list_empty(ready_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))
|
|
{
|
|
{
|
|
@@ -1159,15 +1159,15 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
}
|
|
}
|
|
|
|
|
|
- /* If there is no currently submitted header_req submitted to
|
|
|
|
|
|
+ /* If there is no currently submitted envelope_request submitted to
|
|
* catch envelopes from senders, and there is some pending
|
|
* catch envelopes from senders, and there is some pending
|
|
* receive requests on our side, we resubmit a header request. */
|
|
* receive requests on our side, we resubmit a header request. */
|
|
- MPI_Request header_req;
|
|
|
|
- if ((_starpu_mpi_early_request_count() > 0) && (header_req_submitted == 0))// && (HASH_COUNT(_starpu_mpi_early_data_handle_hashmap) == 0))
|
|
|
|
|
|
+ MPI_Request envelope_request;
|
|
|
|
+ if ((_starpu_mpi_early_request_count() > 0) && (envelope_request_submitted == 0))// && (HASH_COUNT(_starpu_mpi_early_data_handle_hashmap) == 0))
|
|
{
|
|
{
|
|
_STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop\n");
|
|
_STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop\n");
|
|
- MPI_Irecv(recv_env, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _starpu_mpi_tag, MPI_COMM_WORLD, &header_req);
|
|
|
|
- header_req_submitted = 1;
|
|
|
|
|
|
+ MPI_Irecv(envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _starpu_mpi_tag, MPI_COMM_WORLD, &envelope_request);
|
|
|
|
+ envelope_request_submitted = 1;
|
|
}
|
|
}
|
|
|
|
|
|
/* test whether there are some terminated "detached request" */
|
|
/* test whether there are some terminated "detached request" */
|
|
@@ -1175,44 +1175,44 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
_starpu_mpi_test_detached_requests();
|
|
_starpu_mpi_test_detached_requests();
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
- if (header_req_submitted == 1)
|
|
|
|
|
|
+ if (envelope_request_submitted == 1)
|
|
{
|
|
{
|
|
int flag,res;
|
|
int flag,res;
|
|
MPI_Status status;
|
|
MPI_Status status;
|
|
- _STARPU_MPI_DEBUG(4, "Test of header_req\n");
|
|
|
|
|
|
+ _STARPU_MPI_DEBUG(4, "Test of envelope_request\n");
|
|
|
|
|
|
/* test whether an envelope has arrived. */
|
|
/* test whether an envelope has arrived. */
|
|
- res = MPI_Test(&header_req, &flag, &status);
|
|
|
|
|
|
+ res = MPI_Test(&envelope_request, &flag, &status);
|
|
STARPU_ASSERT(res == MPI_SUCCESS);
|
|
STARPU_ASSERT(res == MPI_SUCCESS);
|
|
|
|
|
|
if (flag)
|
|
if (flag)
|
|
{
|
|
{
|
|
- _STARPU_MPI_DEBUG(3, "Searching for application request with tag %d and source %d (size %ld)\n", recv_env->mpi_tag, status.MPI_SOURCE, recv_env->size);
|
|
|
|
|
|
+ _STARPU_MPI_DEBUG(3, "Searching for application request with tag %d and source %d (size %ld)\n", envelope->mpi_tag, status.MPI_SOURCE, envelope->size);
|
|
|
|
|
|
- struct _starpu_mpi_req *found_req = _starpu_mpi_early_request_find(recv_env->mpi_tag, status.MPI_SOURCE);
|
|
|
|
|
|
+ struct _starpu_mpi_req *early_request = _starpu_mpi_early_request_find(envelope->mpi_tag, status.MPI_SOURCE);
|
|
|
|
|
|
/* Case: a data will arrive before a matching receive is
|
|
/* Case: a data will arrive before a matching receive is
|
|
* posted by the application. Create a temporary handle to
|
|
* posted by the application. Create a temporary handle to
|
|
* store the incoming data, submit a starpu_mpi_irecv_detached
|
|
* store the incoming data, submit a starpu_mpi_irecv_detached
|
|
* on this handle, and store it as an early_data
|
|
* on this handle, and store it as an early_data
|
|
*/
|
|
*/
|
|
- if (!found_req)
|
|
|
|
|
|
+ if (early_request == NULL)
|
|
{
|
|
{
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(3, "Request with tag %d and source %d not found, creating a early_handle to receive incoming data..\n", recv_env->mpi_tag, status.MPI_SOURCE);
|
|
|
|
|
|
+ _STARPU_MPI_DEBUG(3, "Request with tag %d and source %d not found, creating a early_handle to receive incoming data..\n", envelope->mpi_tag, status.MPI_SOURCE);
|
|
|
|
|
|
starpu_data_handle_t data_handle = NULL;
|
|
starpu_data_handle_t data_handle = NULL;
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
- data_handle = _starpu_data_get_data_handle_from_tag(recv_env->mpi_tag);
|
|
|
|
|
|
+ data_handle = _starpu_data_get_data_handle_from_tag(envelope->mpi_tag);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
struct _starpu_mpi_early_data_handle* early_data_handle = calloc(1, sizeof(struct _starpu_mpi_early_data_handle));
|
|
struct _starpu_mpi_early_data_handle* early_data_handle = calloc(1, sizeof(struct _starpu_mpi_early_data_handle));
|
|
STARPU_ASSERT(early_data_handle);
|
|
STARPU_ASSERT(early_data_handle);
|
|
STARPU_PTHREAD_MUTEX_INIT(&early_data_handle->req_mutex, NULL);
|
|
STARPU_PTHREAD_MUTEX_INIT(&early_data_handle->req_mutex, NULL);
|
|
STARPU_PTHREAD_COND_INIT(&early_data_handle->req_cond, NULL);
|
|
STARPU_PTHREAD_COND_INIT(&early_data_handle->req_cond, NULL);
|
|
- early_data_handle->mpi_tag = recv_env->mpi_tag;
|
|
|
|
- early_data_handle->env = recv_env;
|
|
|
|
|
|
+ early_data_handle->mpi_tag = envelope->mpi_tag;
|
|
|
|
+ early_data_handle->env = envelope;
|
|
early_data_handle->source = status.MPI_SOURCE;
|
|
early_data_handle->source = status.MPI_SOURCE;
|
|
|
|
|
|
if (data_handle)
|
|
if (data_handle)
|
|
@@ -1237,7 +1237,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
|
|
early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
|
|
early_data_handle->mpi_tag, MPI_COMM_WORLD, 1, 0,
|
|
early_data_handle->mpi_tag, MPI_COMM_WORLD, 1, 0,
|
|
- NULL, NULL, 1, 1, recv_env->size);
|
|
|
|
|
|
+ NULL, NULL, 1, 1, envelope->size);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
// We wait until the request is pushed in the
|
|
// We wait until the request is pushed in the
|
|
@@ -1264,22 +1264,22 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
* _starpu_mpi_handle_ready_request. */
|
|
* _starpu_mpi_handle_ready_request. */
|
|
else
|
|
else
|
|
{
|
|
{
|
|
- _STARPU_MPI_DEBUG(3, "A matching receive has been found for the incoming data with tag %d\n", recv_env->mpi_tag);
|
|
|
|
|
|
+ _STARPU_MPI_DEBUG(3, "A matching receive has been found for the incoming data with tag %d\n", envelope->mpi_tag);
|
|
|
|
|
|
- _starpu_mpi_early_request_delete(found_req);
|
|
|
|
|
|
+ _starpu_mpi_early_request_delete(early_request);
|
|
|
|
|
|
- _starpu_mpi_handle_allocate_datatype(found_req->data_handle, &found_req->datatype, &found_req->user_datatype);
|
|
|
|
- if (found_req->user_datatype == 0)
|
|
|
|
|
|
+ _starpu_mpi_handle_allocate_datatype(early_request->data_handle, &early_request->datatype, &early_request->user_datatype);
|
|
|
|
+ if (early_request->user_datatype == 0)
|
|
{
|
|
{
|
|
- found_req->count = 1;
|
|
|
|
- found_req->ptr = starpu_data_get_local_ptr(found_req->data_handle);
|
|
|
|
|
|
+ early_request->count = 1;
|
|
|
|
+ early_request->ptr = starpu_data_get_local_ptr(early_request->data_handle);
|
|
}
|
|
}
|
|
else
|
|
else
|
|
{
|
|
{
|
|
- found_req->count = recv_env->size;
|
|
|
|
- found_req->ptr = malloc(found_req->count);
|
|
|
|
|
|
+ early_request->count = envelope->size;
|
|
|
|
+ early_request->ptr = malloc(early_request->count);
|
|
|
|
|
|
- STARPU_ASSERT_MSG(found_req->ptr, "cannot allocate message of size %ld\n", found_req->count);
|
|
|
|
|
|
+ STARPU_ASSERT_MSG(early_request->ptr, "cannot allocate message of size %ld\n", early_request->count);
|
|
}
|
|
}
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "Handling new request... \n");
|
|
_STARPU_MPI_DEBUG(3, "Handling new request... \n");
|
|
@@ -1288,10 +1288,10 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
* application submit requests in the meantime, so we
|
|
* application submit requests in the meantime, so we
|
|
* release the lock. */
|
|
* release the lock. */
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
- _starpu_mpi_handle_ready_request(found_req);
|
|
|
|
|
|
+ _starpu_mpi_handle_ready_request(early_request);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
}
|
|
}
|
|
- header_req_submitted = 0;
|
|
|
|
|
|
+ envelope_request_submitted = 0;
|
|
}
|
|
}
|
|
else
|
|
else
|
|
{
|
|
{
|
|
@@ -1317,7 +1317,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
_starpu_mpi_early_data_free(worldsize);
|
|
_starpu_mpi_early_data_free(worldsize);
|
|
_starpu_mpi_early_request_free();
|
|
_starpu_mpi_early_request_free();
|
|
free(argc_argv);
|
|
free(argc_argv);
|
|
- free(recv_env);
|
|
|
|
|
|
+ free(envelope);
|
|
|
|
|
|
return NULL;
|
|
return NULL;
|
|
}
|
|
}
|