user_interactions.c 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. /*
  2. * StarPU
  3. * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <common/config.h>
  17. #include <datawizard/coherency.h>
  18. #include <datawizard/copy-driver.h>
  19. #include <datawizard/write_back.h>
  20. #include <core/dependencies/data-concurrency.h>
  21. int starpu_request_data_allocation(starpu_data_handle handle, uint32_t node)
  22. {
  23. data_request_t r;
  24. r = create_data_request(handle, 0, node, node, 0, 0, 1);
  25. /* we do not increase the refcnt associated to the request since we are
  26. * not waiting for its termination */
  27. post_data_request(r, node);
  28. return 0;
  29. }
  30. struct state_and_node {
  31. starpu_data_handle state;
  32. starpu_access_mode mode;
  33. unsigned node;
  34. pthread_cond_t cond;
  35. pthread_mutex_t lock;
  36. unsigned finished;
  37. unsigned async;
  38. unsigned non_blocking;
  39. void (*callback)(void *);
  40. void *callback_arg;
  41. };
  42. /* put the current value of the data into RAM */
  43. static inline void _starpu_sync_data_with_mem_continuation(void *arg)
  44. {
  45. int ret;
  46. struct state_and_node *statenode = arg;
  47. starpu_data_handle handle = statenode->state;
  48. unsigned r = (statenode->mode != STARPU_W);
  49. unsigned w = (statenode->mode != STARPU_R);
  50. ret = fetch_data_on_node(handle, 0, r, w, 0);
  51. STARPU_ASSERT(!ret);
  52. if (statenode->non_blocking)
  53. {
  54. /* continuation of starpu_sync_data_with_mem_non_blocking: we
  55. * execute the callback if any */
  56. if (statenode->callback)
  57. statenode->callback(statenode->callback_arg);
  58. }
  59. else {
  60. /* continuation of starpu_sync_data_with_mem */
  61. pthread_mutex_lock(&statenode->lock);
  62. statenode->finished = 1;
  63. pthread_cond_signal(&statenode->cond);
  64. pthread_mutex_unlock(&statenode->lock);
  65. }
  66. }
  67. /* The data must be released by calling starpu_release_data_from_mem later on */
  68. int starpu_sync_data_with_mem(starpu_data_handle handle, starpu_access_mode mode)
  69. {
  70. /* it is forbidden to call this function from a callback or a codelet */
  71. if (STARPU_UNLIKELY(!worker_may_perform_blocking_calls()))
  72. return -EDEADLK;
  73. struct state_and_node statenode =
  74. {
  75. .state = handle,
  76. .mode = mode,
  77. .node = 0, // unused
  78. .non_blocking = 0,
  79. .cond = PTHREAD_COND_INITIALIZER,
  80. .lock = PTHREAD_MUTEX_INITIALIZER,
  81. .finished = 0
  82. };
  83. /* we try to get the data, if we do not succeed immediately, we set a
  84. * callback function that will be executed automatically when the data is
  85. * available again, otherwise we fetch the data directly */
  86. if (!attempt_to_submit_data_request_from_apps(handle, mode,
  87. _starpu_sync_data_with_mem_continuation, &statenode))
  88. {
  89. /* no one has locked this data yet, so we proceed immediately */
  90. _starpu_sync_data_with_mem_continuation(&statenode);
  91. }
  92. else {
  93. pthread_mutex_lock(&statenode.lock);
  94. if (!statenode.finished)
  95. pthread_cond_wait(&statenode.cond, &statenode.lock);
  96. pthread_mutex_unlock(&statenode.lock);
  97. }
  98. return 0;
  99. }
  100. /* The data must be released by calling starpu_release_data_from_mem later on */
  101. int starpu_sync_data_with_mem_non_blocking(starpu_data_handle handle,
  102. starpu_access_mode mode, void (*callback)(void *), void *arg)
  103. {
  104. struct state_and_node statenode =
  105. {
  106. .state = handle,
  107. .mode = mode,
  108. .node = 0, // unused
  109. .non_blocking = 1,
  110. .callback = callback,
  111. .callback_arg = arg,
  112. .cond = PTHREAD_COND_INITIALIZER,
  113. .lock = PTHREAD_MUTEX_INITIALIZER,
  114. .finished = 0
  115. };
  116. /* we try to get the data, if we do not succeed immediately, we set a
  117. * callback function that will be executed automatically when the data is
  118. * available again, otherwise we fetch the data directly */
  119. if (!attempt_to_submit_data_request_from_apps(handle, mode,
  120. _starpu_sync_data_with_mem_continuation, &statenode))
  121. {
  122. /* no one has locked this data yet, so we proceed immediately */
  123. _starpu_sync_data_with_mem_continuation(&statenode);
  124. }
  125. return 0;
  126. }
  127. /* This function must be called after starpu_sync_data_with_mem so that the
  128. * application release the data */
  129. void starpu_release_data_from_mem(starpu_data_handle handle)
  130. {
  131. /* The application can now release the rw-lock */
  132. release_data_on_node(handle, 0, 0);
  133. }
  134. static void _prefetch_data_on_node(void *arg)
  135. {
  136. struct state_and_node *statenode = arg;
  137. fetch_data_on_node(statenode->state, statenode->node, 1, 0, statenode->async);
  138. pthread_mutex_lock(&statenode->lock);
  139. statenode->finished = 1;
  140. pthread_cond_signal(&statenode->cond);
  141. pthread_mutex_unlock(&statenode->lock);
  142. if (!statenode->async)
  143. {
  144. starpu_spin_lock(&statenode->state->header_lock);
  145. notify_data_dependencies(statenode->state);
  146. starpu_spin_unlock(&statenode->state->header_lock);
  147. }
  148. }
  149. int starpu_prefetch_data_on_node(starpu_data_handle handle, unsigned node, unsigned async)
  150. {
  151. /* it is forbidden to call this function from a callback or a codelet */
  152. if (STARPU_UNLIKELY(!worker_may_perform_blocking_calls()))
  153. return -EDEADLK;
  154. struct state_and_node statenode =
  155. {
  156. .state = handle,
  157. .node = node,
  158. .async = async,
  159. .cond = PTHREAD_COND_INITIALIZER,
  160. .lock = PTHREAD_MUTEX_INITIALIZER,
  161. .finished = 0
  162. };
  163. if (!attempt_to_submit_data_request_from_apps(handle, STARPU_R, _prefetch_data_on_node, &statenode))
  164. {
  165. /* we can immediately proceed */
  166. fetch_data_on_node(handle, node, 1, 0, async);
  167. /* remove the "lock"/reference */
  168. if (!async)
  169. {
  170. starpu_spin_lock(&handle->header_lock);
  171. notify_data_dependencies(handle);
  172. starpu_spin_unlock(&handle->header_lock);
  173. }
  174. }
  175. else {
  176. pthread_mutex_lock(&statenode.lock);
  177. if (!statenode.finished)
  178. pthread_cond_wait(&statenode.cond, &statenode.lock);
  179. pthread_mutex_unlock(&statenode.lock);
  180. }
  181. return 0;
  182. }