|
@@ -30,6 +30,7 @@
|
|
#include <starpu_mpi_select_node.h>
|
|
#include <starpu_mpi_select_node.h>
|
|
#include <starpu_mpi_tag.h>
|
|
#include <starpu_mpi_tag.h>
|
|
#include <starpu_mpi_comm.h>
|
|
#include <starpu_mpi_comm.h>
|
|
|
|
+#include <starpu_mpi_init.h>
|
|
#include <common/config.h>
|
|
#include <common/config.h>
|
|
#include <common/thread.h>
|
|
#include <common/thread.h>
|
|
#include <datawizard/interfaces/data_interface.h>
|
|
#include <datawizard/interfaces/data_interface.h>
|
|
@@ -64,10 +65,10 @@ static struct _starpu_mpi_req_list *detached_requests;
|
|
static starpu_pthread_mutex_t detached_requests_mutex;
|
|
static starpu_pthread_mutex_t detached_requests_mutex;
|
|
|
|
|
|
/* Condition to wake up progression thread */
|
|
/* Condition to wake up progression thread */
|
|
-static starpu_pthread_cond_t cond_progression;
|
|
|
|
|
|
+static starpu_pthread_cond_t progress_cond;
|
|
/* Condition to wake up waiting for all current MPI requests to finish */
|
|
/* Condition to wake up waiting for all current MPI requests to finish */
|
|
-static starpu_pthread_cond_t cond_finished;
|
|
|
|
-static starpu_pthread_mutex_t mutex;
|
|
|
|
|
|
+static starpu_pthread_cond_t barrier_cond;
|
|
|
|
+static starpu_pthread_mutex_t progress_mutex;
|
|
#ifndef STARPU_SIMGRID
|
|
#ifndef STARPU_SIMGRID
|
|
static starpu_pthread_t progress_thread;
|
|
static starpu_pthread_t progress_thread;
|
|
#endif
|
|
#endif
|
|
@@ -179,7 +180,7 @@ static void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "new req %p srcdst %d tag %d and type %s %d\n", req, req->node_tag.rank, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->is_internal_req);
|
|
_STARPU_MPI_DEBUG(3, "new req %p srcdst %d tag %d and type %s %d\n", req, req->node_tag.rank, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->is_internal_req);
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
|
|
|
|
if (req->request_type == RECV_REQ)
|
|
if (req->request_type == RECV_REQ)
|
|
{
|
|
{
|
|
@@ -210,12 +211,12 @@ static void _starpu_mpi_submit_ready_request(void *arg)
|
|
_starpu_mpi_req_list_push_front(ready_requests, req);
|
|
_starpu_mpi_req_list_push_front(ready_requests, req);
|
|
|
|
|
|
/* inform the starpu mpi thread that the request has been pushed in the ready_requests list */
|
|
/* inform the starpu mpi thread that the request has been pushed in the ready_requests list */
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&req->posted_mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&req->posted_mutex);
|
|
req->posted = 1;
|
|
req->posted = 1;
|
|
STARPU_PTHREAD_COND_BROADCAST(&req->posted_cond);
|
|
STARPU_PTHREAD_COND_BROADCAST(&req->posted_cond);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&req->posted_mutex);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&req->posted_mutex);
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
}
|
|
}
|
|
else
|
|
else
|
|
{
|
|
{
|
|
@@ -228,12 +229,12 @@ static void _starpu_mpi_submit_ready_request(void *arg)
|
|
* will be called to bring the data back to the original data handle associated to the request.*/
|
|
* will be called to bring the data back to the original data handle associated to the request.*/
|
|
if (early_data_handle)
|
|
if (early_data_handle)
|
|
{
|
|
{
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req_mutex));
|
|
STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req_mutex));
|
|
while (!(early_data_handle->req_ready))
|
|
while (!(early_data_handle->req_ready))
|
|
STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req_cond), &(early_data_handle->req_mutex));
|
|
STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req_cond), &(early_data_handle->req_mutex));
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req_mutex));
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&(early_data_handle->req_mutex));
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
|
|
|
|
_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->node_tag.data_tag);
|
|
_STARPU_MPI_DEBUG(3, "The RECV request %p with tag %d has already been received, copying previously received data into handle's pointer..\n", req, req->node_tag.data_tag);
|
|
STARPU_ASSERT(req->data_handle != early_data_handle->handle);
|
|
STARPU_ASSERT(req->data_handle != early_data_handle->handle);
|
|
@@ -291,8 +292,8 @@ static void _starpu_mpi_submit_ready_request(void *arg)
|
|
}
|
|
}
|
|
|
|
|
|
newer_requests = 1;
|
|
newer_requests = 1;
|
|
- STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
|
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&progress_cond);
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
_STARPU_MPI_LOG_OUT();
|
|
_STARPU_MPI_LOG_OUT();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -777,11 +778,9 @@ int starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
if (submitted)
|
|
if (submitted)
|
|
{
|
|
{
|
|
struct _starpu_mpi_req *testing_req;
|
|
struct _starpu_mpi_req *testing_req;
|
|
- _starpu_mpi_request_init(&testing_req);
|
|
|
|
|
|
|
|
/* Initialize the request structure */
|
|
/* Initialize the request structure */
|
|
- STARPU_PTHREAD_MUTEX_INIT(&(testing_req->req_mutex), NULL);
|
|
|
|
- STARPU_PTHREAD_COND_INIT(&(testing_req->req_cond), NULL);
|
|
|
|
|
|
+ _starpu_mpi_request_init(&testing_req);
|
|
testing_req->flag = flag;
|
|
testing_req->flag = flag;
|
|
testing_req->status = status;
|
|
testing_req->status = status;
|
|
testing_req->other_request = req;
|
|
testing_req->other_request = req;
|
|
@@ -847,35 +846,33 @@ int _starpu_mpi_barrier(MPI_Comm comm)
|
|
|
|
|
|
int ret = posted_requests;
|
|
int ret = posted_requests;
|
|
struct _starpu_mpi_req *barrier_req;
|
|
struct _starpu_mpi_req *barrier_req;
|
|
- _starpu_mpi_request_init(&barrier_req);
|
|
|
|
|
|
|
|
/* First wait for *both* all tasks and MPI requests to finish, in case
|
|
/* First wait for *both* all tasks and MPI requests to finish, in case
|
|
* some tasks generate MPI requests, MPI requests generate tasks, etc.
|
|
* some tasks generate MPI requests, MPI requests generate tasks, etc.
|
|
*/
|
|
*/
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
STARPU_MPI_ASSERT_MSG(!barrier_running, "Concurrent starpu_mpi_barrier is not implemented, even on different communicators");
|
|
STARPU_MPI_ASSERT_MSG(!barrier_running, "Concurrent starpu_mpi_barrier is not implemented, even on different communicators");
|
|
barrier_running = 1;
|
|
barrier_running = 1;
|
|
do
|
|
do
|
|
{
|
|
{
|
|
while (posted_requests)
|
|
while (posted_requests)
|
|
/* Wait for all current MPI requests to finish */
|
|
/* Wait for all current MPI requests to finish */
|
|
- STARPU_PTHREAD_COND_WAIT(&cond_finished, &mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&barrier_cond, &progress_mutex);
|
|
/* No current request, clear flag */
|
|
/* No current request, clear flag */
|
|
newer_requests = 0;
|
|
newer_requests = 0;
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
/* Now wait for all tasks */
|
|
/* Now wait for all tasks */
|
|
starpu_task_wait_for_all();
|
|
starpu_task_wait_for_all();
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
/* Check newer_requests again, in case some MPI requests
|
|
/* Check newer_requests again, in case some MPI requests
|
|
* triggered by tasks completed and triggered tasks between
|
|
* triggered by tasks completed and triggered tasks between
|
|
* wait_for_all finished and we take the lock */
|
|
* wait_for_all finished and we take the lock */
|
|
} while (posted_requests || newer_requests);
|
|
} while (posted_requests || newer_requests);
|
|
barrier_running = 0;
|
|
barrier_running = 0;
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
|
|
|
|
/* Initialize the request structure */
|
|
/* Initialize the request structure */
|
|
- STARPU_PTHREAD_MUTEX_INIT(&(barrier_req->req_mutex), NULL);
|
|
|
|
- STARPU_PTHREAD_COND_INIT(&(barrier_req->req_cond), NULL);
|
|
|
|
|
|
+ _starpu_mpi_request_init(&barrier_req);
|
|
barrier_req->func = _starpu_mpi_barrier_func;
|
|
barrier_req->func = _starpu_mpi_barrier_func;
|
|
barrier_req->request_type = BARRIER_REQ;
|
|
barrier_req->request_type = BARRIER_REQ;
|
|
barrier_req->node_tag.comm = comm;
|
|
barrier_req->node_tag.comm = comm;
|
|
@@ -1054,28 +1051,6 @@ static void _starpu_mpi_early_data_cb(void* arg)
|
|
args = NULL;
|
|
args = NULL;
|
|
}
|
|
}
|
|
|
|
|
|
-#ifdef STARPU_MPI_ACTIVITY
|
|
|
|
-static unsigned _starpu_mpi_progression_hook_func(void *arg STARPU_ATTRIBUTE_UNUSED)
|
|
|
|
-{
|
|
|
|
- unsigned may_block = 1;
|
|
|
|
-
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
|
|
- if (!_starpu_mpi_req_list_empty(detached_requests))
|
|
|
|
- {
|
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
- STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
|
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
- may_block = 0;
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- return may_block;
|
|
|
|
-}
|
|
|
|
-#endif /* STARPU_MPI_ACTIVITY */
|
|
|
|
-
|
|
|
|
static void _starpu_mpi_test_detached_requests(void)
|
|
static void _starpu_mpi_test_detached_requests(void)
|
|
{
|
|
{
|
|
//_STARPU_MPI_LOG_IN();
|
|
//_STARPU_MPI_LOG_IN();
|
|
@@ -1092,7 +1067,6 @@ static void _starpu_mpi_test_detached_requests(void)
|
|
|
|
|
|
//_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->data_request, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->node_tag.rank);
|
|
//_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->data_request, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->node_tag.rank);
|
|
req->ret = MPI_Test(&req->data_request, &flag, &status);
|
|
req->ret = MPI_Test(&req->data_request, &flag, &status);
|
|
-
|
|
|
|
STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
|
|
STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
|
|
|
|
|
|
if (!flag)
|
|
if (!flag)
|
|
@@ -1104,26 +1078,12 @@ static void _starpu_mpi_test_detached_requests(void)
|
|
struct _starpu_mpi_req *next_req;
|
|
struct _starpu_mpi_req *next_req;
|
|
next_req = _starpu_mpi_req_list_next(req);
|
|
next_req = _starpu_mpi_req_list_next(req);
|
|
|
|
|
|
- if (req->request_type == RECV_REQ)
|
|
|
|
- {
|
|
|
|
- _STARPU_MPI_TRACE_IRECV_COMPLETE_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
|
|
|
|
- }
|
|
|
|
- else if (req->request_type == SEND_REQ)
|
|
|
|
- {
|
|
|
|
- _STARPU_MPI_TRACE_ISEND_COMPLETE_BEGIN(req->node_tag.rank, req->node_tag.data_tag, 0);
|
|
|
|
- }
|
|
|
|
|
|
+ _STARPU_MPI_TRACE_COMPLETE_BEGIN(req->request_type, req->node_tag.rank, req->node_tag.data_tag);
|
|
|
|
|
|
_starpu_mpi_req_list_erase(detached_requests, req);
|
|
_starpu_mpi_req_list_erase(detached_requests, req);
|
|
_starpu_mpi_handle_request_termination(req);
|
|
_starpu_mpi_handle_request_termination(req);
|
|
|
|
|
|
- if (req->request_type == RECV_REQ)
|
|
|
|
- {
|
|
|
|
- _STARPU_MPI_TRACE_IRECV_COMPLETE_END(req->node_tag.rank, req->node_tag.data_tag);
|
|
|
|
- }
|
|
|
|
- else if (req->request_type == SEND_REQ)
|
|
|
|
- {
|
|
|
|
- _STARPU_MPI_TRACE_ISEND_COMPLETE_END(req->node_tag.rank, req->node_tag.data_tag, 0);
|
|
|
|
- }
|
|
|
|
|
|
+ _STARPU_MPI_TRACE_COMPLETE_END(req->request_type, req->node_tag.rank, req->node_tag.data_tag);
|
|
|
|
|
|
if (req->is_internal_req == 0)
|
|
if (req->is_internal_req == 0)
|
|
{
|
|
{
|
|
@@ -1147,14 +1107,14 @@ static void _starpu_mpi_handle_detached_request(struct _starpu_mpi_req *req)
|
|
/* put the submitted request into the list of pending requests
|
|
/* put the submitted request into the list of pending requests
|
|
* so that it can be handled by the progression mechanisms */
|
|
* so that it can be handled by the progression mechanisms */
|
|
STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
- _starpu_mpi_req_list_push_front(detached_requests, req);
|
|
|
|
|
|
+ _starpu_mpi_req_list_push_back(detached_requests, req);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
|
|
|
starpu_wake_all_blocked_workers();
|
|
starpu_wake_all_blocked_workers();
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
- STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
|
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
|
|
+ STARPU_PTHREAD_COND_SIGNAL(&progress_cond);
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1171,28 +1131,6 @@ static void _starpu_mpi_handle_ready_request(struct _starpu_mpi_req *req)
|
|
_STARPU_MPI_LOG_OUT();
|
|
_STARPU_MPI_LOG_OUT();
|
|
}
|
|
}
|
|
|
|
|
|
-static void _starpu_mpi_print_thread_level_support(int thread_level, char *msg)
|
|
|
|
-{
|
|
|
|
- switch (thread_level)
|
|
|
|
- {
|
|
|
|
- case MPI_THREAD_SERIALIZED:
|
|
|
|
- {
|
|
|
|
- _STARPU_DISP("MPI%s MPI_THREAD_SERIALIZED; Multiple threads may make MPI calls, but only one at a time.\n", msg);
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- case MPI_THREAD_FUNNELED:
|
|
|
|
- {
|
|
|
|
- _STARPU_DISP("MPI%s MPI_THREAD_FUNNELED; The application can safely make calls to StarPU-MPI functions, but should not call directly MPI communication functions.\n", msg);
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- case MPI_THREAD_SINGLE:
|
|
|
|
- {
|
|
|
|
- _STARPU_DISP("MPI%s MPI_THREAD_SINGLE; MPI does not have multi-thread support, this might cause problems. The application can make calls to StarPU-MPI functions, but not call directly MPI Communication functions.\n", msg);
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope, MPI_Status status, MPI_Comm comm)
|
|
static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope, MPI_Status status, MPI_Comm comm)
|
|
{
|
|
{
|
|
_STARPU_MPI_DEBUG(20, "Request with tag %d and source %d not found, creating a early_data_handle to receive incoming data..\n", envelope->data_tag, status.MPI_SOURCE);
|
|
_STARPU_MPI_DEBUG(20, "Request with tag %d and source %d not found, creating a early_data_handle to receive incoming data..\n", envelope->data_tag, status.MPI_SOURCE);
|
|
@@ -1202,9 +1140,9 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
|
|
_starpu_mpi_early_data_add(early_data_handle);
|
|
_starpu_mpi_early_data_add(early_data_handle);
|
|
|
|
|
|
starpu_data_handle_t data_handle = NULL;
|
|
starpu_data_handle_t data_handle = NULL;
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
data_handle = _starpu_mpi_data_get_data_handle_from_tag(envelope->data_tag);
|
|
data_handle = _starpu_mpi_data_get_data_handle_from_tag(envelope->data_tag);
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
|
|
|
|
if (data_handle && starpu_data_get_interface_id(data_handle) < STARPU_MAX_INTERFACE_ID)
|
|
if (data_handle && starpu_data_get_interface_id(data_handle) < STARPU_MAX_INTERFACE_ID)
|
|
{
|
|
{
|
|
@@ -1225,19 +1163,19 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
|
|
//_starpu_mpi_early_data_add(early_data_handle);
|
|
//_starpu_mpi_early_data_add(early_data_handle);
|
|
}
|
|
}
|
|
|
|
|
|
- _STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_data_handle with tag %d from comm %d src %d ..\n", early_data_handle->node_tag.data_tag, comm, status.MPI_SOURCE);
|
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
+ _STARPU_MPI_DEBUG(20, "Posting internal detached irecv on early_data_handle with tag %d from comm %ld src %d ..\n", early_data_handle->node_tag.data_tag, (long int)comm, status.MPI_SOURCE);
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
|
|
early_data_handle->req = _starpu_mpi_irecv_common(early_data_handle->handle, status.MPI_SOURCE,
|
|
early_data_handle->node_tag.data_tag, comm, 1, 0,
|
|
early_data_handle->node_tag.data_tag, comm, 1, 0,
|
|
NULL, NULL, 1, 1, envelope->size);
|
|
NULL, NULL, 1, 1, envelope->size);
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
|
|
|
|
// We wait until the request is pushed in the
|
|
// We wait until the request is pushed in the
|
|
// ready_request list, that ensures that the next loop
|
|
// ready_request list, that ensures that the next loop
|
|
// will call _starpu_mpi_handle_ready_request
|
|
// will call _starpu_mpi_handle_ready_request
|
|
// on the request and post the corresponding mpi_irecv,
|
|
// on the request and post the corresponding mpi_irecv,
|
|
// otherwise, it may lead to read data as envelop
|
|
// otherwise, it may lead to read data as envelop
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req->posted_mutex));
|
|
STARPU_PTHREAD_MUTEX_LOCK(&(early_data_handle->req->posted_mutex));
|
|
while (!(early_data_handle->req->posted))
|
|
while (!(early_data_handle->req->posted))
|
|
STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req->posted_cond), &(early_data_handle->req->posted_mutex));
|
|
STARPU_PTHREAD_COND_WAIT(&(early_data_handle->req->posted_cond), &(early_data_handle->req->posted_mutex));
|
|
@@ -1247,27 +1185,41 @@ static void _starpu_mpi_receive_early_data(struct _starpu_mpi_envelope *envelope
|
|
early_data_handle->req_ready = 1;
|
|
early_data_handle->req_ready = 1;
|
|
STARPU_PTHREAD_COND_BROADCAST(&early_data_handle->req_cond);
|
|
STARPU_PTHREAD_COND_BROADCAST(&early_data_handle->req_cond);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_handle->req_mutex);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&early_data_handle->req_mutex);
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
}
|
|
}
|
|
|
|
|
|
-static void _starpu_mpi_do_initialize(struct _starpu_mpi_argc_argv *argc_argv)
|
|
|
|
|
|
+static void _starpu_mpi_add_sync_point_in_fxt(void)
|
|
{
|
|
{
|
|
- if (argc_argv->initialize_mpi)
|
|
|
|
- {
|
|
|
|
- int thread_support;
|
|
|
|
- _STARPU_DEBUG("Calling MPI_Init_thread\n");
|
|
|
|
- if (MPI_Init_thread(argc_argv->argc, argc_argv->argv, MPI_THREAD_SERIALIZED, &thread_support) != MPI_SUCCESS)
|
|
|
|
- {
|
|
|
|
- _STARPU_ERROR("MPI_Init_thread failed\n");
|
|
|
|
- }
|
|
|
|
- _starpu_mpi_print_thread_level_support(thread_support, "_Init_thread level =");
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
|
|
+#ifdef STARPU_USE_FXT
|
|
|
|
+ int rank;
|
|
|
|
+ int worldsize;
|
|
|
|
+ int ret;
|
|
|
|
+
|
|
|
|
+ starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
|
|
|
|
+ starpu_mpi_comm_size(MPI_COMM_WORLD, &worldsize);
|
|
|
|
+
|
|
|
|
+ ret = MPI_Barrier(MPI_COMM_WORLD);
|
|
|
|
+ STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Barrier returning %s", _starpu_mpi_get_mpi_error_code(ret));
|
|
|
|
+
|
|
|
|
+ /* We generate a "unique" key so that we can make sure that different
|
|
|
|
+ * FxT traces come from the same MPI run. */
|
|
|
|
+ int random_number;
|
|
|
|
+
|
|
|
|
+ /* XXX perhaps we don't want to generate a new seed if the application
|
|
|
|
+ * specified some reproductible behaviour ? */
|
|
|
|
+ if (rank == 0)
|
|
{
|
|
{
|
|
- int provided;
|
|
|
|
- MPI_Query_thread(&provided);
|
|
|
|
- _starpu_mpi_print_thread_level_support(provided, " has been initialized with");
|
|
|
|
|
|
+ srand(time(NULL));
|
|
|
|
+ random_number = rand();
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ ret = MPI_Bcast(&random_number, 1, MPI_INT, 0, MPI_COMM_WORLD);
|
|
|
|
+ STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Bcast returning %s", _starpu_mpi_get_mpi_error_code(ret));
|
|
|
|
+
|
|
|
|
+ _STARPU_MPI_TRACE_BARRIER(rank, worldsize, random_number);
|
|
|
|
+
|
|
|
|
+ _STARPU_MPI_DEBUG(3, "unique key %x\n", random_number);
|
|
|
|
+#endif
|
|
}
|
|
}
|
|
|
|
|
|
static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
@@ -1326,26 +1278,21 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
_starpu_mpi_datatype_init();
|
|
_starpu_mpi_datatype_init();
|
|
|
|
|
|
/* notify the main thread that the progression thread is ready */
|
|
/* notify the main thread that the progression thread is ready */
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
running = 1;
|
|
running = 1;
|
|
- STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
|
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_COND_SIGNAL(&progress_cond);
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
|
|
|
|
-
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
|
|
|
|
int envelope_request_submitted = 0;
|
|
int envelope_request_submitted = 0;
|
|
|
|
|
|
while (running || posted_requests || !(_starpu_mpi_req_list_empty(ready_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))// || !(_starpu_mpi_early_request_count()) || !(_starpu_mpi_sync_data_count()))
|
|
while (running || posted_requests || !(_starpu_mpi_req_list_empty(ready_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))// || !(_starpu_mpi_early_request_count()) || !(_starpu_mpi_sync_data_count()))
|
|
{
|
|
{
|
|
/* shall we block ? */
|
|
/* shall we block ? */
|
|
- unsigned block = _starpu_mpi_req_list_empty(ready_requests) && _starpu_mpi_early_request_count() == 0 && _starpu_mpi_sync_data_count() == 0;
|
|
|
|
-
|
|
|
|
-#ifndef STARPU_MPI_ACTIVITY
|
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
- block = block && _starpu_mpi_req_list_empty(detached_requests);
|
|
|
|
|
|
+ unsigned block = _starpu_mpi_req_list_empty(ready_requests) && _starpu_mpi_early_request_count() == 0 && _starpu_mpi_sync_data_count() == 0 && _starpu_mpi_req_list_empty(detached_requests);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
-#endif /* STARPU_MPI_ACTIVITY */
|
|
|
|
|
|
|
|
#ifdef STARPU_SIMGRID
|
|
#ifdef STARPU_SIMGRID
|
|
MSG_process_sleep(0.000010);
|
|
MSG_process_sleep(0.000010);
|
|
@@ -1358,8 +1305,8 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
if (barrier_running)
|
|
if (barrier_running)
|
|
/* Tell mpi_barrier */
|
|
/* Tell mpi_barrier */
|
|
- STARPU_PTHREAD_COND_SIGNAL(&cond_finished);
|
|
|
|
- STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_COND_SIGNAL(&barrier_cond);
|
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&progress_cond, &progress_mutex);
|
|
|
|
|
|
_STARPU_MPI_TRACE_SLEEP_END();
|
|
_STARPU_MPI_TRACE_SLEEP_END();
|
|
}
|
|
}
|
|
@@ -1374,9 +1321,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
* (on a sync_data_with_mem call), we want to let the
|
|
* (on a sync_data_with_mem call), we want to let the
|
|
* application submit requests in the meantime, so we
|
|
* application submit requests in the meantime, so we
|
|
* release the lock. */
|
|
* release the lock. */
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
_starpu_mpi_handle_ready_request(req);
|
|
_starpu_mpi_handle_ready_request(req);
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
}
|
|
}
|
|
|
|
|
|
/* If there is no currently submitted envelope_request submitted to
|
|
/* If there is no currently submitted envelope_request submitted to
|
|
@@ -1389,9 +1336,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
}
|
|
}
|
|
|
|
|
|
/* test whether there are some terminated "detached request" */
|
|
/* test whether there are some terminated "detached request" */
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
_starpu_mpi_test_detached_requests();
|
|
_starpu_mpi_test_detached_requests();
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
|
|
|
|
if (envelope_request_submitted == 1)
|
|
if (envelope_request_submitted == 1)
|
|
{
|
|
{
|
|
@@ -1411,9 +1358,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
struct _starpu_mpi_req *_sync_req = _starpu_mpi_sync_data_find(envelope->data_tag, envelope_status.MPI_SOURCE, envelope_comm);
|
|
struct _starpu_mpi_req *_sync_req = _starpu_mpi_sync_data_find(envelope->data_tag, envelope_status.MPI_SOURCE, envelope_comm);
|
|
_STARPU_MPI_DEBUG(20, "Sending data with tag %d to node %d\n", _sync_req->node_tag.data_tag, envelope_status.MPI_SOURCE);
|
|
_STARPU_MPI_DEBUG(20, "Sending data with tag %d to node %d\n", _sync_req->node_tag.data_tag, envelope_status.MPI_SOURCE);
|
|
STARPU_MPI_ASSERT_MSG(envelope->data_tag == _sync_req->node_tag.data_tag, "Tag mismatch (envelope %d != req %d)\n", envelope->data_tag, _sync_req->node_tag.data_tag);
|
|
STARPU_MPI_ASSERT_MSG(envelope->data_tag == _sync_req->node_tag.data_tag, "Tag mismatch (envelope %d != req %d)\n", envelope->data_tag, _sync_req->node_tag.data_tag);
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
_starpu_mpi_isend_data_func(_sync_req);
|
|
_starpu_mpi_isend_data_func(_sync_req);
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
}
|
|
}
|
|
else
|
|
else
|
|
{
|
|
{
|
|
@@ -1487,9 +1434,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
* (on a sync_data_with_mem call), we want to let the
|
|
* (on a sync_data_with_mem call), we want to let the
|
|
* application submit requests in the meantime, so we
|
|
* application submit requests in the meantime, so we
|
|
* release the lock. */
|
|
* release the lock. */
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
_starpu_mpi_handle_ready_request(early_request);
|
|
_starpu_mpi_handle_ready_request(early_request);
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
envelope_request_submitted = 0;
|
|
envelope_request_submitted = 0;
|
|
@@ -1520,7 +1467,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
MPI_Finalize();
|
|
MPI_Finalize();
|
|
}
|
|
}
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
|
|
|
|
_starpu_mpi_sync_data_free();
|
|
_starpu_mpi_sync_data_free();
|
|
_starpu_mpi_early_data_free();
|
|
_starpu_mpi_early_data_free();
|
|
@@ -1531,68 +1478,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
return NULL;
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
|
|
-/********************************************************/
|
|
|
|
-/* */
|
|
|
|
-/* (De)Initialization methods */
|
|
|
|
-/* */
|
|
|
|
-/********************************************************/
|
|
|
|
-
|
|
|
|
-#ifdef STARPU_MPI_ACTIVITY
|
|
|
|
-static int hookid = - 1;
|
|
|
|
-#endif /* STARPU_MPI_ACTIVITY */
|
|
|
|
-
|
|
|
|
-static void _starpu_mpi_add_sync_point_in_fxt(void)
|
|
|
|
-{
|
|
|
|
-#ifdef STARPU_USE_FXT
|
|
|
|
- int rank;
|
|
|
|
- int worldsize;
|
|
|
|
- int ret;
|
|
|
|
-
|
|
|
|
- starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
|
|
|
|
- starpu_mpi_comm_size(MPI_COMM_WORLD, &worldsize);
|
|
|
|
-
|
|
|
|
- ret = MPI_Barrier(MPI_COMM_WORLD);
|
|
|
|
- STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Barrier returning %s", _starpu_mpi_get_mpi_error_code(ret));
|
|
|
|
-
|
|
|
|
- /* We generate a "unique" key so that we can make sure that different
|
|
|
|
- * FxT traces come from the same MPI run. */
|
|
|
|
- int random_number;
|
|
|
|
-
|
|
|
|
- /* XXX perhaps we don't want to generate a new seed if the application
|
|
|
|
- * specified some reproductible behaviour ? */
|
|
|
|
- if (rank == 0)
|
|
|
|
- {
|
|
|
|
- srand(time(NULL));
|
|
|
|
- random_number = rand();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- ret = MPI_Bcast(&random_number, 1, MPI_INT, 0, MPI_COMM_WORLD);
|
|
|
|
- STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Bcast returning %s", _starpu_mpi_get_mpi_error_code(ret));
|
|
|
|
-
|
|
|
|
- _STARPU_MPI_TRACE_BARRIER(rank, worldsize, random_number);
|
|
|
|
-
|
|
|
|
- _STARPU_MPI_DEBUG(3, "unique key %x\n", random_number);
|
|
|
|
-#endif
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-static
|
|
|
|
-int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi, MPI_Comm comm)
|
|
|
|
|
|
+int _starpu_mpi_progress_init(struct _starpu_mpi_argc_argv *argc_argv)
|
|
{
|
|
{
|
|
- struct _starpu_mpi_argc_argv *argc_argv = malloc(sizeof(struct _starpu_mpi_argc_argv));
|
|
|
|
- argc_argv->initialize_mpi = initialize_mpi;
|
|
|
|
- argc_argv->argc = argc;
|
|
|
|
- argc_argv->argv = argv;
|
|
|
|
- argc_argv->comm = comm;
|
|
|
|
-
|
|
|
|
-#ifdef STARPU_SIMGRID
|
|
|
|
- /* Call MPI_Init_thread as early as possible, to initialize simgrid
|
|
|
|
- * before working with mutexes etc. */
|
|
|
|
- _starpu_mpi_do_initialize(argc_argv);
|
|
|
|
-#endif
|
|
|
|
-
|
|
|
|
- STARPU_PTHREAD_MUTEX_INIT(&mutex, NULL);
|
|
|
|
- STARPU_PTHREAD_COND_INIT(&cond_progression, NULL);
|
|
|
|
- STARPU_PTHREAD_COND_INIT(&cond_finished, NULL);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_INIT(&progress_mutex, NULL);
|
|
|
|
+ STARPU_PTHREAD_COND_INIT(&progress_cond, NULL);
|
|
|
|
+ STARPU_PTHREAD_COND_INIT(&barrier_cond, NULL);
|
|
ready_requests = _starpu_mpi_req_list_new();
|
|
ready_requests = _starpu_mpi_req_list_new();
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_INIT(&detached_requests_mutex, NULL);
|
|
STARPU_PTHREAD_MUTEX_INIT(&detached_requests_mutex, NULL);
|
|
@@ -1601,117 +1491,40 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi, MPI_Comm
|
|
STARPU_PTHREAD_MUTEX_INIT(&mutex_posted_requests, NULL);
|
|
STARPU_PTHREAD_MUTEX_INIT(&mutex_posted_requests, NULL);
|
|
_starpu_mpi_comm = starpu_getenv("STARPU_MPI_COMM") != NULL;
|
|
_starpu_mpi_comm = starpu_getenv("STARPU_MPI_COMM") != NULL;
|
|
|
|
|
|
-#ifdef STARPU_MPI_ACTIVITY
|
|
|
|
- hookid = starpu_progression_hook_register(_starpu_mpi_progression_hook_func, NULL);
|
|
|
|
- STARPU_MPI_ASSERT_MSG(hookid >= 0, "starpu_progression_hook_register failed");
|
|
|
|
-#endif /* STARPU_MPI_ACTIVITY */
|
|
|
|
-
|
|
|
|
#ifdef STARPU_SIMGRID
|
|
#ifdef STARPU_SIMGRID
|
|
_starpu_mpi_progress_thread_func(argc_argv);
|
|
_starpu_mpi_progress_thread_func(argc_argv);
|
|
return 0;
|
|
return 0;
|
|
#else
|
|
#else
|
|
STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
|
|
STARPU_PTHREAD_CREATE(&progress_thread, NULL, _starpu_mpi_progress_thread_func, argc_argv);
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
while (!running)
|
|
while (!running)
|
|
- STARPU_PTHREAD_COND_WAIT(&cond_progression, &mutex);
|
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
-
|
|
|
|
- return 0;
|
|
|
|
-#endif
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-#ifdef STARPU_SIMGRID
|
|
|
|
-/* This is called before application's main, to initialize SMPI before we can
|
|
|
|
- * create MSG processes to run application's main */
|
|
|
|
-int _starpu_mpi_simgrid_init(int argc, char *argv[])
|
|
|
|
-{
|
|
|
|
- return _starpu_mpi_initialize(&argc, &argv, 1, MPI_COMM_WORLD);
|
|
|
|
-}
|
|
|
|
-#endif
|
|
|
|
-
|
|
|
|
-int starpu_mpi_init_comm(int *argc STARPU_ATTRIBUTE_UNUSED, char ***argv STARPU_ATTRIBUTE_UNUSED, int initialize_mpi STARPU_ATTRIBUTE_UNUSED, MPI_Comm comm STARPU_ATTRIBUTE_UNUSED)
|
|
|
|
-{
|
|
|
|
-#ifdef STARPU_SIMGRID
|
|
|
|
- return 0;
|
|
|
|
-#else
|
|
|
|
- return _starpu_mpi_initialize(argc, argv, initialize_mpi, comm);
|
|
|
|
-#endif
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-int starpu_mpi_init(int *argc, char ***argv, int initialize_mpi)
|
|
|
|
-{
|
|
|
|
- return starpu_mpi_init_comm(argc, argv, initialize_mpi, MPI_COMM_WORLD);
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-int starpu_mpi_initialize(void)
|
|
|
|
-{
|
|
|
|
-#ifdef STARPU_SIMGRID
|
|
|
|
- return 0;
|
|
|
|
-#else
|
|
|
|
- return _starpu_mpi_initialize(NULL, NULL, 0, MPI_COMM_WORLD);
|
|
|
|
-#endif
|
|
|
|
-}
|
|
|
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&progress_cond, &progress_mutex);
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
|
|
|
|
-int starpu_mpi_initialize_extended(int *rank, int *world_size)
|
|
|
|
-{
|
|
|
|
-#ifdef STARPU_SIMGRID
|
|
|
|
- *world_size = _mpi_world_size;
|
|
|
|
- *rank = _mpi_world_rank;
|
|
|
|
return 0;
|
|
return 0;
|
|
-#else
|
|
|
|
- int ret;
|
|
|
|
-
|
|
|
|
- ret = _starpu_mpi_initialize(NULL, NULL, 1, MPI_COMM_WORLD);
|
|
|
|
- if (ret == 0)
|
|
|
|
- {
|
|
|
|
- _STARPU_DEBUG("Calling MPI_Comm_rank\n");
|
|
|
|
- MPI_Comm_rank(MPI_COMM_WORLD, rank);
|
|
|
|
- MPI_Comm_size(MPI_COMM_WORLD, world_size);
|
|
|
|
- }
|
|
|
|
- return ret;
|
|
|
|
#endif
|
|
#endif
|
|
}
|
|
}
|
|
|
|
|
|
-int starpu_mpi_shutdown(void)
|
|
|
|
|
|
+void _starpu_mpi_progress_shutdown(int *value)
|
|
{
|
|
{
|
|
- void *value;
|
|
|
|
- int rank, world_size;
|
|
|
|
-
|
|
|
|
- /* We need to get the rank before calling MPI_Finalize to pass to _starpu_mpi_comm_amounts_display() */
|
|
|
|
- starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
|
|
|
|
- starpu_mpi_comm_size(MPI_COMM_WORLD, &world_size);
|
|
|
|
-
|
|
|
|
- /* kill the progression thread */
|
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
|
|
running = 0;
|
|
running = 0;
|
|
- STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
|
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
-
|
|
|
|
-#ifndef STARPU_SIMGRID
|
|
|
|
- starpu_pthread_join(progress_thread, &value);
|
|
|
|
-#else
|
|
|
|
|
|
+ STARPU_PTHREAD_COND_BROADCAST(&progress_cond);
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
|
|
|
|
+#ifdef STARPU_SIMGRID
|
|
/* FIXME: should rather properly wait for _starpu_mpi_progress_thread_func to finish */
|
|
/* FIXME: should rather properly wait for _starpu_mpi_progress_thread_func to finish */
|
|
MSG_process_sleep(1);
|
|
MSG_process_sleep(1);
|
|
|
|
+#else
|
|
|
|
+ starpu_pthread_join(progress_thread, (void *)value);
|
|
#endif
|
|
#endif
|
|
-
|
|
|
|
-#ifdef STARPU_MPI_ACTIVITY
|
|
|
|
- starpu_progression_hook_deregister(hookid);
|
|
|
|
-#endif /* STARPU_MPI_ACTIVITY */
|
|
|
|
-
|
|
|
|
- _STARPU_MPI_TRACE_STOP(rank, world_size);
|
|
|
|
-
|
|
|
|
/* free the request queues */
|
|
/* free the request queues */
|
|
_starpu_mpi_req_list_delete(detached_requests);
|
|
_starpu_mpi_req_list_delete(detached_requests);
|
|
_starpu_mpi_req_list_delete(ready_requests);
|
|
_starpu_mpi_req_list_delete(ready_requests);
|
|
|
|
|
|
- _starpu_mpi_comm_amounts_display(rank);
|
|
|
|
- _starpu_mpi_comm_amounts_free();
|
|
|
|
- _starpu_mpi_cache_free(world_size);
|
|
|
|
- _starpu_mpi_tag_free();
|
|
|
|
- _starpu_mpi_comm_free();
|
|
|
|
-
|
|
|
|
- return 0;
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_DESTROY(&mutex_posted_requests);
|
|
|
|
+ STARPU_PTHREAD_MUTEX_DESTROY(&progress_mutex);
|
|
|
|
+ STARPU_PTHREAD_COND_DESTROY(&barrier_cond);
|
|
}
|
|
}
|
|
|
|
|
|
void _starpu_mpi_clear_cache(starpu_data_handle_t data_handle)
|
|
void _starpu_mpi_clear_cache(starpu_data_handle_t data_handle)
|
|
@@ -1771,42 +1584,6 @@ int starpu_mpi_data_get_tag(starpu_data_handle_t data)
|
|
return ((struct _starpu_mpi_node_tag *)(data->mpi_data))->data_tag;
|
|
return ((struct _starpu_mpi_node_tag *)(data->mpi_data))->data_tag;
|
|
}
|
|
}
|
|
|
|
|
|
-int starpu_mpi_comm_size(MPI_Comm comm, int *size)
|
|
|
|
-{
|
|
|
|
-#ifdef STARPU_SIMGRID
|
|
|
|
- STARPU_MPI_ASSERT_MSG(comm == MPI_COMM_WORLD, "StarPU-SMPI only works with MPI_COMM_WORLD for now");
|
|
|
|
- *size = _mpi_world_size;
|
|
|
|
- return 0;
|
|
|
|
-#else
|
|
|
|
- return MPI_Comm_size(comm, size);
|
|
|
|
-#endif
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-int starpu_mpi_comm_rank(MPI_Comm comm, int *rank)
|
|
|
|
-{
|
|
|
|
-#ifdef STARPU_SIMGRID
|
|
|
|
- STARPU_MPI_ASSERT_MSG(comm == MPI_COMM_WORLD, "StarPU-SMPI only works with MPI_COMM_WORLD for now");
|
|
|
|
- *rank = _mpi_world_rank;
|
|
|
|
- return 0;
|
|
|
|
-#else
|
|
|
|
- return MPI_Comm_rank(comm, rank);
|
|
|
|
-#endif
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-int starpu_mpi_world_size(void)
|
|
|
|
-{
|
|
|
|
- int size;
|
|
|
|
- starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
|
|
|
|
- return size;
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-int starpu_mpi_world_rank(void)
|
|
|
|
-{
|
|
|
|
- int rank;
|
|
|
|
- starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
|
|
|
|
- return rank;
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
int starpu_mpi_wait_for_all(MPI_Comm comm)
|
|
int starpu_mpi_wait_for_all(MPI_Comm comm)
|
|
{
|
|
{
|
|
int mpi = 1;
|
|
int mpi = 1;
|