user_interactions.c 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. /*
  2. * StarPU
  3. * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <common/config.h>
  17. #include <datawizard/coherency.h>
  18. #include <datawizard/copy-driver.h>
  19. #include <datawizard/write_back.h>
  20. #include <core/dependencies/data-concurrency.h>
  21. int starpu_request_data_allocation(starpu_data_handle handle, uint32_t node)
  22. {
  23. starpu_data_request_t r;
  24. STARPU_ASSERT(handle);
  25. r = _starpu_create_data_request(handle, 0, node, node, 0, 0, 1);
  26. /* we do not increase the refcnt associated to the request since we are
  27. * not waiting for its termination */
  28. _starpu_post_data_request(r, node);
  29. return 0;
  30. }
  31. struct state_and_node {
  32. starpu_data_handle state;
  33. starpu_access_mode mode;
  34. unsigned node;
  35. pthread_cond_t cond;
  36. pthread_mutex_t lock;
  37. unsigned finished;
  38. unsigned async;
  39. unsigned non_blocking;
  40. void (*callback)(void *);
  41. void *callback_arg;
  42. };
  43. /* put the current value of the data into STARPU_RAM */
  44. static inline void _starpu_sync_data_with_mem_continuation(void *arg)
  45. {
  46. int ret;
  47. struct state_and_node *statenode = arg;
  48. starpu_data_handle handle = statenode->state;
  49. STARPU_ASSERT(handle);
  50. unsigned r = (statenode->mode != STARPU_W);
  51. unsigned w = (statenode->mode != STARPU_R);
  52. ret = _starpu_fetch_data_on_node(handle, 0, r, w, 0);
  53. STARPU_ASSERT(!ret);
  54. if (statenode->non_blocking)
  55. {
  56. /* continuation of starpu_sync_data_with_mem_non_blocking: we
  57. * execute the callback if any */
  58. if (statenode->callback)
  59. statenode->callback(statenode->callback_arg);
  60. free(statenode);
  61. }
  62. else {
  63. /* continuation of starpu_sync_data_with_mem */
  64. pthread_mutex_lock(&statenode->lock);
  65. statenode->finished = 1;
  66. pthread_cond_signal(&statenode->cond);
  67. pthread_mutex_unlock(&statenode->lock);
  68. }
  69. }
  70. /* The data must be released by calling starpu_release_data_from_mem later on */
  71. int starpu_sync_data_with_mem(starpu_data_handle handle, starpu_access_mode mode)
  72. {
  73. STARPU_ASSERT(handle);
  74. /* it is forbidden to call this function from a callback or a codelet */
  75. if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
  76. return -EDEADLK;
  77. struct state_and_node statenode =
  78. {
  79. .state = handle,
  80. .mode = mode,
  81. .node = 0, // unused
  82. .non_blocking = 0,
  83. .cond = PTHREAD_COND_INITIALIZER,
  84. .lock = PTHREAD_MUTEX_INITIALIZER,
  85. .finished = 0
  86. };
  87. /* we try to get the data, if we do not succeed immediately, we set a
  88. * callback function that will be executed automatically when the data is
  89. * available again, otherwise we fetch the data directly */
  90. if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode,
  91. _starpu_sync_data_with_mem_continuation, &statenode))
  92. {
  93. /* no one has locked this data yet, so we proceed immediately */
  94. _starpu_sync_data_with_mem_continuation(&statenode);
  95. }
  96. else {
  97. pthread_mutex_lock(&statenode.lock);
  98. if (!statenode.finished)
  99. pthread_cond_wait(&statenode.cond, &statenode.lock);
  100. pthread_mutex_unlock(&statenode.lock);
  101. }
  102. return 0;
  103. }
  104. /* The data must be released by calling starpu_release_data_from_mem later on */
  105. int starpu_sync_data_with_mem_non_blocking(starpu_data_handle handle,
  106. starpu_access_mode mode, void (*callback)(void *), void *arg)
  107. {
  108. STARPU_ASSERT(handle);
  109. struct state_and_node *statenode = malloc(sizeof(struct state_and_node));
  110. STARPU_ASSERT(statenode);
  111. statenode->state = handle;
  112. statenode->mode = mode;
  113. statenode->non_blocking = 1;
  114. statenode->callback = callback;
  115. statenode->callback_arg = arg;
  116. pthread_cond_init(&statenode->cond, NULL);
  117. pthread_mutex_init(&statenode->lock, NULL);
  118. statenode->finished = 0;
  119. /* we try to get the data, if we do not succeed immediately, we set a
  120. * callback function that will be executed automatically when the data is
  121. * available again, otherwise we fetch the data directly */
  122. if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode,
  123. _starpu_sync_data_with_mem_continuation, statenode))
  124. {
  125. /* no one has locked this data yet, so we proceed immediately */
  126. _starpu_sync_data_with_mem_continuation(statenode);
  127. }
  128. return 0;
  129. }
  130. /* This function must be called after starpu_sync_data_with_mem so that the
  131. * application release the data */
  132. void starpu_release_data_from_mem(starpu_data_handle handle)
  133. {
  134. STARPU_ASSERT(handle);
  135. /* The application can now release the rw-lock */
  136. _starpu_release_data_on_node(handle, 0, 0);
  137. }
  138. static void _prefetch_data_on_node(void *arg)
  139. {
  140. struct state_and_node *statenode = arg;
  141. _starpu_fetch_data_on_node(statenode->state, statenode->node, 1, 0, statenode->async);
  142. pthread_mutex_lock(&statenode->lock);
  143. statenode->finished = 1;
  144. pthread_cond_signal(&statenode->cond);
  145. pthread_mutex_unlock(&statenode->lock);
  146. if (!statenode->async)
  147. {
  148. _starpu_spin_lock(&statenode->state->header_lock);
  149. _starpu_notify_data_dependencies(statenode->state);
  150. _starpu_spin_unlock(&statenode->state->header_lock);
  151. }
  152. }
  153. int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle handle, unsigned node, unsigned async, starpu_access_mode mode)
  154. {
  155. STARPU_ASSERT(handle);
  156. /* it is forbidden to call this function from a callback or a codelet */
  157. if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
  158. return -EDEADLK;
  159. struct state_and_node statenode =
  160. {
  161. .state = handle,
  162. .node = node,
  163. .async = async,
  164. .cond = PTHREAD_COND_INITIALIZER,
  165. .lock = PTHREAD_MUTEX_INITIALIZER,
  166. .finished = 0
  167. };
  168. if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _prefetch_data_on_node, &statenode))
  169. {
  170. /* we can immediately proceed */
  171. uint8_t read = (mode != STARPU_W);
  172. uint8_t write = (mode != STARPU_R);
  173. _starpu_fetch_data_on_node(handle, node, read, write, async);
  174. /* remove the "lock"/reference */
  175. if (!async)
  176. {
  177. _starpu_spin_lock(&handle->header_lock);
  178. _starpu_notify_data_dependencies(handle);
  179. _starpu_spin_unlock(&handle->header_lock);
  180. }
  181. }
  182. else {
  183. pthread_mutex_lock(&statenode.lock);
  184. if (!statenode.finished)
  185. pthread_cond_wait(&statenode.cond, &statenode.lock);
  186. pthread_mutex_unlock(&statenode.lock);
  187. }
  188. return 0;
  189. }
  190. int starpu_prefetch_data_on_node(starpu_data_handle handle, unsigned node, unsigned async)
  191. {
  192. return _starpu_prefetch_data_on_node_with_mode(handle, node, async, STARPU_R);
  193. }