graph.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2016-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. /*
  17. * This stores the task graph structure, to used by the schedulers which need
  18. * it. We do not always enable it since it is costly. To avoid interfering
  19. * too much with execution, it may be a bit outdated, i.e. still contain jobs
  20. * which have completed very recently.
  21. *
  22. * This is because we drop nodes lazily: when a job terminates, we just add the
  23. * node to the dropped list (to avoid having to take the mutex on the whole
  24. * graph). The graph gets updated whenever the graph mutex becomes available.
  25. */
  26. #include <starpu.h>
  27. #include <core/jobs.h>
  28. #include <common/graph.h>
  29. #include <core/workers.h>
  30. /* Protects the whole task graph except the dropped list */
  31. static starpu_pthread_rwlock_t graph_lock;
  32. /* Whether we should enable recording the task graph */
  33. int _starpu_graph_record;
  34. /* This list contains all nodes without incoming dependency */
  35. struct _starpu_graph_node_multilist_top top;
  36. /* This list contains all nodes without outgoing dependency */
  37. struct _starpu_graph_node_multilist_bottom bottom;
  38. /* This list contains all nodes */
  39. struct _starpu_graph_node_multilist_all all;
  40. /* Protects the dropped list, always taken before graph lock */
  41. static starpu_pthread_mutex_t dropped_lock;
  42. /* This list contains all dropped nodes, i.e. the job terminated by the corresponding node is still int he graph */
  43. struct _starpu_graph_node_multilist_dropped dropped;
  44. void _starpu_graph_init(void)
  45. {
  46. STARPU_PTHREAD_RWLOCK_INIT(&graph_lock, NULL);
  47. _starpu_graph_node_multilist_head_init_top(&top);
  48. _starpu_graph_node_multilist_head_init_bottom(&bottom);
  49. _starpu_graph_node_multilist_head_init_all(&all);
  50. STARPU_PTHREAD_MUTEX_INIT(&dropped_lock, NULL);
  51. _starpu_graph_node_multilist_head_init_dropped(&dropped);
  52. }
  53. /* LockWR the graph lock */
  54. void _starpu_graph_wrlock(void)
  55. {
  56. starpu_worker_relax_on();
  57. STARPU_PTHREAD_RWLOCK_WRLOCK(&graph_lock);
  58. starpu_worker_relax_off();
  59. }
  60. void _starpu_graph_drop_node(struct _starpu_graph_node *node);
  61. /* This flushes the list of nodes to be dropped. Both the dropped_lock and
  62. * graph_lock mutexes have to be held on entry, and are released. */
  63. void _starpu_graph_drop_dropped_nodes(void)
  64. {
  65. struct _starpu_graph_node_multilist_dropped dropping;
  66. /* Pick up the list of dropped nodes */
  67. _starpu_graph_node_multilist_move_dropped(&dropped, &dropping);
  68. STARPU_PTHREAD_MUTEX_UNLOCK(&dropped_lock);
  69. /* And now process it if it's not empty. */
  70. if (!_starpu_graph_node_multilist_empty_dropped(&dropping))
  71. {
  72. struct _starpu_graph_node *node, *next;
  73. for (node = _starpu_graph_node_multilist_begin_dropped(&dropping);
  74. node != _starpu_graph_node_multilist_end_dropped(&dropping);
  75. node = next)
  76. {
  77. next = _starpu_graph_node_multilist_next_dropped(node);
  78. _starpu_graph_drop_node(node);
  79. }
  80. }
  81. STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
  82. }
  83. /* UnlockWR the graph lock */
  84. void _starpu_graph_wrunlock(void)
  85. {
  86. starpu_worker_relax_on();
  87. STARPU_PTHREAD_MUTEX_LOCK(&dropped_lock);
  88. starpu_worker_relax_off();
  89. _starpu_graph_drop_dropped_nodes();
  90. }
  91. /* LockRD the graph lock */
  92. void _starpu_graph_rdlock(void)
  93. {
  94. starpu_worker_relax_on();
  95. STARPU_PTHREAD_RWLOCK_RDLOCK(&graph_lock);
  96. starpu_worker_relax_off();
  97. }
  98. /* UnlockRD the graph lock */
  99. void _starpu_graph_rdunlock(void)
  100. {
  101. STARPU_PTHREAD_RWLOCK_UNLOCK(&graph_lock);
  102. /* Take the opportunity to try to take it WR */
  103. if (STARPU_PTHREAD_RWLOCK_TRYWRLOCK(&graph_lock) == 0)
  104. /* Good, flush dropped nodes */
  105. _starpu_graph_wrunlock();
  106. }
  107. static void __starpu_graph_foreach(void (*func)(void *data, struct _starpu_graph_node *node), void *data)
  108. {
  109. struct _starpu_graph_node *node;
  110. for (node = _starpu_graph_node_multilist_begin_all(&all);
  111. node != _starpu_graph_node_multilist_end_all(&all);
  112. node = _starpu_graph_node_multilist_next_all(node))
  113. func(data, node);
  114. }
  115. /* Add a node to the graph */
  116. void _starpu_graph_add_job(struct _starpu_job *job)
  117. {
  118. struct _starpu_graph_node *node;
  119. _STARPU_CALLOC(node, 1, sizeof(*node));
  120. node->job = job;
  121. job->graph_node = node;
  122. STARPU_PTHREAD_MUTEX_INIT0(&node->mutex, NULL);
  123. _starpu_graph_wrlock();
  124. /* It does not have any dependency yet, add to all lists */
  125. _starpu_graph_node_multilist_push_back_top(&top, node);
  126. _starpu_graph_node_multilist_push_back_bottom(&bottom, node);
  127. _starpu_graph_node_multilist_push_back_all(&all, node);
  128. _starpu_graph_wrunlock();
  129. }
  130. /* Add a node to an array of nodes */
  131. static unsigned add_node(struct _starpu_graph_node *node, struct _starpu_graph_node ***nodes, unsigned *n_nodes, unsigned *alloc_nodes, unsigned **slot)
  132. {
  133. unsigned ret;
  134. if (*n_nodes == *alloc_nodes)
  135. {
  136. if (*alloc_nodes)
  137. *alloc_nodes *= 2;
  138. else
  139. *alloc_nodes = 4;
  140. _STARPU_REALLOC(*nodes, *alloc_nodes * sizeof(**nodes));
  141. if (slot)
  142. {
  143. _STARPU_REALLOC(*slot, *alloc_nodes * sizeof(**slot));
  144. }
  145. }
  146. ret = (*n_nodes)++;
  147. (*nodes)[ret] = node;
  148. return ret;
  149. }
  150. /* Add a dependency between nodes */
  151. void _starpu_graph_add_job_dep(struct _starpu_job *job, struct _starpu_job *prev_job)
  152. {
  153. unsigned rank_incoming, rank_outgoing;
  154. _starpu_graph_wrlock();
  155. struct _starpu_graph_node *node = job->graph_node;
  156. struct _starpu_graph_node *prev_node = prev_job->graph_node;
  157. if (!node || !prev_node)
  158. {
  159. /* Already gone */
  160. _starpu_graph_wrunlock();
  161. return;
  162. }
  163. if (_starpu_graph_node_multilist_queued_bottom(prev_node))
  164. /* Previous node is not at bottom any more */
  165. _starpu_graph_node_multilist_erase_bottom(&bottom, prev_node);
  166. if (_starpu_graph_node_multilist_queued_top(node))
  167. /* Next node is not at top any more */
  168. _starpu_graph_node_multilist_erase_top(&top, node);
  169. rank_incoming = add_node(prev_node, &node->incoming, &node->n_incoming, &node->alloc_incoming, &node->incoming_slot);
  170. rank_outgoing = add_node(node, &prev_node->outgoing, &prev_node->n_outgoing, &prev_node->alloc_outgoing, &prev_node->outgoing_slot);
  171. prev_node->outgoing_slot[rank_outgoing] = rank_incoming;
  172. node->incoming_slot[rank_incoming] = rank_outgoing;
  173. _starpu_graph_wrunlock();
  174. }
  175. /* Drop a node, and thus its dependencies */
  176. void _starpu_graph_drop_node(struct _starpu_graph_node *node)
  177. {
  178. unsigned i;
  179. STARPU_ASSERT(!node->job);
  180. if (_starpu_graph_node_multilist_queued_bottom(node))
  181. _starpu_graph_node_multilist_erase_bottom(&bottom, node);
  182. if (_starpu_graph_node_multilist_queued_top(node))
  183. _starpu_graph_node_multilist_erase_top(&top, node);
  184. if (_starpu_graph_node_multilist_queued_all(node))
  185. _starpu_graph_node_multilist_erase_all(&all, node);
  186. /* Drop ourself from the incoming part of the outgoing nodes. */
  187. for (i = 0; i < node->n_outgoing; i++)
  188. {
  189. struct _starpu_graph_node *next = node->outgoing[i];
  190. if (next)
  191. next->incoming[node->outgoing_slot[i]] = NULL;
  192. }
  193. /* Drop ourself from the outgoing part of the incoming nodes,
  194. * in case we happen to get dropped before it. */
  195. for (i = 0; i < node->n_incoming; i++)
  196. {
  197. struct _starpu_graph_node *prev = node->incoming[i];
  198. if (prev)
  199. prev->outgoing[node->incoming_slot[i]] = NULL;
  200. }
  201. node->n_outgoing = 0;
  202. free(node->outgoing);
  203. node->outgoing = NULL;
  204. free(node->outgoing_slot);
  205. node->outgoing_slot = NULL;
  206. node->alloc_outgoing = 0;
  207. node->n_incoming = 0;
  208. free(node->incoming);
  209. node->incoming = NULL;
  210. free(node->incoming_slot);
  211. node->incoming_slot = NULL;
  212. node->alloc_incoming = 0;
  213. free(node);
  214. }
  215. /* Drop a job */
  216. void _starpu_graph_drop_job(struct _starpu_job *job)
  217. {
  218. struct _starpu_graph_node *node = job->graph_node;
  219. job->graph_node = NULL;
  220. if (!node)
  221. return;
  222. starpu_worker_relax_on();
  223. STARPU_PTHREAD_MUTEX_LOCK(&node->mutex);
  224. starpu_worker_relax_off();
  225. /* Will not be able to use the job any more */
  226. node->job = NULL;
  227. STARPU_PTHREAD_MUTEX_UNLOCK(&node->mutex);
  228. starpu_worker_relax_on();
  229. STARPU_PTHREAD_MUTEX_LOCK(&dropped_lock);
  230. starpu_worker_relax_off();
  231. /* Queue for removal when lock becomes available */
  232. _starpu_graph_node_multilist_push_back_dropped(&dropped, node);
  233. if (STARPU_PTHREAD_RWLOCK_TRYWRLOCK(&graph_lock) == 0)
  234. {
  235. /* Graph wrlock is available, drop nodes immediately */
  236. _starpu_graph_drop_dropped_nodes();
  237. }
  238. else
  239. STARPU_PTHREAD_MUTEX_UNLOCK(&dropped_lock);
  240. }
  241. static void _starpu_graph_set_n(void *data, struct _starpu_graph_node *node)
  242. {
  243. int value = (intptr_t) data;
  244. node->graph_n = value;
  245. }
  246. /* Call func for each vertex of the task graph, from bottom to top, in topological order */
  247. static void _starpu_graph_compute_bottom_up(void (*func)(struct _starpu_graph_node *next_node, struct _starpu_graph_node *prev_node, void *data), void *data)
  248. {
  249. struct _starpu_graph_node *node, *node2;
  250. struct _starpu_graph_node **current_set = NULL, **next_set = NULL, **swap_set;
  251. unsigned current_n, next_n, i, j;
  252. unsigned current_alloc = 0, next_alloc = 0, swap_alloc;
  253. /* Classical flow algorithm: start from bottom, and propagate depths to top */
  254. /* Set number of processed outgoing edges to 0 for each node */
  255. __starpu_graph_foreach(_starpu_graph_set_n, (void*) 0);
  256. /* Start with the bottom of the graph */
  257. current_n = 0;
  258. for (node = _starpu_graph_node_multilist_begin_bottom(&bottom);
  259. node != _starpu_graph_node_multilist_end_bottom(&bottom);
  260. node = _starpu_graph_node_multilist_next_bottom(node))
  261. add_node(node, &current_set, &current_n, &current_alloc, NULL);
  262. /* Now propagate to top as long as we have current nodes */
  263. while (current_n)
  264. {
  265. /* Next set is initially empty */
  266. next_n = 0;
  267. /* For each node in the current set */
  268. for (i = 0; i < current_n; i++)
  269. {
  270. node = current_set[i];
  271. /* For each parent of this node */
  272. for (j = 0; j < node->n_incoming; j++)
  273. {
  274. node2 = node->incoming[j];
  275. if (!node2)
  276. continue;
  277. node2->graph_n++;
  278. func(node, node2, data);
  279. if ((unsigned) node2->graph_n == node2->n_outgoing)
  280. /* All outgoing edges were processed, can now add to next set */
  281. add_node(node2, &next_set, &next_n, &next_alloc, NULL);
  282. }
  283. }
  284. /* Swap next set with current set */
  285. swap_set = next_set;
  286. swap_alloc = next_alloc;
  287. next_set = current_set;
  288. next_alloc = current_alloc;
  289. current_set = swap_set;
  290. current_alloc = swap_alloc;
  291. current_n = next_n;
  292. }
  293. free(current_set);
  294. free(next_set);
  295. }
  296. static void compute_depth(struct _starpu_graph_node *next_node, struct _starpu_graph_node *prev_node, void *data)
  297. {
  298. (void)data;
  299. if (prev_node->depth < next_node->depth + 1)
  300. prev_node->depth = next_node->depth + 1;
  301. }
  302. void _starpu_graph_compute_depths(void)
  303. {
  304. struct _starpu_graph_node *node;
  305. _starpu_graph_wrlock();
  306. /* The bottom of the graph has depth 0 */
  307. for (node = _starpu_graph_node_multilist_begin_bottom(&bottom);
  308. node != _starpu_graph_node_multilist_end_bottom(&bottom);
  309. node = _starpu_graph_node_multilist_next_bottom(node))
  310. node->depth = 0;
  311. _starpu_graph_compute_bottom_up(compute_depth, NULL);
  312. _starpu_graph_wrunlock();
  313. }
  314. void _starpu_graph_compute_descendants(void)
  315. {
  316. struct _starpu_graph_node *node, *node2, *node3;
  317. struct _starpu_graph_node **current_set = NULL, **next_set = NULL, **swap_set;
  318. unsigned current_n, next_n, i, j;
  319. unsigned current_alloc = 0, next_alloc = 0, swap_alloc;
  320. _starpu_graph_wrlock();
  321. /* Yes, this is O(|V|.(|V|+|E|)) :( */
  322. /* We could get O(|V|.|E|) by doing a topological sort first.
  323. *
  324. * |E| is usually O(|V|), though (bounded number of data dependencies,
  325. * and we use synchronization tasks) */
  326. for (node = _starpu_graph_node_multilist_begin_all(&all);
  327. node != _starpu_graph_node_multilist_end_all(&all);
  328. node = _starpu_graph_node_multilist_next_all(node))
  329. {
  330. unsigned descendants;
  331. /* Mark all nodes as unseen */
  332. for (node2 = _starpu_graph_node_multilist_begin_all(&all);
  333. node2 != _starpu_graph_node_multilist_end_all(&all);
  334. node2 = _starpu_graph_node_multilist_next_all(node2))
  335. node2->graph_n = 0;
  336. /* Start with the node we want to compute the number of descendants of */
  337. current_n = 0;
  338. add_node(node, &current_set, &current_n, &current_alloc, NULL);
  339. node->graph_n = 1;
  340. descendants = 0;
  341. /* While we have descendants, count their descendants */
  342. while (current_n)
  343. {
  344. /* Next set is initially empty */
  345. next_n = 0;
  346. /* For each node in the current set */
  347. for (i = 0; i < current_n; i++)
  348. {
  349. node2 = current_set[i];
  350. /* For each child of this node2 */
  351. for (j = 0; j < node2->n_outgoing; j++)
  352. {
  353. node3 = node2->outgoing[j];
  354. if (!node3)
  355. continue;
  356. if (node3->graph_n)
  357. /* Already seen */
  358. continue;
  359. /* Add this node */
  360. node3->graph_n = 1;
  361. descendants++;
  362. add_node(node3, &next_set, &next_n, &next_alloc, NULL);
  363. }
  364. }
  365. /* Swap next set with current set */
  366. swap_set = next_set;
  367. swap_alloc = next_alloc;
  368. next_set = current_set;
  369. next_alloc = current_alloc;
  370. current_set = swap_set;
  371. current_alloc = swap_alloc;
  372. current_n = next_n;
  373. }
  374. node->descendants = descendants;
  375. }
  376. _starpu_graph_wrunlock();
  377. free(current_set);
  378. free(next_set);
  379. }
  380. void _starpu_graph_foreach(void (*func)(void *data, struct _starpu_graph_node *node), void *data)
  381. {
  382. _starpu_graph_wrlock();
  383. __starpu_graph_foreach(func, data);
  384. _starpu_graph_wrunlock();
  385. }