cg.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <starpu.h>
  17. #include <common/config.h>
  18. #include <common/utils.h>
  19. #include <core/jobs.h>
  20. #include <core/task.h>
  21. #include <core/dependencies/cg.h>
  22. #include <core/dependencies/tags.h>
  23. void _starpu_cg_list_init0(struct _starpu_cg_list *list)
  24. {
  25. _starpu_spin_init(&list->lock);
  26. //list->ndeps = 0;
  27. //list->ndeps_completed = 0;
  28. #ifdef STARPU_DEBUG
  29. //list->deps = NULL;
  30. //list->done = NULL;
  31. #endif
  32. //list->terminated = 0;
  33. //list->nsuccs = 0;
  34. #ifdef STARPU_DYNAMIC_DEPS_SIZE
  35. /* this is a small initial default value ... may be changed */
  36. //list->succ_list_size = 0;
  37. //list->succ = NULL;
  38. #endif
  39. }
  40. void _starpu_cg_list_deinit(struct _starpu_cg_list *list)
  41. {
  42. unsigned id;
  43. for (id = 0; id < list->nsuccs; id++)
  44. {
  45. struct _starpu_cg *cg = list->succ[id];
  46. /* We remove the reference on the completion group, and free it
  47. * if there is no more reference. */
  48. unsigned ntags = STARPU_ATOMIC_ADD(&cg->ntags, -1);
  49. if (ntags == 0)
  50. {
  51. #ifdef STARPU_DEBUG
  52. free(list->succ[id]->deps);
  53. free(list->succ[id]->done);
  54. #endif
  55. free(list->succ[id]);
  56. }
  57. }
  58. #ifdef STARPU_DYNAMIC_DEPS_SIZE
  59. free(list->succ);
  60. #endif
  61. #ifdef STARPU_DEBUG
  62. free(list->deps);
  63. free(list->done);
  64. #endif
  65. _starpu_spin_destroy(&list->lock);
  66. }
  67. /* Returns whether the completion was already terminated, and caller should
  68. * thus immediately proceed. */
  69. int _starpu_add_successor_to_cg_list(struct _starpu_cg_list *successors, struct _starpu_cg *cg)
  70. {
  71. int ret;
  72. STARPU_ASSERT(cg);
  73. _starpu_spin_lock(&successors->lock);
  74. ret = successors->terminated;
  75. /* where should that cg should be put in the array ? */
  76. unsigned index = successors->nsuccs++;
  77. #ifdef STARPU_DYNAMIC_DEPS_SIZE
  78. if (index >= successors->succ_list_size)
  79. {
  80. /* the successor list is too small */
  81. if (successors->succ_list_size > 0)
  82. successors->succ_list_size *= 2;
  83. else
  84. successors->succ_list_size = 4;
  85. _STARPU_REALLOC(successors->succ, successors->succ_list_size*sizeof(struct _starpu_cg *));
  86. }
  87. #else
  88. STARPU_ASSERT(index < STARPU_NMAXDEPS);
  89. #endif
  90. successors->succ[index] = cg;
  91. _starpu_spin_unlock(&successors->lock);
  92. return ret;
  93. }
  94. int _starpu_list_task_successors_in_cg_list(struct _starpu_cg_list *successors, unsigned ndeps, struct starpu_task *task_array[])
  95. {
  96. unsigned i;
  97. unsigned n = 0;
  98. _starpu_spin_lock(&successors->lock);
  99. for (i = 0; i < successors->nsuccs; i++)
  100. {
  101. struct _starpu_cg *cg = successors->succ[i];
  102. if (cg->cg_type != STARPU_CG_TASK)
  103. continue;
  104. if (n < ndeps)
  105. {
  106. task_array[n] = cg->succ.job->task;
  107. n++;
  108. }
  109. }
  110. _starpu_spin_unlock(&successors->lock);
  111. return n;
  112. }
  113. int _starpu_list_task_scheduled_successors_in_cg_list(struct _starpu_cg_list *successors, unsigned ndeps, struct starpu_task *task_array[])
  114. {
  115. unsigned i;
  116. unsigned n = 0;
  117. _starpu_spin_lock(&successors->lock);
  118. for (i = 0; i < successors->nsuccs; i++)
  119. {
  120. struct _starpu_cg *cg = successors->succ[i];
  121. if (cg->cg_type != STARPU_CG_TASK)
  122. continue;
  123. if (n < ndeps)
  124. {
  125. struct starpu_task *task = cg->succ.job->task;
  126. if (task->cl == NULL || task->where == STARPU_NOWHERE || task->execute_on_a_specific_worker)
  127. /* will not be scheduled */
  128. continue;
  129. task_array[n] = task;
  130. n++;
  131. }
  132. }
  133. _starpu_spin_unlock(&successors->lock);
  134. return n;
  135. }
  136. int _starpu_list_tag_successors_in_cg_list(struct _starpu_cg_list *successors, unsigned ndeps, starpu_tag_t tag_array[])
  137. {
  138. unsigned i;
  139. unsigned n = 0;
  140. _starpu_spin_lock(&successors->lock);
  141. for (i = 0; i < successors->nsuccs; i++)
  142. {
  143. struct _starpu_cg *cg = successors->succ[i];
  144. if (cg->cg_type != STARPU_CG_TAG)
  145. continue;
  146. if (n < ndeps)
  147. {
  148. tag_array[n] = cg->succ.tag->id;
  149. n++;
  150. }
  151. }
  152. _starpu_spin_unlock(&successors->lock);
  153. return n;
  154. }
  155. /* Note: in case of a tag, it must be already locked */
  156. void _starpu_notify_cg(void *pred STARPU_ATTRIBUTE_UNUSED, struct _starpu_cg *cg)
  157. {
  158. STARPU_ASSERT(cg);
  159. unsigned remaining = STARPU_ATOMIC_ADD(&cg->remaining, -1);
  160. ANNOTATE_HAPPENS_BEFORE(&cg->remaining);
  161. if (remaining == 0)
  162. {
  163. ANNOTATE_HAPPENS_AFTER(&cg->remaining);
  164. /* Note: This looks racy to helgrind when the tasks are not
  165. * autoregenerated, since they then unsubcribe from the
  166. * completion group in parallel, thus decreasing ntags. This is
  167. * however not a problem since it means we will not reuse this
  168. * cg, and remaining will not be used, so a bogus value won't
  169. * hurt.
  170. */
  171. cg->remaining = cg->ntags;
  172. /* the group is now completed */
  173. switch (cg->cg_type)
  174. {
  175. case STARPU_CG_APPS:
  176. {
  177. /* this is a cg for an application waiting on a set of
  178. * tags, wake the thread */
  179. STARPU_PTHREAD_MUTEX_LOCK(&cg->succ.succ_apps.cg_mutex);
  180. cg->succ.succ_apps.completed = 1;
  181. STARPU_PTHREAD_COND_SIGNAL(&cg->succ.succ_apps.cg_cond);
  182. STARPU_PTHREAD_MUTEX_UNLOCK(&cg->succ.succ_apps.cg_mutex);
  183. break;
  184. }
  185. case STARPU_CG_TAG:
  186. {
  187. struct _starpu_cg_list *tag_successors;
  188. struct _starpu_tag *tag;
  189. tag = cg->succ.tag;
  190. tag_successors = &tag->tag_successors;
  191. tag_successors->ndeps_completed++;
  192. /* Note: the tag is already locked by the
  193. * caller. */
  194. if ((tag->state == STARPU_BLOCKED) &&
  195. (tag_successors->ndeps == tag_successors->ndeps_completed))
  196. {
  197. /* reset the counter so that we can reuse the completion group */
  198. tag_successors->ndeps_completed = 0;
  199. _starpu_tag_set_ready(tag);
  200. }
  201. break;
  202. }
  203. case STARPU_CG_TASK:
  204. {
  205. struct _starpu_cg_list *job_successors;
  206. struct _starpu_job *j;
  207. j = cg->succ.job;
  208. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  209. job_successors = &j->job_successors;
  210. #ifdef STARPU_DEBUG
  211. if (!j->task->regenerate)
  212. {
  213. unsigned i;
  214. /* Remove backward cg pointers for easier debugging */
  215. if (job_successors->deps)
  216. {
  217. for (i = 0; i < job_successors->ndeps; i++)
  218. if (job_successors->deps[i] == cg)
  219. break;
  220. STARPU_ASSERT(i < job_successors->ndeps);
  221. job_successors->done[i] = 1;
  222. }
  223. if (cg->deps)
  224. {
  225. for (i = 0; i < cg->ndeps; i++)
  226. if (cg->deps[i] == pred)
  227. break;
  228. STARPU_ASSERT(i < cg->ndeps);
  229. cg->done[i] = 1;
  230. }
  231. }
  232. #endif
  233. unsigned ndeps_completed =
  234. STARPU_ATOMIC_ADD(&job_successors->ndeps_completed, 1);
  235. STARPU_ASSERT(job_successors->ndeps >= ndeps_completed);
  236. /* Need to atomically test submitted and check
  237. * dependencies, since this is concurrent with
  238. * _starpu_submit_job */
  239. if (j->submitted && job_successors->ndeps == ndeps_completed &&
  240. j->task->status == STARPU_TASK_BLOCKED_ON_TASK)
  241. {
  242. /* That task has already passed tag checks,
  243. * do not do them again since the tag has been cleared! */
  244. _starpu_enforce_deps_starting_from_task(j);
  245. }
  246. else
  247. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  248. break;
  249. }
  250. default:
  251. STARPU_ABORT();
  252. }
  253. }
  254. }
  255. /* Called when a job has just started, so we can notify tasks which were waiting
  256. * only for this one when they can expect to start */
  257. /* Note: in case of a tag, it must be already locked */
  258. void _starpu_notify_job_ready_soon_cg(void *pred STARPU_ATTRIBUTE_UNUSED, struct _starpu_cg *cg, _starpu_notify_job_start_data *data)
  259. {
  260. STARPU_ASSERT(cg);
  261. if (cg->remaining == 1)
  262. {
  263. /* the group is to be completed */
  264. switch (cg->cg_type)
  265. {
  266. case STARPU_CG_APPS:
  267. /* Not a task */
  268. break;
  269. case STARPU_CG_TAG:
  270. {
  271. struct _starpu_cg_list *tag_successors;
  272. struct _starpu_tag *tag;
  273. tag = cg->succ.tag;
  274. tag_successors = &tag->tag_successors;
  275. /* Note: the tag is already locked by the
  276. * caller. */
  277. if ((tag->state == STARPU_BLOCKED) &&
  278. (tag_successors->ndeps == tag_successors->ndeps_completed + 1))
  279. {
  280. /* This is to be ready */
  281. _starpu_enforce_deps_notify_job_ready_soon(tag->job, data, 1);
  282. }
  283. break;
  284. }
  285. case STARPU_CG_TASK:
  286. {
  287. struct _starpu_cg_list *job_successors;
  288. struct _starpu_job *j;
  289. j = cg->succ.job;
  290. job_successors = &j->job_successors;
  291. if (job_successors->ndeps == job_successors->ndeps_completed + 1 &&
  292. j->task->status == STARPU_TASK_BLOCKED_ON_TASK)
  293. {
  294. /* This is to be ready */
  295. _starpu_enforce_deps_notify_job_ready_soon(j, data, 0);
  296. }
  297. break;
  298. }
  299. default:
  300. STARPU_ABORT();
  301. }
  302. }
  303. }
  304. /* Caller just has to promise that the list will not disappear.
  305. * _starpu_notify_cg_list protects the list itself.
  306. * No job lock should be held, since we might want to immediately call the callback of an empty task.
  307. */
  308. void _starpu_notify_cg_list(void *pred, struct _starpu_cg_list *successors)
  309. {
  310. unsigned succ;
  311. _starpu_spin_lock(&successors->lock);
  312. /* Note: some thread might be concurrently adding other items */
  313. for (succ = 0; succ < successors->nsuccs; succ++)
  314. {
  315. struct _starpu_cg *cg = successors->succ[succ];
  316. STARPU_ASSERT(cg);
  317. unsigned cg_type = cg->cg_type;
  318. if (cg_type == STARPU_CG_APPS)
  319. {
  320. /* Remove the temporary ref to the cg */
  321. memmove(&successors->succ[succ], &successors->succ[succ+1], (successors->nsuccs-(succ+1)) * sizeof(successors->succ[succ]));
  322. succ--;
  323. successors->nsuccs--;
  324. }
  325. _starpu_spin_unlock(&successors->lock);
  326. struct _starpu_tag *cgtag = NULL;
  327. if (cg_type == STARPU_CG_TAG)
  328. {
  329. cgtag = cg->succ.tag;
  330. STARPU_ASSERT(cgtag);
  331. _starpu_spin_lock(&cgtag->lock);
  332. }
  333. _starpu_notify_cg(pred, cg);
  334. if (cg_type == STARPU_CG_TAG)
  335. _starpu_spin_unlock(&cgtag->lock);
  336. _starpu_spin_lock(&successors->lock);
  337. }
  338. successors->terminated = 1;
  339. _starpu_spin_unlock(&successors->lock);
  340. }
  341. /* Called when a job has just started, so we can notify tasks which were waiting
  342. * only for this one when they can expect to start */
  343. /* Caller just has to promise that the list will not disappear.
  344. * _starpu_notify_cg_list protects the list itself.
  345. * No job lock should be held, since we might want to immediately call the callback of an empty task.
  346. */
  347. void _starpu_notify_job_start_cg_list(void *pred, struct _starpu_cg_list *successors, _starpu_notify_job_start_data *data)
  348. {
  349. unsigned succ;
  350. _starpu_spin_lock(&successors->lock);
  351. /* Note: some thread might be concurrently adding other items */
  352. for (succ = 0; succ < successors->nsuccs; succ++)
  353. {
  354. struct _starpu_cg *cg = successors->succ[succ];
  355. _starpu_spin_unlock(&successors->lock);
  356. STARPU_ASSERT(cg);
  357. unsigned cg_type = cg->cg_type;
  358. struct _starpu_tag *cgtag = NULL;
  359. if (cg_type == STARPU_CG_TAG)
  360. {
  361. cgtag = cg->succ.tag;
  362. STARPU_ASSERT(cgtag);
  363. _starpu_spin_lock(&cgtag->lock);
  364. }
  365. _starpu_notify_job_ready_soon_cg(pred, cg, data);
  366. if (cg_type == STARPU_CG_TAG)
  367. _starpu_spin_unlock(&cgtag->lock);
  368. _starpu_spin_lock(&successors->lock);
  369. }
  370. _starpu_spin_unlock(&successors->lock);
  371. }