user_interactions.c 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. /*
  2. * StarPU
  3. * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <common/config.h>
  17. #include <common/utils.h>
  18. #include <datawizard/coherency.h>
  19. #include <datawizard/copy_driver.h>
  20. #include <datawizard/write_back.h>
  21. #include <core/dependencies/data_concurrency.h>
  22. int starpu_data_request_allocation(starpu_data_handle handle, uint32_t node)
  23. {
  24. starpu_data_request_t r;
  25. STARPU_ASSERT(handle);
  26. r = _starpu_create_data_request(handle, 0, node, node, 0, 0, 1);
  27. /* we do not increase the refcnt associated to the request since we are
  28. * not waiting for its termination */
  29. _starpu_post_data_request(r, node);
  30. return 0;
  31. }
  32. struct state_and_node {
  33. starpu_data_handle state;
  34. starpu_access_mode mode;
  35. unsigned node;
  36. pthread_cond_t cond;
  37. pthread_mutex_t lock;
  38. unsigned finished;
  39. unsigned async;
  40. void (*callback)(void *);
  41. void *callback_arg;
  42. };
  43. /* put the current value of the data into RAM */
  44. static inline void _starpu_sync_data_with_mem_continuation_non_blocking(void *arg)
  45. {
  46. int ret;
  47. struct state_and_node *statenode = arg;
  48. starpu_data_handle handle = statenode->state;
  49. STARPU_ASSERT(handle);
  50. unsigned r = (statenode->mode & STARPU_R);
  51. unsigned w = (statenode->mode & STARPU_W);
  52. ret = _starpu_fetch_data_on_node(handle, 0, r, w, 0);
  53. STARPU_ASSERT(!ret);
  54. /* continuation of starpu_data_sync_with_mem_non_blocking: we
  55. * execute the callback if any */
  56. if (statenode->callback)
  57. statenode->callback(statenode->callback_arg);
  58. free(statenode);
  59. }
  60. /* The data must be released by calling starpu_data_release_from_mem later on */
  61. int starpu_data_sync_with_mem(starpu_data_handle handle, starpu_access_mode mode)
  62. {
  63. STARPU_ASSERT(handle);
  64. /* it is forbidden to call this function from a callback or a codelet */
  65. if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
  66. return -EDEADLK;
  67. struct state_and_node statenode =
  68. {
  69. .state = handle,
  70. .mode = mode,
  71. .node = 0, // unused
  72. .cond = PTHREAD_COND_INITIALIZER,
  73. .lock = PTHREAD_MUTEX_INITIALIZER,
  74. .finished = 0
  75. };
  76. /* we try to get the data, if we do not succeed immediately, we set a
  77. * callback function that will be executed automatically when the data is
  78. * available again, otherwise we fetch the data directly */
  79. if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode,
  80. _starpu_sync_data_with_mem_continuation, &statenode))
  81. {
  82. /* no one has locked this data yet, so we proceed immediately */
  83. _starpu_sync_data_with_mem_continuation(&statenode);
  84. }
  85. else {
  86. PTHREAD_MUTEX_LOCK(&statenode.lock);
  87. while (!statenode.finished)
  88. PTHREAD_COND_WAIT(&statenode.cond, &statenode.lock);
  89. PTHREAD_MUTEX_UNLOCK(&statenode.lock);
  90. }
  91. return 0;
  92. }
  93. static inline void _starpu_sync_data_with_mem_continuation(void *arg)
  94. {
  95. int ret;
  96. struct state_and_node *statenode = arg;
  97. starpu_data_handle handle = statenode->state;
  98. STARPU_ASSERT(handle);
  99. unsigned r = (statenode->mode & STARPU_R);
  100. unsigned w = (statenode->mode & STARPU_W);
  101. ret = _starpu_fetch_data_on_node(handle, 0, r, w, 0);
  102. STARPU_ASSERT(!ret);
  103. /* continuation of starpu_data_sync_with_mem */
  104. PTHREAD_MUTEX_LOCK(&statenode->lock);
  105. statenode->finished = 1;
  106. PTHREAD_COND_SIGNAL(&statenode->cond);
  107. PTHREAD_MUTEX_UNLOCK(&statenode->lock);
  108. }
  109. /* The data must be released by calling starpu_data_release_from_mem later on */
  110. int starpu_data_sync_with_mem_non_blocking(starpu_data_handle handle,
  111. starpu_access_mode mode, void (*callback)(void *), void *arg)
  112. {
  113. STARPU_ASSERT(handle);
  114. struct state_and_node *statenode = malloc(sizeof(struct state_and_node));
  115. STARPU_ASSERT(statenode);
  116. statenode->state = handle;
  117. statenode->mode = mode;
  118. statenode->callback = callback;
  119. statenode->callback_arg = arg;
  120. PTHREAD_COND_INIT(&statenode->cond, NULL);
  121. PTHREAD_MUTEX_INIT(&statenode->lock, NULL);
  122. statenode->finished = 0;
  123. /* we try to get the data, if we do not succeed immediately, we set a
  124. * callback function that will be executed automatically when the data is
  125. * available again, otherwise we fetch the data directly */
  126. if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode,
  127. _starpu_sync_data_with_mem_continuation_non_blocking, statenode))
  128. {
  129. /* no one has locked this data yet, so we proceed immediately */
  130. _starpu_sync_data_with_mem_continuation_non_blocking(statenode);
  131. }
  132. return 0;
  133. }
  134. /* This function must be called after starpu_data_sync_with_mem so that the
  135. * application release the data */
  136. void starpu_data_release_from_mem(starpu_data_handle handle)
  137. {
  138. STARPU_ASSERT(handle);
  139. /* The application can now release the rw-lock */
  140. _starpu_release_data_on_node(handle, 0, 0);
  141. }
  142. static void _prefetch_data_on_node(void *arg)
  143. {
  144. struct state_and_node *statenode = arg;
  145. _starpu_fetch_data_on_node(statenode->state, statenode->node, 1, 0, statenode->async);
  146. PTHREAD_MUTEX_LOCK(&statenode->lock);
  147. statenode->finished = 1;
  148. PTHREAD_COND_SIGNAL(&statenode->cond);
  149. PTHREAD_MUTEX_UNLOCK(&statenode->lock);
  150. if (!statenode->async)
  151. {
  152. _starpu_spin_lock(&statenode->state->header_lock);
  153. _starpu_notify_data_dependencies(statenode->state);
  154. _starpu_spin_unlock(&statenode->state->header_lock);
  155. }
  156. }
  157. int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle handle, unsigned node, unsigned async, starpu_access_mode mode)
  158. {
  159. STARPU_ASSERT(handle);
  160. /* it is forbidden to call this function from a callback or a codelet */
  161. if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
  162. return -EDEADLK;
  163. struct state_and_node statenode =
  164. {
  165. .state = handle,
  166. .node = node,
  167. .async = async,
  168. .cond = PTHREAD_COND_INITIALIZER,
  169. .lock = PTHREAD_MUTEX_INITIALIZER,
  170. .finished = 0
  171. };
  172. if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _prefetch_data_on_node, &statenode))
  173. {
  174. /* we can immediately proceed */
  175. uint8_t read = (mode & STARPU_R);
  176. uint8_t write = (mode & STARPU_W);
  177. _starpu_fetch_data_on_node(handle, node, read, write, async);
  178. /* remove the "lock"/reference */
  179. if (!async)
  180. {
  181. _starpu_spin_lock(&handle->header_lock);
  182. _starpu_notify_data_dependencies(handle);
  183. _starpu_spin_unlock(&handle->header_lock);
  184. }
  185. }
  186. else {
  187. PTHREAD_MUTEX_LOCK(&statenode.lock);
  188. while (!statenode.finished)
  189. PTHREAD_COND_WAIT(&statenode.cond, &statenode.lock);
  190. PTHREAD_MUTEX_UNLOCK(&statenode.lock);
  191. }
  192. return 0;
  193. }
  194. int starpu_data_prefetch_on_node(starpu_data_handle handle, unsigned node, unsigned async)
  195. {
  196. return _starpu_prefetch_data_on_node_with_mode(handle, node, async, STARPU_R);
  197. }
  198. /*
  199. * It is possible to specify that a piece of data can be discarded without
  200. * impacting the application.
  201. */
  202. void starpu_data_advise_as_important(starpu_data_handle handle, unsigned is_important)
  203. {
  204. _starpu_spin_lock(&handle->header_lock);
  205. /* first take all the children lock (in order !) */
  206. unsigned child;
  207. for (child = 0; child < handle->nchildren; child++)
  208. {
  209. /* make sure the intermediate children is advised as well */
  210. struct starpu_data_state_t *child_handle = &handle->children[child];
  211. if (child_handle->nchildren > 0)
  212. starpu_data_advise_as_important(child_handle, is_important);
  213. }
  214. handle->is_not_important = !is_important;
  215. /* now the parent may be used again so we release the lock */
  216. _starpu_spin_unlock(&handle->header_lock);
  217. }
  218. void starpu_data_set_sequential_consistency_flag(starpu_data_handle handle, unsigned flag)
  219. {
  220. _starpu_spin_lock(&handle->header_lock);
  221. unsigned child;
  222. for (child = 0; child < handle->nchildren; child++)
  223. {
  224. /* make sure that the flags are applied to the children as well */
  225. struct starpu_data_state_t *child_handle = &handle->children[child];
  226. if (child_handle->nchildren > 0)
  227. starpu_data_set_sequential_consistency_flag(child_handle, flag);
  228. }
  229. PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
  230. handle->sequential_consistency = flag;
  231. PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  232. _starpu_spin_unlock(&handle->header_lock);
  233. }