mp_common.c 14 KB


  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012,2016,2017 Inria
  4. * Copyright (C) 2013-2017, 2019 CNRS
  5. * Copyright (C) 2013,2015 Université de Bordeaux
  6. * Copyright (C) 2013 Thibaut Lambert
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. #include <stdlib.h>
  20. #include <datawizard/interfaces/data_interface.h>
  21. #include <drivers/mp_common/mp_common.h>
  22. #include <drivers/mp_common/sink_common.h>
  23. #include <drivers/mic/driver_mic_common.h>
  24. #include <drivers/mic/driver_mic_source.h>
  25. #include <drivers/mic/driver_mic_sink.h>
  26. #include <drivers/scc/driver_scc_common.h>
  27. #include <drivers/scc/driver_scc_source.h>
  28. #include <drivers/scc/driver_scc_sink.h>
  29. #include <drivers/mpi/driver_mpi_common.h>
  30. #include <drivers/mpi/driver_mpi_source.h>
  31. #include <drivers/mpi/driver_mpi_sink.h>
  32. #include <common/list.h>
  33. const char *_starpu_mp_common_command_to_string(const int command)
  34. {
  35. switch(command)
  36. {
  37. case STARPU_MP_COMMAND_EXIT:
  38. return "EXIT";
  39. case STARPU_MP_COMMAND_EXECUTE:
  40. return "EXECUTE";
  41. case STARPU_MP_COMMAND_EXECUTE_DETACHED:
  42. return "EXECUTE_DETACHED";
  43. case STARPU_MP_COMMAND_ERROR_EXECUTE:
  44. return "ERROR_EXECUTE";
  45. case STARPU_MP_COMMAND_ERROR_EXECUTE_DETACHED:
  46. return "ERROR_EXECUTE_DETACHED";
  47. case STARPU_MP_COMMAND_LOOKUP:
  48. return "LOOKUP";
  49. case STARPU_MP_COMMAND_ANSWER_LOOKUP:
  50. return "ANSWER_LOOKUP";
  51. case STARPU_MP_COMMAND_ERROR_LOOKUP:
  52. return "ERROR_LOOKUP";
  53. case STARPU_MP_COMMAND_ALLOCATE:
  54. return "ALLOCATE";
  55. case STARPU_MP_COMMAND_ANSWER_ALLOCATE:
  56. return "ANSWER_ALLOCATE";
  57. case STARPU_MP_COMMAND_ERROR_ALLOCATE:
  58. return "ERROR_ALLOCATE";
  59. case STARPU_MP_COMMAND_FREE:
  60. return "FREE";
  61. /* Synchronous send */
  62. case STARPU_MP_COMMAND_RECV_FROM_HOST:
  63. return "RECV_FROM_HOST";
  64. case STARPU_MP_COMMAND_SEND_TO_HOST:
  65. return "SEND_TO_HOST";
  66. case STARPU_MP_COMMAND_RECV_FROM_SINK:
  67. return "RECV_FROM_SINK";
  68. case STARPU_MP_COMMAND_SEND_TO_SINK:
  69. return "SEND_TO_SINK";
  70. /* Asynchronous send */
  71. case STARPU_MP_COMMAND_RECV_FROM_HOST_ASYNC:
  72. return "RECV_FROM_HOST_ASYNC";
  73. case STARPU_MP_COMMAND_RECV_FROM_HOST_ASYNC_COMPLETED:
  74. return "RECV_FROM_HOST_ASYNC_COMPLETED";
  75. case STARPU_MP_COMMAND_SEND_TO_HOST_ASYNC:
  76. return "SEND_TO_HOST_ASYNC";
  77. case STARPU_MP_COMMAND_SEND_TO_HOST_ASYNC_COMPLETED:
  78. return "SEND_TO_HOST_ASYNC_COMPLETED";
  79. case STARPU_MP_COMMAND_RECV_FROM_SINK_ASYNC:
  80. return "RECV_FROM_SINK_ASYNC";
  81. case STARPU_MP_COMMAND_RECV_FROM_SINK_ASYNC_COMPLETED:
  82. return "RECV_FROM_SINK_ASYNC_COMPLETED";
  83. case STARPU_MP_COMMAND_SEND_TO_SINK_ASYNC:
  84. return "SEND_TO_SINK_ASYNC";
  85. case STARPU_MP_COMMAND_SEND_TO_SINK_ASYNC_COMPLETED:
  86. return "SEND_TO_SINK_ASYNC_COMPLETED";
  87. case STARPU_MP_COMMAND_TRANSFER_COMPLETE:
  88. return "TRANSFER_COMPLETE";
  89. case STARPU_MP_COMMAND_SINK_NBCORES:
  90. return "SINK_NBCORES";
  91. case STARPU_MP_COMMAND_ANSWER_SINK_NBCORES:
  92. return "ANSWER_SINK_NBCORES";
  93. case STARPU_MP_COMMAND_EXECUTION_SUBMITTED:
  94. return "EXECUTION_SUBMITTED";
  95. case STARPU_MP_COMMAND_EXECUTION_COMPLETED:
  96. return "EXECUTION_COMPLETED";
  97. case STARPU_MP_COMMAND_EXECUTION_DETACHED_SUBMITTED:
  98. return "EXECUTION_SUBMITTED_DETACHED";
  99. case STARPU_MP_COMMAND_EXECUTION_DETACHED_COMPLETED:
  100. return "EXECUTION_DETACHED_COMPLETED";
  101. case STARPU_MP_COMMAND_PRE_EXECUTION:
  102. return "PRE_EXECUTION";
  103. case STARPU_MP_COMMAND_SYNC_WORKERS:
  104. return "SYNC_WORKERS";
  105. default:
  106. return "<invalid command code>";
  107. }
  108. }
  109. const char *_starpu_mp_common_node_kind_to_string(const int kind)
  110. {
  111. switch(kind)
  112. {
  113. case STARPU_NODE_MIC_SINK:
  114. return "MIC_SINK";
  115. case STARPU_NODE_MIC_SOURCE:
  116. return "MIC_SOURCE";
  117. case STARPU_NODE_SCC_SINK:
  118. return "SCC_SINK";
  119. case STARPU_NODE_SCC_SOURCE:
  120. return "SCC_SOURCE";
  121. case STARPU_NODE_MPI_SINK:
  122. return "MPI_SINK";
  123. case STARPU_NODE_MPI_SOURCE:
  124. return "MPI_SOURCE";
  125. default:
  126. return "<invalid command code>";
  127. }
  128. }
  129. /* Allocate and initialize the sink structure, when the function returns
  130. * all the pointer of functions are linked to the right ones.
  131. */
  132. struct _starpu_mp_node * STARPU_ATTRIBUTE_MALLOC
  133. _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
  134. int peer_id)
  135. {
  136. struct _starpu_mp_node *node;
  137. _STARPU_MALLOC(node, sizeof(struct _starpu_mp_node));
  138. node->kind = node_kind;
  139. node->peer_id = peer_id;
  140. switch(node->kind)
  141. {
  142. #ifdef STARPU_USE_MIC
  143. case STARPU_NODE_MIC_SOURCE:
  144. {
  145. node->nb_mp_sinks = starpu_mic_worker_get_count();
  146. node->devid = peer_id;
  147. node->init = _starpu_mic_src_init;
  148. node->launch_workers= NULL;
  149. node->deinit = _starpu_mic_src_deinit;
  150. node->report_error = _starpu_mic_src_report_scif_error;
  151. node->mp_recv_is_ready = _starpu_mic_common_recv_is_ready;
  152. node->mp_send = _starpu_mic_common_send;
  153. node->mp_recv = _starpu_mic_common_recv;
  154. node->dt_send = _starpu_mic_common_dt_send;
  155. node->dt_recv = _starpu_mic_common_dt_recv;
  156. node->get_kernel_from_job =_starpu_mic_src_get_kernel_from_job;
  157. node->lookup = NULL;
  158. node->bind_thread = NULL;
  159. node->execute = NULL;
  160. node->allocate = NULL;
  161. node->free = NULL;
  162. }
  163. break;
  164. case STARPU_NODE_MIC_SINK:
  165. {
  166. node->devid = atoi(starpu_getenv("_STARPU_MIC_DEVID"));
  167. node->nb_mp_sinks = atoi(starpu_getenv("_STARPU_MIC_NB"));
  168. node->init = _starpu_mic_sink_init;
  169. node->launch_workers = _starpu_mic_sink_launch_workers;
  170. node->deinit = _starpu_mic_sink_deinit;
  171. node->report_error = _starpu_mic_sink_report_error;
  172. node->mp_recv_is_ready = _starpu_mic_common_recv_is_ready;
  173. node->mp_send = _starpu_mic_common_send;
  174. node->mp_recv = _starpu_mic_common_recv;
  175. node->dt_send = _starpu_mic_common_dt_send;
  176. node->dt_recv = _starpu_mic_common_dt_recv;
  177. node->dt_test = NULL; /* Not used now */
  178. node->get_kernel_from_job = NULL;
  179. node->lookup = _starpu_mic_sink_lookup;
  180. node->bind_thread = _starpu_mic_sink_bind_thread;
  181. node->execute = _starpu_sink_common_execute;
  182. node->allocate = _starpu_mic_sink_allocate;
  183. node->free = _starpu_mic_sink_free;
  184. }
  185. break;
  186. #endif /* STARPU_USE_MIC */
  187. #ifdef STARPU_USE_SCC
  188. case STARPU_NODE_SCC_SOURCE:
  189. {
  190. node->init = _starpu_scc_src_init;
  191. node->deinit = NULL;
  192. node->report_error = _starpu_scc_common_report_rcce_error;
  193. node->mp_recv_is_ready = _starpu_scc_common_recv_is_ready;
  194. node->mp_send = _starpu_scc_common_send;
  195. node->mp_recv = _starpu_scc_common_recv;
  196. node->dt_send = _starpu_scc_common_send;
  197. node->dt_recv = _starpu_scc_common_recv;
  198. node->dt_send_to_device = NULL;
  199. node->dt_recv_from_device = NULL;
  200. node->get_kernel_from_job =_starpu_scc_src_get_kernel_from_job;
  201. node->lookup = NULL;
  202. node->bind_thread = NULL;
  203. node->execute = NULL;
  204. node->allocate = NULL;
  205. node->free = NULL;
  206. }
  207. break;
  208. case STARPU_NODE_SCC_SINK:
  209. {
  210. node->init = _starpu_scc_sink_init;
  211. node->launch_workers = _starpu_scc_sink_launch_workers;
  212. node->deinit = _starpu_scc_sink_deinit;
  213. node->report_error = _starpu_scc_common_report_rcce_error;
  214. node->mp_recv_is_ready = _starpu_scc_common_recv_is_ready;
  215. node->mp_send = _starpu_scc_common_send;
  216. node->mp_recv = _starpu_scc_common_recv;
  217. node->dt_send = _starpu_scc_common_send;
  218. node->dt_recv = _starpu_scc_common_recv;
  219. node->dt_send_to_device = _starpu_scc_sink_send_to_device;
  220. node->dt_recv_from_device = _starpu_scc_sink_recv_from_device;
  221. node->dt_test = NULL /* not used now */
  222. node->get_kernel_from_job = NULL;
  223. node->lookup = _starpu_scc_sink_lookup;
  224. node->bind_thread = _starpu_scc_sink_bind_thread;
  225. node->execute = _starpu_scc_sink_execute;
  226. node->allocate = _starpu_sink_common_allocate;
  227. node->free = _starpu_sink_common_free;
  228. }
  229. break;
  230. #endif /* STARPU_USE_SCC */
  231. #ifdef STARPU_USE_MPI_MASTER_SLAVE
  232. case STARPU_NODE_MPI_SOURCE:
  233. {
  234. /*
  235. node->nb_mp_sinks =
  236. node->devid =
  237. */
  238. node->peer_id = (_starpu_mpi_common_get_src_node() <= peer_id ? peer_id+1 : peer_id);
  239. node->mp_connection.mpi_remote_nodeid = node->peer_id;
  240. node->init = _starpu_mpi_source_init;
  241. node->launch_workers = NULL;
  242. node->deinit = _starpu_mpi_source_deinit;
  243. /* node->report_error = */
  244. node->mp_recv_is_ready = _starpu_mpi_common_recv_is_ready;
  245. node->mp_send = _starpu_mpi_common_mp_send;
  246. node->mp_recv = _starpu_mpi_common_mp_recv;
  247. node->dt_send = _starpu_mpi_common_send;
  248. node->dt_recv = _starpu_mpi_common_recv;
  249. node->dt_send_to_device = _starpu_mpi_common_send_to_device;
  250. node->dt_recv_from_device = _starpu_mpi_common_recv_from_device;
  251. node->get_kernel_from_job = _starpu_mpi_ms_src_get_kernel_from_job;
  252. node->lookup = NULL;
  253. node->bind_thread = NULL;
  254. node->execute = NULL;
  255. node->allocate = NULL;
  256. node->free = NULL;
  257. }
  258. break;
  259. case STARPU_NODE_MPI_SINK:
  260. {
  261. /*
  262. node->nb_mp_sinks =
  263. node->devid =
  264. */
  265. node->mp_connection.mpi_remote_nodeid = _starpu_mpi_common_get_src_node();
  266. node->init = _starpu_mpi_sink_init;
  267. node->launch_workers = _starpu_mpi_sink_launch_workers;
  268. node->deinit = _starpu_mpi_sink_deinit;
  269. /* node->report_error = */
  270. node->mp_recv_is_ready = _starpu_mpi_common_recv_is_ready;
  271. node->mp_send = _starpu_mpi_common_mp_send;
  272. node->mp_recv = _starpu_mpi_common_mp_recv;
  273. node->dt_send = _starpu_mpi_common_send;
  274. node->dt_recv = _starpu_mpi_common_recv;
  275. node->dt_send_to_device = _starpu_mpi_common_send_to_device;
  276. node->dt_recv_from_device = _starpu_mpi_common_recv_from_device;
  277. node->dt_test = _starpu_mpi_common_test_event;
  278. node->get_kernel_from_job = NULL;
  279. node->lookup = _starpu_mpi_sink_lookup;
  280. node->bind_thread = _starpu_mpi_sink_bind_thread;
  281. node->execute = _starpu_sink_common_execute;
  282. node->allocate = _starpu_sink_common_allocate;
  283. node->free = _starpu_sink_common_free;
  284. }
  285. break;
  286. #endif /* STARPU_USE_MPI_MASTER_SLAVE */
  287. default:
  288. STARPU_ASSERT(0);
  289. }
  290. /* Let's allocate the buffer, we want it to be big enough to contain
  291. * a command, an argument and the argument size */
  292. _STARPU_MALLOC(node->buffer, BUFFER_SIZE);
  293. if (node->init)
  294. node->init(node);
  295. mp_message_list_init(&node->message_queue);
  296. STARPU_PTHREAD_MUTEX_INIT(&node->message_queue_mutex,NULL);
  297. STARPU_PTHREAD_MUTEX_INIT(&node->connection_mutex, NULL);
  298. _starpu_mp_event_list_init(&node->event_list);
  299. /* If the node is a sink then we must initialize some field */
  300. if(node->kind == STARPU_NODE_MIC_SINK || node->kind == STARPU_NODE_SCC_SINK || node->kind == STARPU_NODE_MPI_SINK)
  301. {
  302. int i;
  303. node->is_running = 1;
  304. _STARPU_MALLOC(node->run_table, sizeof(struct mp_task *)*node->nb_cores);
  305. _STARPU_MALLOC(node->run_table_detached, sizeof(struct mp_task *)*node->nb_cores);
  306. _STARPU_MALLOC(node->sem_run_table, sizeof(sem_t)*node->nb_cores);
  307. for(i=0; i<node->nb_cores; i++)
  308. {
  309. node->run_table[i] = NULL;
  310. node->run_table_detached[i] = NULL;
  311. sem_init(&node->sem_run_table[i],0,0);
  312. }
  313. mp_barrier_list_init(&node->barrier_list);
  314. STARPU_PTHREAD_MUTEX_INIT(&node->barrier_mutex,NULL);
  315. STARPU_PTHREAD_BARRIER_INIT(&node->init_completed_barrier, NULL, node->nb_cores+1);
  316. node->launch_workers(node);
  317. }
  318. return node;
  319. }
  320. /* Deinitialize the sink structure and release the structure */
  321. void _starpu_mp_common_node_destroy(struct _starpu_mp_node *node)
  322. {
  323. if (node->deinit)
  324. node->deinit(node);
  325. STARPU_PTHREAD_MUTEX_DESTROY(&node->message_queue_mutex);
  326. /* If the node is a sink then we must destroy some field */
  327. if(node->kind == STARPU_NODE_MIC_SINK || node->kind == STARPU_NODE_SCC_SINK || node->kind == STARPU_NODE_MPI_SINK)
  328. {
  329. int i;
  330. for(i=0; i<node->nb_cores; i++)
  331. {
  332. sem_destroy(&node->sem_run_table[i]);
  333. }
  334. free(node->run_table);
  335. free(node->run_table_detached);
  336. free(node->sem_run_table);
  337. STARPU_PTHREAD_MUTEX_DESTROY(&node->barrier_mutex);
  338. STARPU_PTHREAD_BARRIER_DESTROY(&node->init_completed_barrier);
  339. }
  340. free(node->buffer);
  341. free(node);
  342. }
  343. /* Send COMMAND to RECIPIENT, along with ARG if ARG_SIZE is non-zero */
  344. void _starpu_mp_common_send_command(const struct _starpu_mp_node *node, const enum _starpu_mp_command command, void *arg, int arg_size)
  345. {
  346. STARPU_ASSERT_MSG(arg_size <= BUFFER_SIZE, "Too much data (%d) for the static MIC buffer (%d), increase BUFFER_SIZE perhaps?", arg_size, BUFFER_SIZE);
  347. //printf("SEND CMD : %d - arg_size %d by %lu \n", command, arg_size, starpu_pthread_self());
  348. /* MIC and MPI sizes are given through a int */
  349. int command_size = sizeof(enum _starpu_mp_command);
  350. int arg_size_size = sizeof(int);
  351. /* Let's copy the data into the command line buffer */
  352. memcpy(node->buffer, &command, command_size);
  353. memcpy((void*) ((uintptr_t)node->buffer + command_size), &arg_size, arg_size_size);
  354. node->mp_send(node, node->buffer, command_size + arg_size_size);
  355. if (arg_size)
  356. node->mp_send(node, arg, arg_size);
  357. }
  358. /* Return the command received from SENDER. In case SENDER sent an argument
  359. * beside the command, an address to a copy of this argument is returns in arg.
  360. * There is no need to free this address as it's not allocated at this time.
  361. * However, the data pointed by arg shouldn't be relied on after a new call to
  362. * STARPU_MP_COMMON_RECV_COMMAND as it might corrupt it.
  363. */
  364. enum _starpu_mp_command _starpu_mp_common_recv_command(const struct _starpu_mp_node *node, void **arg, int *arg_size)
  365. {
  366. enum _starpu_mp_command command;
  367. /* MIC and MPI sizes are given through a int */
  368. int command_size = sizeof(enum _starpu_mp_command);
  369. int arg_size_size = sizeof(int);
  370. node->mp_recv(node, node->buffer, command_size + arg_size_size);
  371. command = *((enum _starpu_mp_command *) node->buffer);
  372. *arg_size = *((int *) ((uintptr_t)node->buffer + command_size));
  373. //printf("RECV command : %d - arg_size %d by %lu \n", command, *arg_size, starpu_pthread_self());
  374. /* If there is no argument (ie. arg_size == 0),
  375. * let's return the command right now */
  376. if (!(*arg_size))
  377. {
  378. *arg = NULL;
  379. return command;
  380. }
  381. STARPU_ASSERT(*arg_size <= BUFFER_SIZE);
  382. node->mp_recv(node, node->buffer, *arg_size);
  383. *arg = node->buffer;
  384. return command;
  385. }