|
@@ -1,7 +1,7 @@
|
|
|
/* StarPU --- Runtime system for heterogeneous multicore architectures.
|
|
|
*
|
|
|
* Copyright (C) 2009, 2010-2017 Université de Bordeaux
|
|
|
- * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016 CNRS
|
|
|
+ * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017 CNRS
|
|
|
* Copyright (C) 2016 Inria
|
|
|
*
|
|
|
* StarPU is free software; you can redistribute it and/or modify
|
|
@@ -76,6 +76,10 @@ static int running = 0;
|
|
|
#ifdef STARPU_SIMGRID
|
|
|
static int _mpi_world_size;
|
|
|
static int _mpi_world_rank;
|
|
|
+
|
|
|
+static int wait_counter;
|
|
|
+static starpu_pthread_cond_t wait_counter_cond;
|
|
|
+static starpu_pthread_mutex_t wait_counter_mutex;
|
|
|
#endif
|
|
|
int _starpu_mpi_fake_world_size = -1;
|
|
|
int _starpu_mpi_fake_world_rank = -1;
|
|
@@ -143,6 +147,12 @@ static void _starpu_mpi_request_init(struct _starpu_mpi_req **req)
|
|
|
(*req)->early_data_handle = NULL;
|
|
|
(*req)->envelope = NULL;
|
|
|
(*req)->sequential_consistency = 1;
|
|
|
+
|
|
|
+#ifdef STARPU_SIMGRID
|
|
|
+ starpu_pthread_queue_init(&((*req)->queue));
|
|
|
+ starpu_pthread_queue_register(&wait, &((*req)->queue));
|
|
|
+ (*req)->done = 0;
|
|
|
+#endif
|
|
|
}
|
|
|
|
|
|
static void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req)
|
|
@@ -153,6 +163,10 @@ static void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req)
|
|
|
STARPU_PTHREAD_COND_DESTROY(&req->posted_cond);
|
|
|
free(req->datatype_name);
|
|
|
req->datatype_name = NULL;
|
|
|
+#ifdef STARPU_SIMGRID
|
|
|
+ starpu_pthread_queue_unregister(&wait, &req->queue);
|
|
|
+ starpu_pthread_queue_destroy(&req->queue);
|
|
|
+#endif
|
|
|
free(req);
|
|
|
req = NULL;
|
|
|
}
|
|
@@ -294,6 +308,9 @@ static void _starpu_mpi_submit_ready_request(void *arg)
|
|
|
|
|
|
newer_requests = 1;
|
|
|
STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
|
|
|
+#ifdef STARPU_SIMGRID
|
|
|
+ starpu_pthread_queue_signal(&dontsleep);
|
|
|
+#endif
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
@@ -350,6 +367,55 @@ static struct _starpu_mpi_req *_starpu_mpi_isend_irecv_common(starpu_data_handle
|
|
|
return req;
|
|
|
}
|
|
|
|
|
|
+#ifdef STARPU_SIMGRID
|
|
|
+int _starpu_mpi_simgrid_mpi_test(int *done, int *flag)
|
|
|
+{
|
|
|
+ *flag = 0;
|
|
|
+ if (*done)
|
|
|
+ {
|
|
|
+ starpu_pthread_queue_signal(&dontsleep);
|
|
|
+ *flag = 1;
|
|
|
+ }
|
|
|
+ return MPI_SUCCESS;
|
|
|
+}
|
|
|
+static void* _starpu_mpi_simgrid_wait_req_func(void* arg)
|
|
|
+{
|
|
|
+ struct _starpu_simgrid_mpi_req *sim_req = arg;
|
|
|
+ int ret;
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&wait_counter_mutex);
|
|
|
+ wait_counter++;
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&wait_counter_mutex);
|
|
|
+
|
|
|
+ ret = MPI_Wait(sim_req->request, sim_req->status);
|
|
|
+
|
|
|
+ STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(ret));
|
|
|
+
|
|
|
+ *(sim_req->done) = 1;
|
|
|
+ starpu_pthread_queue_signal(sim_req->queue);
|
|
|
+
|
|
|
+ free(sim_req);
|
|
|
+
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&wait_counter_mutex);
|
|
|
+ if (--wait_counter == 0)
|
|
|
+ STARPU_PTHREAD_COND_SIGNAL(&wait_counter_cond);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&wait_counter_mutex);
|
|
|
+
|
|
|
+ return NULL;
|
|
|
+}
|
|
|
+void _starpu_mpi_simgrid_wait_req(MPI_Request *request, MPI_Status *status, starpu_pthread_queue_t *queue, unsigned *done)
|
|
|
+{
|
|
|
+ struct _starpu_simgrid_mpi_req *sim_req;
|
|
|
+ _STARPU_MPI_CALLOC(sim_req, 1, sizeof(struct _starpu_simgrid_mpi_req));
|
|
|
+ sim_req->request = request;
|
|
|
+ sim_req->status = status;
|
|
|
+ sim_req->queue = queue;
|
|
|
+ sim_req->done = done;
|
|
|
+ *done = 0;
|
|
|
+
|
|
|
+ _starpu_simgrid_xbt_thread_create("wait for mpi transfer", _starpu_mpi_simgrid_wait_req_func, sim_req);
|
|
|
+}
|
|
|
+#endif
|
|
|
+
|
|
|
/********************************************************/
|
|
|
/* */
|
|
|
/* Send functionalities */
|
|
@@ -379,6 +445,10 @@ static void _starpu_mpi_isend_data_func(struct _starpu_mpi_req *req)
|
|
|
STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Issend returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
|
|
|
}
|
|
|
|
|
|
+#ifdef STARPU_SIMGRID
|
|
|
+ _starpu_mpi_simgrid_wait_req(&req->data_request, &req->status_store, &req->queue, &req->done);
|
|
|
+#endif
|
|
|
+
|
|
|
_STARPU_MPI_TRACE_ISEND_SUBMIT_END(req->node_tag.rank, req->node_tag.data_tag, 0);
|
|
|
|
|
|
/* somebody is perhaps waiting for the MPI request to be posted */
|
|
@@ -571,6 +641,9 @@ static void _starpu_mpi_irecv_data_func(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
_STARPU_MPI_COMM_FROM_DEBUG(req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.data_tag, req->node_tag.comm);
|
|
|
req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->node_tag.rank, _STARPU_MPI_TAG_DATA, req->node_tag.comm, &req->data_request);
|
|
|
+#ifdef STARPU_SIMGRID
|
|
|
+ _starpu_mpi_simgrid_wait_req(&req->data_request, &req->status_store, &req->queue, &req->done);
|
|
|
+#endif
|
|
|
}
|
|
|
STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
|
|
|
|
|
@@ -682,6 +755,10 @@ static void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
|
|
|
_STARPU_MPI_TRACE_UWAIT_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
|
|
|
if (req->data_request != MPI_REQUEST_NULL)
|
|
|
{
|
|
|
+ // TODO: Fix for STARPU_SIMGRID
|
|
|
+#ifdef STARPU_SIMGRID
|
|
|
+ STARPU_MPI_ASSERT_MSG(0, "Implement this in STARPU_SIMGRID");
|
|
|
+#endif
|
|
|
req->ret = MPI_Wait(&req->data_request, waiting_req->status);
|
|
|
STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
|
|
|
}
|
|
@@ -755,7 +832,13 @@ static void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
|
|
|
|
|
|
_STARPU_MPI_TRACE_UTESTING_BEGIN(req->node_tag.rank, req->node_tag.data_tag);
|
|
|
|
|
|
+#ifdef STARPU_SIMGRID
|
|
|
+ req->ret = _starpu_mpi_simgrid_mpi_test(&req->done, testing_req->flag);
|
|
|
+ memcpy(testing_req->status, &req->status_store, sizeof(*testing_req->status));
|
|
|
+#else
|
|
|
req->ret = MPI_Test(&req->data_request, testing_req->flag, testing_req->status);
|
|
|
+#endif
|
|
|
+
|
|
|
STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
|
|
|
|
|
|
_STARPU_MPI_TRACE_UTESTING_END(req->node_tag.rank, req->node_tag.data_tag);
|
|
@@ -1095,7 +1178,6 @@ static void _starpu_mpi_test_detached_requests(void)
|
|
|
{
|
|
|
//_STARPU_MPI_LOG_IN();
|
|
|
int flag;
|
|
|
- MPI_Status status;
|
|
|
struct _starpu_mpi_req *req;
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&detached_requests_mutex);
|
|
@@ -1106,7 +1188,11 @@ static void _starpu_mpi_test_detached_requests(void)
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&detached_requests_mutex);
|
|
|
|
|
|
//_STARPU_MPI_DEBUG(3, "Test detached request %p - mpitag %d - TYPE %s %d\n", &req->data_request, req->node_tag.data_tag, _starpu_mpi_request_type(req->request_type), req->node_tag.rank);
|
|
|
- req->ret = MPI_Test(&req->data_request, &flag, &status);
|
|
|
+#ifdef STARPU_SIMGRID
|
|
|
+ req->ret = _starpu_mpi_simgrid_mpi_test(&req->done, &flag);
|
|
|
+#else
|
|
|
+ req->ret = MPI_Test(&req->data_request, &flag, MPI_STATUS_IGNORE);
|
|
|
+#endif
|
|
|
|
|
|
STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
|
|
|
|
|
@@ -1354,6 +1440,11 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
STARPU_PTHREAD_COND_SIGNAL(&cond_progression);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
+#ifdef STARPU_SIMGRID
|
|
|
+ starpu_pthread_wait_init(&wait);
|
|
|
+ starpu_pthread_queue_init(&dontsleep);
|
|
|
+ starpu_pthread_queue_register(&wait, &dontsleep);
|
|
|
+#endif
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
|
|
@@ -1361,6 +1452,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
while (running || posted_requests || !(_starpu_mpi_req_list_empty(ready_requests)) || !(_starpu_mpi_req_list_empty(detached_requests)))// || !(_starpu_mpi_early_request_count()) || !(_starpu_mpi_sync_data_count()))
|
|
|
{
|
|
|
+#ifdef STARPU_SIMGRID
|
|
|
+ starpu_pthread_wait_reset(&wait);
|
|
|
+#endif
|
|
|
/* shall we block ? */
|
|
|
unsigned block = _starpu_mpi_req_list_empty(ready_requests) && _starpu_mpi_early_request_count() == 0 && _starpu_mpi_sync_data_count() == 0;
|
|
|
|
|
@@ -1522,7 +1616,7 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
}
|
|
|
#ifdef STARPU_SIMGRID
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
- MSG_process_sleep(0.000010);
|
|
|
+ starpu_pthread_wait_wait(&wait);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
#endif
|
|
|
}
|
|
@@ -1533,6 +1627,21 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
envelope_request_submitted = 0;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+#ifdef STARPU_SIMGRID
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&wait_counter_mutex);
|
|
|
+ while (wait_counter != 0)
|
|
|
+ STARPU_PTHREAD_COND_WAIT(&wait_counter_cond, &wait_counter_mutex);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&wait_counter_mutex);
|
|
|
+
|
|
|
+ STARPU_PTHREAD_MUTEX_DESTROY(&wait_counter_mutex);
|
|
|
+ STARPU_PTHREAD_COND_DESTROY(&wait_counter_cond);
|
|
|
+
|
|
|
+ starpu_pthread_queue_unregister(&wait, &dontsleep);
|
|
|
+ starpu_pthread_queue_destroy(&dontsleep);
|
|
|
+ starpu_pthread_wait_destroy(&wait);
|
|
|
+#endif
|
|
|
+
|
|
|
STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(detached_requests), "List of detached requests not empty");
|
|
|
STARPU_MPI_ASSERT_MSG(_starpu_mpi_req_list_empty(ready_requests), "List of ready requests not empty");
|
|
|
STARPU_MPI_ASSERT_MSG(posted_requests == 0, "Number of posted request is not zero");
|
|
@@ -1628,6 +1737,11 @@ int _starpu_mpi_initialize(int *argc, char ***argv, int initialize_mpi, MPI_Comm
|
|
|
STARPU_PTHREAD_MUTEX_INIT(&mutex_posted_requests, NULL);
|
|
|
_starpu_mpi_comm = starpu_getenv("STARPU_MPI_COMM") != NULL;
|
|
|
|
|
|
+#ifdef STARPU_SIMGRID
|
|
|
+ STARPU_PTHREAD_MUTEX_INIT(&wait_counter_mutex, NULL);
|
|
|
+ STARPU_PTHREAD_COND_INIT(&wait_counter_cond, NULL);
|
|
|
+#endif
|
|
|
+
|
|
|
#ifdef STARPU_MPI_ACTIVITY
|
|
|
hookid = starpu_progression_hook_register(_starpu_mpi_progression_hook_func, NULL);
|
|
|
STARPU_MPI_ASSERT_MSG(hookid >= 0, "starpu_progression_hook_register failed");
|
|
@@ -1721,6 +1835,9 @@ int starpu_mpi_shutdown(void)
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mutex);
|
|
|
running = 0;
|
|
|
STARPU_PTHREAD_COND_BROADCAST(&cond_progression);
|
|
|
+#ifdef STARPU_SIMGRID
|
|
|
+ starpu_pthread_queue_signal(&dontsleep);
|
|
|
+#endif
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mutex);
|
|
|
|
|
|
#ifndef STARPU_SIMGRID
|
|
@@ -1740,7 +1857,7 @@ int starpu_mpi_shutdown(void)
|
|
|
_starpu_mpi_req_list_delete(detached_requests);
|
|
|
_starpu_mpi_req_list_delete(ready_requests);
|
|
|
|
|
|
- _starpu_mpi_comm_amounts_display(rank);
|
|
|
+ _starpu_mpi_comm_amounts_display(stderr, rank);
|
|
|
_starpu_mpi_comm_amounts_free();
|
|
|
_starpu_mpi_cache_free(world_size);
|
|
|
_starpu_mpi_tag_free();
|
|
@@ -1857,13 +1974,11 @@ void starpu_mpi_get_data_on_node(MPI_Comm comm, starpu_data_handle_t data_handle
|
|
|
tag = starpu_mpi_data_get_tag(data_handle);
|
|
|
if (rank == -1)
|
|
|
{
|
|
|
- fprintf(stderr,"StarPU needs to be told the MPI rank of this data, using starpu_mpi_data_register\n");
|
|
|
- STARPU_ABORT();
|
|
|
+ _STARPU_ERROR("StarPU needs to be told the MPI rank of this data, using starpu_mpi_data_register\n");
|
|
|
}
|
|
|
if (tag == -1)
|
|
|
{
|
|
|
- fprintf(stderr,"StarPU needs to be told the MPI tag of this data, using starpu_mpi_data_register\n");
|
|
|
- STARPU_ABORT();
|
|
|
+ _STARPU_ERROR("StarPU needs to be told the MPI tag of this data, using starpu_mpi_data_register\n");
|
|
|
}
|
|
|
starpu_mpi_comm_rank(comm, &me);
|
|
|
|
|
@@ -1968,4 +2083,3 @@ int starpu_mpi_wait_for_all(MPI_Comm comm)
|
|
|
}
|
|
|
return 0;
|
|
|
}
|
|
|
-
|