|
@@ -32,385 +32,385 @@ static int src_node_id;
|
|
|
|
|
|
static void _starpu_mpi_set_src_node_id()
|
|
|
{
|
|
|
- int node_id = starpu_get_env_number("STARPU_MPI_MASTER_NODE");
|
|
|
+ int node_id = starpu_get_env_number("STARPU_MPI_MASTER_NODE");
|
|
|
|
|
|
- if (node_id != -1)
|
|
|
- {
|
|
|
- int nb_proc, id_proc;
|
|
|
- MPI_Comm_size(MPI_COMM_WORLD, &nb_proc);
|
|
|
- MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
|
|
|
+ if (node_id != -1)
|
|
|
+ {
|
|
|
+ int nb_proc, id_proc;
|
|
|
+ MPI_Comm_size(MPI_COMM_WORLD, &nb_proc);
|
|
|
+ MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
|
|
|
|
|
|
- if (node_id < nb_proc)
|
|
|
- {
|
|
|
- src_node_id = node_id;
|
|
|
- return;
|
|
|
- }
|
|
|
- else if (id_proc == DRIVER_MPI_MASTER_NODE_DEFAULT)
|
|
|
- {
|
|
|
- /* Only one node prints the error message. */
|
|
|
- _STARPU_DISP("The node you specify to be the master is "
|
|
|
- "greater than the total number of nodes.\n"
|
|
|
- "Taking node %d by default...\n", DRIVER_MPI_MASTER_NODE_DEFAULT);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /* Node by default. */
|
|
|
- src_node_id = DRIVER_MPI_MASTER_NODE_DEFAULT;
|
|
|
+ if (node_id < nb_proc)
|
|
|
+ {
|
|
|
+ src_node_id = node_id;
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ else if (id_proc == DRIVER_MPI_MASTER_NODE_DEFAULT)
|
|
|
+ {
|
|
|
+ /* Only one node prints the error message. */
|
|
|
+ _STARPU_DISP("The node you specify to be the master is "
|
|
|
+ "greater than the total number of nodes.\n"
|
|
|
+ "Taking node %d by default...\n", DRIVER_MPI_MASTER_NODE_DEFAULT);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Node by default. */
|
|
|
+ src_node_id = DRIVER_MPI_MASTER_NODE_DEFAULT;
|
|
|
}
|
|
|
|
|
|
int _starpu_mpi_common_mp_init()
|
|
|
{
|
|
|
- //Here we supposed the programmer called two times starpu_init.
|
|
|
- if (mpi_initialized)
|
|
|
- return -ENODEV;
|
|
|
+ //Here we supposed the programmer called two times starpu_init.
|
|
|
+ if (mpi_initialized)
|
|
|
+ return -ENODEV;
|
|
|
|
|
|
- mpi_initialized = 1;
|
|
|
+ mpi_initialized = 1;
|
|
|
|
|
|
- if (MPI_Initialized(&extern_initialized) != MPI_SUCCESS)
|
|
|
- STARPU_ABORT_MSG("Cannot check if MPI is initialized or not !");
|
|
|
+ if (MPI_Initialized(&extern_initialized) != MPI_SUCCESS)
|
|
|
+ STARPU_ABORT_MSG("Cannot check if MPI is initialized or not !");
|
|
|
|
|
|
- //Here MPI_Init or MPI_Init_thread is already called
|
|
|
- if (!extern_initialized)
|
|
|
- {
|
|
|
+ //Here MPI_Init or MPI_Init_thread is already called
|
|
|
+ if (!extern_initialized)
|
|
|
+ {
|
|
|
|
|
|
#if defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
|
|
|
- int required = MPI_THREAD_MULTIPLE;
|
|
|
+ int required = MPI_THREAD_MULTIPLE;
|
|
|
#else
|
|
|
- int required = MPI_THREAD_FUNNELED;
|
|
|
+ int required = MPI_THREAD_FUNNELED;
|
|
|
#endif
|
|
|
|
|
|
- int thread_support;
|
|
|
- STARPU_ASSERT(MPI_Init_thread(_starpu_get_argc(), _starpu_get_argv(), required, &thread_support) == MPI_SUCCESS);
|
|
|
+ int thread_support;
|
|
|
+ STARPU_ASSERT(MPI_Init_thread(_starpu_get_argc(), _starpu_get_argv(), required, &thread_support) == MPI_SUCCESS);
|
|
|
|
|
|
- if (thread_support != required)
|
|
|
- {
|
|
|
- if (required == MPI_THREAD_MULTIPLE)
|
|
|
- _STARPU_DISP("MPI doesn't support MPI_THREAD_MULTIPLE option. MPI Master-Slave can have problems if multiple slaves are launched. \n");
|
|
|
- if (required == MPI_THREAD_FUNNELED)
|
|
|
- _STARPU_DISP("MPI doesn't support MPI_THREAD_FUNNELED option. Many errors can occur. \n");
|
|
|
- }
|
|
|
+ if (thread_support != required)
|
|
|
+ {
|
|
|
+ if (required == MPI_THREAD_MULTIPLE)
|
|
|
+ _STARPU_DISP("MPI doesn't support MPI_THREAD_MULTIPLE option. MPI Master-Slave can have problems if multiple slaves are launched. \n");
|
|
|
+ if (required == MPI_THREAD_FUNNELED)
|
|
|
+ _STARPU_DISP("MPI doesn't support MPI_THREAD_FUNNELED option. Many errors can occur. \n");
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/* Find which node is the master */
|
|
|
_starpu_mpi_set_src_node_id();
|
|
|
|
|
|
return 1;
|
|
|
- }
|
|
|
+}
|
|
|
|
|
|
void _starpu_mpi_common_mp_deinit()
|
|
|
{
|
|
|
- if (!extern_initialized)
|
|
|
- MPI_Finalize();
|
|
|
+ if (!extern_initialized)
|
|
|
+ MPI_Finalize();
|
|
|
}
|
|
|
|
|
|
int _starpu_mpi_common_is_src_node()
|
|
|
{
|
|
|
- int id_proc;
|
|
|
- MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
|
|
|
- return id_proc == src_node_id;
|
|
|
+ int id_proc;
|
|
|
+ MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
|
|
|
+ return id_proc == src_node_id;
|
|
|
}
|
|
|
|
|
|
int _starpu_mpi_common_get_src_node()
|
|
|
{
|
|
|
- return src_node_id;
|
|
|
+ return src_node_id;
|
|
|
}
|
|
|
|
|
|
int _starpu_mpi_common_is_mp_initialized()
|
|
|
{
|
|
|
- return mpi_initialized;
|
|
|
+ return mpi_initialized;
|
|
|
}
|
|
|
|
|
|
/* common parts to initialize a source or a sink node */
|
|
|
void _starpu_mpi_common_mp_initialize_src_sink(struct _starpu_mp_node *node)
|
|
|
{
|
|
|
- struct _starpu_machine_topology *topology = &_starpu_get_machine_config()->topology;
|
|
|
+ struct _starpu_machine_topology *topology = &_starpu_get_machine_config()->topology;
|
|
|
|
|
|
- node->nb_cores = topology->nhwcpus;
|
|
|
+ node->nb_cores = topology->nhwcpus;
|
|
|
}
|
|
|
|
|
|
int _starpu_mpi_common_recv_is_ready(const struct _starpu_mp_node *mp_node)
|
|
|
{
|
|
|
- int res, source;
|
|
|
- int flag = 0;
|
|
|
- int id_proc;
|
|
|
- MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
|
|
|
-
|
|
|
- if (id_proc == src_node_id)
|
|
|
- {
|
|
|
- /* Source has mp_node defined */
|
|
|
- source = mp_node->mp_connection.mpi_remote_nodeid;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- /* Sink can have sink to sink message */
|
|
|
- source = MPI_ANY_SOURCE;
|
|
|
- }
|
|
|
-
|
|
|
- res = MPI_Iprobe(source, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
|
|
|
- STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot test if we received a message !");
|
|
|
-
|
|
|
- return flag;
|
|
|
+ int res, source;
|
|
|
+ int flag = 0;
|
|
|
+ int id_proc;
|
|
|
+ MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
|
|
|
+
|
|
|
+ if (id_proc == src_node_id)
|
|
|
+ {
|
|
|
+ /* Source has mp_node defined */
|
|
|
+ source = mp_node->mp_connection.mpi_remote_nodeid;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ /* Sink can have sink to sink message */
|
|
|
+ source = MPI_ANY_SOURCE;
|
|
|
+ }
|
|
|
+
|
|
|
+ res = MPI_Iprobe(source, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
|
|
|
+ STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot test if we received a message !");
|
|
|
+
|
|
|
+ return flag;
|
|
|
}
|
|
|
|
|
|
/* SEND to source node */
|
|
|
void _starpu_mpi_common_send(const struct _starpu_mp_node *node, void *msg, int len, void * event)
|
|
|
{
|
|
|
- int res;
|
|
|
- int id_proc;
|
|
|
- MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
|
|
|
+ int res;
|
|
|
+ int id_proc;
|
|
|
+ MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
|
|
|
|
|
|
- printf("envoi %d B to %d\n", len, node->mp_connection.mpi_remote_nodeid);
|
|
|
+ //printf("envoi %d B to %d\n", len, node->mp_connection.mpi_remote_nodeid);
|
|
|
|
|
|
- if (event)
|
|
|
- {
|
|
|
- /* Asynchronous send */
|
|
|
- struct _starpu_async_channel * channel = event;
|
|
|
- channel->event.mpi_ms_event.is_sender = 1;
|
|
|
+ if (event)
|
|
|
+ {
|
|
|
+ /* Asynchronous send */
|
|
|
+ struct _starpu_async_channel * channel = event;
|
|
|
+ channel->event.mpi_ms_event.is_sender = 1;
|
|
|
|
|
|
- /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
|
|
|
- if (channel->type == STARPU_UNUSED)
|
|
|
- channel->event.mpi_ms_event.requests = NULL;
|
|
|
+ /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
|
|
|
+ if (channel->type == STARPU_UNUSED)
|
|
|
+ channel->event.mpi_ms_event.requests = NULL;
|
|
|
|
|
|
- /* Initialize the list */
|
|
|
- if (channel->event.mpi_ms_event.requests == NULL)
|
|
|
- {
|
|
|
- channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();
|
|
|
- _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
|
|
|
- }
|
|
|
+ /* Initialize the list */
|
|
|
+ if (channel->event.mpi_ms_event.requests == NULL)
|
|
|
+ {
|
|
|
+ channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();
|
|
|
+ _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
|
|
|
+ }
|
|
|
|
|
|
- struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
|
|
|
+ struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
|
|
|
|
|
|
- res = MPI_Isend(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
|
|
|
+ res = MPI_Isend(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
|
|
|
|
|
|
- channel->starpu_mp_common_finished_receiver++;
|
|
|
- channel->starpu_mp_common_finished_sender++;
|
|
|
+ channel->starpu_mp_common_finished_receiver++;
|
|
|
+ channel->starpu_mp_common_finished_sender++;
|
|
|
|
|
|
- _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- /* Synchronous send */
|
|
|
- res = MPI_Send(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, SYNC_TAG, MPI_COMM_WORLD);
|
|
|
- }
|
|
|
- STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
|
|
|
+ _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ /* Synchronous send */
|
|
|
+ res = MPI_Send(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, SYNC_TAG, MPI_COMM_WORLD);
|
|
|
+ }
|
|
|
+ STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
|
|
|
}
|
|
|
|
|
|
void _starpu_mpi_common_mp_send(const struct _starpu_mp_node *node, void *msg, int len)
|
|
|
{
|
|
|
- _starpu_mpi_common_send(node, msg, len, NULL);
|
|
|
+ _starpu_mpi_common_send(node, msg, len, NULL);
|
|
|
}
|
|
|
|
|
|
|
|
|
/* RECV to source node */
|
|
|
void _starpu_mpi_common_recv(const struct _starpu_mp_node *node, void *msg, int len, void * event)
|
|
|
{
|
|
|
- int res;
|
|
|
- int id_proc;
|
|
|
- MPI_Status s;
|
|
|
- MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
|
|
|
+ int res;
|
|
|
+ int id_proc;
|
|
|
+ MPI_Status s;
|
|
|
+ MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
|
|
|
|
|
|
- printf("recv %d B from %d in %p\n", len, node->mp_connection.mpi_remote_nodeid, msg);
|
|
|
+ //printf("recv %d B from %d in %p\n", len, node->mp_connection.mpi_remote_nodeid, msg);
|
|
|
|
|
|
- if (event)
|
|
|
- {
|
|
|
- /* Asynchronous recv */
|
|
|
- struct _starpu_async_channel * channel = event;
|
|
|
- channel->event.mpi_ms_event.is_sender = 0;
|
|
|
+ if (event)
|
|
|
+ {
|
|
|
+ /* Asynchronous recv */
|
|
|
+ struct _starpu_async_channel * channel = event;
|
|
|
+ channel->event.mpi_ms_event.is_sender = 0;
|
|
|
|
|
|
- /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
|
|
|
- if (channel->type == STARPU_UNUSED)
|
|
|
- channel->event.mpi_ms_event.requests = NULL;
|
|
|
+ /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
|
|
|
+ if (channel->type == STARPU_UNUSED)
|
|
|
+ channel->event.mpi_ms_event.requests = NULL;
|
|
|
|
|
|
- /* Initialize the list */
|
|
|
- if (channel->event.mpi_ms_event.requests == NULL)
|
|
|
- {
|
|
|
- channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();
|
|
|
- _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
|
|
|
- }
|
|
|
+ /* Initialize the list */
|
|
|
+ if (channel->event.mpi_ms_event.requests == NULL)
|
|
|
+ {
|
|
|
+ channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();
|
|
|
+ _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
|
|
|
+ }
|
|
|
|
|
|
- struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
|
|
|
+ struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
|
|
|
|
|
|
- res = MPI_Irecv(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
|
|
|
+ res = MPI_Irecv(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
|
|
|
|
|
|
- channel->starpu_mp_common_finished_receiver++;
|
|
|
- channel->starpu_mp_common_finished_sender++;
|
|
|
+ channel->starpu_mp_common_finished_receiver++;
|
|
|
+ channel->starpu_mp_common_finished_sender++;
|
|
|
|
|
|
- _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- /* Synchronous recv */
|
|
|
- res = MPI_Recv(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, SYNC_TAG, MPI_COMM_WORLD, &s);
|
|
|
- int num_expected;
|
|
|
- MPI_Get_count(&s, MPI_BYTE, &num_expected);
|
|
|
+ _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ /* Synchronous recv */
|
|
|
+ res = MPI_Recv(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, SYNC_TAG, MPI_COMM_WORLD, &s);
|
|
|
+ int num_expected;
|
|
|
+ MPI_Get_count(&s, MPI_BYTE, &num_expected);
|
|
|
|
|
|
- STARPU_ASSERT_MSG(num_expected == len, "MPI Master/Slave received a msg with a size of %d Bytes (expected %d Bytes) !", num_expected, len);
|
|
|
- }
|
|
|
- STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
|
|
|
+ STARPU_ASSERT_MSG(num_expected == len, "MPI Master/Slave received a msg with a size of %d Bytes (expected %d Bytes) !", num_expected, len);
|
|
|
+ }
|
|
|
+ STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
|
|
|
}
|
|
|
|
|
|
void _starpu_mpi_common_mp_recv(const struct _starpu_mp_node *node, void *msg, int len)
|
|
|
{
|
|
|
- _starpu_mpi_common_recv(node, msg, len, NULL);
|
|
|
+ _starpu_mpi_common_recv(node, msg, len, NULL);
|
|
|
}
|
|
|
|
|
|
/* SEND to any node */
|
|
|
void _starpu_mpi_common_send_to_device(const struct _starpu_mp_node *node, int dst_devid, void *msg, int len, void * event)
|
|
|
{
|
|
|
- int res;
|
|
|
- int id_proc;
|
|
|
- MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
|
|
|
+ int res;
|
|
|
+ int id_proc;
|
|
|
+ MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
|
|
|
|
|
|
- printf("S_to_D send %d bytes from %d from %p\n", len, dst_devid, msg);
|
|
|
+ //printf("S_to_D send %d bytes from %d from %p\n", len, dst_devid, msg);
|
|
|
|
|
|
- if (event)
|
|
|
- {
|
|
|
- /* Asynchronous send */
|
|
|
- struct _starpu_async_channel * channel = event;
|
|
|
- channel->event.mpi_ms_event.is_sender = 1;
|
|
|
+ if (event)
|
|
|
+ {
|
|
|
+ /* Asynchronous send */
|
|
|
+ struct _starpu_async_channel * channel = event;
|
|
|
+ channel->event.mpi_ms_event.is_sender = 1;
|
|
|
|
|
|
- /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
|
|
|
- if (channel->type == STARPU_UNUSED)
|
|
|
- channel->event.mpi_ms_event.requests = NULL;
|
|
|
+ /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
|
|
|
+ if (channel->type == STARPU_UNUSED)
|
|
|
+ channel->event.mpi_ms_event.requests = NULL;
|
|
|
|
|
|
- /* Initialize the list */
|
|
|
- if (channel->event.mpi_ms_event.requests == NULL)
|
|
|
- {
|
|
|
- channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();
|
|
|
- _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
|
|
|
- }
|
|
|
+ /* Initialize the list */
|
|
|
+ if (channel->event.mpi_ms_event.requests == NULL)
|
|
|
+ {
|
|
|
+ channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();
|
|
|
+ _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
|
|
|
+ }
|
|
|
|
|
|
- struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
|
|
|
+ struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
|
|
|
|
|
|
- res = MPI_Isend(msg, len, MPI_BYTE, dst_devid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
|
|
|
+ res = MPI_Isend(msg, len, MPI_BYTE, dst_devid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
|
|
|
|
|
|
- channel->starpu_mp_common_finished_receiver++;
|
|
|
- channel->starpu_mp_common_finished_sender++;
|
|
|
+ channel->starpu_mp_common_finished_receiver++;
|
|
|
+ channel->starpu_mp_common_finished_sender++;
|
|
|
|
|
|
- _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- /* Synchronous send */
|
|
|
- res = MPI_Send(msg, len, MPI_BYTE, dst_devid, SYNC_TAG, MPI_COMM_WORLD);
|
|
|
- }
|
|
|
+ _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ /* Synchronous send */
|
|
|
+ res = MPI_Send(msg, len, MPI_BYTE, dst_devid, SYNC_TAG, MPI_COMM_WORLD);
|
|
|
+ }
|
|
|
|
|
|
- STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
|
|
|
+ STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
|
|
|
}
|
|
|
|
|
|
/* RECV to any node */
|
|
|
void _starpu_mpi_common_recv_from_device(const struct _starpu_mp_node *node, int src_devid, void *msg, int len, void * event)
|
|
|
{
|
|
|
- int res;
|
|
|
- int id_proc;
|
|
|
- MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
|
|
|
-
|
|
|
- printf("R_to_D nop recv %d bytes from %d\n", len, src_devid);
|
|
|
-
|
|
|
- if (event)
|
|
|
- {
|
|
|
- /* Asynchronous recv */
|
|
|
- struct _starpu_async_channel * channel = event;
|
|
|
- channel->event.mpi_ms_event.is_sender = 0;
|
|
|
+ int res;
|
|
|
+ int id_proc;
|
|
|
+ MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
|
|
|
|
|
|
- /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
|
|
|
- if (channel->type == STARPU_UNUSED)
|
|
|
- channel->event.mpi_ms_event.requests = NULL;
|
|
|
+ //printf("R_to_D nop recv %d bytes from %d\n", len, src_devid);
|
|
|
|
|
|
- /* Initialize the list */
|
|
|
- if (channel->event.mpi_ms_event.requests == NULL)
|
|
|
+ if (event)
|
|
|
{
|
|
|
- channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();
|
|
|
- _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
|
|
|
- }
|
|
|
+ /* Asynchronous recv */
|
|
|
+ struct _starpu_async_channel * channel = event;
|
|
|
+ channel->event.mpi_ms_event.is_sender = 0;
|
|
|
|
|
|
- struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
|
|
|
+ /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
|
|
|
+ if (channel->type == STARPU_UNUSED)
|
|
|
+ channel->event.mpi_ms_event.requests = NULL;
|
|
|
|
|
|
- res = MPI_Irecv(msg, len, MPI_BYTE, src_devid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
|
|
|
+ /* Initialize the list */
|
|
|
+ if (channel->event.mpi_ms_event.requests == NULL)
|
|
|
+ {
|
|
|
+ channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();
|
|
|
+ _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
|
|
|
+ }
|
|
|
|
|
|
- channel->starpu_mp_common_finished_receiver++;
|
|
|
- channel->starpu_mp_common_finished_sender++;
|
|
|
+ struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
|
|
|
|
|
|
- _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- /* Synchronous recv */
|
|
|
- MPI_Status s;
|
|
|
- res = MPI_Recv(msg, len, MPI_BYTE, src_devid, SYNC_TAG, MPI_COMM_WORLD, &s);
|
|
|
- int num_expected;
|
|
|
- MPI_Get_count(&s, MPI_BYTE, &num_expected);
|
|
|
+ res = MPI_Irecv(msg, len, MPI_BYTE, src_devid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
|
|
|
|
|
|
- STARPU_ASSERT_MSG(num_expected == len, "MPI Master/Slave received a msg with a size of %d Bytes (expected %d Bytes) !", num_expected, len);
|
|
|
- STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
|
|
|
- }
|
|
|
+ channel->starpu_mp_common_finished_receiver++;
|
|
|
+ channel->starpu_mp_common_finished_sender++;
|
|
|
+
|
|
|
+ _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ /* Synchronous recv */
|
|
|
+ MPI_Status s;
|
|
|
+ res = MPI_Recv(msg, len, MPI_BYTE, src_devid, SYNC_TAG, MPI_COMM_WORLD, &s);
|
|
|
+ int num_expected;
|
|
|
+ MPI_Get_count(&s, MPI_BYTE, &num_expected);
|
|
|
+
|
|
|
+ STARPU_ASSERT_MSG(num_expected == len, "MPI Master/Slave received a msg with a size of %d Bytes (expected %d Bytes) !", num_expected, len);
|
|
|
+ STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static void _starpu_mpi_common_polling_node(struct _starpu_mp_node * node)
|
|
|
{
|
|
|
- /* poll the asynchronous messages.*/
|
|
|
- if (node != NULL)
|
|
|
- {
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK(&node->connection_mutex);
|
|
|
- while(node->mp_recv_is_ready(node))
|
|
|
+ /* poll the asynchronous messages.*/
|
|
|
+ if (node != NULL)
|
|
|
{
|
|
|
- enum _starpu_mp_command answer;
|
|
|
- void *arg;
|
|
|
- int arg_size;
|
|
|
- answer = _starpu_mp_common_recv_command(node, &arg, &arg_size);
|
|
|
- if(!_starpu_src_common_store_message(node,arg,arg_size,answer))
|
|
|
- {
|
|
|
- printf("incorrect commande: unknown command or sync command");
|
|
|
- STARPU_ASSERT(0);
|
|
|
- }
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&node->connection_mutex);
|
|
|
+ while(node->mp_recv_is_ready(node))
|
|
|
+ {
|
|
|
+ enum _starpu_mp_command answer;
|
|
|
+ void *arg;
|
|
|
+ int arg_size;
|
|
|
+ answer = _starpu_mp_common_recv_command(node, &arg, &arg_size);
|
|
|
+ if(!_starpu_src_common_store_message(node,arg,arg_size,answer))
|
|
|
+ {
|
|
|
+ printf("incorrect commande: unknown command or sync command");
|
|
|
+ STARPU_ASSERT(0);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
|
|
|
}
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
- /* - In device to device communications, the first ack received by host
|
|
|
+/* - In device to device communications, the first ack received by host
|
|
|
* is considered as the sender (but it cannot be, in fact, the sender)
|
|
|
*/
|
|
|
int _starpu_mpi_common_test_event(struct _starpu_async_channel * event)
|
|
|
{
|
|
|
- if (event->event.mpi_ms_event.requests != NULL && !_starpu_mpi_ms_event_request_list_empty(event->event.mpi_ms_event.requests))
|
|
|
- {
|
|
|
- struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_list_begin(event->event.mpi_ms_event.requests);
|
|
|
- struct _starpu_mpi_ms_event_request * req_next;
|
|
|
-
|
|
|
- while (req != _starpu_mpi_ms_event_request_list_end(event->event.mpi_ms_event.requests))
|
|
|
+ if (event->event.mpi_ms_event.requests != NULL && !_starpu_mpi_ms_event_request_list_empty(event->event.mpi_ms_event.requests))
|
|
|
{
|
|
|
- req_next = _starpu_mpi_ms_event_request_list_next(req);
|
|
|
-
|
|
|
- int flag = 0;
|
|
|
- MPI_Test(&req->request, &flag, MPI_STATUS_IGNORE);
|
|
|
- if (flag)
|
|
|
- {
|
|
|
- _starpu_mpi_ms_event_request_list_erase(event->event.mpi_ms_event.requests, req);
|
|
|
- _starpu_mpi_ms_event_request_delete(req);
|
|
|
-
|
|
|
- if (event->event.mpi_ms_event.is_sender)
|
|
|
- event->starpu_mp_common_finished_sender--;
|
|
|
- else
|
|
|
- event->starpu_mp_common_finished_receiver--;
|
|
|
-
|
|
|
- }
|
|
|
- req = req_next;
|
|
|
- }
|
|
|
+ struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_list_begin(event->event.mpi_ms_event.requests);
|
|
|
+ struct _starpu_mpi_ms_event_request * req_next;
|
|
|
|
|
|
- /* When the list is empty, we finished to wait each request */
|
|
|
- if (_starpu_mpi_ms_event_request_list_empty(event->event.mpi_ms_event.requests))
|
|
|
- {
|
|
|
- /* Destroy the list */
|
|
|
- _starpu_mpi_ms_event_request_list_delete(event->event.mpi_ms_event.requests);
|
|
|
- event->event.mpi_ms_event.requests = NULL;
|
|
|
+ while (req != _starpu_mpi_ms_event_request_list_end(event->event.mpi_ms_event.requests))
|
|
|
+ {
|
|
|
+ req_next = _starpu_mpi_ms_event_request_list_next(req);
|
|
|
+
|
|
|
+ int flag = 0;
|
|
|
+ MPI_Test(&req->request, &flag, MPI_STATUS_IGNORE);
|
|
|
+ if (flag)
|
|
|
+ {
|
|
|
+ _starpu_mpi_ms_event_request_list_erase(event->event.mpi_ms_event.requests, req);
|
|
|
+ _starpu_mpi_ms_event_request_delete(req);
|
|
|
+
|
|
|
+ if (event->event.mpi_ms_event.is_sender)
|
|
|
+ event->starpu_mp_common_finished_sender--;
|
|
|
+ else
|
|
|
+ event->starpu_mp_common_finished_receiver--;
|
|
|
+
|
|
|
+ }
|
|
|
+ req = req_next;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* When the list is empty, we finished to wait each request */
|
|
|
+ if (_starpu_mpi_ms_event_request_list_empty(event->event.mpi_ms_event.requests))
|
|
|
+ {
|
|
|
+ /* Destroy the list */
|
|
|
+ _starpu_mpi_ms_event_request_list_delete(event->event.mpi_ms_event.requests);
|
|
|
+ event->event.mpi_ms_event.requests = NULL;
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- _starpu_mpi_common_polling_node(event->polling_node_sender);
|
|
|
- _starpu_mpi_common_polling_node(event->polling_node_receiver);
|
|
|
-
|
|
|
- return !event->starpu_mp_common_finished_sender && !event->starpu_mp_common_finished_receiver;
|
|
|
+ _starpu_mpi_common_polling_node(event->polling_node_sender);
|
|
|
+ _starpu_mpi_common_polling_node(event->polling_node_receiver);
|
|
|
+
|
|
|
+ return !event->starpu_mp_common_finished_sender && !event->starpu_mp_common_finished_receiver;
|
|
|
}
|
|
|
|
|
|
|
|
@@ -419,48 +419,48 @@ int _starpu_mpi_common_test_event(struct _starpu_async_channel * event)
|
|
|
*/
|
|
|
void _starpu_mpi_common_wait_event(struct _starpu_async_channel * event)
|
|
|
{
|
|
|
- if (event->event.mpi_ms_event.requests != NULL && !_starpu_mpi_ms_event_request_list_empty(event->event.mpi_ms_event.requests))
|
|
|
- {
|
|
|
- struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_list_begin(event->event.mpi_ms_event.requests);
|
|
|
- struct _starpu_mpi_ms_event_request * req_next;
|
|
|
-
|
|
|
- while (req != _starpu_mpi_ms_event_request_list_end(event->event.mpi_ms_event.requests))
|
|
|
+ if (event->event.mpi_ms_event.requests != NULL && !_starpu_mpi_ms_event_request_list_empty(event->event.mpi_ms_event.requests))
|
|
|
{
|
|
|
- req_next = _starpu_mpi_ms_event_request_list_next(req);
|
|
|
+ struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_list_begin(event->event.mpi_ms_event.requests);
|
|
|
+ struct _starpu_mpi_ms_event_request * req_next;
|
|
|
|
|
|
- MPI_Wait(&req->request, MPI_STATUS_IGNORE);
|
|
|
- _starpu_mpi_ms_event_request_list_erase(event->event.mpi_ms_event.requests, req);
|
|
|
+ while (req != _starpu_mpi_ms_event_request_list_end(event->event.mpi_ms_event.requests))
|
|
|
+ {
|
|
|
+ req_next = _starpu_mpi_ms_event_request_list_next(req);
|
|
|
|
|
|
- _starpu_mpi_ms_event_request_delete(req);
|
|
|
- req = req_next;
|
|
|
+ MPI_Wait(&req->request, MPI_STATUS_IGNORE);
|
|
|
+ _starpu_mpi_ms_event_request_list_erase(event->event.mpi_ms_event.requests, req);
|
|
|
|
|
|
- if (event->event.mpi_ms_event.is_sender)
|
|
|
- event->starpu_mp_common_finished_sender--;
|
|
|
- else
|
|
|
- event->starpu_mp_common_finished_receiver--;
|
|
|
+ _starpu_mpi_ms_event_request_delete(req);
|
|
|
+ req = req_next;
|
|
|
|
|
|
- }
|
|
|
+ if (event->event.mpi_ms_event.is_sender)
|
|
|
+ event->starpu_mp_common_finished_sender--;
|
|
|
+ else
|
|
|
+ event->starpu_mp_common_finished_receiver--;
|
|
|
|
|
|
- STARPU_ASSERT_MSG(_starpu_mpi_ms_event_request_list_empty(event->event.mpi_ms_event.requests), "MPI Request list is not empty after a wait_event !");
|
|
|
+ }
|
|
|
|
|
|
- /* Destroy the list */
|
|
|
- _starpu_mpi_ms_event_request_list_delete(event->event.mpi_ms_event.requests);
|
|
|
- event->event.mpi_ms_event.requests = NULL;
|
|
|
- }
|
|
|
+ STARPU_ASSERT_MSG(_starpu_mpi_ms_event_request_list_empty(event->event.mpi_ms_event.requests), "MPI Request list is not empty after a wait_event !");
|
|
|
|
|
|
- //incoming ack from devices
|
|
|
- while(event->starpu_mp_common_finished_sender > 0 || event->starpu_mp_common_finished_receiver > 0)
|
|
|
- {
|
|
|
- _starpu_mpi_common_polling_node(event->polling_node_sender);
|
|
|
- _starpu_mpi_common_polling_node(event->polling_node_receiver);
|
|
|
- }
|
|
|
+ /* Destroy the list */
|
|
|
+ _starpu_mpi_ms_event_request_list_delete(event->event.mpi_ms_event.requests);
|
|
|
+ event->event.mpi_ms_event.requests = NULL;
|
|
|
+ }
|
|
|
+
|
|
|
+ //incoming ack from devices
|
|
|
+ while(event->starpu_mp_common_finished_sender > 0 || event->starpu_mp_common_finished_receiver > 0)
|
|
|
+ {
|
|
|
+ _starpu_mpi_common_polling_node(event->polling_node_sender);
|
|
|
+ _starpu_mpi_common_polling_node(event->polling_node_receiver);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void _starpu_mpi_common_barrier(void)
|
|
|
{
|
|
|
- MPI_Barrier(MPI_COMM_WORLD);
|
|
|
+ MPI_Barrier(MPI_COMM_WORLD);
|
|
|
}
|
|
|
|
|
|
/* Compute bandwidth and latency between source and sink nodes
|
|
@@ -468,91 +468,91 @@ void _starpu_mpi_common_barrier(void)
|
|
|
*/
|
|
|
void _starpu_mpi_common_measure_bandwidth_latency(double bandwidth_dtod[STARPU_MAXMPIDEVS][STARPU_MAXMPIDEVS], double latency_dtod[STARPU_MAXMPIDEVS][STARPU_MAXMPIDEVS])
|
|
|
{
|
|
|
- int ret;
|
|
|
- unsigned iter;
|
|
|
+ int ret;
|
|
|
+ unsigned iter;
|
|
|
|
|
|
- int nb_proc, id_proc;
|
|
|
- MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
|
|
|
- MPI_Comm_size(MPI_COMM_WORLD, &nb_proc);
|
|
|
+ int nb_proc, id_proc;
|
|
|
+ MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
|
|
|
+ MPI_Comm_size(MPI_COMM_WORLD, &nb_proc);
|
|
|
|
|
|
- char * buf;
|
|
|
- _STARPU_MALLOC(buf, SIZE_BANDWIDTH);
|
|
|
- memset(buf, 0, SIZE_BANDWIDTH);
|
|
|
+ char * buf;
|
|
|
+ _STARPU_MALLOC(buf, SIZE_BANDWIDTH);
|
|
|
+ memset(buf, 0, SIZE_BANDWIDTH);
|
|
|
|
|
|
- unsigned sender, receiver;
|
|
|
- for(sender = 0; sender < nb_proc; sender++)
|
|
|
- {
|
|
|
- for(receiver = 0; receiver < nb_proc; receiver++)
|
|
|
+ unsigned sender, receiver;
|
|
|
+ for(sender = 0; sender < nb_proc; sender++)
|
|
|
{
|
|
|
- MPI_Barrier(MPI_COMM_WORLD);
|
|
|
-
|
|
|
- //Node can't be a sender and a receiver
|
|
|
- if(sender == receiver)
|
|
|
- continue;
|
|
|
-
|
|
|
- if(id_proc == sender)
|
|
|
- {
|
|
|
- double start, end;
|
|
|
-
|
|
|
- /* measure bandwidth sender to receiver */
|
|
|
- start = starpu_timing_now();
|
|
|
- for (iter = 0; iter < NITER; iter++)
|
|
|
+ for(receiver = 0; receiver < nb_proc; receiver++)
|
|
|
{
|
|
|
- ret = MPI_Send(buf, SIZE_BANDWIDTH, MPI_BYTE, receiver, 42, MPI_COMM_WORLD);
|
|
|
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
|
|
|
+ MPI_Barrier(MPI_COMM_WORLD);
|
|
|
+
|
|
|
+ //Node can't be a sender and a receiver
|
|
|
+ if(sender == receiver)
|
|
|
+ continue;
|
|
|
+
|
|
|
+ if(id_proc == sender)
|
|
|
+ {
|
|
|
+ double start, end;
|
|
|
+
|
|
|
+ /* measure bandwidth sender to receiver */
|
|
|
+ start = starpu_timing_now();
|
|
|
+ for (iter = 0; iter < NITER; iter++)
|
|
|
+ {
|
|
|
+ ret = MPI_Send(buf, SIZE_BANDWIDTH, MPI_BYTE, receiver, 42, MPI_COMM_WORLD);
|
|
|
+ STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
|
|
|
+ }
|
|
|
+ end = starpu_timing_now();
|
|
|
+ bandwidth_dtod[sender][receiver] = (NITER*1000000)/(end - start);
|
|
|
+
|
|
|
+ /* measure latency sender to receiver */
|
|
|
+ start = starpu_timing_now();
|
|
|
+ for (iter = 0; iter < NITER; iter++)
|
|
|
+ {
|
|
|
+ ret = MPI_Send(buf, 1, MPI_BYTE, receiver, 42, MPI_COMM_WORLD);
|
|
|
+ STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Latency of MPI Master/Slave cannot be measured !");
|
|
|
+ }
|
|
|
+ end = starpu_timing_now();
|
|
|
+ latency_dtod[sender][receiver] = (end - start)/NITER;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (id_proc == receiver)
|
|
|
+ {
|
|
|
+ /* measure bandwidth sender to receiver*/
|
|
|
+ for (iter = 0; iter < NITER; iter++)
|
|
|
+ {
|
|
|
+ ret = MPI_Recv(buf, SIZE_BANDWIDTH, MPI_BYTE, sender, 42, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
|
|
+ STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
|
|
|
+ }
|
|
|
+
|
|
|
+ /* measure latency sender to receiver */
|
|
|
+ for (iter = 0; iter < NITER; iter++)
|
|
|
+ {
|
|
|
+ ret = MPI_Recv(buf, 1, MPI_BYTE, sender, 42, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
|
|
+ STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- end = starpu_timing_now();
|
|
|
- bandwidth_dtod[sender][receiver] = (NITER*1000000)/(end - start);
|
|
|
|
|
|
- /* measure latency sender to receiver */
|
|
|
- start = starpu_timing_now();
|
|
|
- for (iter = 0; iter < NITER; iter++)
|
|
|
- {
|
|
|
- ret = MPI_Send(buf, 1, MPI_BYTE, receiver, 42, MPI_COMM_WORLD);
|
|
|
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Latency of MPI Master/Slave cannot be measured !");
|
|
|
- }
|
|
|
- end = starpu_timing_now();
|
|
|
- latency_dtod[sender][receiver] = (end - start)/NITER;
|
|
|
- }
|
|
|
-
|
|
|
- if (id_proc == receiver)
|
|
|
- {
|
|
|
- /* measure bandwidth sender to receiver*/
|
|
|
- for (iter = 0; iter < NITER; iter++)
|
|
|
+ /* When a sender finished its work, it has to send its results to the master */
|
|
|
+
|
|
|
+ /* Sender doesn't need to send to itself its data */
|
|
|
+ if (sender == src_node_id)
|
|
|
+ continue;
|
|
|
+
|
|
|
+ /* if we are the sender, we send the data */
|
|
|
+ if (sender == id_proc)
|
|
|
{
|
|
|
- ret = MPI_Recv(buf, SIZE_BANDWIDTH, MPI_BYTE, sender, 42, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
|
|
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
|
|
|
+ MPI_Send(bandwidth_dtod[sender], STARPU_MAXMPIDEVS, MPI_DOUBLE, src_node_id, 42, MPI_COMM_WORLD);
|
|
|
+ MPI_Send(latency_dtod[sender], STARPU_MAXMPIDEVS, MPI_DOUBLE, src_node_id, 42, MPI_COMM_WORLD);
|
|
|
}
|
|
|
|
|
|
- /* measure latency sender to receiver */
|
|
|
- for (iter = 0; iter < NITER; iter++)
|
|
|
+ /* the master node receives the data */
|
|
|
+ if (src_node_id == id_proc)
|
|
|
{
|
|
|
- ret = MPI_Recv(buf, 1, MPI_BYTE, sender, 42, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
|
|
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
|
|
|
+ MPI_Recv(bandwidth_dtod[sender], STARPU_MAXMPIDEVS, MPI_DOUBLE, sender, 42, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
|
|
+ MPI_Recv(latency_dtod[sender], STARPU_MAXMPIDEVS, MPI_DOUBLE, sender, 42, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
|
|
}
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /* When a sender finished its work, it has to send its results to the master */
|
|
|
-
|
|
|
- /* Sender doesn't need to send to itself its data */
|
|
|
- if (sender == src_node_id)
|
|
|
- continue;
|
|
|
-
|
|
|
- /* if we are the sender, we send the data */
|
|
|
- if (sender == id_proc)
|
|
|
- {
|
|
|
- MPI_Send(bandwidth_dtod[sender], STARPU_MAXMPIDEVS, MPI_DOUBLE, src_node_id, 42, MPI_COMM_WORLD);
|
|
|
- MPI_Send(latency_dtod[sender], STARPU_MAXMPIDEVS, MPI_DOUBLE, src_node_id, 42, MPI_COMM_WORLD);
|
|
|
- }
|
|
|
|
|
|
- /* the master node receives the data */
|
|
|
- if (src_node_id == id_proc)
|
|
|
- {
|
|
|
- MPI_Recv(bandwidth_dtod[sender], STARPU_MAXMPIDEVS, MPI_DOUBLE, sender, 42, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
|
|
- MPI_Recv(latency_dtod[sender], STARPU_MAXMPIDEVS, MPI_DOUBLE, sender, 42, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
|
|
|
}
|
|
|
-
|
|
|
- }
|
|
|
- free(buf);
|
|
|
+ free(buf);
|
|
|
}
|