|
@@ -1,7 +1,7 @@
|
|
|
/* StarPU --- Runtime system for heterogeneous multicore architectures.
|
|
|
*
|
|
|
* Copyright (C) 2010-2019 CNRS
|
|
|
- * Copyright (C) 2009-2019 Université de Bordeaux
|
|
|
+ * Copyright (C) 2009-2020 Université de Bordeaux
|
|
|
* Copyright (C) 2012,2013,2016,2017 Inria
|
|
|
* Copyright (C) 2017 Guillaume Beauchamp
|
|
|
*
|
|
@@ -19,6 +19,10 @@
|
|
|
|
|
|
#include <stdlib.h>
|
|
|
#include <limits.h>
|
|
|
+#include <common/config.h>
|
|
|
+#ifdef HAVE_UNISTD_H
|
|
|
+#include <unistd.h>
|
|
|
+#endif
|
|
|
#include <starpu_mpi.h>
|
|
|
#include <starpu_mpi_datatype.h>
|
|
|
#include <starpu_mpi_private.h>
|
|
@@ -33,7 +37,6 @@
|
|
|
#include <mpi/starpu_mpi_tag.h>
|
|
|
#include <mpi/starpu_mpi_comm.h>
|
|
|
#include <starpu_mpi_init.h>
|
|
|
-#include <common/config.h>
|
|
|
#include <common/thread.h>
|
|
|
#include <datawizard/interfaces/data_interface.h>
|
|
|
#include <datawizard/coherency.h>
|
|
@@ -325,7 +328,7 @@ static void _starpu_mpi_simgrid_wait_req_func(void* arg)
|
|
|
STARPU_MPI_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(ret));
|
|
|
|
|
|
*(sim_req->done) = 1;
|
|
|
- starpu_pthread_queue_signal(sim_req->queue);
|
|
|
+ starpu_pthread_queue_broadcast(sim_req->queue);
|
|
|
|
|
|
free(sim_req);
|
|
|
|
|
@@ -501,10 +504,10 @@ void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
|
|
|
{
|
|
|
_STARPU_MPI_COMM_FROM_DEBUG(req, req->count, req->datatype, req->node_tag.node.rank, _STARPU_MPI_TAG_DATA, req->node_tag.data_tag, req->node_tag.node.comm);
|
|
|
req->ret = MPI_Irecv(req->ptr, req->count, req->datatype, req->node_tag.node.rank, _STARPU_MPI_TAG_DATA, req->node_tag.node.comm, &req->backend->data_request);
|
|
|
+ }
|
|
|
#ifdef STARPU_SIMGRID
|
|
|
- _starpu_mpi_simgrid_wait_req(&req->backend->data_request, &req->status_store, &req->queue, &req->done);
|
|
|
+ _starpu_mpi_simgrid_wait_req(&req->backend->data_request, &req->status_store, &req->queue, &req->done);
|
|
|
#endif
|
|
|
- }
|
|
|
STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_IRecv returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
|
|
|
|
|
|
_STARPU_MPI_TRACE_IRECV_SUBMIT_END(req->node_tag.node.rank, req->node_tag.data_tag);
|
|
@@ -526,6 +529,7 @@ void _starpu_mpi_irecv_size_func(struct _starpu_mpi_req *req)
|
|
|
/* */
|
|
|
/********************************************************/
|
|
|
|
|
|
+#ifndef STARPU_SIMGRID
|
|
|
void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
@@ -535,10 +539,6 @@ void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
|
|
|
_STARPU_MPI_TRACE_UWAIT_BEGIN(req->node_tag.node.rank, req->node_tag.data_tag);
|
|
|
if (req->backend->data_request != MPI_REQUEST_NULL)
|
|
|
{
|
|
|
- // TODO: Fix for STARPU_SIMGRID
|
|
|
-#ifdef STARPU_SIMGRID
|
|
|
- STARPU_MPI_ASSERT_MSG(0, "Implement this in STARPU_SIMGRID");
|
|
|
-#endif
|
|
|
req->ret = MPI_Wait(&req->backend->data_request, waiting_req->status);
|
|
|
STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Wait returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
|
|
|
}
|
|
@@ -548,15 +548,36 @@ void _starpu_mpi_wait_func(struct _starpu_mpi_req *waiting_req)
|
|
|
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
|
+#endif
|
|
|
|
|
|
int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
{
|
|
|
int ret;
|
|
|
struct _starpu_mpi_req *req = *public_req;
|
|
|
- struct _starpu_mpi_req *waiting_req;
|
|
|
|
|
|
_STARPU_MPI_LOG_IN();
|
|
|
|
|
|
+#ifdef STARPU_SIMGRID
|
|
|
+ _STARPU_MPI_TRACE_UWAIT_BEGIN(req->node_tag.node.rank, req->node_tag.data_tag);
|
|
|
+ starpu_pthread_wait_t wait;
|
|
|
+ starpu_pthread_wait_init(&wait);
|
|
|
+ starpu_pthread_queue_register(&wait, &req->queue);
|
|
|
+ while (1)
|
|
|
+ {
|
|
|
+ starpu_pthread_wait_reset(&wait);
|
|
|
+ if (req->done)
|
|
|
+ break;
|
|
|
+ starpu_pthread_wait_wait(&wait);
|
|
|
+ }
|
|
|
+ starpu_pthread_queue_unregister(&wait, &req->queue);
|
|
|
+ starpu_pthread_wait_destroy(&wait);
|
|
|
+ _STARPU_MPI_TRACE_UWAIT_END(req->node_tag.node.rank, req->node_tag.data_tag);
|
|
|
+
|
|
|
+ if (status)
|
|
|
+ *status = req->status_store;
|
|
|
+ _starpu_mpi_handle_request_termination(req);
|
|
|
+#else
|
|
|
+ struct _starpu_mpi_req *waiting_req;
|
|
|
/* We cannot try to complete a MPI request that was not actually posted
|
|
|
* to MPI yet. */
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&(req->backend->req_mutex));
|
|
@@ -580,16 +601,17 @@ int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
STARPU_PTHREAD_COND_WAIT(&req->backend->req_cond, &req->backend->req_mutex);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
|
|
|
|
|
|
- ret = req->ret;
|
|
|
-
|
|
|
/* The internal request structure was automatically allocated */
|
|
|
+ _starpu_mpi_request_destroy(waiting_req);
|
|
|
+#endif
|
|
|
+
|
|
|
*public_req = NULL;
|
|
|
if (req->backend->internal_req)
|
|
|
{
|
|
|
_starpu_mpi_request_destroy(req->backend->internal_req);
|
|
|
}
|
|
|
+ ret = req->ret;
|
|
|
_starpu_mpi_request_destroy(req);
|
|
|
- _starpu_mpi_request_destroy(waiting_req);
|
|
|
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
return ret;
|
|
@@ -601,6 +623,7 @@ int _starpu_mpi_wait(starpu_mpi_req *public_req, MPI_Status *status)
|
|
|
/* */
|
|
|
/********************************************************/
|
|
|
|
|
|
+#ifndef STARPU_SIMGRID
|
|
|
void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
|
|
|
{
|
|
|
_STARPU_MPI_LOG_IN();
|
|
@@ -613,12 +636,7 @@ void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
|
|
|
|
|
|
_STARPU_MPI_TRACE_UTESTING_BEGIN(req->node_tag.node.rank, req->node_tag.data_tag);
|
|
|
|
|
|
-#ifdef STARPU_SIMGRID
|
|
|
- req->ret = _starpu_mpi_simgrid_mpi_test(&req->done, testing_req->flag);
|
|
|
- memcpy(testing_req->status, &req->status_store, sizeof(*testing_req->status));
|
|
|
-#else
|
|
|
req->ret = MPI_Test(&req->backend->data_request, testing_req->flag, testing_req->status);
|
|
|
-#endif
|
|
|
|
|
|
STARPU_MPI_ASSERT_MSG(req->ret == MPI_SUCCESS, "MPI_Test returning %s", _starpu_mpi_get_mpi_error_code(req->ret));
|
|
|
|
|
@@ -636,6 +654,7 @@ void _starpu_mpi_test_func(struct _starpu_mpi_req *testing_req)
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&testing_req->backend->req_mutex);
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
}
|
|
|
+#endif
|
|
|
|
|
|
int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
{
|
|
@@ -648,6 +667,15 @@ int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
|
|
|
STARPU_MPI_ASSERT_MSG(!req->detached, "MPI_Test cannot be called on a detached request");
|
|
|
|
|
|
+#ifdef STARPU_SIMGRID
|
|
|
+ ret = req->ret = _starpu_mpi_simgrid_mpi_test(&req->done, flag);
|
|
|
+ if (*flag)
|
|
|
+ {
|
|
|
+ if (status)
|
|
|
+ *status = req->status_store;
|
|
|
+ _starpu_mpi_handle_request_termination(req);
|
|
|
+ }
|
|
|
+#else
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&req->backend->req_mutex);
|
|
|
unsigned submitted = req->submitted;
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&req->backend->req_mutex);
|
|
@@ -676,25 +704,26 @@ int _starpu_mpi_test(starpu_mpi_req *public_req, int *flag, MPI_Status *status)
|
|
|
|
|
|
ret = testing_req->ret;
|
|
|
|
|
|
- if (*(testing_req->flag))
|
|
|
- {
|
|
|
- /* The request was completed so we free the internal
|
|
|
- * request structure which was automatically allocated
|
|
|
- * */
|
|
|
- *public_req = NULL;
|
|
|
- if (req->backend->internal_req)
|
|
|
- {
|
|
|
- _starpu_mpi_request_destroy(req->backend->internal_req);
|
|
|
- }
|
|
|
- _starpu_mpi_request_destroy(req);
|
|
|
- }
|
|
|
-
|
|
|
_starpu_mpi_request_destroy(testing_req);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
*flag = 0;
|
|
|
}
|
|
|
+#endif
|
|
|
+
|
|
|
+ if (*flag)
|
|
|
+ {
|
|
|
+ /* The request was completed so we free the internal
|
|
|
+ * request structure which was automatically allocated
|
|
|
+ * */
|
|
|
+ *public_req = NULL;
|
|
|
+ if (req->backend->internal_req)
|
|
|
+ {
|
|
|
+ _starpu_mpi_request_destroy(req->backend->internal_req);
|
|
|
+ }
|
|
|
+ _starpu_mpi_request_destroy(req);
|
|
|
+ }
|
|
|
|
|
|
_STARPU_MPI_LOG_OUT();
|
|
|
return ret;
|
|
@@ -930,6 +959,9 @@ static void _starpu_mpi_early_data_cb(void* arg)
|
|
|
args->req->submitted = 1;
|
|
|
STARPU_PTHREAD_COND_BROADCAST(&args->req->backend->req_cond);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&args->req->backend->req_mutex);
|
|
|
+#ifdef STARPU_SIMGRID
|
|
|
+ args->req->done = 1;
|
|
|
+#endif
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1133,7 +1165,9 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
|
|
|
if (starpu_bind_thread_on(_starpu_mpi_thread_cpuid, STARPU_THREAD_ACTIVE, "MPI") < 0)
|
|
|
{
|
|
|
- _STARPU_DISP("No core was available for the MPI thread. You should use STARPU_RESERVE_NCPU to leave one core available for MPI, or specify one core less in STARPU_NCPU\n");
|
|
|
+ char hostname[65];
|
|
|
+ gethostname(hostname, sizeof(hostname));
|
|
|
+ _STARPU_DISP("[%s] No core was available for the MPI thread. You should use STARPU_RESERVE_NCPU to leave one core available for MPI, or specify one core less in STARPU_NCPU\n", hostname);
|
|
|
}
|
|
|
_starpu_mpi_do_initialize(argc_argv);
|
|
|
if (_starpu_mpi_thread_cpuid >= 0)
|
|
@@ -1150,13 +1184,15 @@ static void *_starpu_mpi_progress_thread_func(void *arg)
|
|
|
int i;
|
|
|
for (i = 0; i < *(argc_argv->argc); i++)
|
|
|
argv_cpy[i] = strdup((*(argc_argv->argv))[i]);
|
|
|
+ void **tsd;
|
|
|
+ _STARPU_CALLOC(tsd, MAX_TSD + 1, sizeof(void*));
|
|
|
#ifdef HAVE_SG_ACTOR_DATA
|
|
|
_starpu_simgrid_actor_create("main", smpi_simulated_main_, _starpu_simgrid_get_host_by_name("MAIN"), *(argc_argv->argc), argv_cpy);
|
|
|
+ /* And set TSD for us */
|
|
|
+ sg_actor_data_set(sg_actor_self(), tsd);
|
|
|
#else
|
|
|
MSG_process_create_with_arguments("main", smpi_simulated_main_, NULL, _starpu_simgrid_get_host_by_name("MAIN"), *(argc_argv->argc), argv_cpy);
|
|
|
/* And set TSD for us */
|
|
|
- void **tsd;
|
|
|
- _STARPU_CALLOC(tsd, MAX_TSD + 1, sizeof(void*));
|
|
|
if (!smpi_process_set_user_data)
|
|
|
{
|
|
|
_STARPU_ERROR("Your version of simgrid does not provide smpi_process_set_user_data, we can not continue without it\n");
|