data_concurrency.c 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. /*
  2. * StarPU
  3. * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <core/dependencies/data_concurrency.h>
  17. #include <datawizard/coherency.h>
  18. #include <core/sched_policy.h>
  19. #include <common/starpu_spinlock.h>
  20. #include <datawizard/sort_data_handles.h>
  21. /*
  22. * The different types of data accesses are STARPU_R, STARPU_RW, STARPU_W,
  23. * STARPU_SCRATCH and STARPU_REDUX. STARPU_RW is managed as a STARPU_W access.
  24. * - A single STARPU_W access is allowed at a time.
  25. * - Concurrent STARPU_R accesses are allowed.
  26. * - Concurrent STARPU_SCRATCH accesses are allowed.
  27. * - Concurrent STARPU_REDUX accesses are allowed.
  28. */
  29. /* the header lock must be taken by the caller */
  30. static unsigned may_unlock_data_req_list_head(starpu_data_handle handle)
  31. {
  32. /* if there is no one to unlock ... */
  33. if (starpu_data_requester_list_empty(handle->req_list))
  34. return 0;
  35. /* if there is no reference to the data anymore, we can use it */
  36. if (handle->refcnt == 0)
  37. return 1;
  38. if (handle->current_mode == STARPU_W)
  39. return 0;
  40. /* data->current_mode == STARPU_R, so we can process more readers */
  41. starpu_data_requester_t r = starpu_data_requester_list_front(handle->req_list);
  42. starpu_access_mode r_mode = r->mode;
  43. if (r_mode == STARPU_RW)
  44. r_mode = STARPU_W;
  45. /* If this is a STARPU_R, STARPU_SCRATCH or STARPU_REDUX type of
  46. * access, we only proceed if the cuurrent mode is the same as the
  47. * requested mode. */
  48. return (r_mode == handle->current_mode);
  49. }
  50. /* Try to submit a data request, in case the request can be processed
  51. * immediatly, return 0, if there is still a dependency that is not compatible
  52. * with the current mode, the request is put in the per-handle list of
  53. * "requesters", and this function returns 1. */
  54. static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_codelet,
  55. starpu_data_handle handle, starpu_access_mode mode,
  56. void (*callback)(void *), void *argcb,
  57. starpu_job_t j, unsigned buffer_index)
  58. {
  59. unsigned ret;
  60. if (mode == STARPU_RW)
  61. mode = STARPU_W;
  62. /* Take the lock protecting the header. We try to do some progression
  63. * in case this is called from a worker, otherwise we just wait for the
  64. * lock to be available. */
  65. if (request_from_codelet)
  66. {
  67. while (_starpu_spin_trylock(&handle->header_lock))
  68. _starpu_datawizard_progress(_starpu_get_local_memory_node(), 0);
  69. }
  70. else {
  71. _starpu_spin_lock(&handle->header_lock);
  72. }
  73. /* If there is currently nobody accessing the piece of data, or it's
  74. * not another writter and if this is the same type of access as the
  75. * current one, we can proceed. */
  76. if ((handle->refcnt == 0) || (!(mode == STARPU_W) && (handle->current_mode == mode)))
  77. {
  78. handle->refcnt++;
  79. handle->current_mode = mode;
  80. /* success */
  81. ret = 0;
  82. }
  83. else
  84. {
  85. /* there cannot be multiple writers or a new writer
  86. * while the data is in read mode */
  87. /* enqueue the request */
  88. starpu_data_requester_t r = starpu_data_requester_new();
  89. r->mode = mode;
  90. r->is_requested_by_codelet = request_from_codelet;
  91. r->j = j;
  92. r->buffer_index = buffer_index;
  93. r->ready_data_callback = callback;
  94. r->argcb = argcb;
  95. starpu_data_requester_list_push_back(handle->req_list, r);
  96. /* failed */
  97. ret = 1;
  98. }
  99. _starpu_spin_unlock(&handle->header_lock);
  100. return ret;
  101. }
  102. unsigned _starpu_attempt_to_submit_data_request_from_apps(starpu_data_handle handle, starpu_access_mode mode,
  103. void (*callback)(void *), void *argcb)
  104. {
  105. return _starpu_attempt_to_submit_data_request(0, handle, mode, callback, argcb, NULL, 0);
  106. }
  107. static unsigned attempt_to_submit_data_request_from_job(starpu_job_t j, unsigned buffer_index)
  108. {
  109. /* Note that we do not access j->task->buffers, but j->ordered_buffers
  110. * which is a sorted copy of it. */
  111. starpu_data_handle handle = j->ordered_buffers[buffer_index].handle;
  112. starpu_access_mode mode = j->ordered_buffers[buffer_index].mode;
  113. return _starpu_attempt_to_submit_data_request(1, handle, mode, NULL, NULL, j, buffer_index);
  114. }
  115. static unsigned _submit_job_enforce_data_deps(starpu_job_t j, unsigned start_buffer_index)
  116. {
  117. unsigned buf;
  118. unsigned nbuffers = j->task->cl->nbuffers;
  119. for (buf = start_buffer_index; buf < nbuffers; buf++)
  120. {
  121. if (attempt_to_submit_data_request_from_job(j, buf)) {
  122. j->task->status = STARPU_TASK_BLOCKED_ON_JOB;
  123. return 1;
  124. }
  125. }
  126. return 0;
  127. }
  128. /* When a new task is submitted, we make sure that there cannot be codelets
  129. with concurrent data-access at the same time in the scheduling engine (eg.
  130. there can be 2 tasks reading a piece of data, but there cannot be one
  131. reading and another writing) */
  132. unsigned _starpu_submit_job_enforce_data_deps(starpu_job_t j)
  133. {
  134. struct starpu_codelet_t *cl = j->task->cl;
  135. if ((cl == NULL) || (cl->nbuffers == 0))
  136. return 0;
  137. /* Compute an ordered list of the different pieces of data so that we
  138. * grab then according to a total order, thus avoiding a deadlock
  139. * condition */
  140. memcpy(j->ordered_buffers, j->task->buffers, cl->nbuffers*sizeof(starpu_buffer_descr));
  141. _starpu_sort_task_handles(j->ordered_buffers, cl->nbuffers);
  142. return _submit_job_enforce_data_deps(j, 0);
  143. }
  144. static unsigned unlock_one_requester(starpu_data_requester_t r)
  145. {
  146. starpu_job_t j = r->j;
  147. unsigned nbuffers = j->task->cl->nbuffers;
  148. unsigned buffer_index = r->buffer_index;
  149. if (buffer_index + 1 < nbuffers)
  150. {
  151. /* not all buffers are protected yet */
  152. return _submit_job_enforce_data_deps(j, buffer_index + 1);
  153. }
  154. else
  155. return 0;
  156. }
  157. /* The header lock must already be taken by the caller */
  158. void _starpu_notify_data_dependencies(starpu_data_handle handle)
  159. {
  160. /* A data access has finished so we remove a reference. */
  161. STARPU_ASSERT(handle->refcnt > 0);
  162. handle->refcnt--;
  163. while (may_unlock_data_req_list_head(handle))
  164. {
  165. /* Grab the head of the requester list and unlock it. */
  166. starpu_data_requester_t r = starpu_data_requester_list_pop_front(handle->req_list);
  167. /* The data is now attributed to that request so we put a
  168. * reference on it. */
  169. handle->refcnt++;
  170. /* STARPU_RW accesses are treated as STARPU_W */
  171. starpu_access_mode r_mode = r->mode;
  172. if (r_mode == STARPU_RW)
  173. r_mode = STARPU_W;
  174. handle->current_mode = r_mode;
  175. _starpu_spin_unlock(&handle->header_lock);
  176. if (r->is_requested_by_codelet)
  177. {
  178. if (!unlock_one_requester(r))
  179. _starpu_push_task(r->j, 0);
  180. }
  181. else
  182. {
  183. STARPU_ASSERT(r->ready_data_callback);
  184. /* execute the callback associated with the data requester */
  185. r->ready_data_callback(r->argcb);
  186. }
  187. starpu_data_requester_delete(r);
  188. _starpu_spin_lock(&handle->header_lock);
  189. }
  190. }