|
@@ -69,6 +69,9 @@ LIST_TYPE(_starpu_mpi_copy_handle,
|
|
|
void *buffer;
|
|
|
int mpi_tag;
|
|
|
int source;
|
|
|
+ int req_ready;
|
|
|
+ starpu_pthread_mutex_t req_mutex;
|
|
|
+ starpu_pthread_cond_t req_cond;
|
|
|
);
|
|
|
|
|
|
struct _starpu_mpi_copy_handle_hashlist
|
|
@@ -1022,6 +1025,13 @@ static void _starpu_mpi_submit_new_mpi_request(void *arg)
|
|
|
* bring the data back to the original data handle associated to the request.*/
|
|
|
if (chandle)
|
|
|
{
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&(chandle->req_mutex));
|
|
|
+ while (!(chandle->req_ready))
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&(chandle->req_cond), &(chandle->req_mutex));
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&(chandle->req_mutex));
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
+
|
|
|
_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->mpi_tag);
|
|
|
STARPU_ASSERT(req->data_handle != chandle->handle);
|
|
|
|
|
@@ -1331,8 +1341,10 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
data_handle = starpu_data_get_data_handle_from_tag(recv_env->mpi_tag);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
- struct _starpu_mpi_copy_handle* chandle = malloc(sizeof(struct _starpu_mpi_copy_handle));
|
|
|
+ struct _starpu_mpi_copy_handle* chandle = calloc(1, sizeof(struct _starpu_mpi_copy_handle));
|
|
|
STARPU_ASSERT(chandle);
|
|
|
+ STARPU_PTHREAD_MUTEX_INIT(&chandle->req_mutex, NULL);
|
|
|
+ STARPU_PTHREAD_COND_INIT(&chandle->req_cond, NULL);
|
|
|
chandle->mpi_tag = recv_env->mpi_tag;
|
|
|
chandle->env = recv_env;
|
|
|
chandle->source = status.MPI_SOURCE;
|
|
@@ -1370,6 +1382,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
while (!(chandle->req->posted))
|
|
|
STARPU_PTHREAD_COND_WAIT(&(chandle->req->posted_cond), &(chandle->req->posted_mutex));
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&(chandle->req->posted_mutex));
|
|
|
+
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&chandle->req_mutex);
|
|
|
+ chandle->req_ready = 1;
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&chandle->req_cond);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&chandle->req_mutex);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
}
|
|
|
/* Case : a matching receive has been found for the incoming data, we handle the correct allocation of the pointer associated to
|