driver_mpi_common.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2015 Mathieu Lirzin <mthl@openmailbox.org>
  4. * Copyright (C) 2016 Inria
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <mpi.h>
  18. #include <core/workers.h>
  19. #include <core/perfmodel/perfmodel.h>
  20. #include <drivers/mp_common/source_common.h>
  21. #include "driver_mpi_common.h"
  22. #define NITER 32
  23. #define SIZE_BANDWIDTH (1024*1024)
  24. #define SYNC_TAG 44
  25. #define ASYNC_TAG 45
  26. #define DRIVER_MPI_MASTER_NODE_DEFAULT 0
  27. static int mpi_initialized = 0;
  28. static int extern_initialized = 0;
  29. static int src_node_id;
  30. static void _starpu_mpi_set_src_node_id()
  31. {
  32. int node_id = starpu_get_env_number("STARPU_MPI_MASTER_NODE");
  33. if (node_id != -1)
  34. {
  35. int nb_proc, id_proc;
  36. MPI_Comm_size(MPI_COMM_WORLD, &nb_proc);
  37. MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
  38. if (node_id < nb_proc)
  39. {
  40. src_node_id = node_id;
  41. return;
  42. }
  43. else if (id_proc == DRIVER_MPI_MASTER_NODE_DEFAULT)
  44. {
  45. /* Only one node prints the error message. */
  46. fprintf(stderr, "The node you specify to be the master is "
  47. "greater than the total number of nodes.\n"
  48. "Taking node %d by default...\n", DRIVER_MPI_MASTER_NODE_DEFAULT);
  49. }
  50. }
  51. /* Node by default. */
  52. src_node_id = DRIVER_MPI_MASTER_NODE_DEFAULT;
  53. }
  54. int _starpu_mpi_common_mp_init()
  55. {
  56. //Here we supposed the programmer called two times starpu_init.
  57. if (mpi_initialized)
  58. return -ENODEV;
  59. mpi_initialized = 1;
  60. if (MPI_Initialized(&extern_initialized) != MPI_SUCCESS)
  61. STARPU_ABORT_MSG("Cannot check if MPI is initialized or not !");
  62. //Here MPI_Init or MPI_Init_thread is already called
  63. if (!extern_initialized)
  64. {
  65. #if defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
  66. int required = MPI_THREAD_MULTIPLE;
  67. #else
  68. int required = MPI_THREAD_FUNNELED;
  69. #endif
  70. int thread_support;
  71. STARPU_ASSERT(MPI_Init_thread(_starpu_get_argc(), _starpu_get_argv(), required, &thread_support) == MPI_SUCCESS);
  72. if (thread_support != required)
  73. {
  74. if (required == MPI_THREAD_MULTIPLE)
  75. fprintf(stderr, "MPI doesn't support MPI_THREAD_MULTIPLE option. MPI Master-Slave can have problems if multiple slaves are launched. \n");
  76. if (required == MPI_THREAD_FUNNELED)
  77. fprintf(stderr, "MPI doesn't support MPI_THREAD_FUNNELED option. Many errors can occur. \n");
  78. }
  79. }
  80. /* Find which node is the master */
  81. _starpu_mpi_set_src_node_id();
  82. return 1;
  83. }
  84. void _starpu_mpi_common_mp_deinit()
  85. {
  86. if (!extern_initialized)
  87. MPI_Finalize();
  88. }
  89. int _starpu_mpi_common_is_src_node()
  90. {
  91. int id_proc;
  92. MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
  93. return id_proc == src_node_id;
  94. }
  95. int _starpu_mpi_common_get_src_node()
  96. {
  97. return src_node_id;
  98. }
  99. int _starpu_mpi_common_is_mp_initialized()
  100. {
  101. return mpi_initialized;
  102. }
  103. /* common parts to initialize a source or a sink node */
  104. void _starpu_mpi_common_mp_initialize_src_sink(struct _starpu_mp_node *node)
  105. {
  106. struct _starpu_machine_topology *topology = &_starpu_get_machine_config()->topology;
  107. node->nb_cores = topology->nhwcpus;
  108. }
  109. int _starpu_mpi_common_recv_is_ready(const struct _starpu_mp_node *mp_node)
  110. {
  111. int res, source;
  112. int flag = 0;
  113. int id_proc;
  114. MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
  115. if (id_proc == src_node_id)
  116. {
  117. /* Source has mp_node defined */
  118. source = mp_node->mp_connection.mpi_remote_nodeid;
  119. }
  120. else
  121. {
  122. /* Sink can have sink to sink message */
  123. source = MPI_ANY_SOURCE;
  124. }
  125. res = MPI_Iprobe(source, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
  126. STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot test if we received a message !");
  127. return flag;
  128. }
  129. /* SEND to source node */
  130. void _starpu_mpi_common_send(const struct _starpu_mp_node *node, void *msg, int len, void * event)
  131. {
  132. int res;
  133. int id_proc;
  134. MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
  135. printf("envoi %d B to %d\n", len, node->mp_connection.mpi_remote_nodeid);
  136. if (event)
  137. {
  138. /* Asynchronous send */
  139. struct _starpu_async_channel * channel = event;
  140. channel->event.mpi_ms_event.is_sender = 1;
  141. /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
  142. if (channel->type == STARPU_UNUSED)
  143. channel->event.mpi_ms_event.requests = NULL;
  144. /* Initialize the list */
  145. if (channel->event.mpi_ms_event.requests == NULL)
  146. {
  147. channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();
  148. _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
  149. }
  150. struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
  151. res = MPI_Isend(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
  152. channel->starpu_mp_common_finished_receiver++;
  153. channel->starpu_mp_common_finished_sender++;
  154. _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
  155. }
  156. else
  157. {
  158. /* Synchronous send */
  159. res = MPI_Send(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, SYNC_TAG, MPI_COMM_WORLD);
  160. }
  161. STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
  162. }
  163. void _starpu_mpi_common_mp_send(const struct _starpu_mp_node *node, void *msg, int len)
  164. {
  165. _starpu_mpi_common_send(node, msg, len, NULL);
  166. }
  167. /* RECV to source node */
  168. void _starpu_mpi_common_recv(const struct _starpu_mp_node *node, void *msg, int len, void * event)
  169. {
  170. int res;
  171. int id_proc;
  172. MPI_Status s;
  173. MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
  174. printf("recv %d B from %d in %p\n", len, node->mp_connection.mpi_remote_nodeid, msg);
  175. if (event)
  176. {
  177. /* Asynchronous recv */
  178. struct _starpu_async_channel * channel = event;
  179. channel->event.mpi_ms_event.is_sender = 0;
  180. /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
  181. if (channel->type == STARPU_UNUSED)
  182. channel->event.mpi_ms_event.requests = NULL;
  183. /* Initialize the list */
  184. if (channel->event.mpi_ms_event.requests == NULL)
  185. {
  186. channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();
  187. _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
  188. }
  189. struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
  190. res = MPI_Irecv(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
  191. channel->starpu_mp_common_finished_receiver++;
  192. channel->starpu_mp_common_finished_sender++;
  193. _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
  194. }
  195. else
  196. {
  197. /* Synchronous recv */
  198. res = MPI_Recv(msg, len, MPI_BYTE, node->mp_connection.mpi_remote_nodeid, SYNC_TAG, MPI_COMM_WORLD, &s);
  199. int num_expected;
  200. MPI_Get_count(&s, MPI_BYTE, &num_expected);
  201. STARPU_ASSERT_MSG(num_expected == len, "MPI Master/Slave received a msg with a size of %d Bytes (expected %d Bytes) !", num_expected, len);
  202. }
  203. STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
  204. }
  205. void _starpu_mpi_common_mp_recv(const struct _starpu_mp_node *node, void *msg, int len)
  206. {
  207. _starpu_mpi_common_recv(node, msg, len, NULL);
  208. }
  209. /* SEND to any node */
  210. void _starpu_mpi_common_send_to_device(const struct _starpu_mp_node *node, int dst_devid, void *msg, int len, void * event)
  211. {
  212. int res;
  213. int id_proc;
  214. MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
  215. printf("send %d bytes from %d from %p\n", len, dst_devid, msg);
  216. if (event)
  217. {
  218. /* Asynchronous send */
  219. struct _starpu_async_channel * channel = event;
  220. channel->event.mpi_ms_event.is_sender = 1;
  221. /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
  222. if (channel->type == STARPU_UNUSED)
  223. channel->event.mpi_ms_event.requests = NULL;
  224. /* Initialize the list */
  225. if (channel->event.mpi_ms_event.requests == NULL)
  226. {
  227. channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();
  228. _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
  229. }
  230. struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
  231. res = MPI_Isend(msg, len, MPI_BYTE, dst_devid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
  232. channel->starpu_mp_common_finished_receiver++;
  233. channel->starpu_mp_common_finished_sender++;
  234. _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
  235. }
  236. else
  237. {
  238. /* Synchronous send */
  239. res = MPI_Send(msg, len, MPI_BYTE, dst_devid, SYNC_TAG, MPI_COMM_WORLD);
  240. }
  241. STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
  242. }
  243. /* RECV to any node */
  244. void _starpu_mpi_common_recv_from_device(const struct _starpu_mp_node *node, int src_devid, void *msg, int len, void * event)
  245. {
  246. int res;
  247. int id_proc;
  248. MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
  249. printf("nop recv %d bytes from %d\n", len, src_devid);
  250. if (event)
  251. {
  252. /* Asynchronous recv */
  253. struct _starpu_async_channel * channel = event;
  254. channel->event.mpi_ms_event.is_sender = 0;
  255. /* call by sink, we need to initialize some parts, for host it's done in data_request.c */
  256. if (channel->type == STARPU_UNUSED)
  257. channel->event.mpi_ms_event.requests = NULL;
  258. /* Initialize the list */
  259. if (channel->event.mpi_ms_event.requests == NULL)
  260. {
  261. channel->event.mpi_ms_event.requests = _starpu_mpi_ms_event_request_list_new();
  262. _starpu_mpi_ms_event_request_list_init(channel->event.mpi_ms_event.requests);
  263. }
  264. struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_new();
  265. res = MPI_Irecv(msg, len, MPI_BYTE, src_devid, ASYNC_TAG, MPI_COMM_WORLD, &req->request);
  266. channel->starpu_mp_common_finished_receiver++;
  267. channel->starpu_mp_common_finished_sender++;
  268. _starpu_mpi_ms_event_request_list_push_back(channel->event.mpi_ms_event.requests, req);
  269. }
  270. else
  271. {
  272. /* Synchronous recv */
  273. res = MPI_Recv(msg, len, MPI_BYTE, src_devid, SYNC_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  274. STARPU_ASSERT_MSG(res == MPI_SUCCESS, "MPI Master/Slave cannot receive a msg with a size of %d Bytes !", len);
  275. }
  276. }
  277. /* - In device to device communications, the first ack received by host
  278. * is considered as the sender (but it cannot be, in fact, the sender)
  279. */
  280. int _starpu_mpi_common_test_event(struct _starpu_async_channel * event)
  281. {
  282. if (event->event.mpi_ms_event.requests != NULL && !_starpu_mpi_ms_event_request_list_empty(event->event.mpi_ms_event.requests))
  283. {
  284. struct _starpu_mpi_ms_event_request * req = _starpu_mpi_ms_event_request_list_begin(event->event.mpi_ms_event.requests);
  285. struct _starpu_mpi_ms_event_request * req_next;
  286. while (req != _starpu_mpi_ms_event_request_list_end(event->event.mpi_ms_event.requests))
  287. {
  288. req_next = _starpu_mpi_ms_event_request_list_next(req);
  289. int flag = 0;
  290. MPI_Test(&req->request, &flag, MPI_STATUS_IGNORE);
  291. if (flag)
  292. {
  293. _starpu_mpi_ms_event_request_list_erase(event->event.mpi_ms_event.requests, req);
  294. _starpu_mpi_ms_event_request_delete(req);
  295. if (event->event.mpi_ms_event.is_sender)
  296. event->starpu_mp_common_finished_sender--;
  297. else
  298. event->starpu_mp_common_finished_receiver--;
  299. }
  300. req = req_next;
  301. }
  302. /* When the list is empty, we finished to wait each request */
  303. if (_starpu_mpi_ms_event_request_list_empty(event->event.mpi_ms_event.requests))
  304. {
  305. /* Destroy the list */
  306. _starpu_mpi_ms_event_request_list_delete(event->event.mpi_ms_event.requests);
  307. event->event.mpi_ms_event.requests = NULL;
  308. }
  309. }
  310. /* poll the asynchronous messages.*/
  311. if (event->polling_node != NULL)
  312. {
  313. while(event->polling_node->mp_recv_is_ready(event->polling_node))
  314. {
  315. enum _starpu_mp_command answer;
  316. void *arg;
  317. int arg_size;
  318. answer = _starpu_mp_common_recv_command(event->polling_node, &arg, &arg_size);
  319. if(!_starpu_src_common_store_message(event->polling_node,arg,arg_size,answer))
  320. {
  321. printf("incorrect commande: unknown command or sync command");
  322. STARPU_ASSERT(0);
  323. }
  324. }
  325. }
  326. return !event->starpu_mp_common_finished_sender && !event->starpu_mp_common_finished_receiver;
  327. }
  328. void _starpu_mpi_common_barrier(void)
  329. {
  330. MPI_Barrier(MPI_COMM_WORLD);
  331. }
  332. /* Compute bandwidth and latency between source and sink nodes
  333. * Source node has to have the entire set of times at the end
  334. */
  335. void _starpu_mpi_common_measure_bandwidth_latency(double * bandwidth_htod, double * bandwidth_dtoh, double * latency_htod, double * latency_dtoh)
  336. {
  337. int ret;
  338. unsigned iter;
  339. int nb_proc, id_proc;
  340. MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
  341. MPI_Comm_size(MPI_COMM_WORLD, &nb_proc);
  342. char * buf;
  343. _STARPU_MALLOC(buf, SIZE_BANDWIDTH);
  344. memset(buf, 0, SIZE_BANDWIDTH);
  345. unsigned node;
  346. unsigned id = 0;
  347. for(node = 0; node < nb_proc; node++)
  348. {
  349. MPI_Barrier(MPI_COMM_WORLD);
  350. //Don't measure link master <-> master
  351. if(node == src_node_id)
  352. continue;
  353. if(_starpu_mpi_common_is_src_node())
  354. {
  355. double start, end;
  356. /* measure bandwidth host to device */
  357. start = starpu_timing_now();
  358. for (iter = 0; iter < NITER; iter++)
  359. {
  360. ret = MPI_Send(buf, SIZE_BANDWIDTH, MPI_BYTE, node, node, MPI_COMM_WORLD);
  361. STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
  362. }
  363. end = starpu_timing_now();
  364. bandwidth_htod[id] = (NITER*1000000)/(end - start);
  365. /* measure bandwidth device to host */
  366. start = starpu_timing_now();
  367. for (iter = 0; iter < NITER; iter++)
  368. {
  369. ret = MPI_Recv(buf, SIZE_BANDWIDTH, MPI_BYTE, node, node, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  370. STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
  371. }
  372. end = starpu_timing_now();
  373. bandwidth_dtoh[id] = (NITER*1000000)/(end - start);
  374. /* measure latency host to device */
  375. start = starpu_timing_now();
  376. for (iter = 0; iter < NITER; iter++)
  377. {
  378. ret = MPI_Send(buf, 1, MPI_BYTE, node, node, MPI_COMM_WORLD);
  379. STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Latency of MPI Master/Slave cannot be measured !");
  380. }
  381. end = starpu_timing_now();
  382. latency_htod[id] = (end - start)/NITER;
  383. /* measure latency device to host */
  384. start = starpu_timing_now();
  385. for (iter = 0; iter < NITER; iter++)
  386. {
  387. ret = MPI_Recv(buf, 1, MPI_BYTE, node, node, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  388. STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
  389. }
  390. end = starpu_timing_now();
  391. latency_dtoh[id] = (end - start)/NITER;
  392. }
  393. else if (node == id_proc) /* if we are the sink node evaluated */
  394. {
  395. /* measure bandwidth host to device */
  396. for (iter = 0; iter < NITER; iter++)
  397. {
  398. ret = MPI_Recv(buf, SIZE_BANDWIDTH, MPI_BYTE, src_node_id, node, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  399. STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
  400. }
  401. /* measure bandwidth device to host */
  402. for (iter = 0; iter < NITER; iter++)
  403. {
  404. ret = MPI_Send(buf, SIZE_BANDWIDTH, MPI_BYTE, src_node_id, node, MPI_COMM_WORLD);
  405. STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
  406. }
  407. /* measure latency host to device */
  408. for (iter = 0; iter < NITER; iter++)
  409. {
  410. ret = MPI_Recv(buf, 1, MPI_BYTE, src_node_id, node, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  411. STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Bandwidth of MPI Master/Slave cannot be measured !");
  412. }
  413. /* measure latency device to host */
  414. for (iter = 0; iter < NITER; iter++)
  415. {
  416. ret = MPI_Send(buf, 1, MPI_BYTE, src_node_id, node, MPI_COMM_WORLD);
  417. STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "Latency of MPI Master/Slave cannot be measured !");
  418. }
  419. }
  420. id++;
  421. }
  422. free(buf);
  423. }