data-concurrency.c 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. /*
  2. * StarPU
  3. * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <core/dependencies/data-concurrency.h>
  17. #include <datawizard/coherency.h>
  18. #include <core/policies/sched_policy.h>
  19. #include <common/starpu-spinlock.h>
  20. #include <datawizard/sort_data_handles.h>
  21. static unsigned _submit_job_enforce_data_deps(starpu_job_t j, unsigned start_buffer_index);
  22. static unsigned unlock_one_requester(starpu_data_requester_t r)
  23. {
  24. starpu_job_t j = r->j;
  25. unsigned nbuffers = j->task->cl->nbuffers;
  26. unsigned buffer_index = r->buffer_index;
  27. if (buffer_index + 1 < nbuffers)
  28. {
  29. /* not all buffers are protected yet */
  30. return _submit_job_enforce_data_deps(j, buffer_index + 1);
  31. }
  32. else
  33. return 0;
  34. }
  35. /* the header lock must be taken by the caller */
  36. static unsigned may_unlock_data_req_list_head(starpu_data_handle handle)
  37. {
  38. /* if there is no one to unlock ... */
  39. if (starpu_data_requester_list_empty(handle->req_list))
  40. return 0;
  41. /* if there is no reference to the data anymore, we can use it */
  42. if (handle->refcnt == 0)
  43. {
  44. STARPU_ASSERT(!handle->per_node[0].request);
  45. STARPU_ASSERT(!handle->per_node[1].request);
  46. return 1;
  47. }
  48. if (handle->current_mode == STARPU_W)
  49. return 0;
  50. /* data->current_mode == STARPU_R, so we can process more readers */
  51. starpu_data_requester_t r = starpu_data_requester_list_front(handle->req_list);
  52. return (r->mode == STARPU_R);
  53. }
  54. unsigned _starpu_attempt_to_submit_data_request_from_apps(starpu_data_handle handle, starpu_access_mode mode,
  55. void (*callback)(void *), void *argcb)
  56. {
  57. unsigned ret;
  58. _starpu_spin_lock(&handle->header_lock);
  59. if (handle->refcnt == 0)
  60. {
  61. /* there is nobody currently about to manipulate the data */
  62. handle->refcnt++;
  63. handle->current_mode = mode;
  64. /* success */
  65. ret = 0;
  66. }
  67. else
  68. {
  69. /* there is already someone that may access the data */
  70. if ( (mode == STARPU_R) && (handle->current_mode == STARPU_R))
  71. {
  72. handle->refcnt++;
  73. /* success : there is a new reader */
  74. ret = 0;
  75. }
  76. else
  77. {
  78. /* there cannot be multiple writers or a new writer
  79. * while the data is in read mode */
  80. /* enqueue the request */
  81. starpu_data_requester_t r = starpu_data_requester_new();
  82. r->mode = mode;
  83. r->is_requested_by_codelet = 0;
  84. r->ready_data_callback = callback;
  85. r->argcb = argcb;
  86. starpu_data_requester_list_push_back(handle->req_list, r);
  87. /* failed */
  88. ret = 1;
  89. }
  90. }
  91. _starpu_spin_unlock(&handle->header_lock);
  92. return ret;
  93. }
  94. static unsigned attempt_to_submit_data_request_from_job(starpu_job_t j, unsigned buffer_index)
  95. {
  96. unsigned ret;
  97. /* Note that we do not access j->task->buffers, but j->ordered_buffers
  98. * which is a sorted copy of it. */
  99. starpu_data_handle handle = j->ordered_buffers[buffer_index].handle;
  100. starpu_access_mode mode = j->ordered_buffers[buffer_index].mode;
  101. while (_starpu_spin_trylock(&handle->header_lock))
  102. _starpu_datawizard_progress(_starpu_get_local_memory_node(), 0);
  103. if (handle->refcnt == 0)
  104. {
  105. /* there is nobody currently about to manipulate the data */
  106. handle->refcnt++;
  107. handle->current_mode = (mode==STARPU_R)?STARPU_R:STARPU_W;
  108. /* success */
  109. ret = 0;
  110. }
  111. else
  112. {
  113. /* there is already someone that may access the data */
  114. if ( (mode == STARPU_R) && (handle->current_mode == STARPU_R))
  115. {
  116. handle->refcnt++;
  117. /* success : there is a new reader */
  118. ret = 0;
  119. }
  120. else
  121. {
  122. /* there cannot be multiple writers or a new writer
  123. * while the data is in read mode */
  124. /* enqueue the request */
  125. starpu_data_requester_t r = starpu_data_requester_new();
  126. r->mode = mode;
  127. r->is_requested_by_codelet = 1;
  128. r->j = j;
  129. r->buffer_index = buffer_index;
  130. starpu_data_requester_list_push_back(handle->req_list, r);
  131. /* failed */
  132. ret = 1;
  133. }
  134. }
  135. _starpu_spin_unlock(&handle->header_lock);
  136. return ret;
  137. }
  138. static unsigned _submit_job_enforce_data_deps(starpu_job_t j, unsigned start_buffer_index)
  139. {
  140. unsigned buf;
  141. unsigned nbuffers = j->task->cl->nbuffers;
  142. for (buf = start_buffer_index; buf < nbuffers; buf++)
  143. {
  144. if (attempt_to_submit_data_request_from_job(j, buf))
  145. return 1;
  146. }
  147. return 0;
  148. }
  149. /* When a new task is submitted, we make sure that there cannot be codelets
  150. with concurrent data-access at the same time in the scheduling engine (eg.
  151. there can be 2 tasks reading a piece of data, but there cannot be one
  152. reading and another writing) */
  153. unsigned _starpu_submit_job_enforce_data_deps(starpu_job_t j)
  154. {
  155. struct starpu_codelet_t *cl = j->task->cl;
  156. if ((cl == NULL) || (cl->nbuffers == 0))
  157. return 0;
  158. /* Compute an ordered list of the different pieces of data so that we
  159. * grab then according to a total order, thus avoiding a deadlock
  160. * condition */
  161. memcpy(j->ordered_buffers, j->task->buffers, cl->nbuffers*sizeof(starpu_buffer_descr));
  162. _starpu_sort_task_handles(j->ordered_buffers, cl->nbuffers);
  163. return _submit_job_enforce_data_deps(j, 0);
  164. }
  165. /* The header lock must already be taken by the caller */
  166. void _starpu_notify_data_dependencies(starpu_data_handle handle)
  167. {
  168. handle->refcnt--;
  169. while (may_unlock_data_req_list_head(handle))
  170. {
  171. /* unlock the head of the requester list */
  172. starpu_data_requester_t r = starpu_data_requester_list_pop_front(handle->req_list);
  173. handle->refcnt++;
  174. /* the data is now attributed to that request */
  175. handle->current_mode = (r->mode==STARPU_R)?STARPU_R:STARPU_W;
  176. _starpu_spin_unlock(&handle->header_lock);
  177. if (r->is_requested_by_codelet)
  178. {
  179. if (!unlock_one_requester(r))
  180. _starpu_push_task(r->j);
  181. }
  182. else
  183. {
  184. STARPU_ASSERT(r->ready_data_callback);
  185. /* execute the callback associated with the data requester */
  186. r->ready_data_callback(r->argcb);
  187. }
  188. starpu_data_requester_delete(r);
  189. _starpu_spin_lock(&handle->header_lock);
  190. }
  191. }