user_interactions.c 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. /*
  2. * StarPU
  3. * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <common/config.h>
  17. #include <common/utils.h>
  18. #include <datawizard/coherency.h>
  19. #include <datawizard/copy_driver.h>
  20. #include <datawizard/write_back.h>
  21. #include <core/dependencies/data_concurrency.h>
  22. int starpu_request_data_allocation(starpu_data_handle handle, uint32_t node)
  23. {
  24. starpu_data_request_t r;
  25. STARPU_ASSERT(handle);
  26. r = _starpu_create_data_request(handle, 0, node, node, 0, 0, 1);
  27. /* we do not increase the refcnt associated to the request since we are
  28. * not waiting for its termination */
  29. _starpu_post_data_request(r, node);
  30. return 0;
  31. }
  32. struct state_and_node {
  33. starpu_data_handle state;
  34. starpu_access_mode mode;
  35. unsigned node;
  36. pthread_cond_t cond;
  37. pthread_mutex_t lock;
  38. unsigned finished;
  39. unsigned async;
  40. unsigned non_blocking;
  41. void (*callback)(void *);
  42. void *callback_arg;
  43. };
  44. /* put the current value of the data into RAM */
  45. static inline void _starpu_sync_data_with_mem_continuation(void *arg)
  46. {
  47. int ret;
  48. struct state_and_node *statenode = arg;
  49. starpu_data_handle handle = statenode->state;
  50. STARPU_ASSERT(handle);
  51. unsigned r = (statenode->mode != STARPU_W);
  52. unsigned w = (statenode->mode != STARPU_R);
  53. ret = _starpu_fetch_data_on_node(handle, 0, r, w, 0);
  54. STARPU_ASSERT(!ret);
  55. if (statenode->non_blocking)
  56. {
  57. /* continuation of starpu_sync_data_with_mem_non_blocking: we
  58. * execute the callback if any */
  59. if (statenode->callback)
  60. statenode->callback(statenode->callback_arg);
  61. free(statenode);
  62. }
  63. else {
  64. /* continuation of starpu_sync_data_with_mem */
  65. PTHREAD_MUTEX_LOCK(&statenode->lock);
  66. statenode->finished = 1;
  67. PTHREAD_COND_SIGNAL(&statenode->cond);
  68. PTHREAD_MUTEX_UNLOCK(&statenode->lock);
  69. }
  70. }
  71. /* The data must be released by calling starpu_release_data_from_mem later on */
  72. int starpu_sync_data_with_mem(starpu_data_handle handle, starpu_access_mode mode)
  73. {
  74. STARPU_ASSERT(handle);
  75. /* it is forbidden to call this function from a callback or a codelet */
  76. if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
  77. return -EDEADLK;
  78. struct state_and_node statenode =
  79. {
  80. .state = handle,
  81. .mode = mode,
  82. .node = 0, // unused
  83. .non_blocking = 0,
  84. .cond = PTHREAD_COND_INITIALIZER,
  85. .lock = PTHREAD_MUTEX_INITIALIZER,
  86. .finished = 0
  87. };
  88. /* we try to get the data, if we do not succeed immediately, we set a
  89. * callback function that will be executed automatically when the data is
  90. * available again, otherwise we fetch the data directly */
  91. if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode,
  92. _starpu_sync_data_with_mem_continuation, &statenode))
  93. {
  94. /* no one has locked this data yet, so we proceed immediately */
  95. _starpu_sync_data_with_mem_continuation(&statenode);
  96. }
  97. else {
  98. PTHREAD_MUTEX_LOCK(&statenode.lock);
  99. while (!statenode.finished)
  100. PTHREAD_COND_WAIT(&statenode.cond, &statenode.lock);
  101. PTHREAD_MUTEX_UNLOCK(&statenode.lock);
  102. }
  103. return 0;
  104. }
  105. /* The data must be released by calling starpu_release_data_from_mem later on */
  106. int starpu_sync_data_with_mem_non_blocking(starpu_data_handle handle,
  107. starpu_access_mode mode, void (*callback)(void *), void *arg)
  108. {
  109. STARPU_ASSERT(handle);
  110. struct state_and_node *statenode = malloc(sizeof(struct state_and_node));
  111. STARPU_ASSERT(statenode);
  112. statenode->state = handle;
  113. statenode->mode = mode;
  114. statenode->non_blocking = 1;
  115. statenode->callback = callback;
  116. statenode->callback_arg = arg;
  117. PTHREAD_COND_INIT(&statenode->cond, NULL);
  118. PTHREAD_MUTEX_INIT(&statenode->lock, NULL);
  119. statenode->finished = 0;
  120. /* we try to get the data, if we do not succeed immediately, we set a
  121. * callback function that will be executed automatically when the data is
  122. * available again, otherwise we fetch the data directly */
  123. if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode,
  124. _starpu_sync_data_with_mem_continuation, statenode))
  125. {
  126. /* no one has locked this data yet, so we proceed immediately */
  127. _starpu_sync_data_with_mem_continuation(statenode);
  128. }
  129. return 0;
  130. }
  131. /* This function must be called after starpu_sync_data_with_mem so that the
  132. * application release the data */
  133. void starpu_release_data_from_mem(starpu_data_handle handle)
  134. {
  135. STARPU_ASSERT(handle);
  136. /* The application can now release the rw-lock */
  137. _starpu_release_data_on_node(handle, 0, 0);
  138. }
  139. static void _prefetch_data_on_node(void *arg)
  140. {
  141. struct state_and_node *statenode = arg;
  142. _starpu_fetch_data_on_node(statenode->state, statenode->node, 1, 0, statenode->async);
  143. PTHREAD_MUTEX_LOCK(&statenode->lock);
  144. statenode->finished = 1;
  145. PTHREAD_COND_SIGNAL(&statenode->cond);
  146. PTHREAD_MUTEX_UNLOCK(&statenode->lock);
  147. if (!statenode->async)
  148. {
  149. _starpu_spin_lock(&statenode->state->header_lock);
  150. _starpu_notify_data_dependencies(statenode->state);
  151. _starpu_spin_unlock(&statenode->state->header_lock);
  152. }
  153. }
  154. int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle handle, unsigned node, unsigned async, starpu_access_mode mode)
  155. {
  156. STARPU_ASSERT(handle);
  157. /* it is forbidden to call this function from a callback or a codelet */
  158. if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
  159. return -EDEADLK;
  160. struct state_and_node statenode =
  161. {
  162. .state = handle,
  163. .node = node,
  164. .async = async,
  165. .cond = PTHREAD_COND_INITIALIZER,
  166. .lock = PTHREAD_MUTEX_INITIALIZER,
  167. .finished = 0
  168. };
  169. if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _prefetch_data_on_node, &statenode))
  170. {
  171. /* we can immediately proceed */
  172. uint8_t read = (mode != STARPU_W);
  173. uint8_t write = (mode != STARPU_R);
  174. _starpu_fetch_data_on_node(handle, node, read, write, async);
  175. /* remove the "lock"/reference */
  176. if (!async)
  177. {
  178. _starpu_spin_lock(&handle->header_lock);
  179. _starpu_notify_data_dependencies(handle);
  180. _starpu_spin_unlock(&handle->header_lock);
  181. }
  182. }
  183. else {
  184. PTHREAD_MUTEX_LOCK(&statenode.lock);
  185. while (!statenode.finished)
  186. PTHREAD_COND_WAIT(&statenode.cond, &statenode.lock);
  187. PTHREAD_MUTEX_UNLOCK(&statenode.lock);
  188. }
  189. return 0;
  190. }
  191. int starpu_prefetch_data_on_node(starpu_data_handle handle, unsigned node, unsigned async)
  192. {
  193. return _starpu_prefetch_data_on_node_with_mode(handle, node, async, STARPU_R);
  194. }
  195. /*
  196. * It is possible to specify that a piece of data can be discarded without
  197. * impacting the application.
  198. */
  199. void starpu_advise_if_data_is_important(starpu_data_handle handle, unsigned is_important)
  200. {
  201. _starpu_spin_lock(&handle->header_lock);
  202. /* first take all the children lock (in order !) */
  203. unsigned child;
  204. for (child = 0; child < handle->nchildren; child++)
  205. {
  206. /* make sure the intermediate children is advised as well */
  207. struct starpu_data_state_t *child_handle = &handle->children[child];
  208. if (child_handle->nchildren > 0)
  209. starpu_advise_if_data_is_important(child_handle, is_important);
  210. }
  211. handle->is_not_important = !is_important;
  212. /* now the parent may be used again so we release the lock */
  213. _starpu_spin_unlock(&handle->header_lock);
  214. }
  215. void starpu_data_set_sequential_consistency_flag(starpu_data_handle handle, unsigned flag)
  216. {
  217. _starpu_spin_lock(&handle->header_lock);
  218. unsigned child;
  219. for (child = 0; child < handle->nchildren; child++)
  220. {
  221. /* make sure that the flags are applied to the children as well */
  222. struct starpu_data_state_t *child_handle = &handle->children[child];
  223. if (child_handle->nchildren > 0)
  224. starpu_data_set_sequential_consistency_flag(child_handle, flag);
  225. }
  226. PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
  227. handle->sequential_consistency = flag;
  228. PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  229. _starpu_spin_unlock(&handle->header_lock);
  230. }