data_request.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. /*
  2. * StarPU
  3. * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <common/config.h>
  17. #include <datawizard/data_request.h>
  18. #include <pthread.h>
  19. #include <common/utils.h>
  20. /* requests that have not been treated at all */
  21. static starpu_data_request_list_t data_requests[STARPU_MAXNODES];
  22. static pthread_cond_t data_requests_list_cond[STARPU_MAXNODES];
  23. static pthread_mutex_t data_requests_list_mutex[STARPU_MAXNODES];
  24. /* requests that are not terminated (eg. async transfers) */
  25. static starpu_data_request_list_t data_requests_pending[STARPU_MAXNODES];
  26. static pthread_cond_t data_requests_pending_list_cond[STARPU_MAXNODES];
  27. static pthread_mutex_t data_requests_pending_list_mutex[STARPU_MAXNODES];
  28. void _starpu_init_data_request_lists(void)
  29. {
  30. unsigned i;
  31. for (i = 0; i < STARPU_MAXNODES; i++)
  32. {
  33. data_requests[i] = starpu_data_request_list_new();
  34. PTHREAD_MUTEX_INIT(&data_requests_list_mutex[i], NULL);
  35. PTHREAD_COND_INIT(&data_requests_list_cond[i], NULL);
  36. data_requests_pending[i] = starpu_data_request_list_new();
  37. PTHREAD_MUTEX_INIT(&data_requests_pending_list_mutex[i], NULL);
  38. PTHREAD_COND_INIT(&data_requests_pending_list_cond[i], NULL);
  39. }
  40. }
  41. void _starpu_deinit_data_request_lists(void)
  42. {
  43. unsigned i;
  44. for (i = 0; i < STARPU_MAXNODES; i++)
  45. {
  46. PTHREAD_COND_DESTROY(&data_requests_pending_list_cond[i]);
  47. PTHREAD_MUTEX_DESTROY(&data_requests_pending_list_mutex[i]);
  48. starpu_data_request_list_delete(data_requests_pending[i]);
  49. PTHREAD_COND_DESTROY(&data_requests_list_cond[i]);
  50. PTHREAD_MUTEX_DESTROY(&data_requests_list_mutex[i]);
  51. starpu_data_request_list_delete(data_requests[i]);
  52. }
  53. }
  54. /* this should be called with the lock r->handle->header_lock taken */
  55. static void starpu_data_request_destroy(starpu_data_request_t r)
  56. {
  57. r->handle->per_node[r->dst_node].request = NULL;
  58. starpu_data_request_delete(r);
  59. }
  60. /* handle->lock should already be taken ! */
  61. starpu_data_request_t _starpu_create_data_request(starpu_data_handle handle, uint32_t src_node, uint32_t dst_node, uint32_t handling_node, uint8_t read, uint8_t write, unsigned is_prefetch)
  62. {
  63. starpu_data_request_t r = starpu_data_request_new();
  64. _starpu_spin_init(&r->lock);
  65. r->handle = handle;
  66. r->src_node = src_node;
  67. r->dst_node = dst_node;
  68. r->read = read;
  69. r->write = write;
  70. r->handling_node = handling_node;
  71. r->completed = 0;
  72. r->retval = -1;
  73. r->next_req_count = 0;
  74. r->strictness = 1;
  75. r->is_a_prefetch_request = is_prefetch;
  76. /* associate that request with the handle so that further similar
  77. * requests will reuse that one */
  78. _starpu_spin_lock(&r->lock);
  79. handle->per_node[dst_node].request = r;
  80. handle->per_node[dst_node].refcnt++;
  81. if (read)
  82. handle->per_node[src_node].refcnt++;
  83. r->refcnt = 1;
  84. _starpu_spin_unlock(&r->lock);
  85. return r;
  86. }
  87. /* handle->lock should be taken */
  88. starpu_data_request_t _starpu_search_existing_data_request(starpu_data_handle handle, uint32_t dst_node, uint8_t read, uint8_t write)
  89. {
  90. starpu_data_request_t r = handle->per_node[dst_node].request;
  91. if (r)
  92. {
  93. /* perhaps we need to "upgrade" the request */
  94. if (read)
  95. {
  96. /* in case the exisiting request did not imply a memory
  97. * transfer yet, we have to increment the refcnt now
  98. * (so that the source remains valid) */
  99. if (!r->read)
  100. handle->per_node[dst_node].refcnt++;
  101. r->read = 1;
  102. }
  103. if (write)
  104. r->write = 1;
  105. _starpu_spin_lock(&r->lock);
  106. }
  107. return r;
  108. }
  109. int _starpu_wait_data_request_completion(starpu_data_request_t r, unsigned may_alloc)
  110. {
  111. int retval;
  112. int do_delete = 0;
  113. uint32_t local_node = _starpu_get_local_memory_node();
  114. do {
  115. _starpu_spin_lock(&r->lock);
  116. if (r->completed)
  117. break;
  118. _starpu_spin_unlock(&r->lock);
  119. _starpu_wake_all_blocked_workers_on_node(r->handling_node);
  120. _starpu_datawizard_progress(local_node, may_alloc);
  121. } while (1);
  122. retval = r->retval;
  123. if (retval)
  124. fprintf(stderr, "REQUEST %p COMPLETED (retval %d) !\n", r, r->retval);
  125. r->refcnt--;
  126. /* if nobody is waiting on that request, we can get rid of it */
  127. if (r->refcnt == 0)
  128. do_delete = 1;
  129. _starpu_spin_unlock(&r->lock);
  130. if (do_delete)
  131. starpu_data_request_destroy(r);
  132. return retval;
  133. }
  134. /* this is non blocking */
  135. void _starpu_post_data_request(starpu_data_request_t r, uint32_t handling_node)
  136. {
  137. // fprintf(stderr, "POST REQUEST\n");
  138. if (r->read)
  139. {
  140. STARPU_ASSERT(r->handle->per_node[r->src_node].allocated);
  141. STARPU_ASSERT(r->handle->per_node[r->src_node].refcnt);
  142. }
  143. /* insert the request in the proper list */
  144. PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node]);
  145. starpu_data_request_list_push_front(data_requests[handling_node], r);
  146. PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
  147. _starpu_wake_all_blocked_workers_on_node(handling_node);
  148. }
  149. static void starpu_handle_data_request_completion(starpu_data_request_t r)
  150. {
  151. unsigned do_delete = 0;
  152. starpu_data_handle handle = r->handle;
  153. uint32_t src_node = r->src_node;
  154. uint32_t dst_node = r->dst_node;
  155. _starpu_update_data_state(handle, dst_node, r->write);
  156. #ifdef STARPU_USE_FXT
  157. size_t size = handle->ops->get_size(handle);
  158. STARPU_TRACE_END_DRIVER_COPY(src_node, dst_node, size, r->com_id);
  159. #endif
  160. unsigned chained_req;
  161. for (chained_req = 0; chained_req < r->next_req_count; chained_req++)
  162. {
  163. _starpu_post_data_request(r->next_req[chained_req], r->next_req[chained_req]->handling_node);
  164. }
  165. r->completed = 1;
  166. handle->per_node[dst_node].refcnt--;
  167. if (r->read)
  168. handle->per_node[src_node].refcnt--;
  169. r->refcnt--;
  170. /* if nobody is waiting on that request, we can get rid of it */
  171. if (r->refcnt == 0)
  172. do_delete = 1;
  173. r->retval = 0;
  174. _starpu_spin_unlock(&r->lock);
  175. if (do_delete)
  176. starpu_data_request_destroy(r);
  177. _starpu_spin_unlock(&handle->header_lock);
  178. }
  179. /* TODO : accounting to see how much time was spent working for other people ... */
  180. static int starpu_handle_data_request(starpu_data_request_t r, unsigned may_alloc)
  181. {
  182. starpu_data_handle handle = r->handle;
  183. _starpu_spin_lock(&handle->header_lock);
  184. _starpu_spin_lock(&r->lock);
  185. if (r->read)
  186. {
  187. STARPU_ASSERT(handle->per_node[r->src_node].allocated);
  188. STARPU_ASSERT(handle->per_node[r->src_node].refcnt);
  189. }
  190. /* perform the transfer */
  191. /* the header of the data must be locked by the worker that submitted the request */
  192. r->retval = _starpu_driver_copy_data_1_to_1(handle, r->src_node, r->dst_node, !r->read, r, may_alloc);
  193. if (r->retval == ENOMEM)
  194. {
  195. _starpu_spin_unlock(&r->lock);
  196. _starpu_spin_unlock(&handle->header_lock);
  197. return ENOMEM;
  198. }
  199. if (r->retval == EAGAIN)
  200. {
  201. _starpu_spin_unlock(&r->lock);
  202. _starpu_spin_unlock(&handle->header_lock);
  203. /* the request is pending and we put it in the corresponding queue */
  204. PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[r->handling_node]);
  205. starpu_data_request_list_push_front(data_requests_pending[r->handling_node], r);
  206. PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[r->handling_node]);
  207. return EAGAIN;
  208. }
  209. /* the request has been handled */
  210. starpu_handle_data_request_completion(r);
  211. return 0;
  212. }
  213. void _starpu_handle_node_data_requests(uint32_t src_node, unsigned may_alloc)
  214. {
  215. /* for all entries of the list */
  216. starpu_data_request_t r;
  217. /* take all the entries from the request list */
  218. PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
  219. starpu_data_request_list_t local_list = data_requests[src_node];
  220. if (starpu_data_request_list_empty(local_list))
  221. {
  222. /* there is no request */
  223. PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
  224. return;
  225. }
  226. data_requests[src_node] = starpu_data_request_list_new();
  227. PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
  228. while (!starpu_data_request_list_empty(local_list))
  229. {
  230. int res;
  231. r = starpu_data_request_list_pop_back(local_list);
  232. res = starpu_handle_data_request(r, may_alloc);
  233. if (res == ENOMEM)
  234. {
  235. PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
  236. starpu_data_request_list_push_front(data_requests[src_node], r);
  237. PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
  238. }
  239. /* wake the requesting worker up */
  240. // if we do not progress ..
  241. // pthread_cond_broadcast(&data_requests_list_cond[src_node]);
  242. }
  243. starpu_data_request_list_delete(local_list);
  244. }
  245. static void _handle_pending_node_data_requests(uint32_t src_node, unsigned force)
  246. {
  247. // fprintf(stderr, "_starpu_handle_pending_node_data_requests ...\n");
  248. PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
  249. /* for all entries of the list */
  250. starpu_data_request_list_t local_list = data_requests_pending[src_node];
  251. data_requests_pending[src_node] = starpu_data_request_list_new();
  252. PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
  253. while (!starpu_data_request_list_empty(local_list))
  254. {
  255. starpu_data_request_t r;
  256. r = starpu_data_request_list_pop_back(local_list);
  257. starpu_data_handle handle = r->handle;
  258. _starpu_spin_lock(&handle->header_lock);
  259. _starpu_spin_lock(&r->lock);
  260. /* wait until the transfer is terminated */
  261. if (force)
  262. {
  263. _starpu_driver_wait_request_completion(&r->async_channel, src_node);
  264. starpu_handle_data_request_completion(r);
  265. }
  266. else {
  267. if (_starpu_driver_test_request_completion(&r->async_channel, src_node))
  268. {
  269. starpu_handle_data_request_completion(r);
  270. }
  271. else {
  272. _starpu_spin_unlock(&r->lock);
  273. _starpu_spin_unlock(&handle->header_lock);
  274. /* wake the requesting worker up */
  275. PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
  276. starpu_data_request_list_push_front(data_requests_pending[src_node], r);
  277. PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
  278. }
  279. }
  280. }
  281. starpu_data_request_list_delete(local_list);
  282. }
  283. void _starpu_handle_pending_node_data_requests(uint32_t src_node)
  284. {
  285. _handle_pending_node_data_requests(src_node, 0);
  286. }
  287. void _starpu_handle_all_pending_node_data_requests(uint32_t src_node)
  288. {
  289. _handle_pending_node_data_requests(src_node, 1);
  290. }
  291. int _starpu_check_that_no_data_request_exists(uint32_t node)
  292. {
  293. /* XXX lock that !!! that's a quick'n'dirty test */
  294. int no_request = starpu_data_request_list_empty(data_requests[node]);
  295. int no_pending = starpu_data_request_list_empty(data_requests_pending[node]);
  296. return (no_request && no_pending);
  297. }