data-concurrency.c 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. /*
  2. * StarPU
  3. * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <core/dependencies/data-concurrency.h>
  17. #include <datawizard/coherency.h>
  18. #include <core/policies/sched_policy.h>
  19. #include <common/starpu-spinlock.h>
  20. static unsigned _submit_job_enforce_data_deps(job_t j, unsigned start_buffer_index);
  21. static unsigned unlock_one_requester(data_requester_t r)
  22. {
  23. job_t j = r->j;
  24. unsigned nbuffers = j->task->cl->nbuffers;
  25. unsigned buffer_index = r->buffer_index;
  26. if (buffer_index + 1 < nbuffers)
  27. {
  28. /* not all buffers are protected yet */
  29. return _submit_job_enforce_data_deps(j, buffer_index + 1);
  30. }
  31. else
  32. return 0;
  33. }
  34. /* the header lock must be taken by the caller */
  35. static unsigned may_unlock_data_req_list_head(starpu_data_handle handle)
  36. {
  37. /* if there is no one to unlock ... */
  38. if (data_requester_list_empty(handle->req_list))
  39. return 0;
  40. /* if there is no reference to the data anymore, we can use it */
  41. if (handle->refcnt == 0)
  42. {
  43. STARPU_ASSERT(!handle->per_node[0].request);
  44. STARPU_ASSERT(!handle->per_node[1].request);
  45. return 1;
  46. }
  47. if (handle->current_mode == STARPU_W)
  48. return 0;
  49. /* data->current_mode == STARPU_R, so we can process more readers */
  50. data_requester_t r = data_requester_list_front(handle->req_list);
  51. return (r->mode == STARPU_R);
  52. }
  53. unsigned attempt_to_submit_data_request_from_apps(starpu_data_handle handle, starpu_access_mode mode,
  54. void (*callback)(void *), void *argcb)
  55. {
  56. unsigned ret;
  57. starpu_spin_lock(&handle->header_lock);
  58. if (handle->refcnt == 0)
  59. {
  60. /* there is nobody currently about to manipulate the data */
  61. handle->refcnt++;
  62. handle->current_mode = mode;
  63. /* success */
  64. ret = 0;
  65. }
  66. else
  67. {
  68. /* there is already someone that may access the data */
  69. if ( (mode == STARPU_R) && (handle->current_mode == STARPU_R))
  70. {
  71. handle->refcnt++;
  72. /* success : there is a new reader */
  73. ret = 0;
  74. }
  75. else
  76. {
  77. /* there cannot be multiple writers or a new writer
  78. * while the data is in read mode */
  79. /* enqueue the request */
  80. data_requester_t r = data_requester_new();
  81. r->mode = mode;
  82. r->is_requested_by_codelet = 0;
  83. r->ready_data_callback = callback;
  84. r->argcb = argcb;
  85. data_requester_list_push_back(handle->req_list, r);
  86. /* failed */
  87. ret = 1;
  88. }
  89. }
  90. starpu_spin_unlock(&handle->header_lock);
  91. return ret;
  92. }
  93. static unsigned attempt_to_submit_data_request_from_job(job_t j, unsigned buffer_index)
  94. {
  95. unsigned ret;
  96. starpu_data_handle handle = j->task->buffers[buffer_index].handle;
  97. starpu_access_mode mode = j->task->buffers[buffer_index].mode;
  98. while (starpu_spin_trylock(&handle->header_lock))
  99. datawizard_progress(get_local_memory_node(), 0);
  100. if (handle->refcnt == 0)
  101. {
  102. /* there is nobody currently about to manipulate the data */
  103. handle->refcnt++;
  104. handle->current_mode = (mode==STARPU_R)?STARPU_R:STARPU_W;
  105. /* success */
  106. ret = 0;
  107. }
  108. else
  109. {
  110. /* there is already someone that may access the data */
  111. if ( (mode == STARPU_R) && (handle->current_mode == STARPU_R))
  112. {
  113. handle->refcnt++;
  114. /* success : there is a new reader */
  115. ret = 0;
  116. }
  117. else
  118. {
  119. /* there cannot be multiple writers or a new writer
  120. * while the data is in read mode */
  121. /* enqueue the request */
  122. data_requester_t r = data_requester_new();
  123. r->mode = mode;
  124. r->is_requested_by_codelet = 1;
  125. r->j = j;
  126. r->buffer_index = buffer_index;
  127. data_requester_list_push_back(handle->req_list, r);
  128. /* failed */
  129. ret = 1;
  130. }
  131. }
  132. starpu_spin_unlock(&handle->header_lock);
  133. return ret;
  134. }
  135. static unsigned _submit_job_enforce_data_deps(job_t j, unsigned start_buffer_index)
  136. {
  137. unsigned buf;
  138. /* TODO compute an ordered list of the data */
  139. unsigned nbuffers = j->task->cl->nbuffers;
  140. for (buf = start_buffer_index; buf < nbuffers; buf++)
  141. {
  142. if (attempt_to_submit_data_request_from_job(j, buf))
  143. return 1;
  144. }
  145. return 0;
  146. }
  147. /* When a new task is submitted, we make sure that there cannot be codelets
  148. with concurrent data-access at the same time in the scheduling engine (eg.
  149. there can be 2 tasks reading a piece of data, but there cannot be one
  150. reading and another writing) */
  151. unsigned submit_job_enforce_data_deps(job_t j)
  152. {
  153. if ((j->task->cl == NULL) || (j->task->cl->nbuffers == 0))
  154. return 0;
  155. return _submit_job_enforce_data_deps(j, 0);
  156. }
  157. /* The header lock must already be taken by the caller */
  158. void notify_data_dependencies(starpu_data_handle handle)
  159. {
  160. handle->refcnt--;
  161. while (may_unlock_data_req_list_head(handle))
  162. {
  163. /* unlock the head of the requester list */
  164. data_requester_t r = data_requester_list_pop_front(handle->req_list);
  165. handle->refcnt++;
  166. /* the data is now attributed to that request */
  167. handle->current_mode = (r->mode==STARPU_R)?STARPU_R:STARPU_W;
  168. starpu_spin_unlock(&handle->header_lock);
  169. if (r->is_requested_by_codelet)
  170. {
  171. if (!unlock_one_requester(r))
  172. push_task(r->j);
  173. }
  174. else
  175. {
  176. STARPU_ASSERT(r->ready_data_callback);
  177. /* execute the callback associated with the data requester */
  178. r->ready_data_callback(r->argcb);
  179. }
  180. data_requester_delete(r);
  181. starpu_spin_lock(&handle->header_lock);
  182. }
  183. }