mp_common.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012, 2016 INRIA
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <stdlib.h>
  17. #include <pthread.h>
  18. #include <datawizard/interfaces/data_interface.h>
  19. #include <drivers/mp_common/mp_common.h>
  20. #include <drivers/mp_common/sink_common.h>
  21. #include <drivers/mic/driver_mic_common.h>
  22. #include <drivers/mic/driver_mic_source.h>
  23. #include <drivers/mic/driver_mic_sink.h>
  24. #include <drivers/scc/driver_scc_common.h>
  25. #include <drivers/scc/driver_scc_source.h>
  26. #include <drivers/scc/driver_scc_sink.h>
  27. #include <drivers/mpi/driver_mpi_common.h>
  28. #include <drivers/mpi/driver_mpi_source.h>
  29. #include <drivers/mpi/driver_mpi_sink.h>
  30. #include <common/list.h>
  31. const char *_starpu_mp_common_command_to_string(const int command)
  32. {
  33. switch(command)
  34. {
  35. case STARPU_EXIT:
  36. return "EXIT";
  37. case STARPU_EXECUTE:
  38. return "EXECUTE";
  39. case STARPU_ERROR_EXECUTE:
  40. return "ERROR_EXECUTE";
  41. case STARPU_LOOKUP:
  42. return "LOOKUP";
  43. case STARPU_ANSWER_LOOKUP:
  44. return "ANSWER_LOOKUP";
  45. case STARPU_ERROR_LOOKUP:
  46. return "ERROR_LOOKUP";
  47. case STARPU_ALLOCATE:
  48. return "ALLOCATE";
  49. case STARPU_ANSWER_ALLOCATE:
  50. return "ANSWER_ALLOCATE";
  51. case STARPU_ERROR_ALLOCATE:
  52. return "ERROR_ALLOCATE";
  53. case STARPU_FREE:
  54. return "FREE";
  55. case STARPU_RECV_FROM_HOST:
  56. return "RECV_FROM_HOST";
  57. case STARPU_SEND_TO_HOST:
  58. return "SEND_TO_HOST";
  59. case STARPU_RECV_FROM_SINK:
  60. return "RECV_FROM_SINK";
  61. case STARPU_SEND_TO_SINK:
  62. return "SEND_TO_SINK";
  63. case STARPU_TRANSFER_COMPLETE:
  64. return "TRANSFER_COMPLETE";
  65. case STARPU_SINK_NBCORES:
  66. return "SINK_NBCORES";
  67. case STARPU_ANSWER_SINK_NBCORES:
  68. return "ANSWER_SINK_NBCORES";
  69. case STARPU_EXECUTION_SUBMITTED:
  70. return "EXECUTION_SUBMITTED";
  71. case STARPU_EXECUTION_COMPLETED:
  72. return "EXECUTION_COMPLETED";
  73. case STARPU_PRE_EXECUTION:
  74. return "PRE_EXECUTION";
  75. case STARPU_SYNC_WORKERS:
  76. return "SYNC_WORKERS";
  77. default:
  78. return "<invalid command code>";
  79. }
  80. }
  81. const char *_starpu_mp_common_node_kind_to_string(const int kind)
  82. {
  83. switch(kind)
  84. {
  85. case STARPU_MIC_SINK:
  86. return "MIC_SINK";
  87. case STARPU_MIC_SOURCE:
  88. return "MIC_SOURCE";
  89. case STARPU_SCC_SINK:
  90. return "SCC_SINK";
  91. case STARPU_SCC_SOURCE:
  92. return "SCC_SOURCE";
  93. case STARPU_MPI_SINK:
  94. return "MPI_SINK";
  95. case STARPU_MPI_SOURCE:
  96. return "MPI_SOURCE";
  97. default:
  98. return "<invalid command code>";
  99. }
  100. }
  101. /* Allocate and initialize the sink structure, when the function returns
  102. * all the pointer of functions are linked to the right ones.
  103. */
  104. struct _starpu_mp_node * STARPU_ATTRIBUTE_MALLOC
  105. _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
  106. int peer_id)
  107. {
  108. struct _starpu_mp_node *node;
  109. node = (struct _starpu_mp_node *) malloc(sizeof(struct _starpu_mp_node));
  110. node->kind = node_kind;
  111. node->peer_id = peer_id;
  112. switch(node->kind)
  113. {
  114. #ifdef STARPU_USE_MIC
  115. case STARPU_MIC_SOURCE:
  116. {
  117. node->nb_mp_sinks = starpu_mic_worker_get_count();
  118. node->devid = peer_id;
  119. node->init = _starpu_mic_src_init;
  120. node->launch_workers= NULL;
  121. node->deinit = _starpu_mic_src_deinit;
  122. node->report_error = _starpu_mic_src_report_scif_error;
  123. node->mp_recv_is_ready = _starpu_mic_common_recv_is_ready;
  124. node->mp_send = _starpu_mic_common_send;
  125. node->mp_recv = _starpu_mic_common_recv;
  126. node->dt_send = _starpu_mic_common_dt_send;
  127. node->dt_recv = _starpu_mic_common_dt_recv;
  128. node->get_kernel_from_job =_starpu_mic_src_get_kernel_from_job;
  129. node->lookup = NULL;
  130. node->bind_thread = NULL;
  131. node->execute = NULL;
  132. node->allocate = NULL;
  133. node->free = NULL;
  134. }
  135. break;
  136. case STARPU_MIC_SINK:
  137. {
  138. node->devid = atoi(starpu_getenv("_STARPU_MIC_DEVID"));
  139. node->nb_mp_sinks = atoi(starpu_getenv("_STARPU_MIC_NB"));
  140. node->init = _starpu_mic_sink_init;
  141. node->launch_workers = _starpu_mic_sink_launch_workers;
  142. node->deinit = _starpu_mic_sink_deinit;
  143. node->report_error = _starpu_mic_sink_report_error;
  144. node->mp_recv_is_ready = _starpu_mic_common_recv_is_ready;
  145. node->mp_send = _starpu_mic_common_send;
  146. node->mp_recv = _starpu_mic_common_recv;
  147. node->dt_send = _starpu_mic_common_dt_send;
  148. node->dt_recv = _starpu_mic_common_dt_recv;
  149. node->get_kernel_from_job = NULL;
  150. node->lookup = _starpu_mic_sink_lookup;
  151. node->bind_thread = _starpu_mic_sink_bind_thread;
  152. node->execute = _starpu_sink_common_execute;
  153. node->allocate = _starpu_mic_sink_allocate;
  154. node->free = _starpu_mic_sink_free;
  155. }
  156. break;
  157. #endif /* STARPU_USE_MIC */
  158. #ifdef STARPU_USE_SCC
  159. case STARPU_SCC_SOURCE:
  160. {
  161. node->init = _starpu_scc_src_init;
  162. node->deinit = NULL;
  163. node->report_error = _starpu_scc_common_report_rcce_error;
  164. node->mp_recv_is_ready = _starpu_scc_common_recv_is_ready;
  165. node->mp_send = _starpu_scc_common_send;
  166. node->mp_recv = _starpu_scc_common_recv;
  167. node->dt_send = _starpu_scc_common_send;
  168. node->dt_recv = _starpu_scc_common_recv;
  169. node->dt_send_to_device = NULL;
  170. node->dt_recv_from_device = NULL;
  171. node->get_kernel_from_job =_starpu_scc_src_get_kernel_from_job;
  172. node->lookup = NULL;
  173. node->bind_thread = NULL;
  174. node->execute = NULL;
  175. node->allocate = NULL;
  176. node->free = NULL;
  177. }
  178. break;
  179. case STARPU_SCC_SINK:
  180. {
  181. node->init = _starpu_scc_sink_init;
  182. node->launch_workers = _starpu_scc_sink_launch_workers;
  183. node->deinit = _starpu_scc_sink_deinit;
  184. node->report_error = _starpu_scc_common_report_rcce_error;
  185. node->mp_recv_is_ready = _starpu_scc_common_recv_is_ready;
  186. node->mp_send = _starpu_scc_common_send;
  187. node->mp_recv = _starpu_scc_common_recv;
  188. node->dt_send = _starpu_scc_common_send;
  189. node->dt_recv = _starpu_scc_common_recv;
  190. node->dt_send_to_device = _starpu_scc_sink_send_to_device;
  191. node->dt_recv_from_device = _starpu_scc_sink_recv_from_device;
  192. node->get_kernel_from_job = NULL;
  193. node->lookup = _starpu_scc_sink_lookup;
  194. node->bind_thread = _starpu_scc_sink_bind_thread;
  195. node->execute = _starpu_scc_sink_execute;
  196. node->allocate = _starpu_sink_common_allocate;
  197. node->free = _starpu_sink_common_free;
  198. }
  199. break;
  200. #endif /* STARPU_USE_SCC */
  201. #ifdef STARPU_USE_MPI_MASTER_SLAVE
  202. case STARPU_MPI_SOURCE:
  203. {
  204. /*
  205. node->nb_mp_sinks =
  206. node->devid =
  207. */
  208. node->mp_connection.mpi_remote_nodeid = peer_id+1;
  209. node->init = _starpu_mpi_source_init;
  210. node->launch_workers = NULL;
  211. node->deinit = _starpu_mpi_source_deinit;
  212. /* node->report_error = */
  213. node->mp_recv_is_ready = _starpu_mpi_common_recv_is_ready;
  214. node->mp_send = _starpu_mpi_common_send;
  215. node->mp_recv = _starpu_mpi_common_recv;
  216. node->dt_send = _starpu_mpi_common_send;
  217. node->dt_recv = _starpu_mpi_common_recv;
  218. node->dt_send_to_device = _starpu_mpi_common_send_to_device;
  219. node->dt_recv_from_device = _starpu_mpi_common_recv_from_device;
  220. node->get_kernel_from_job = _starpu_mpi_ms_src_get_kernel_from_job;
  221. node->lookup = NULL;
  222. node->bind_thread = NULL;
  223. node->execute = NULL;
  224. node->allocate = NULL;
  225. node->free = NULL;
  226. }
  227. break;
  228. case STARPU_MPI_SINK:
  229. {
  230. /*
  231. node->nb_mp_sinks =
  232. node->devid =
  233. */
  234. node->mp_connection.mpi_remote_nodeid = _starpu_mpi_common_get_src_node();
  235. node->init = _starpu_mpi_sink_init;
  236. node->launch_workers = _starpu_mpi_sink_launch_workers;
  237. node->deinit = _starpu_mpi_sink_deinit;
  238. /* node->report_error = */
  239. node->mp_recv_is_ready = _starpu_mpi_common_recv_is_ready;
  240. node->mp_send = _starpu_mpi_common_send;
  241. node->mp_recv = _starpu_mpi_common_recv;
  242. node->dt_send = _starpu_mpi_common_send;
  243. node->dt_recv = _starpu_mpi_common_recv;
  244. node->get_kernel_from_job = NULL;
  245. node->lookup = _starpu_mpi_sink_lookup;
  246. node->bind_thread = _starpu_mpi_sink_bind_thread;
  247. node->execute = _starpu_sink_common_execute;
  248. node->allocate = _starpu_sink_common_allocate;
  249. node->free = _starpu_sink_common_free;
  250. }
  251. break;
  252. #endif /* STARPU_USE_MPI_MASTER_SLAVE */
  253. default:
  254. STARPU_ASSERT(0);
  255. }
  256. /* Let's allocate the buffer, we want it to be big enough to contain
  257. * a command, an argument and the argument size */
  258. node->buffer = (void *) malloc(BUFFER_SIZE);
  259. if (node->init)
  260. node->init(node);
  261. mp_message_list_init(&node->message_queue);
  262. STARPU_PTHREAD_MUTEX_INIT(&node->message_queue_mutex,NULL);
  263. /* If the node is a sink then we must initialize some field */
  264. if(node->kind == STARPU_MIC_SINK || node->kind == STARPU_SCC_SINK || node->kind == STARPU_MPI_SINK)
  265. {
  266. int i;
  267. node->is_running = 1;
  268. node->run_table = malloc(sizeof(struct mp_task *)*node->nb_cores);
  269. node->sem_run_table = malloc(sizeof(sem_t)*node->nb_cores);
  270. for(i=0; i<node->nb_cores; i++)
  271. {
  272. node->run_table[i] = NULL;
  273. sem_init(&node->sem_run_table[i],0,0);
  274. }
  275. mp_barrier_list_init(&node->barrier_list);
  276. STARPU_PTHREAD_MUTEX_INIT(&node->barrier_mutex,NULL);
  277. STARPU_PTHREAD_BARRIER_INIT(&node->init_completed_barrier, NULL, node->nb_cores+1);
  278. node->launch_workers(node);
  279. }
  280. return node;
  281. }
  282. /* Deinitialize the sink structure and release the structure */
  283. void _starpu_mp_common_node_destroy(struct _starpu_mp_node *node)
  284. {
  285. if (node->deinit)
  286. node->deinit(node);
  287. STARPU_PTHREAD_MUTEX_DESTROY(&node->message_queue_mutex);
  288. /* If the node is a sink then we must destroy some field */
  289. if(node->kind == STARPU_MIC_SINK || node->kind == STARPU_SCC_SINK)
  290. {
  291. int i;
  292. for(i=0; i<node->nb_cores; i++)
  293. {
  294. sem_destroy(&node->sem_run_table[i]);
  295. }
  296. free(node->run_table);
  297. free(node->sem_run_table);
  298. STARPU_PTHREAD_MUTEX_DESTROY(&node->barrier_mutex);
  299. STARPU_PTHREAD_BARRIER_DESTROY(&node->init_completed_barrier);
  300. }
  301. free(node->buffer);
  302. free(node);
  303. }
  304. /* Send COMMAND to RECIPIENT, along with ARG if ARG_SIZE is non-zero */
  305. void _starpu_mp_common_send_command(const struct _starpu_mp_node *node,
  306. const enum _starpu_mp_command command,
  307. void *arg, int arg_size)
  308. {
  309. STARPU_ASSERT_MSG(arg_size <= BUFFER_SIZE, "Too much data (%d) for the static MIC buffer (%d), increase BUFFER_SIZE perhaps?", arg_size, BUFFER_SIZE);
  310. /* MIC and MPI sizes are given through a int */
  311. int command_size = sizeof(enum _starpu_mp_command);
  312. int arg_size_size = sizeof(int);
  313. /* Let's copy the data into the command line buffer */
  314. memcpy(node->buffer, &command, command_size);
  315. memcpy((void*) ((uintptr_t)node->buffer + command_size), &arg_size, arg_size_size);
  316. node->mp_send(node, node->buffer, command_size + arg_size_size);
  317. if (arg_size)
  318. node->mp_send(node, arg, arg_size);
  319. }
  320. /* Return the command received from SENDER. In case SENDER sent an argument
  321. * beside the command, an address to a copy of this argument is returns in arg.
  322. * There is no need to free this address as it's not allocated at this time.
  323. * However, the data pointed by arg shouldn't be relied on after a new call to
  324. * STARPU_MP_COMMON_RECV_COMMAND as it might corrupt it.
  325. */
  326. enum _starpu_mp_command _starpu_mp_common_recv_command(const struct _starpu_mp_node *node,
  327. void **arg, int *arg_size)
  328. {
  329. enum _starpu_mp_command command;
  330. /* MIC and MPI sizes are given through a int */
  331. int command_size = sizeof(enum _starpu_mp_command);
  332. int arg_size_size = sizeof(int);
  333. node->mp_recv(node, node->buffer, command_size + arg_size_size);
  334. command = *((enum _starpu_mp_command *) node->buffer);
  335. *arg_size = *((int *) ((uintptr_t)node->buffer + command_size));
  336. /* If there is no argument (ie. arg_size == 0),
  337. * let's return the command right now */
  338. if (!(*arg_size))
  339. {
  340. *arg = NULL;
  341. return command;
  342. }
  343. STARPU_ASSERT(*arg_size <= BUFFER_SIZE);
  344. node->mp_recv(node, node->buffer, *arg_size);
  345. *arg = node->buffer;
  346. return command;
  347. }