tags.c 13 KB


  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009-2013, 2016 Université de Bordeaux
  4. * Copyright (C) 2010, 2011, 2012, 2013 CNRS
  5. * Copyright (C) 2016 Inria
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <starpu.h>
  19. #include <common/config.h>
  20. #include <common/utils.h>
  21. #include <core/dependencies/tags.h>
  22. #include <core/jobs.h>
  23. #include <core/sched_policy.h>
  24. #include <core/dependencies/data_concurrency.h>
  25. #include <profiling/bound.h>
  26. #include <common/uthash.h>
  27. #include <core/debug.h>
  28. #define STARPU_AYUDAME_OFFSET 4000000000000000000ULL
  29. struct _starpu_tag_table
  30. {
  31. UT_hash_handle hh;
  32. starpu_tag_t id;
  33. struct _starpu_tag *tag;
  34. };
  35. #define HASH_ADD_UINT64_T(head,field,add) HASH_ADD(hh,head,field,sizeof(uint64_t),add)
  36. #define HASH_FIND_UINT64_T(head,find,out) HASH_FIND(hh,head,find,sizeof(uint64_t),out)
  37. static struct _starpu_tag_table *tag_htbl = NULL;
  38. static starpu_pthread_rwlock_t tag_global_rwlock;
  39. static struct _starpu_cg *create_cg_apps(unsigned ntags)
  40. {
  41. struct _starpu_cg *cg = (struct _starpu_cg *) malloc(sizeof(struct _starpu_cg));
  42. STARPU_ASSERT(cg);
  43. cg->ntags = ntags;
  44. cg->remaining = ntags;
  45. cg->cg_type = STARPU_CG_APPS;
  46. cg->succ.succ_apps.completed = 0;
  47. STARPU_PTHREAD_MUTEX_INIT(&cg->succ.succ_apps.cg_mutex, NULL);
  48. STARPU_PTHREAD_COND_INIT(&cg->succ.succ_apps.cg_cond, NULL);
  49. return cg;
  50. }
  51. static struct _starpu_cg *create_cg_tag(unsigned ntags, struct _starpu_tag *tag)
  52. {
  53. struct _starpu_cg *cg = (struct _starpu_cg *) malloc(sizeof(struct _starpu_cg));
  54. STARPU_ASSERT(cg);
  55. cg->ntags = ntags;
  56. cg->remaining = ntags;
  57. cg->cg_type = STARPU_CG_TAG;
  58. cg->succ.tag = tag;
  59. tag->tag_successors.ndeps++;
  60. return cg;
  61. }
  62. static struct _starpu_tag *_starpu_tag_init(starpu_tag_t id)
  63. {
  64. struct _starpu_tag *tag;
  65. tag = (struct _starpu_tag *) malloc(sizeof(struct _starpu_tag));
  66. STARPU_ASSERT(tag);
  67. tag->job = NULL;
  68. tag->is_assigned = 0;
  69. tag->is_submitted = 0;
  70. tag->id = id;
  71. tag->state = STARPU_INVALID_STATE;
  72. _starpu_cg_list_init(&tag->tag_successors);
  73. _starpu_spin_init(&tag->lock);
  74. return tag;
  75. }
  76. static void _starpu_tag_free(void *_tag)
  77. {
  78. struct _starpu_tag *tag = (struct _starpu_tag *) _tag;
  79. if (tag)
  80. {
  81. _starpu_spin_lock(&tag->lock);
  82. unsigned nsuccs = tag->tag_successors.nsuccs;
  83. unsigned succ;
  84. for (succ = 0; succ < nsuccs; succ++)
  85. {
  86. struct _starpu_cg *cg = tag->tag_successors.succ[succ];
  87. unsigned ntags = STARPU_ATOMIC_ADD(&cg->ntags, -1);
  88. unsigned remaining STARPU_ATTRIBUTE_UNUSED = STARPU_ATOMIC_ADD(&cg->remaining, -1);
  89. if (!ntags && (cg->cg_type == STARPU_CG_TAG))
  90. /* Last tag this cg depends on, cg becomes unreferenced */
  91. free(cg);
  92. }
  93. #ifdef STARPU_DYNAMIC_DEPS_SIZE
  94. free(tag->tag_successors.succ);
  95. #endif
  96. _starpu_spin_unlock(&tag->lock);
  97. _starpu_spin_destroy(&tag->lock);
  98. free(tag);
  99. }
  100. }
  101. /*
  102. * Staticly initializing tag_global_rwlock seems to lead to weird errors
  103. * on Darwin, so we do it dynamically.
  104. */
  105. void _starpu_init_tags(void)
  106. {
  107. STARPU_PTHREAD_RWLOCK_INIT(&tag_global_rwlock, NULL);
  108. }
  109. void starpu_tag_remove(starpu_tag_t id)
  110. {
  111. struct _starpu_tag_table *entry;
  112. STARPU_ASSERT(!STARPU_AYU_EVENT || id < STARPU_AYUDAME_OFFSET);
  113. STARPU_AYU_REMOVETASK(id + STARPU_AYUDAME_OFFSET);
  114. STARPU_PTHREAD_RWLOCK_WRLOCK(&tag_global_rwlock);
  115. HASH_FIND_UINT64_T(tag_htbl, &id, entry);
  116. if (entry) HASH_DEL(tag_htbl, entry);
  117. STARPU_PTHREAD_RWLOCK_UNLOCK(&tag_global_rwlock);
  118. if (entry)
  119. {
  120. _starpu_tag_free(entry->tag);
  121. free(entry);
  122. }
  123. }
  124. void _starpu_tag_clear(void)
  125. {
  126. STARPU_PTHREAD_RWLOCK_WRLOCK(&tag_global_rwlock);
  127. /* XXX: _starpu_tag_free takes the tag spinlocks while we are keeping
  128. * the global rwlock. This contradicts the lock order of
  129. * starpu_tag_wait_array. Should not be a problem in practice since
  130. * _starpu_tag_clear is called at shutdown only. */
  131. struct _starpu_tag_table *entry, *tmp;
  132. HASH_ITER(hh, tag_htbl, entry, tmp)
  133. {
  134. HASH_DEL(tag_htbl, entry);
  135. _starpu_tag_free(entry->tag);
  136. free(entry);
  137. }
  138. STARPU_PTHREAD_RWLOCK_UNLOCK(&tag_global_rwlock);
  139. }
  140. static struct _starpu_tag *_gettag_struct(starpu_tag_t id)
  141. {
  142. /* search if the tag is already declared or not */
  143. struct _starpu_tag_table *entry;
  144. struct _starpu_tag *tag;
  145. HASH_FIND_UINT64_T(tag_htbl, &id, entry);
  146. if (entry != NULL)
  147. tag = entry->tag;
  148. else
  149. {
  150. /* the tag does not exist yet : create an entry */
  151. tag = _starpu_tag_init(id);
  152. struct _starpu_tag_table *entry2;
  153. entry2 = (struct _starpu_tag_table *) malloc(sizeof(*entry2));
  154. STARPU_ASSERT(entry2 != NULL);
  155. entry2->id = id;
  156. entry2->tag = tag;
  157. HASH_ADD_UINT64_T(tag_htbl, id, entry2);
  158. STARPU_ASSERT(!STARPU_AYU_EVENT || id < STARPU_AYUDAME_OFFSET);
  159. STARPU_AYU_ADDTASK(id + STARPU_AYUDAME_OFFSET, NULL);
  160. }
  161. return tag;
  162. }
  163. static struct _starpu_tag *gettag_struct(starpu_tag_t id)
  164. {
  165. struct _starpu_tag *tag;
  166. STARPU_PTHREAD_RWLOCK_WRLOCK(&tag_global_rwlock);
  167. tag = _gettag_struct(id);
  168. STARPU_PTHREAD_RWLOCK_UNLOCK(&tag_global_rwlock);
  169. return tag;
  170. }
  171. /* lock should be taken */
  172. void _starpu_tag_set_ready(struct _starpu_tag *tag)
  173. {
  174. /* mark this tag as ready to run */
  175. tag->state = STARPU_READY;
  176. /* declare it to the scheduler ! */
  177. struct _starpu_job *j = tag->job;
  178. /* In case the task job is going to be scheduled immediately, and if
  179. * the task is "empty", calling _starpu_push_task would directly try to enforce
  180. * the dependencies of the task, and therefore it would try to grab the
  181. * lock again, resulting in a deadlock. */
  182. _starpu_spin_unlock(&tag->lock);
  183. /* enforce data dependencies */
  184. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  185. _starpu_enforce_deps_starting_from_task(j);
  186. _starpu_spin_lock(&tag->lock);
  187. STARPU_ASSERT(!STARPU_AYU_EVENT || tag->id < STARPU_AYUDAME_OFFSET);
  188. STARPU_AYU_PRERUNTASK(tag->id + STARPU_AYUDAME_OFFSET, -1);
  189. STARPU_AYU_POSTRUNTASK(tag->id + STARPU_AYUDAME_OFFSET);
  190. }
  191. /* the lock must be taken ! */
  192. static void _starpu_tag_add_succ(struct _starpu_tag *tag, struct _starpu_cg *cg)
  193. {
  194. STARPU_ASSERT(tag);
  195. _starpu_add_successor_to_cg_list(&tag->tag_successors, cg);
  196. if (tag->state == STARPU_DONE)
  197. {
  198. /* the tag was already completed sooner */
  199. _starpu_notify_cg(cg);
  200. }
  201. }
  202. void _starpu_notify_tag_dependencies(struct _starpu_tag *tag)
  203. {
  204. _starpu_spin_lock(&tag->lock);
  205. if (tag->state == STARPU_DONE)
  206. {
  207. _starpu_spin_unlock(&tag->lock);
  208. return;
  209. }
  210. tag->state = STARPU_DONE;
  211. _STARPU_TRACE_TAG_DONE(tag);
  212. _starpu_notify_cg_list(&tag->tag_successors);
  213. _starpu_spin_unlock(&tag->lock);
  214. }
  215. void starpu_tag_restart(starpu_tag_t id)
  216. {
  217. struct _starpu_tag *tag = gettag_struct(id);
  218. _starpu_spin_lock(&tag->lock);
  219. STARPU_ASSERT_MSG(tag->state == STARPU_DONE || tag->state == STARPU_INVALID_STATE || tag->state == STARPU_ASSOCIATED || tag->state == STARPU_BLOCKED, "Only completed tags can be restarted (%llu was %d)", (unsigned long long) id, tag->state);
  220. tag->state = STARPU_BLOCKED;
  221. _starpu_spin_unlock(&tag->lock);
  222. }
  223. void starpu_tag_notify_from_apps(starpu_tag_t id)
  224. {
  225. struct _starpu_tag *tag = gettag_struct(id);
  226. _starpu_notify_tag_dependencies(tag);
  227. }
  228. void _starpu_tag_declare(starpu_tag_t id, struct _starpu_job *job)
  229. {
  230. _STARPU_TRACE_TAG(id, job);
  231. job->task->use_tag = 1;
  232. struct _starpu_tag *tag= gettag_struct(id);
  233. _starpu_spin_lock(&tag->lock);
  234. /* Note: a tag can be shared by several tasks, when it is used to
  235. * detect when either of them are finished. We however don't allow
  236. * several tasks to share a tag when it is used to wake them by
  237. * dependency */
  238. if (tag->job != job)
  239. tag->is_assigned++;
  240. tag->job = job;
  241. job->tag = tag;
  242. /* the tag is now associated to a job */
  243. /* When the same tag may be signaled several times by different tasks,
  244. * and it's already done, we should not reset the "done" state.
  245. * When the tag is simply used by the same task several times, we have
  246. * to do so. */
  247. if (job->task->regenerate || job->submitted == 2 ||
  248. tag->state != STARPU_DONE)
  249. tag->state = STARPU_ASSOCIATED;
  250. STARPU_ASSERT(!STARPU_AYU_EVENT || id < STARPU_AYUDAME_OFFSET);
  251. STARPU_AYU_ADDDEPENDENCY(id+STARPU_AYUDAME_OFFSET, 0, job->job_id);
  252. STARPU_AYU_ADDDEPENDENCY(job->job_id, 0, id+STARPU_AYUDAME_OFFSET);
  253. _starpu_spin_unlock(&tag->lock);
  254. }
  255. void starpu_tag_declare_deps_array(starpu_tag_t id, unsigned ndeps, starpu_tag_t *array)
  256. {
  257. if (!ndeps)
  258. return;
  259. unsigned i;
  260. /* create the associated completion group */
  261. struct _starpu_tag *tag_child = gettag_struct(id);
  262. _starpu_spin_lock(&tag_child->lock);
  263. struct _starpu_cg *cg = create_cg_tag(ndeps, tag_child);
  264. _starpu_spin_unlock(&tag_child->lock);
  265. for (i = 0; i < ndeps; i++)
  266. {
  267. starpu_tag_t dep_id = array[i];
  268. /* id depends on dep_id
  269. * so cg should be among dep_id's successors*/
  270. _STARPU_TRACE_TAG_DEPS(id, dep_id);
  271. _starpu_bound_tag_dep(id, dep_id);
  272. struct _starpu_tag *tag_dep = gettag_struct(dep_id);
  273. STARPU_ASSERT(tag_dep != tag_child);
  274. _starpu_spin_lock(&tag_dep->lock);
  275. _starpu_spin_lock(&tag_child->lock);
  276. _starpu_tag_add_succ(tag_dep, cg);
  277. STARPU_ASSERT(!STARPU_AYU_EVENT || dep_id < STARPU_AYUDAME_OFFSET);
  278. STARPU_ASSERT(!STARPU_AYU_EVENT || id < STARPU_AYUDAME_OFFSET);
  279. STARPU_AYU_ADDDEPENDENCY(dep_id+STARPU_AYUDAME_OFFSET, 0, id+STARPU_AYUDAME_OFFSET);
  280. _starpu_spin_unlock(&tag_child->lock);
  281. _starpu_spin_unlock(&tag_dep->lock);
  282. }
  283. }
  284. void starpu_tag_declare_deps(starpu_tag_t id, unsigned ndeps, ...)
  285. {
  286. if (!ndeps)
  287. return;
  288. unsigned i;
  289. /* create the associated completion group */
  290. struct _starpu_tag *tag_child = gettag_struct(id);
  291. _starpu_spin_lock(&tag_child->lock);
  292. struct _starpu_cg *cg = create_cg_tag(ndeps, tag_child);
  293. _starpu_spin_unlock(&tag_child->lock);
  294. va_list pa;
  295. va_start(pa, ndeps);
  296. for (i = 0; i < ndeps; i++)
  297. {
  298. starpu_tag_t dep_id;
  299. dep_id = va_arg(pa, starpu_tag_t);
  300. /* id depends on dep_id
  301. * so cg should be among dep_id's successors*/
  302. _STARPU_TRACE_TAG_DEPS(id, dep_id);
  303. _starpu_bound_tag_dep(id, dep_id);
  304. struct _starpu_tag *tag_dep = gettag_struct(dep_id);
  305. STARPU_ASSERT(tag_dep != tag_child);
  306. _starpu_spin_lock(&tag_dep->lock);
  307. _starpu_spin_lock(&tag_child->lock);
  308. _starpu_tag_add_succ(tag_dep, cg);
  309. STARPU_ASSERT(!STARPU_AYU_EVENT || dep_id < STARPU_AYUDAME_OFFSET);
  310. STARPU_ASSERT(!STARPU_AYU_EVENT || id < STARPU_AYUDAME_OFFSET);
  311. STARPU_AYU_ADDDEPENDENCY(dep_id+STARPU_AYUDAME_OFFSET, 0, id+STARPU_AYUDAME_OFFSET);
  312. _starpu_spin_unlock(&tag_child->lock);
  313. _starpu_spin_unlock(&tag_dep->lock);
  314. }
  315. va_end(pa);
  316. }
  317. /* this function may be called by the application (outside callbacks !) */
  318. int starpu_tag_wait_array(unsigned ntags, starpu_tag_t *id)
  319. {
  320. unsigned i;
  321. unsigned current;
  322. struct _starpu_tag *tag_array[ntags];
  323. _STARPU_LOG_IN();
  324. /* It is forbidden to block within callbacks or codelets */
  325. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_tag_wait must not be called from a task or callback");
  326. STARPU_PTHREAD_RWLOCK_WRLOCK(&tag_global_rwlock);
  327. /* only wait the tags that are not done yet */
  328. for (i = 0, current = 0; i < ntags; i++)
  329. {
  330. struct _starpu_tag *tag = _gettag_struct(id[i]);
  331. _starpu_spin_lock(&tag->lock);
  332. if (tag->state == STARPU_DONE)
  333. {
  334. /* that tag is done already */
  335. _starpu_spin_unlock(&tag->lock);
  336. }
  337. else
  338. {
  339. tag_array[current] = tag;
  340. current++;
  341. }
  342. }
  343. STARPU_PTHREAD_RWLOCK_UNLOCK(&tag_global_rwlock);
  344. if (current == 0)
  345. {
  346. /* all deps are already fulfilled */
  347. _STARPU_LOG_OUT_TAG("all deps are already fulfilled");
  348. return 0;
  349. }
  350. /* there is at least one task that is not finished */
  351. struct _starpu_cg *cg = create_cg_apps(current);
  352. for (i = 0; i < current; i++)
  353. {
  354. _starpu_tag_add_succ(tag_array[i], cg);
  355. _starpu_spin_unlock(&tag_array[i]->lock);
  356. }
  357. STARPU_PTHREAD_MUTEX_LOCK(&cg->succ.succ_apps.cg_mutex);
  358. while (!cg->succ.succ_apps.completed)
  359. STARPU_PTHREAD_COND_WAIT(&cg->succ.succ_apps.cg_cond, &cg->succ.succ_apps.cg_mutex);
  360. STARPU_PTHREAD_MUTEX_UNLOCK(&cg->succ.succ_apps.cg_mutex);
  361. STARPU_PTHREAD_MUTEX_DESTROY(&cg->succ.succ_apps.cg_mutex);
  362. STARPU_PTHREAD_COND_DESTROY(&cg->succ.succ_apps.cg_cond);
  363. free(cg);
  364. _STARPU_LOG_OUT();
  365. return 0;
  366. }
  367. int starpu_tag_wait(starpu_tag_t id)
  368. {
  369. return starpu_tag_wait_array(1, &id);
  370. }
  371. struct starpu_task *starpu_tag_get_task(starpu_tag_t id)
  372. {
  373. struct _starpu_tag_table *entry;
  374. struct _starpu_tag *tag;
  375. STARPU_PTHREAD_RWLOCK_WRLOCK(&tag_global_rwlock);
  376. HASH_FIND_UINT64_T(tag_htbl, &id, entry);
  377. STARPU_PTHREAD_RWLOCK_UNLOCK(&tag_global_rwlock);
  378. if (!entry)
  379. return NULL;
  380. tag = entry->tag;
  381. if (!tag->job)
  382. return NULL;
  383. return tag->job->task;
  384. }