data_request.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. /*
  2. * StarPU
  3. * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <starpu.h>
  17. #include <common/config.h>
  18. #include <datawizard/datawizard.h>
  19. /* requests that have not been treated at all */
  20. static starpu_data_request_list_t data_requests[STARPU_MAXNODES];
  21. static pthread_cond_t data_requests_list_cond[STARPU_MAXNODES];
  22. static pthread_mutex_t data_requests_list_mutex[STARPU_MAXNODES];
  23. /* requests that are not terminated (eg. async transfers) */
  24. static starpu_data_request_list_t data_requests_pending[STARPU_MAXNODES];
  25. static pthread_cond_t data_requests_pending_list_cond[STARPU_MAXNODES];
  26. static pthread_mutex_t data_requests_pending_list_mutex[STARPU_MAXNODES];
  27. void _starpu_init_data_request_lists(void)
  28. {
  29. unsigned i;
  30. for (i = 0; i < STARPU_MAXNODES; i++)
  31. {
  32. data_requests[i] = starpu_data_request_list_new();
  33. PTHREAD_MUTEX_INIT(&data_requests_list_mutex[i], NULL);
  34. PTHREAD_COND_INIT(&data_requests_list_cond[i], NULL);
  35. data_requests_pending[i] = starpu_data_request_list_new();
  36. PTHREAD_MUTEX_INIT(&data_requests_pending_list_mutex[i], NULL);
  37. PTHREAD_COND_INIT(&data_requests_pending_list_cond[i], NULL);
  38. }
  39. }
  40. void _starpu_deinit_data_request_lists(void)
  41. {
  42. unsigned i;
  43. for (i = 0; i < STARPU_MAXNODES; i++)
  44. {
  45. PTHREAD_COND_DESTROY(&data_requests_pending_list_cond[i]);
  46. PTHREAD_MUTEX_DESTROY(&data_requests_pending_list_mutex[i]);
  47. starpu_data_request_list_delete(data_requests_pending[i]);
  48. PTHREAD_COND_DESTROY(&data_requests_list_cond[i]);
  49. PTHREAD_MUTEX_DESTROY(&data_requests_list_mutex[i]);
  50. starpu_data_request_list_delete(data_requests[i]);
  51. }
  52. }
  53. /* this should be called with the lock r->handle->header_lock taken */
  54. static void starpu_data_request_destroy(starpu_data_request_t r)
  55. {
  56. r->handle->per_node[r->dst_node].request = NULL;
  57. starpu_data_request_delete(r);
  58. }
  59. /* handle->lock should already be taken ! */
  60. starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle, uint32_t src_node, uint32_t dst_node, uint32_t handling_node, starpu_access_mode mode, unsigned is_prefetch)
  61. {
  62. starpu_data_request_t r = starpu_data_request_new();
  63. _starpu_spin_init(&r->lock);
  64. r->handle = handle;
  65. r->src_node = src_node;
  66. r->dst_node = dst_node;
  67. r->mode = mode;
  68. r->handling_node = handling_node;
  69. r->completed = 0;
  70. r->retval = -1;
  71. r->next_req_count = 0;
  72. r->callbacks = NULL;
  73. r->is_a_prefetch_request = is_prefetch;
  74. /* associate that request with the handle so that further similar
  75. * requests will reuse that one */
  76. _starpu_spin_lock(&r->lock);
  77. handle->per_node[dst_node].request = r;
  78. handle->per_node[dst_node].refcnt++;
  79. if (mode & STARPU_R)
  80. handle->per_node[src_node].refcnt++;
  81. r->refcnt = 1;
  82. _starpu_spin_unlock(&r->lock);
  83. return r;
  84. }
  85. /* handle->lock should be taken. r is returned locked */
  86. starpu_data_request_t _starpu_search_existing_data_request(struct starpu_data_replicate_s *replicate, starpu_access_mode mode)
  87. {
  88. starpu_data_request_t r = replicate->request;
  89. if (r)
  90. {
  91. _starpu_spin_lock(&r->lock);
  92. /* perhaps we need to "upgrade" the request */
  93. if (mode & STARPU_R)
  94. {
  95. /* in case the exisiting request did not imply a memory
  96. * transfer yet, we have to increment the refcnt now
  97. * (so that the source remains valid) */
  98. if (!(r->mode & STARPU_R))
  99. replicate->refcnt++;
  100. r->mode |= STARPU_R;
  101. }
  102. if (mode & STARPU_W)
  103. r->mode |= STARPU_W;
  104. }
  105. return r;
  106. }
  107. int _starpu_wait_data_request_completion(starpu_data_request_t r, unsigned may_alloc)
  108. {
  109. int retval;
  110. int do_delete = 0;
  111. uint32_t local_node = _starpu_get_local_memory_node();
  112. do {
  113. _starpu_spin_lock(&r->lock);
  114. if (r->completed)
  115. break;
  116. _starpu_spin_unlock(&r->lock);
  117. #ifndef STARPU_NON_BLOCKING_DRIVERS
  118. _starpu_wake_all_blocked_workers_on_node(r->handling_node);
  119. #endif
  120. _starpu_datawizard_progress(local_node, may_alloc);
  121. } while (1);
  122. retval = r->retval;
  123. if (retval)
  124. _STARPU_DISP("REQUEST %p COMPLETED (retval %d) !\n", r, r->retval);
  125. r->refcnt--;
  126. /* if nobody is waiting on that request, we can get rid of it */
  127. if (r->refcnt == 0)
  128. do_delete = 1;
  129. _starpu_spin_unlock(&r->lock);
  130. if (do_delete)
  131. starpu_data_request_destroy(r);
  132. return retval;
  133. }
  134. /* this is non blocking */
  135. void _starpu_post_data_request(starpu_data_request_t r, uint32_t handling_node)
  136. {
  137. // _STARPU_DEBUG("POST REQUEST\n");
  138. if (r->mode & STARPU_R)
  139. {
  140. STARPU_ASSERT(r->handle->per_node[r->src_node].allocated);
  141. STARPU_ASSERT(r->handle->per_node[r->src_node].refcnt);
  142. }
  143. /* insert the request in the proper list */
  144. PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node]);
  145. starpu_data_request_list_push_front(data_requests[handling_node], r);
  146. PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
  147. #ifndef STARPU_NON_BLOCKING_DRIVERS
  148. _starpu_wake_all_blocked_workers_on_node(handling_node);
  149. #endif
  150. }
  151. /* We assume that r->lock is taken by the caller */
  152. void _starpu_data_request_append_callback(starpu_data_request_t r, void (*callback_func)(void *), void *callback_arg)
  153. {
  154. STARPU_ASSERT(r);
  155. if (callback_func)
  156. {
  157. struct callback_list *link = malloc(sizeof(struct callback_list));
  158. STARPU_ASSERT(link);
  159. link->callback_func = callback_func;
  160. link->callback_arg = callback_arg;
  161. link->next = r->callbacks;
  162. r->callbacks = link;
  163. }
  164. }
  165. static void starpu_handle_data_request_completion(starpu_data_request_t r)
  166. {
  167. unsigned do_delete = 0;
  168. starpu_data_handle handle = r->handle;
  169. uint32_t src_node = r->src_node;
  170. uint32_t dst_node = r->dst_node;
  171. _starpu_update_data_state(handle, dst_node, r->mode);
  172. #ifdef STARPU_USE_FXT
  173. size_t size = _starpu_data_get_size(handle);
  174. STARPU_TRACE_END_DRIVER_COPY(src_node, dst_node, size, r->com_id);
  175. #endif
  176. unsigned chained_req;
  177. for (chained_req = 0; chained_req < r->next_req_count; chained_req++)
  178. {
  179. _starpu_post_data_request(r->next_req[chained_req], r->next_req[chained_req]->handling_node);
  180. }
  181. r->completed = 1;
  182. handle->per_node[dst_node].refcnt--;
  183. if (r->mode & STARPU_R)
  184. handle->per_node[src_node].refcnt--;
  185. r->refcnt--;
  186. /* if nobody is waiting on that request, we can get rid of it */
  187. if (r->refcnt == 0)
  188. do_delete = 1;
  189. r->retval = 0;
  190. /* In case there are one or multiple callbacks, we execute them now. */
  191. struct callback_list *callbacks = r->callbacks;
  192. _starpu_spin_unlock(&r->lock);
  193. if (do_delete)
  194. starpu_data_request_destroy(r);
  195. _starpu_spin_unlock(&handle->header_lock);
  196. /* We do the callback once the lock is released so that they can do
  197. * blocking operations with the handle (eg. release it) */
  198. while (callbacks)
  199. {
  200. callbacks->callback_func(callbacks->callback_arg);
  201. struct callback_list *next = callbacks->next;
  202. free(callbacks);
  203. callbacks = next;
  204. }
  205. }
  206. /* TODO : accounting to see how much time was spent working for other people ... */
  207. static int starpu_handle_data_request(starpu_data_request_t r, unsigned may_alloc)
  208. {
  209. starpu_data_handle handle = r->handle;
  210. _starpu_spin_lock(&handle->header_lock);
  211. _starpu_spin_lock(&r->lock);
  212. if (r->mode & STARPU_R)
  213. {
  214. STARPU_ASSERT(handle->per_node[r->src_node].allocated);
  215. STARPU_ASSERT(handle->per_node[r->src_node].refcnt);
  216. }
  217. /* perform the transfer */
  218. /* the header of the data must be locked by the worker that submitted the request */
  219. r->retval = _starpu_driver_copy_data_1_to_1(handle, r->src_node, r->dst_node, !(r->mode & STARPU_R), r, may_alloc);
  220. if (r->retval == -ENOMEM)
  221. {
  222. _starpu_spin_unlock(&r->lock);
  223. _starpu_spin_unlock(&handle->header_lock);
  224. return -ENOMEM;
  225. }
  226. if (r->retval == -EAGAIN)
  227. {
  228. _starpu_spin_unlock(&r->lock);
  229. _starpu_spin_unlock(&handle->header_lock);
  230. /* the request is pending and we put it in the corresponding queue */
  231. PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[r->handling_node]);
  232. starpu_data_request_list_push_front(data_requests_pending[r->handling_node], r);
  233. PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[r->handling_node]);
  234. return -EAGAIN;
  235. }
  236. /* the request has been handled */
  237. starpu_handle_data_request_completion(r);
  238. return 0;
  239. }
  240. void _starpu_handle_node_data_requests(uint32_t src_node, unsigned may_alloc)
  241. {
  242. /* for all entries of the list */
  243. starpu_data_request_t r;
  244. /* take all the entries from the request list */
  245. PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
  246. starpu_data_request_list_t local_list = data_requests[src_node];
  247. if (starpu_data_request_list_empty(local_list))
  248. {
  249. /* there is no request */
  250. PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
  251. return;
  252. }
  253. data_requests[src_node] = starpu_data_request_list_new();
  254. PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
  255. while (!starpu_data_request_list_empty(local_list))
  256. {
  257. int res;
  258. r = starpu_data_request_list_pop_back(local_list);
  259. res = starpu_handle_data_request(r, may_alloc);
  260. if (res == -ENOMEM)
  261. {
  262. PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
  263. starpu_data_request_list_push_front(data_requests[src_node], r);
  264. PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
  265. }
  266. /* wake the requesting worker up */
  267. // if we do not progress ..
  268. // pthread_cond_broadcast(&data_requests_list_cond[src_node]);
  269. }
  270. starpu_data_request_list_delete(local_list);
  271. }
  272. static void _handle_pending_node_data_requests(uint32_t src_node, unsigned force)
  273. {
  274. // _STARPU_DEBUG("_starpu_handle_pending_node_data_requests ...\n");
  275. PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
  276. /* for all entries of the list */
  277. starpu_data_request_list_t local_list = data_requests_pending[src_node];
  278. data_requests_pending[src_node] = starpu_data_request_list_new();
  279. PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
  280. while (!starpu_data_request_list_empty(local_list))
  281. {
  282. starpu_data_request_t r;
  283. r = starpu_data_request_list_pop_back(local_list);
  284. starpu_data_handle handle = r->handle;
  285. _starpu_spin_lock(&handle->header_lock);
  286. _starpu_spin_lock(&r->lock);
  287. /* wait until the transfer is terminated */
  288. if (force)
  289. {
  290. _starpu_driver_wait_request_completion(&r->async_channel, src_node);
  291. starpu_handle_data_request_completion(r);
  292. }
  293. else {
  294. if (_starpu_driver_test_request_completion(&r->async_channel, src_node))
  295. {
  296. starpu_handle_data_request_completion(r);
  297. }
  298. else {
  299. _starpu_spin_unlock(&r->lock);
  300. _starpu_spin_unlock(&handle->header_lock);
  301. /* wake the requesting worker up */
  302. PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
  303. starpu_data_request_list_push_front(data_requests_pending[src_node], r);
  304. PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
  305. }
  306. }
  307. }
  308. starpu_data_request_list_delete(local_list);
  309. }
  310. void _starpu_handle_pending_node_data_requests(uint32_t src_node)
  311. {
  312. _handle_pending_node_data_requests(src_node, 0);
  313. }
  314. void _starpu_handle_all_pending_node_data_requests(uint32_t src_node)
  315. {
  316. _handle_pending_node_data_requests(src_node, 1);
  317. }
  318. int _starpu_check_that_no_data_request_exists(uint32_t node)
  319. {
  320. /* XXX lock that !!! that's a quick'n'dirty test */
  321. int no_request = starpu_data_request_list_empty(data_requests[node]);
  322. int no_pending = starpu_data_request_list_empty(data_requests_pending[node]);
  323. return (no_request && no_pending);
  324. }