sink_common.c 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2012, 2016, 2017 INRIA
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <starpu.h>
  17. #include <common/config.h>
  18. #include <common/utils.h>
  19. #include <drivers/mp_common/mp_common.h>
  20. #include <drivers/mpi/driver_mpi_common.h>
  21. #include <datawizard/interfaces/data_interface.h>
  22. #include <common/barrier.h>
  23. #include <core/workers.h>
  24. #include <common/barrier_counter.h>
  25. #ifdef STARPU_USE_MIC
  26. #include <common/COISysInfo_common.h>
  27. #endif
  28. #include "sink_common.h"
  29. /* Return the sink kind of the running process, based on the value of the
  30. * STARPU_SINK environment variable.
  31. * If there is no valid value retrieved, return STARPU_INVALID_KIND
  32. */
  33. static enum _starpu_mp_node_kind _starpu_sink_common_get_kind(void)
  34. {
  35. /* Environment varible STARPU_SINK must be defined when running on sink
  36. * side : let's use it to get the kind of node we're running on */
  37. char *node_kind = starpu_getenv("STARPU_SINK");
  38. STARPU_ASSERT(node_kind);
  39. if (!strcmp(node_kind, "STARPU_MIC"))
  40. return STARPU_NODE_MIC_SINK;
  41. else if (!strcmp(node_kind, "STARPU_SCC"))
  42. return STARPU_NODE_SCC_SINK;
  43. else if (!strcmp(node_kind, "STARPU_MPI_MS"))
  44. return STARPU_NODE_MPI_SINK;
  45. else
  46. return STARPU_NODE_INVALID_KIND;
  47. }
  48. /* Send to host the number of cores of the sink device
  49. */
  50. static void _starpu_sink_common_get_nb_cores (struct _starpu_mp_node *node)
  51. {
  52. // Process packet received from `_starpu_src_common_sink_cores'.
  53. _starpu_mp_common_send_command (node, STARPU_MP_COMMAND_ANSWER_SINK_NBCORES,
  54. &node->nb_cores, sizeof (int));
  55. }
  56. /* Send to host the address of the function given in parameter
  57. */
  58. static void _starpu_sink_common_lookup(const struct _starpu_mp_node *node,
  59. char *func_name)
  60. {
  61. void (*func)(void);
  62. func = node->lookup(node,func_name);
  63. //_STARPU_DEBUG("Looked up %s, got %p\n", func_name, func);
  64. /* If we couldn't find the function, let's send an error to the host.
  65. * The user probably made a mistake in the name */
  66. if (func)
  67. _starpu_mp_common_send_command(node, STARPU_MP_COMMAND_ANSWER_LOOKUP,
  68. &func, sizeof(func));
  69. else
  70. _starpu_mp_common_send_command(node, STARPU_MP_COMMAND_ERROR_LOOKUP,
  71. NULL, 0);
  72. }
  73. /* Allocate a memory space and send the address of this space to the host
  74. */
  75. void _starpu_sink_common_allocate(const struct _starpu_mp_node *mp_node,
  76. void *arg, int arg_size)
  77. {
  78. STARPU_ASSERT(arg_size == sizeof(size_t));
  79. void *addr;
  80. _STARPU_MALLOC(addr, *(size_t *)(arg));
  81. /* If the allocation fail, let's send an error to the host.
  82. */
  83. if (addr)
  84. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_ANSWER_ALLOCATE,
  85. &addr, sizeof(addr));
  86. else
  87. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_ERROR_ALLOCATE,
  88. NULL, 0);
  89. }
  90. void _starpu_sink_common_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED,
  91. void *arg, int arg_size)
  92. {
  93. STARPU_ASSERT(arg_size == sizeof(void *));
  94. free(*(void **)(arg));
  95. }
  96. static void _starpu_sink_common_copy_from_host_sync(const struct _starpu_mp_node *mp_node,
  97. void *arg, int arg_size)
  98. {
  99. STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command));
  100. struct _starpu_mp_transfer_command *cmd = (struct _starpu_mp_transfer_command *)arg;
  101. mp_node->dt_recv(mp_node, cmd->addr, cmd->size, NULL);
  102. }
  103. static void _starpu_sink_common_copy_from_host_async(struct _starpu_mp_node *mp_node,
  104. void *arg, int arg_size)
  105. {
  106. STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command));
  107. struct _starpu_mp_transfer_command *cmd = (struct _starpu_mp_transfer_command *)arg;
  108. /* For asynchronous transfers, we store events to test them later when they are finished */
  109. struct _starpu_mp_event * sink_event = _starpu_mp_event_new();
  110. /* Save the command to send */
  111. sink_event->answer_cmd = STARPU_MP_COMMAND_RECV_FROM_HOST_ASYNC_COMPLETED;
  112. sink_event->remote_event = cmd->event;
  113. /* Set the sender (host) ready because we don't want to wait its ack */
  114. struct _starpu_async_channel * async_channel = &sink_event->event;
  115. async_channel->type = STARPU_UNUSED;
  116. async_channel->starpu_mp_common_finished_sender = -1;
  117. async_channel->starpu_mp_common_finished_receiver = 0;
  118. async_channel->polling_node_receiver = NULL;
  119. async_channel->polling_node_sender = NULL;
  120. mp_node->dt_recv(mp_node, cmd->addr, cmd->size, &sink_event->event);
  121. /* Push event on the list */
  122. _starpu_mp_event_list_push_back(&mp_node->event_list, sink_event);
  123. }
  124. static void _starpu_sink_common_copy_to_host_sync(const struct _starpu_mp_node *mp_node,
  125. void *arg, int arg_size)
  126. {
  127. STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command));
  128. struct _starpu_mp_transfer_command *cmd = (struct _starpu_mp_transfer_command *)arg;
  129. /* Save values before sending command to prevent the overwriting */
  130. size_t size = cmd->size;
  131. void * addr = cmd->addr;
  132. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_SEND_TO_HOST, NULL, 0);
  133. mp_node->dt_send(mp_node, addr, size, NULL);
  134. }
  135. static void _starpu_sink_common_copy_to_host_async(struct _starpu_mp_node *mp_node,
  136. void *arg, int arg_size)
  137. {
  138. STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command));
  139. struct _starpu_mp_transfer_command *cmd = (struct _starpu_mp_transfer_command *)arg;
  140. /* For asynchronous transfers, we need to say dt_send that we are in async mode
  141. * but we don't push event on list because we don't need to know if it's finished
  142. */
  143. struct _starpu_mp_event * sink_event = _starpu_mp_event_new();
  144. /* Save the command to send */
  145. sink_event->answer_cmd = STARPU_MP_COMMAND_SEND_TO_HOST_ASYNC_COMPLETED;
  146. sink_event->remote_event = cmd->event;
  147. /* Set the receiver (host) ready because we don't want to wait its ack */
  148. struct _starpu_async_channel * async_channel = &sink_event->event;
  149. async_channel->type = STARPU_UNUSED;
  150. async_channel->starpu_mp_common_finished_sender = 0;
  151. async_channel->starpu_mp_common_finished_receiver = -1;
  152. async_channel->polling_node_receiver = NULL;
  153. async_channel->polling_node_sender = NULL;
  154. mp_node->dt_send(mp_node, cmd->addr, cmd->size, &sink_event->event);
  155. /* Push event on the list */
  156. _starpu_mp_event_list_push_back(&mp_node->event_list, sink_event);
  157. }
  158. static void _starpu_sink_common_copy_from_sink_sync(const struct _starpu_mp_node *mp_node,
  159. void *arg, int arg_size)
  160. {
  161. STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command_to_device));
  162. struct _starpu_mp_transfer_command_to_device *cmd = (struct _starpu_mp_transfer_command_to_device *)arg;
  163. mp_node->dt_recv_from_device(mp_node, cmd->devid, cmd->addr, cmd->size, NULL);
  164. _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_TRANSFER_COMPLETE, NULL, 0);
  165. }
  166. static void _starpu_sink_common_copy_from_sink_async(struct _starpu_mp_node *mp_node,
  167. void *arg, int arg_size)
  168. {
  169. STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command_to_device));
  170. struct _starpu_mp_transfer_command_to_device *cmd = (struct _starpu_mp_transfer_command_to_device *)arg;
  171. /* For asynchronous transfers, we store events to test them later when they are finished
  172. */
  173. struct _starpu_mp_event * sink_event = _starpu_mp_event_new();
  174. /* Save the command to send */
  175. sink_event->answer_cmd = STARPU_MP_COMMAND_RECV_FROM_SINK_ASYNC_COMPLETED;
  176. sink_event->remote_event = cmd->event;
  177. /* Set the sender ready because we don't want to wait its ack */
  178. struct _starpu_async_channel * async_channel = &sink_event->event;
  179. async_channel->type = STARPU_UNUSED;
  180. async_channel->starpu_mp_common_finished_sender = -1;
  181. async_channel->starpu_mp_common_finished_receiver = 0;
  182. async_channel->polling_node_receiver = NULL;
  183. async_channel->polling_node_sender = NULL;
  184. mp_node->dt_recv_from_device(mp_node, cmd->devid, cmd->addr, cmd->size, &sink_event->event);
  185. /* Push event on the list */
  186. _starpu_mp_event_list_push_back(&mp_node->event_list, sink_event);
  187. }
  188. static void _starpu_sink_common_copy_to_sink_sync(const struct _starpu_mp_node *mp_node,
  189. void *arg, int arg_size)
  190. {
  191. STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command_to_device));
  192. struct _starpu_mp_transfer_command_to_device *cmd = (struct _starpu_mp_transfer_command_to_device *)arg;
  193. mp_node->dt_send_to_device(mp_node, cmd->devid, cmd->addr, cmd->size, NULL);
  194. }
  195. static void _starpu_sink_common_copy_to_sink_async(struct _starpu_mp_node *mp_node,
  196. void *arg, int arg_size)
  197. {
  198. STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command_to_device));
  199. struct _starpu_mp_transfer_command_to_device *cmd = (struct _starpu_mp_transfer_command_to_device *)arg;
  200. /* For asynchronous transfers, we need to say dt_send that we are in async mode
  201. * but we don't push event on list because we don't need to know if it's finished
  202. */
  203. struct _starpu_mp_event * sink_event = _starpu_mp_event_new();
  204. /* Save the command to send */
  205. sink_event->answer_cmd = STARPU_MP_COMMAND_SEND_TO_SINK_ASYNC_COMPLETED;
  206. sink_event->remote_event = cmd->event;
  207. /* Set the receiver ready because we don't want to wait its ack */
  208. struct _starpu_async_channel * async_channel = &sink_event->event;
  209. async_channel->type = STARPU_UNUSED;
  210. async_channel->starpu_mp_common_finished_sender = 0;
  211. async_channel->starpu_mp_common_finished_receiver = -1;
  212. async_channel->polling_node_receiver = NULL;
  213. async_channel->polling_node_sender = NULL;
  214. mp_node->dt_send_to_device(mp_node, cmd->devid, cmd->addr, cmd->size, &sink_event->event);
  215. /* Push event on the list */
  216. _starpu_mp_event_list_push_back(&mp_node->event_list, sink_event);
  217. }
  218. /* Receive workers and combined workers and store them into the struct config
  219. */
  220. static void _starpu_sink_common_recv_workers(struct _starpu_mp_node * node, void *arg, int arg_size)
  221. {
  222. /* Retrieve information from the message */
  223. STARPU_ASSERT(arg_size == (sizeof(int)*5));
  224. uintptr_t arg_ptr = (uintptr_t) arg;
  225. int i;
  226. int nworkers = *(int *)arg_ptr;
  227. arg_ptr += sizeof(nworkers);
  228. int worker_size = *(int *)arg_ptr;
  229. arg_ptr += sizeof(worker_size);
  230. int combined_worker_size = *(int *)arg_ptr;
  231. arg_ptr += sizeof(combined_worker_size);
  232. int baseworkerid = *(int *)arg_ptr;
  233. arg_ptr += sizeof(baseworkerid);
  234. struct _starpu_machine_config *config = _starpu_get_machine_config();
  235. config->topology.nworkers = *(int *)arg_ptr;
  236. /* Retrieve workers */
  237. struct _starpu_worker * workers = &config->workers[baseworkerid];
  238. node->dt_recv(node,workers,worker_size, NULL);
  239. /* Update workers to have coherent field */
  240. for(i=0; i<nworkers; i++)
  241. {
  242. workers[i].config = config;
  243. STARPU_PTHREAD_MUTEX_INIT(&workers[i].mutex,NULL);
  244. STARPU_PTHREAD_MUTEX_DESTROY(&workers[i].mutex);
  245. STARPU_PTHREAD_COND_INIT(&workers[i].started_cond,NULL);
  246. STARPU_PTHREAD_COND_DESTROY(&workers[i].started_cond);
  247. STARPU_PTHREAD_COND_INIT(&workers[i].ready_cond,NULL);
  248. STARPU_PTHREAD_COND_DESTROY(&workers[i].ready_cond);
  249. STARPU_PTHREAD_MUTEX_INIT(&workers[i].sched_mutex,NULL);
  250. STARPU_PTHREAD_MUTEX_DESTROY(&workers[i].sched_mutex);
  251. STARPU_PTHREAD_COND_INIT(&workers[i].sched_cond,NULL);
  252. STARPU_PTHREAD_COND_DESTROY(&workers[i].sched_cond);
  253. workers[i].current_task = NULL;
  254. workers[i].set = NULL;
  255. }
  256. /* Retrieve combined workers */
  257. struct _starpu_combined_worker * combined_workers = config->combined_workers;
  258. node->dt_recv(node, combined_workers, combined_worker_size, NULL);
  259. node->baseworkerid = baseworkerid;
  260. STARPU_PTHREAD_BARRIER_WAIT(&node->init_completed_barrier);
  261. }
  262. /* Function looping on the sink, waiting for tasks to execute.
  263. * If the caller is the host, don't do anything.
  264. */
  265. void _starpu_sink_common_worker(void)
  266. {
  267. struct _starpu_mp_node *node = NULL;
  268. enum _starpu_mp_command command = STARPU_MP_COMMAND_EXIT;
  269. int arg_size = 0;
  270. void *arg = NULL;
  271. int exit_starpu = 0;
  272. enum _starpu_mp_node_kind node_kind = _starpu_sink_common_get_kind();
  273. if (node_kind == STARPU_NODE_INVALID_KIND)
  274. _STARPU_ERROR("No valid sink kind retrieved, use the"
  275. "STARPU_SINK environment variable to specify"
  276. "this\n");
  277. /* Create and initialize the node */
  278. node = _starpu_mp_common_node_create(node_kind, -1);
  279. starpu_pthread_key_t worker_key;
  280. STARPU_PTHREAD_KEY_CREATE(&worker_key, NULL);
  281. while (!exit_starpu)
  282. {
  283. /* If we have received a message */
  284. if(node->mp_recv_is_ready(node))
  285. {
  286. command = _starpu_mp_common_recv_command(node, &arg, &arg_size);
  287. switch(command)
  288. {
  289. case STARPU_MP_COMMAND_EXIT:
  290. exit_starpu = 1;
  291. break;
  292. case STARPU_MP_COMMAND_EXECUTE_DETACHED:
  293. case STARPU_MP_COMMAND_EXECUTE:
  294. node->execute(node, arg, arg_size);
  295. break;
  296. case STARPU_MP_COMMAND_SINK_NBCORES:
  297. _starpu_sink_common_get_nb_cores(node);
  298. break;
  299. case STARPU_MP_COMMAND_LOOKUP:
  300. _starpu_sink_common_lookup(node, (char *) arg);
  301. break;
  302. case STARPU_MP_COMMAND_ALLOCATE:
  303. node->allocate(node, arg, arg_size);
  304. break;
  305. case STARPU_MP_COMMAND_FREE:
  306. node->free(node, arg, arg_size);
  307. break;
  308. case STARPU_MP_COMMAND_RECV_FROM_HOST:
  309. _starpu_sink_common_copy_from_host_sync(node, arg, arg_size);
  310. break;
  311. case STARPU_MP_COMMAND_SEND_TO_HOST:
  312. _starpu_sink_common_copy_to_host_sync(node, arg, arg_size);
  313. break;
  314. case STARPU_MP_COMMAND_RECV_FROM_SINK:
  315. _starpu_sink_common_copy_from_sink_sync(node, arg, arg_size);
  316. break;
  317. case STARPU_MP_COMMAND_SEND_TO_SINK:
  318. _starpu_sink_common_copy_to_sink_sync(node, arg, arg_size);
  319. break;
  320. case STARPU_MP_COMMAND_RECV_FROM_HOST_ASYNC:
  321. _starpu_sink_common_copy_from_host_async(node, arg, arg_size);
  322. break;
  323. case STARPU_MP_COMMAND_SEND_TO_HOST_ASYNC:
  324. _starpu_sink_common_copy_to_host_async(node, arg, arg_size);
  325. break;
  326. case STARPU_MP_COMMAND_RECV_FROM_SINK_ASYNC:
  327. _starpu_sink_common_copy_from_sink_async(node, arg, arg_size);
  328. break;
  329. case STARPU_MP_COMMAND_SEND_TO_SINK_ASYNC:
  330. _starpu_sink_common_copy_to_sink_async(node, arg, arg_size);
  331. break;
  332. case STARPU_MP_COMMAND_SYNC_WORKERS:
  333. _starpu_sink_common_recv_workers(node, arg, arg_size);
  334. break;
  335. default:
  336. _STARPU_MSG("Oops, command %x unrecognized\n", command);
  337. }
  338. }
  339. STARPU_PTHREAD_MUTEX_LOCK(&node->message_queue_mutex);
  340. /* If the list is not empty */
  341. if(!mp_message_list_empty(&node->message_queue))
  342. {
  343. /* We pop a message and send it to the host */
  344. struct mp_message * message = mp_message_list_pop_back(&node->message_queue);
  345. STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
  346. //_STARPU_DEBUG("telling host that we have finished the task %p sur %d.\n", task->kernel, task->coreid);
  347. _starpu_mp_common_send_command(node, message->type,
  348. message->buffer, message->size);
  349. free(message->buffer);
  350. mp_message_delete(message);
  351. }
  352. else
  353. {
  354. STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
  355. }
  356. if(!_starpu_mp_event_list_empty(&node->event_list))
  357. {
  358. struct _starpu_mp_event * sink_event = _starpu_mp_event_list_pop_front(&node->event_list);
  359. if (node->dt_test(&sink_event->event))
  360. {
  361. /* send ACK to host */
  362. _starpu_mp_common_send_command(node, sink_event->answer_cmd , &sink_event->remote_event, sizeof(sink_event->remote_event));
  363. _starpu_mp_event_delete(sink_event);
  364. }
  365. else
  366. {
  367. /* try later */
  368. _starpu_mp_event_list_push_back(&node->event_list, sink_event);
  369. }
  370. }
  371. }
  372. STARPU_PTHREAD_KEY_DELETE(worker_key);
  373. /* Deinitialize the node and release it */
  374. _starpu_mp_common_node_destroy(node);
  375. #ifdef STARPU_USE_MPI_MASTER_SLAVE
  376. _starpu_mpi_common_mp_deinit();
  377. #endif
  378. exit(0);
  379. }
  380. /* Search for the mp_barrier correspondind to the specified combined worker
  381. * and create it if it doesn't exist
  382. */
  383. static struct mp_barrier * _starpu_sink_common_get_barrier(struct _starpu_mp_node * node, int cb_workerid, int cb_workersize)
  384. {
  385. struct mp_barrier * b = NULL;
  386. STARPU_PTHREAD_MUTEX_LOCK(&node->barrier_mutex);
  387. /* Search if the barrier already exist */
  388. for(b = mp_barrier_list_begin(&node->barrier_list);
  389. b != mp_barrier_list_end(&node->barrier_list) && b->id != cb_workerid;
  390. b = mp_barrier_list_next(b));
  391. /* If we found the barrier */
  392. if(b != NULL)
  393. {
  394. STARPU_PTHREAD_MUTEX_UNLOCK(&node->barrier_mutex);
  395. return b;
  396. }
  397. else
  398. {
  399. /* Else we create, initialize and add it to the list*/
  400. b = mp_barrier_new();
  401. b->id = cb_workerid;
  402. STARPU_PTHREAD_BARRIER_INIT(&b->before_work_barrier,NULL,cb_workersize);
  403. STARPU_PTHREAD_BARRIER_INIT(&b->after_work_barrier,NULL,cb_workersize);
  404. mp_barrier_list_push_back(&node->barrier_list,b);
  405. STARPU_PTHREAD_MUTEX_UNLOCK(&node->barrier_mutex);
  406. return b;
  407. }
  408. }
  409. /* Erase for the mp_barrier correspondind to the specified combined worker
  410. */
  411. static void _starpu_sink_common_erase_barrier(struct _starpu_mp_node * node, struct mp_barrier *barrier)
  412. {
  413. STARPU_PTHREAD_MUTEX_LOCK(&node->barrier_mutex);
  414. mp_barrier_list_erase(&node->barrier_list,barrier);
  415. STARPU_PTHREAD_MUTEX_UNLOCK(&node->barrier_mutex);
  416. }
  417. /* Append the message given in parameter to the message list
  418. */
  419. static void _starpu_sink_common_append_message(struct _starpu_mp_node *node, struct mp_message * message)
  420. {
  421. STARPU_PTHREAD_MUTEX_LOCK(&node->message_queue_mutex);
  422. mp_message_list_push_front(&node->message_queue,message);
  423. STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
  424. }
  425. /* Append to the message list a "STARPU_PRE_EXECUTION" message
  426. */
  427. static void _starpu_sink_common_pre_execution_message(struct _starpu_mp_node *node, struct mp_task *task)
  428. {
  429. /* Init message to tell the sink that the execution has begun */
  430. struct mp_message * message = mp_message_new();
  431. message->type = STARPU_MP_COMMAND_PRE_EXECUTION;
  432. _STARPU_MALLOC(message->buffer, sizeof(int));
  433. *(int *) message->buffer = task->combined_workerid;
  434. message->size = sizeof(int);
  435. /* Append the message to the queue */
  436. _starpu_sink_common_append_message(node, message);
  437. }
  438. /* Append to the message list a "STARPU_EXECUTION_COMPLETED" message
  439. */
  440. static void _starpu_sink_common_execution_completed_message(struct _starpu_mp_node *node, struct mp_task *task)
  441. {
  442. /* Init message to tell the sink that the execution is completed */
  443. struct mp_message * message = mp_message_new();
  444. if (task->detached)
  445. message->type = STARPU_MP_COMMAND_EXECUTION_DETACHED_COMPLETED;
  446. else
  447. message->type = STARPU_MP_COMMAND_EXECUTION_COMPLETED;
  448. _STARPU_MALLOC(message->buffer, sizeof(int));
  449. *(int*) message->buffer = task->coreid;
  450. message->size = sizeof(int);
  451. /* Append the message to the queue */
  452. _starpu_sink_common_append_message(node, message);
  453. }
  454. /* Bind the thread which is running on the specified core to the combined worker */
  455. static void _starpu_sink_common_bind_to_combined_worker(struct _starpu_mp_node *node, int coreid, struct _starpu_combined_worker * combined_worker)
  456. {
  457. int i;
  458. int * bind_set;
  459. _STARPU_MALLOC(bind_set, sizeof(int)*combined_worker->worker_size);
  460. for(i=0;i<combined_worker->worker_size;i++)
  461. bind_set[i] = combined_worker->combined_workerid[i] - node->baseworkerid;
  462. node->bind_thread(node, coreid, bind_set, combined_worker->worker_size);
  463. }
  464. /* Get the current rank of the worker in the combined worker
  465. */
  466. static int _starpu_sink_common_get_current_rank(int workerid, struct _starpu_combined_worker * combined_worker)
  467. {
  468. int i;
  469. for(i=0; i<combined_worker->worker_size; i++)
  470. if(workerid == combined_worker->combined_workerid[i])
  471. return i;
  472. STARPU_ASSERT(0);
  473. return -1;
  474. }
  475. /* Execute the task
  476. */
  477. static void _starpu_sink_common_execute_kernel(struct _starpu_mp_node *node, int coreid, struct _starpu_worker * worker, int detached)
  478. {
  479. struct _starpu_combined_worker * combined_worker = NULL;
  480. struct mp_task* task;
  481. if (detached)
  482. task = node->run_table_detached[coreid];
  483. else
  484. task = node->run_table[coreid];
  485. /* If it's a parallel task */
  486. if(task->is_parallel_task)
  487. {
  488. combined_worker = _starpu_get_combined_worker_struct(task->combined_workerid);
  489. worker->current_rank = _starpu_sink_common_get_current_rank(worker->workerid, combined_worker);
  490. worker->combined_workerid = task->combined_workerid;
  491. worker->worker_size = combined_worker->worker_size;
  492. /* Synchronize with others threads of the combined worker*/
  493. STARPU_PTHREAD_BARRIER_WAIT(&task->mp_barrier->before_work_barrier);
  494. /* The first thread of the combined worker */
  495. if(worker->current_rank == 0)
  496. {
  497. /* tell the sink that the execution has begun */
  498. _starpu_sink_common_pre_execution_message(node,task);
  499. /* If the mode is FORKJOIN,
  500. * the first thread binds himself
  501. * on all core of the combined worker*/
  502. if(task->type == STARPU_FORKJOIN)
  503. {
  504. _starpu_sink_common_bind_to_combined_worker(node, coreid, combined_worker);
  505. }
  506. }
  507. }
  508. else
  509. {
  510. worker->current_rank = 0;
  511. worker->combined_workerid = 0;
  512. worker->worker_size = 1;
  513. }
  514. if(task->type != STARPU_FORKJOIN || worker->current_rank == 0)
  515. {
  516. if (_starpu_get_disable_kernels() <= 0)
  517. {
  518. /* execute the task */
  519. task->kernel(task->interfaces,task->cl_arg);
  520. }
  521. }
  522. /* If it's a parallel task */
  523. if(task->is_parallel_task)
  524. {
  525. /* Synchronize with others threads of the combined worker*/
  526. STARPU_PTHREAD_BARRIER_WAIT(&task->mp_barrier->after_work_barrier);
  527. /* The fisrt thread of the combined */
  528. if(worker->current_rank == 0)
  529. {
  530. /* Erase the barrier from the list */
  531. _starpu_sink_common_erase_barrier(node,task->mp_barrier);
  532. /* If the mode is FORKJOIN,
  533. * the first thread rebinds himself on his own core */
  534. if(task->type == STARPU_FORKJOIN)
  535. node->bind_thread(node, coreid, &coreid, 1);
  536. }
  537. }
  538. if (detached)
  539. node->run_table_detached[coreid] = NULL;
  540. else
  541. node->run_table[coreid] = NULL;
  542. /* tell the sink that the execution is completed */
  543. _starpu_sink_common_execution_completed_message(node,task);
  544. /*free the task*/
  545. unsigned i;
  546. for (i = 0; i < task->nb_interfaces; i++)
  547. free(task->interfaces[i]);
  548. free(task->interfaces);
  549. if (task->cl_arg != NULL)
  550. free(task->cl_arg);
  551. free(task);
  552. }
  553. /* The main function executed by the thread
  554. * thread_arg is a structure containing the information needed by the thread
  555. */
  556. void* _starpu_sink_thread(void * thread_arg)
  557. {
  558. /* Retrieve the information from the structure */
  559. struct _starpu_mp_node *node = ((struct arg_sink_thread *)thread_arg)->node;
  560. int coreid =((struct arg_sink_thread *)thread_arg)->coreid;
  561. /* free the structure */
  562. free(thread_arg);
  563. STARPU_PTHREAD_BARRIER_WAIT(&node->init_completed_barrier);
  564. struct _starpu_worker *worker = &_starpu_get_machine_config()->workers[node->baseworkerid + coreid];
  565. node->bind_thread(node, coreid, &coreid, 1);
  566. _starpu_set_local_worker_key(worker);
  567. while(node->is_running)
  568. {
  569. /*Wait there is a task available */
  570. sem_wait(&node->sem_run_table[coreid]);
  571. if (node->run_table_detached[coreid] != NULL)
  572. _starpu_sink_common_execute_kernel(node, coreid, worker, 1);
  573. if (node->run_table[coreid] != NULL)
  574. _starpu_sink_common_execute_kernel(node, coreid, worker, 0);
  575. }
  576. starpu_pthread_exit(NULL);
  577. }
  578. /* Add the task to the specific thread and wake him up
  579. */
  580. static void _starpu_sink_common_execute_thread(struct _starpu_mp_node *node, struct mp_task *task)
  581. {
  582. int detached = task->detached;
  583. /* Add the task to the specific thread */
  584. if (detached)
  585. node->run_table_detached[task->coreid] = task;
  586. else
  587. node->run_table[task->coreid] = task;
  588. /* Unlock the mutex to wake up the thread which will execute the task */
  589. sem_post(&node->sem_run_table[task->coreid]);
  590. }
  591. /* Receive paquet from _starpu_src_common_execute_kernel in the form below :
  592. * [Function pointer on sink, number of interfaces, interfaces
  593. * (union _starpu_interface), cl_arg]
  594. * Then call the function given, passing as argument an array containing the
  595. * addresses of the received interfaces
  596. */
  597. void _starpu_sink_common_execute(struct _starpu_mp_node *node,
  598. void *arg, int arg_size)
  599. {
  600. unsigned i;
  601. uintptr_t arg_ptr = (uintptr_t) arg;
  602. struct mp_task *task;
  603. _STARPU_MALLOC(task, sizeof(struct mp_task));
  604. task->kernel = *(void(**)(void **, void *)) arg_ptr;
  605. arg_ptr += sizeof(task->kernel);
  606. task->type = *(enum starpu_codelet_type *) arg_ptr;
  607. arg_ptr += sizeof(task->type);
  608. task->is_parallel_task = *(int *) arg_ptr;
  609. arg_ptr += sizeof(task->is_parallel_task);
  610. if(task->is_parallel_task)
  611. {
  612. task->combined_workerid= *(int *) arg_ptr;
  613. arg_ptr += sizeof(task->combined_workerid);
  614. task->mp_barrier = _starpu_sink_common_get_barrier(node,task->combined_workerid,_starpu_get_combined_worker_struct(task->combined_workerid)->worker_size);
  615. }
  616. task->coreid = *(unsigned *) arg_ptr;
  617. arg_ptr += sizeof(task->coreid);
  618. task->nb_interfaces = *(unsigned *) arg_ptr;
  619. arg_ptr += sizeof(task->nb_interfaces);
  620. task->detached = *(int *) arg_ptr;
  621. arg_ptr += sizeof(task->detached);
  622. _STARPU_MALLOC(task->interfaces, task->nb_interfaces * sizeof(*task->interfaces));
  623. /* The function needs an array pointing to each interface it needs
  624. * during execution. As in sink-side there is no mean to know which
  625. * kind of interface to expect, the array is composed of unions of
  626. * interfaces, thus we expect the same size anyway */
  627. for (i = 0; i < task->nb_interfaces; i++)
  628. {
  629. union _starpu_interface * interface;
  630. _STARPU_MALLOC(interface, sizeof(union _starpu_interface));
  631. memcpy(interface, (void*) arg_ptr, sizeof(union _starpu_interface));
  632. task->interfaces[i] = interface;
  633. arg_ptr += sizeof(union _starpu_interface);
  634. }
  635. /* Was cl_arg sent ? */
  636. if (arg_size > arg_ptr - (uintptr_t) arg)
  637. {
  638. /* Copy cl_arg to prevent overwriting by an other task */
  639. unsigned cl_arg_size = arg_size - (arg_ptr - (uintptr_t) arg);
  640. _STARPU_MALLOC(task->cl_arg, cl_arg_size);
  641. memcpy(task->cl_arg, (void *) arg_ptr, cl_arg_size);
  642. }
  643. else
  644. task->cl_arg = NULL;
  645. //_STARPU_DEBUG("telling host that we have submitted the task %p.\n", task->kernel);
  646. if (task->detached)
  647. _starpu_mp_common_send_command(node, STARPU_MP_COMMAND_EXECUTION_DETACHED_SUBMITTED,
  648. NULL, 0);
  649. else
  650. _starpu_mp_common_send_command(node, STARPU_MP_COMMAND_EXECUTION_SUBMITTED,
  651. NULL, 0);
  652. //_STARPU_DEBUG("executing the task %p\n", task->kernel);
  653. _starpu_sink_common_execute_thread(node, task);
  654. }