123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520 |
- /* StarPU --- Runtime system for heterogeneous multicore architectures.
- *
- * Copyright (C) 2015 Mathieu Lirzin <mthl@openmailbox.org>
- * Copyright (C) 2016 Inria
- *
- * StarPU is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * StarPU is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
- #include <mpi.h>
- #include <core/workers.h>
- #include <core/perfmodel/perfmodel.h>
- #include <drivers/mp_common/source_common.h>
- #include "driver_mpi_common.h"
- #define NITER 32
- #define SIZE_BANDWIDTH (1024*1024)
- #define SYNC_TAG 44
- #define ASYNC_TAG 45
- #define DRIVER_MPI_MASTER_NODE_DEFAULT 0
- static int mpi_initialized = 0;
- static int extern_initialized = 0;
- static int src_node_id;
- static void _starpu_mpi_set_src_node_id()
- {
- int node_id = starpu_get_env_number("STARPU_MPI_MASTER_NODE");
- if (node_id != -1)
- {
- int nb_proc, id_proc;
- MPI_Comm_size(MPI_COMM_WORLD, &nb_proc);
- MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
- if (node_id < nb_proc)
- {
- src_node_id = node_id;
- return;
- }
- else if (id_proc == DRIVER_MPI_MASTER_NODE_DEFAULT)
- {
- /* Only one node prints the error message. */
- fprintf(stderr, "The node you specify to be the master is "
- "greater than the total number of nodes.\n"
- "Taking node %d by default...\n", DRIVER_MPI_MASTER_NODE_DEFAULT);
- }
- }
- /* Node by default. */
- src_node_id = DRIVER_MPI_MASTER_NODE_DEFAULT;
- }
- int _starpu_mpi_common_mp_init()
- {
- //Here we supposed the programmer called two times starpu_init.
- if (mpi_initialized)
- return -ENODEV;
- mpi_initialized = 1;
- if (MPI_Initialized(&extern_initialized) != MPI_SUCCESS)
- STARPU_ABORT_MSG("Cannot check if MPI is initialized or not !");
- //Here MPI_Init or MPI_Init_thread is already called
- if (!extern_initialized)
- {
- #if defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
- int required = MPI_THREAD_MULTIPLE;
- #else
- int required = MPI_THREAD_FUNNELED;
- #endif
- int thread_support;
- STARPU_ASSERT(MPI_Init_thread(_starpu_get_argc(), _starpu_get_argv(), required, &thread_support) == MPI_SUCCESS);
- if (thread_support != required)
- {
- if (required == MPI_THREAD_MULTIPLE)
- fprintf(stderr, "MPI doesn't support MPI_THREAD_MULTIPLE option. MPI Master-Slave can have problems if multiple slaves are launched. \n");
- if (required == MPI_THREAD_FUNNELED)
- fprintf(stderr, "MPI doesn't support MPI_THREAD_FUNNELED option. Many errors can occur. \n");
- }
- }
-
- /* Find which node is the master */
- _starpu_mpi_set_src_node_id();
- return 1;
- }
- void _starpu_mpi_common_mp_deinit()
- {
- if (!extern_initialized)
- MPI_Finalize();
- }
- int _starpu_mpi_common_is_src_node()
- {
- int id_proc;
- MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
- return id_proc == src_node_id;
- }
- int _starpu_mpi_common_get_src_node()
- {
- return src_node_id;
- }
- int _starpu_mpi_common_is_mp_initialized()
- {
- return mpi_initialized;
- }
- /* common parts to initialize a source or a sink node */
- void _starpu_mpi_common_mp_initialize_src_sink(struct _starpu_mp_node *node)
- {
- struct _starpu_machine_topology *topology = &_starpu_get_machine_config()->topology;
- node->nb_cores = topology->nhwcpus;
- }
- int _starpu_mpi_common_recv_is_ready(const struct _starpu_mp_node *mp_node)
- {
- int res, source;
- int flag = 0;
- int id_proc;
- MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
- if (id_proc == src_node_id)
- {
- /* Source has mp_node defined */
- source = mp_node->mp_connection.mpi_remote_nodeid;
- }
- else
- {
- /* Sink can have sink to sink message */
- source = MPI_ANY_SOURCE;
- }
- res = MPI_Iprobe(source, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
- STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot test if we received a message !");
- return flag;
- }
- /* SEND to source node */
- void _starpu_mpi_common_send(const struct _starpu_mp_node *node, void *msg, int len, void * event)
- {
- int res;
- int id_proc;
- MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
- printf("envoi %d B to %d\n", len, node->mp_connection.mpi_remote_nodeid);
- if (event)
- {
- /* Asynchronous send */
- struct _starpu_async_channel * channel = event;
- channel->event.mpi_ms_event.is_sender = 1;
- /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
- if (channel->type == STARPU_UNUSED)
- channel->event.mpi_ms_event.requests = NULL;
- /* Initialize the list */
- if (channel->event.mpi_ms_event.requests == NULL)
- {
- channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();
- _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
- }
- struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
- res = MPI_Isend(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
- channel->starpu_mp_common_finished_receiver++;
- channel->starpu_mp_common_finished_sender++;
- _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
- }
- else
- {
- /* Synchronous send */
- res = MPI_Send(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, SYNC_TAG, MPI_COMM_WORLD);
- }
- STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
- }
- void _starpu_mpi_common_mp_send(const struct _starpu_mp_node *node, void *msg, int len)
- {
- _starpu_mpi_common_send(node, msg, len, NULL);
- }
- /* RECV to source node */
- void _starpu_mpi_common_recv(const struct _starpu_mp_node *node, void *msg, int len, void * event)
- {
- int res;
- int id_proc;
- MPI_Status s;
- MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
- printf("recv %d B from %d in %p\n", len, node->mp_connection.mpi_remote_nodeid, msg);
- if (event)
- {
- /* Asynchronous recv */
- struct _starpu_async_channel * channel = event;
- channel->event.mpi_ms_event.is_sender = 0;
- /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
- if (channel->type == STARPU_UNUSED)
- channel->event.mpi_ms_event.requests = NULL;
- /* Initialize the list */
- if (channel->event.mpi_ms_event.requests == NULL)
- {
- channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();
- _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
- }
- struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
- res = MPI_Irecv(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
- channel->starpu_mp_common_finished_receiver++;
- channel->starpu_mp_common_finished_sender++;
- _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
- }
- else
- {
- /* Synchronous recv */
- res = MPI_Recv(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, SYNC_TAG, MPI_COMM_WORLD, &s);
- int num_expected;
- MPI_Get_count(&s, MPI_BYTE, &num_expected);
- STARPU_ASSERT_MSG(num_expected == len, "MPI Master/Slave received a msg with a size of %d Bytes (expected %d Bytes) !", num_expected, len);
- }
- STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
- }
- void _starpu_mpi_common_mp_recv(const struct _starpu_mp_node *node, void *msg, int len)
- {
- _starpu_mpi_common_recv(node, msg, len, NULL);
- }
- /* SEND to any node */
- void _starpu_mpi_common_send_to_device(const struct _starpu_mp_node *node, int dst_devid, void *msg, int len, void * event)
- {
- int res;
- int id_proc;
- MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
- printf("send %d bytes from %d from %p\n", len, dst_devid, msg);
- if (event)
- {
- /* Asynchronous send */
- struct _starpu_async_channel * channel = event;
- channel->event.mpi_ms_event.is_sender = 1;
- /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
- if (channel->type == STARPU_UNUSED)
- channel->event.mpi_ms_event.requests = NULL;
- /* Initialize the list */
- if (channel->event.mpi_ms_event.requests == NULL)
- {
- channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();
- _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
- }
- struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
- res = MPI_Isend(msg, len, MPI_BYTE, dst_devid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
- channel->starpu_mp_common_finished_receiver++;
- channel->starpu_mp_common_finished_sender++;
- _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
- }
- else
- {
- /* Synchronous send */
- res = MPI_Send(msg, len, MPI_BYTE, dst_devid, SYNC_TAG, MPI_COMM_WORLD);
- }
- STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
- }
- /* RECV to any node */
- void _starpu_mpi_common_recv_from_device(const struct _starpu_mp_node *node, int src_devid, void *msg, int len, void * event)
- {
- int res;
- int id_proc;
- MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
- printf("nop recv %d bytes from %d\n", len, src_devid);
- if (event)
- {
- /* Asynchronous recv */
- struct _starpu_async_channel * channel = event;
- channel->event.mpi_ms_event.is_sender = 0;
- /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
- if (channel->type == STARPU_UNUSED)
- channel->event.mpi_ms_event.requests = NULL;
- /* Initialize the list */
- if (channel->event.mpi_ms_event.requests == NULL)
- {
- channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();
- _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
- }
- struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
- res = MPI_Irecv(msg, len, MPI_BYTE, src_devid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
- channel->starpu_mp_common_finished_receiver++;
- channel->starpu_mp_common_finished_sender++;
- _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
- }
- else
- {
- /* Synchronous recv */
- res = MPI_Recv(msg, len, MPI_BYTE, src_devid, SYNC_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
- STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
- }
- }
- /* - In device to device communications, the first ack received by host
- * is considered as the sender (but it cannot be, in fact, the sender)
- */
- int _starpu_mpi_common_test_event(struct _starpu_async_channel * event)
- {
- if (event->event.mpi_ms_event.requests != NULL && !_starpu_mpi_ms_event_request_list_empty(event->event.mpi_ms_event.requests))
- {
- struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_list_begin(event->event.mpi_ms_event.requests);
- struct _starpu_mpi_ms_event_request * req_next;
- while (req != _starpu_mpi_ms_event_request_list_end(event->event.mpi_ms_event.requests))
- {
- req_next = _starpu_mpi_ms_event_request_list_next(req);
- int flag = 0;
- MPI_Test(&req->request, &flag, MPI_STATUS_IGNORE);
- if (flag)
- {
- _starpu_mpi_ms_event_request_list_erase(event->event.mpi_ms_event.requests, req);
- _starpu_mpi_ms_event_request_delete(req);
- if (event->event.mpi_ms_event.is_sender)
- event->starpu_mp_common_finished_sender--;
- else
- event->starpu_mp_common_finished_receiver--;
- }
- req = req_next;
- }
- /* When the list is empty, we finished to wait each request */
- if (_starpu_mpi_ms_event_request_list_empty(event->event.mpi_ms_event.requests))
- {
- /* Destroy the list */
- _starpu_mpi_ms_event_request_list_delete(event->event.mpi_ms_event.requests);
- event->event.mpi_ms_event.requests = NULL;
- }
- }
- /* poll the asynchronous messages.*/
- if (event->polling_node != NULL)
- {
- while(event->polling_node->mp_recv_is_ready(event->polling_node))
- {
- enum _starpu_mp_command answer;
- void *arg;
- int arg_size;
- answer = _starpu_mp_common_recv_command(event->polling_node, &arg, &arg_size);
- if(!_starpu_src_common_store_message(event->polling_node,arg,arg_size,answer))
- {
- printf("incorrect commande: unknown command or sync command");
- STARPU_ASSERT(0);
- }
- }
- }
- return !event->starpu_mp_common_finished_sender && !event->starpu_mp_common_finished_receiver;
- }
- void _starpu_mpi_common_barrier(void)
- {
- MPI_Barrier(MPI_COMM_WORLD);
- }
- /* Compute bandwidth and latency between source and sink nodes
- * Source node has to have the entire set of times at the end
- */
- void _starpu_mpi_common_measure_bandwidth_latency(double * bandwidth_htod, double * bandwidth_dtoh, double * latency_htod, double * latency_dtoh)
- {
- int ret;
- unsigned iter;
- int nb_proc, id_proc;
- MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
- MPI_Comm_size(MPI_COMM_WORLD, &nb_proc);
- char * buf;
- _STARPU_MALLOC(buf, SIZE_BANDWIDTH);
- memset(buf, 0, SIZE_BANDWIDTH);
- unsigned node;
- unsigned id = 0;
- for(node = 0; node < nb_proc; node++)
- {
- MPI_Barrier(MPI_COMM_WORLD);
- //Don't measure link master <-> master
- if(node == src_node_id)
- continue;
- if(_starpu_mpi_common_is_src_node())
- {
- double start, end;
- /* measure bandwidth host to device */
- start = starpu_timing_now();
- for (iter = 0; iter < NITER; iter++)
- {
- ret = MPI_Send(buf, SIZE_BANDWIDTH, MPI_BYTE, node, node, MPI_COMM_WORLD);
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
- }
- end = starpu_timing_now();
- bandwidth_htod[id] = (NITER*1000000)/(end - start);
- /* measure bandwidth device to host */
- start = starpu_timing_now();
- for (iter = 0; iter < NITER; iter++)
- {
- ret = MPI_Recv(buf, SIZE_BANDWIDTH, MPI_BYTE, node, node, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
- }
- end = starpu_timing_now();
- bandwidth_dtoh[id] = (NITER*1000000)/(end - start);
- /* measure latency host to device */
- start = starpu_timing_now();
- for (iter = 0; iter < NITER; iter++)
- {
- ret = MPI_Send(buf, 1, MPI_BYTE, node, node, MPI_COMM_WORLD);
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Latency of MPI Master/Slave cannot be measured !");
- }
- end = starpu_timing_now();
- latency_htod[id] = (end - start)/NITER;
- /* measure latency device to host */
- start = starpu_timing_now();
- for (iter = 0; iter < NITER; iter++)
- {
- ret = MPI_Recv(buf, 1, MPI_BYTE, node, node, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
- }
- end = starpu_timing_now();
- latency_dtoh[id] = (end - start)/NITER;
- }
- else if (node == id_proc) /* if we are the sink node evaluated */
- {
- /* measure bandwidth host to device */
- for (iter = 0; iter < NITER; iter++)
- {
- ret = MPI_Recv(buf, SIZE_BANDWIDTH, MPI_BYTE, src_node_id, node, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
- }
- /* measure bandwidth device to host */
- for (iter = 0; iter < NITER; iter++)
- {
- ret = MPI_Send(buf, SIZE_BANDWIDTH, MPI_BYTE, src_node_id, node, MPI_COMM_WORLD);
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
- }
- /* measure latency host to device */
- for (iter = 0; iter < NITER; iter++)
- {
- ret = MPI_Recv(buf, 1, MPI_BYTE, src_node_id, node, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
- }
- /* measure latency device to host */
- for (iter = 0; iter < NITER; iter++)
- {
- ret = MPI_Send(buf, 1, MPI_BYTE, src_node_id, node, MPI_COMM_WORLD);
- STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Latency of MPI Master/Slave cannot be measured !");
- }
- }
- id++;
- }
- free(buf);
- }
|