cg.c 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2012, 2014-2015 Université de Bordeaux
  4. * Copyright (C) 2010, 2011, 2012, 2013, 2015 CNRS
  5. * Copyright (C) 2012 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <starpu.h>
  19. #include <common/config.h>
  20. #include <common/utils.h>
  21. #include <core/jobs.h>
  22. #include <core/task.h>
  23. #include <core/dependencies/cg.h>
  24. #include <core/dependencies/tags.h>
  25. void _starpu_cg_list_init(struct _starpu_cg_list *list)
  26. {
  27. _starpu_spin_init(&list->lock);
  28. list->ndeps = 0;
  29. list->ndeps_completed = 0;
  30. list->terminated = 0;
  31. list->nsuccs = 0;
  32. #ifdef STARPU_DYNAMIC_DEPS_SIZE
  33. /* this is a small initial default value ... may be changed */
  34. list->succ_list_size = 0;
  35. list->succ = NULL;
  36. #endif
  37. }
  38. void _starpu_cg_list_deinit(struct _starpu_cg_list *list)
  39. {
  40. unsigned id;
  41. for (id = 0; id < list->nsuccs; id++)
  42. {
  43. struct _starpu_cg *cg = list->succ[id];
  44. /* We remove the reference on the completion group, and free it
  45. * if there is no more reference. */
  46. unsigned ntags = STARPU_ATOMIC_ADD(&cg->ntags, -1);
  47. if (ntags == 0)
  48. free(list->succ[id]);
  49. }
  50. #ifdef STARPU_DYNAMIC_DEPS_SIZE
  51. free(list->succ);
  52. #endif
  53. _starpu_spin_destroy(&list->lock);
  54. }
  55. /* Returns whether the completion was already terminated, and caller should
  56. * thus immediately proceed. */
  57. int _starpu_add_successor_to_cg_list(struct _starpu_cg_list *successors, struct _starpu_cg *cg)
  58. {
  59. int ret;
  60. STARPU_ASSERT(cg);
  61. _starpu_spin_lock(&successors->lock);
  62. ret = successors->terminated;
  63. /* where should that cg should be put in the array ? */
  64. unsigned index = successors->nsuccs++;
  65. #ifdef STARPU_DYNAMIC_DEPS_SIZE
  66. if (index >= successors->succ_list_size)
  67. {
  68. /* the successor list is too small */
  69. if (successors->succ_list_size > 0)
  70. successors->succ_list_size *= 2;
  71. else
  72. successors->succ_list_size = 4;
  73. successors->succ = (struct _starpu_cg **) realloc(successors->succ,
  74. successors->succ_list_size*sizeof(struct _starpu_cg *));
  75. }
  76. #else
  77. STARPU_ASSERT(index < STARPU_NMAXDEPS);
  78. #endif
  79. successors->succ[index] = cg;
  80. _starpu_spin_unlock(&successors->lock);
  81. return ret;
  82. }
  83. /* Note: in case of a tag, it must be already locked */
  84. void _starpu_notify_cg(struct _starpu_cg *cg)
  85. {
  86. STARPU_ASSERT(cg);
  87. ANNOTATE_HAPPENS_BEFORE(&cg->remaining);
  88. unsigned remaining = STARPU_ATOMIC_ADD(&cg->remaining, -1);
  89. if (remaining == 0)
  90. {
  91. ANNOTATE_HAPPENS_AFTER(&cg->remaining);
  92. cg->remaining = cg->ntags;
  93. struct _starpu_tag *tag;
  94. struct _starpu_cg_list *tag_successors, *job_successors;
  95. struct _starpu_job *j;
  96. /* the group is now completed */
  97. switch (cg->cg_type)
  98. {
  99. case STARPU_CG_APPS:
  100. {
  101. /* this is a cg for an application waiting on a set of
  102. * tags, wake the thread */
  103. STARPU_PTHREAD_MUTEX_LOCK(&cg->succ.succ_apps.cg_mutex);
  104. cg->succ.succ_apps.completed = 1;
  105. STARPU_PTHREAD_COND_SIGNAL(&cg->succ.succ_apps.cg_cond);
  106. STARPU_PTHREAD_MUTEX_UNLOCK(&cg->succ.succ_apps.cg_mutex);
  107. break;
  108. }
  109. case STARPU_CG_TAG:
  110. {
  111. tag = cg->succ.tag;
  112. tag_successors = &tag->tag_successors;
  113. tag_successors->ndeps_completed++;
  114. /* Note: the tag is already locked by the
  115. * caller. */
  116. if ((tag->state == STARPU_BLOCKED) &&
  117. (tag_successors->ndeps == tag_successors->ndeps_completed))
  118. {
  119. /* reset the counter so that we can reuse the completion group */
  120. tag_successors->ndeps_completed = 0;
  121. _starpu_tag_set_ready(tag);
  122. }
  123. break;
  124. }
  125. case STARPU_CG_TASK:
  126. {
  127. j = cg->succ.job;
  128. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  129. job_successors = &j->job_successors;
  130. unsigned ndeps_completed =
  131. STARPU_ATOMIC_ADD(&job_successors->ndeps_completed, 1);
  132. STARPU_ASSERT(job_successors->ndeps >= ndeps_completed);
  133. /* Need to atomically test submitted and check
  134. * dependencies, since this is concurrent with
  135. * _starpu_submit_job */
  136. if (j->submitted && job_successors->ndeps == ndeps_completed &&
  137. j->task->status == STARPU_TASK_BLOCKED_ON_TASK)
  138. {
  139. /* That task has already passed tag checks,
  140. * do not do them again since the tag has been cleared! */
  141. _starpu_enforce_deps_starting_from_task(j);
  142. }
  143. else
  144. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  145. break;
  146. }
  147. default:
  148. STARPU_ABORT();
  149. }
  150. }
  151. }
  152. /* Caller just has to promise that the list will not disappear.
  153. * _starpu_notify_cg_list protects the list itself.
  154. * No job lock should be held, since we might want to immediately call the callback of an empty task.
  155. */
  156. void _starpu_notify_cg_list(struct _starpu_cg_list *successors)
  157. {
  158. unsigned succ;
  159. _starpu_spin_lock(&successors->lock);
  160. /* Note: some thread might be concurrently adding other items */
  161. for (succ = 0; succ < successors->nsuccs; succ++)
  162. {
  163. struct _starpu_cg *cg = successors->succ[succ];
  164. STARPU_ASSERT(cg);
  165. unsigned cg_type = cg->cg_type;
  166. if (cg_type == STARPU_CG_APPS)
  167. {
  168. /* Remove the temporary ref to the cg */
  169. memmove(&successors->succ[succ], &successors->succ[succ+1], (successors->nsuccs-(succ+1)) * sizeof(successors->succ[succ]));
  170. succ--;
  171. successors->nsuccs--;
  172. }
  173. _starpu_spin_unlock(&successors->lock);
  174. struct _starpu_tag *cgtag = NULL;
  175. if (cg_type == STARPU_CG_TAG)
  176. {
  177. cgtag = cg->succ.tag;
  178. STARPU_ASSERT(cgtag);
  179. _starpu_spin_lock(&cgtag->lock);
  180. }
  181. _starpu_notify_cg(cg);
  182. if (cg_type == STARPU_CG_TAG)
  183. _starpu_spin_unlock(&cgtag->lock);
  184. _starpu_spin_lock(&successors->lock);
  185. }
  186. successors->terminated = 1;
  187. _starpu_spin_unlock(&successors->lock);
  188. }