data_request.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009-2015 Université de Bordeaux
  4. * Copyright (C) 2010, 2011, 2012, 2013, 2014 Centre National de la Recherche Scientifique
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <starpu.h>
  18. #include <common/config.h>
  19. #include <common/utils.h>
  20. #include <datawizard/datawizard.h>
  21. #include <core/disk.h>
  22. /* TODO: This should be tuned according to driver capabilities
  23. * Data interfaces should also have to declare how many asynchronous requests
  24. * they have actually started (think of e.g. csr).
  25. */
  26. #define MAX_PENDING_REQUESTS_PER_NODE 20
  27. #define MAX_PENDING_PREFETCH_REQUESTS_PER_NODE 10
  28. /* requests that have not been treated at all */
  29. static struct _starpu_data_request_list *data_requests[STARPU_MAXNODES];
  30. static struct _starpu_data_request_list *prefetch_requests[STARPU_MAXNODES];
  31. static starpu_pthread_mutex_t data_requests_list_mutex[STARPU_MAXNODES];
  32. /* requests that are not terminated (eg. async transfers) */
  33. static struct _starpu_data_request_list *data_requests_pending[STARPU_MAXNODES];
  34. static unsigned data_requests_npending[STARPU_MAXNODES];
  35. static starpu_pthread_mutex_t data_requests_pending_list_mutex[STARPU_MAXNODES];
  36. void _starpu_init_data_request_lists(void)
  37. {
  38. unsigned i;
  39. for (i = 0; i < STARPU_MAXNODES; i++)
  40. {
  41. prefetch_requests[i] = _starpu_data_request_list_new();
  42. data_requests[i] = _starpu_data_request_list_new();
  43. /* Tell helgrind that we are fine with checking for list_empty
  44. * in _starpu_handle_node_data_requests, we will call it
  45. * periodically anyway */
  46. STARPU_HG_DISABLE_CHECKING(data_requests[i]);
  47. STARPU_HG_DISABLE_CHECKING(data_requests[i]->_head);
  48. STARPU_PTHREAD_MUTEX_INIT(&data_requests_list_mutex[i], NULL);
  49. data_requests_pending[i] = _starpu_data_request_list_new();
  50. data_requests_npending[i] = 0;
  51. STARPU_PTHREAD_MUTEX_INIT(&data_requests_pending_list_mutex[i], NULL);
  52. }
  53. STARPU_HG_DISABLE_CHECKING(data_requests_npending);
  54. }
  55. void _starpu_deinit_data_request_lists(void)
  56. {
  57. unsigned i;
  58. for (i = 0; i < STARPU_MAXNODES; i++)
  59. {
  60. STARPU_PTHREAD_MUTEX_DESTROY(&data_requests_pending_list_mutex[i]);
  61. _starpu_data_request_list_delete(data_requests_pending[i]);
  62. STARPU_PTHREAD_MUTEX_DESTROY(&data_requests_list_mutex[i]);
  63. _starpu_data_request_list_delete(data_requests[i]);
  64. _starpu_data_request_list_delete(prefetch_requests[i]);
  65. }
  66. }
  67. /* this should be called with the lock r->handle->header_lock taken */
  68. static void starpu_data_request_destroy(struct _starpu_data_request *r)
  69. {
  70. unsigned node;
  71. /* If this is a write only request, then there is no source and we use
  72. * the destination node to cache the request. Otherwise we store the
  73. * pending requests between src and dst. */
  74. if (r->mode & STARPU_R)
  75. {
  76. node = r->src_replicate->memory_node;
  77. }
  78. else
  79. {
  80. node = r->dst_replicate->memory_node;
  81. }
  82. STARPU_ASSERT(r->dst_replicate->request[node] == r);
  83. r->dst_replicate->request[node] = NULL;
  84. switch (r->async_channel.type) {
  85. case STARPU_DISK_RAM:
  86. starpu_disk_free_request(&r->async_channel);
  87. break;
  88. default:
  89. break;
  90. }
  91. //fprintf(stderr, "DESTROY REQ %p (%d) refcnt %d\n", r, node, r->refcnt);
  92. _starpu_data_request_delete(r);
  93. }
  94. /* handle->lock should already be taken ! */
  95. struct _starpu_data_request *_starpu_create_data_request(starpu_data_handle_t handle,
  96. struct _starpu_data_replicate *src_replicate,
  97. struct _starpu_data_replicate *dst_replicate,
  98. unsigned handling_node,
  99. enum starpu_data_access_mode mode,
  100. unsigned ndeps,
  101. unsigned is_prefetch)
  102. {
  103. struct _starpu_data_request *r = _starpu_data_request_new();
  104. _starpu_spin_checklocked(&handle->header_lock);
  105. _starpu_spin_init(&r->lock);
  106. r->handle = handle;
  107. r->src_replicate = src_replicate;
  108. r->dst_replicate = dst_replicate;
  109. r->mode = mode;
  110. r->async_channel.type = STARPU_UNUSED;
  111. r->handling_node = handling_node;
  112. r->completed = 0;
  113. r->prefetch = is_prefetch;
  114. r->retval = -1;
  115. r->ndeps = ndeps;
  116. r->next_req_count = 0;
  117. r->callbacks = NULL;
  118. _starpu_spin_lock(&r->lock);
  119. /* Take a reference on the target for the request to be able to write it */
  120. dst_replicate->refcnt++;
  121. handle->busy_count++;
  122. if (mode & STARPU_R)
  123. {
  124. unsigned src_node = src_replicate->memory_node;
  125. dst_replicate->request[src_node] = r;
  126. /* Take a reference on the source for the request to be able to read it */
  127. src_replicate->refcnt++;
  128. handle->busy_count++;
  129. }
  130. else
  131. {
  132. unsigned dst_node = dst_replicate->memory_node;
  133. dst_replicate->request[dst_node] = r;
  134. }
  135. r->refcnt = 1;
  136. _starpu_spin_unlock(&r->lock);
  137. return r;
  138. }
  139. int _starpu_wait_data_request_completion(struct _starpu_data_request *r, unsigned may_alloc)
  140. {
  141. int retval;
  142. int do_delete = 0;
  143. int completed;
  144. unsigned local_node = _starpu_memory_node_get_local_key();
  145. do
  146. {
  147. STARPU_HG_DISABLE_CHECKING(r->completed);
  148. completed = r->completed;
  149. STARPU_HG_ENABLE_CHECKING(r->completed);
  150. if (completed)
  151. {
  152. _starpu_spin_lock(&r->lock);
  153. if (r->completed)
  154. break;
  155. _starpu_spin_unlock(&r->lock);
  156. }
  157. #ifndef STARPU_NON_BLOCKING_DRIVERS
  158. /* XXX: shouldn't be needed, and doesn't work with chained requests anyway */
  159. _starpu_wake_all_blocked_workers_on_node(r->handling_node);
  160. #endif
  161. _starpu_datawizard_progress(local_node, may_alloc);
  162. }
  163. while (1);
  164. retval = r->retval;
  165. if (retval)
  166. _STARPU_DISP("REQUEST %p completed with retval %d!\n", r, r->retval);
  167. r->refcnt--;
  168. /* if nobody is waiting on that request, we can get rid of it */
  169. if (r->refcnt == 0)
  170. do_delete = 1;
  171. _starpu_spin_unlock(&r->lock);
  172. if (do_delete)
  173. starpu_data_request_destroy(r);
  174. return retval;
  175. }
  176. /* this is non blocking */
  177. void _starpu_post_data_request(struct _starpu_data_request *r, unsigned handling_node)
  178. {
  179. /* We don't have a worker for disk nodes, these should have been posted to a main RAM node */
  180. STARPU_ASSERT(starpu_node_get_kind(handling_node) != STARPU_DISK_RAM);
  181. STARPU_ASSERT(_starpu_memory_node_get_nworkers(handling_node));
  182. // _STARPU_DEBUG("POST REQUEST\n");
  183. /* If some dependencies are not fulfilled yet, we don't actually post the request */
  184. if (r->ndeps > 0)
  185. return;
  186. if (r->mode & STARPU_R)
  187. {
  188. STARPU_ASSERT(r->src_replicate->allocated);
  189. STARPU_ASSERT(r->src_replicate->refcnt);
  190. }
  191. /* insert the request in the proper list */
  192. STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[handling_node]);
  193. if (r->prefetch)
  194. _starpu_data_request_list_push_back(prefetch_requests[handling_node], r);
  195. else
  196. _starpu_data_request_list_push_back(data_requests[handling_node], r);
  197. STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[handling_node]);
  198. #ifndef STARPU_NON_BLOCKING_DRIVERS
  199. _starpu_wake_all_blocked_workers_on_node(handling_node);
  200. #endif
  201. }
  202. /* We assume that r->lock is taken by the caller */
  203. void _starpu_data_request_append_callback(struct _starpu_data_request *r, void (*callback_func)(void *), void *callback_arg)
  204. {
  205. STARPU_ASSERT(r);
  206. if (callback_func)
  207. {
  208. struct _starpu_callback_list *link = (struct _starpu_callback_list *) malloc(sizeof(struct _starpu_callback_list));
  209. STARPU_ASSERT(link);
  210. link->callback_func = callback_func;
  211. link->callback_arg = callback_arg;
  212. link->next = r->callbacks;
  213. r->callbacks = link;
  214. }
  215. }
  216. /* This method is called with handle's header_lock taken, and unlocks it */
  217. static void starpu_handle_data_request_completion(struct _starpu_data_request *r)
  218. {
  219. unsigned do_delete = 0;
  220. starpu_data_handle_t handle = r->handle;
  221. enum starpu_data_access_mode mode = r->mode;
  222. struct _starpu_data_replicate *src_replicate = r->src_replicate;
  223. struct _starpu_data_replicate *dst_replicate = r->dst_replicate;
  224. #ifdef STARPU_MEMORY_STATS
  225. enum _starpu_cache_state old_src_replicate_state = src_replicate->state;
  226. #endif
  227. _starpu_spin_checklocked(&handle->header_lock);
  228. _starpu_update_data_state(handle, r->dst_replicate, mode);
  229. #ifdef STARPU_MEMORY_STATS
  230. if (src_replicate->state == STARPU_INVALID)
  231. {
  232. if (old_src_replicate_state == STARPU_OWNER)
  233. _starpu_memory_handle_stats_invalidated(handle, src_replicate->memory_node);
  234. else
  235. {
  236. /* XXX Currently only ex-OWNER are tagged as invalidated */
  237. /* XXX Have to check all old state of every node in case a SHARED data become OWNED by the dst_replicate */
  238. }
  239. }
  240. if (dst_replicate->state == STARPU_SHARED)
  241. _starpu_memory_handle_stats_loaded_shared(handle, dst_replicate->memory_node);
  242. else if (dst_replicate->state == STARPU_OWNER)
  243. {
  244. _starpu_memory_handle_stats_loaded_owner(handle, dst_replicate->memory_node);
  245. }
  246. #endif
  247. #ifdef STARPU_USE_FXT
  248. unsigned src_node = src_replicate->memory_node;
  249. unsigned dst_node = dst_replicate->memory_node;
  250. size_t size = _starpu_data_get_size(handle);
  251. _STARPU_TRACE_END_DRIVER_COPY(src_node, dst_node, size, r->com_id);
  252. #endif
  253. /* Once the request has been fulfilled, we may submit the requests that
  254. * were chained to that request. */
  255. unsigned chained_req;
  256. for (chained_req = 0; chained_req < r->next_req_count; chained_req++)
  257. {
  258. struct _starpu_data_request *next_req = r->next_req[chained_req];
  259. STARPU_ASSERT(next_req->ndeps > 0);
  260. next_req->ndeps--;
  261. _starpu_post_data_request(next_req, next_req->handling_node);
  262. }
  263. r->completed = 1;
  264. /* Remove a reference on the destination replicate for the request */
  265. STARPU_ASSERT(dst_replicate->refcnt > 0);
  266. dst_replicate->refcnt--;
  267. STARPU_ASSERT(handle->busy_count > 0);
  268. handle->busy_count--;
  269. /* In case the source was "locked" by the request too */
  270. if (mode & STARPU_R)
  271. {
  272. STARPU_ASSERT(src_replicate->refcnt > 0);
  273. src_replicate->refcnt--;
  274. STARPU_ASSERT(handle->busy_count > 0);
  275. handle->busy_count--;
  276. }
  277. unsigned destroyed = _starpu_data_check_not_busy(handle);
  278. r->refcnt--;
  279. /* if nobody is waiting on that request, we can get rid of it */
  280. if (r->refcnt == 0)
  281. do_delete = 1;
  282. r->retval = 0;
  283. /* In case there are one or multiple callbacks, we execute them now. */
  284. struct _starpu_callback_list *callbacks = r->callbacks;
  285. _starpu_spin_unlock(&r->lock);
  286. if (do_delete)
  287. starpu_data_request_destroy(r);
  288. if (!destroyed)
  289. _starpu_spin_unlock(&handle->header_lock);
  290. /* We do the callback once the lock is released so that they can do
  291. * blocking operations with the handle (eg. release it) */
  292. while (callbacks)
  293. {
  294. callbacks->callback_func(callbacks->callback_arg);
  295. struct _starpu_callback_list *next = callbacks->next;
  296. free(callbacks);
  297. callbacks = next;
  298. }
  299. }
  300. /* TODO : accounting to see how much time was spent working for other people ... */
  301. static int starpu_handle_data_request(struct _starpu_data_request *r, unsigned may_alloc, int prefetch STARPU_ATTRIBUTE_UNUSED)
  302. {
  303. starpu_data_handle_t handle = r->handle;
  304. #ifndef STARPU_SIMGRID
  305. if (_starpu_spin_trylock(&handle->header_lock))
  306. return -EBUSY;
  307. if (_starpu_spin_trylock(&r->lock))
  308. {
  309. _starpu_spin_unlock(&handle->header_lock);
  310. return -EBUSY;
  311. }
  312. #else
  313. /* Have to wait for the handle, whatever it takes, in simgrid,
  314. * since we can not afford going to sleep, since nobody would wake us
  315. * up. */
  316. _starpu_spin_lock(&handle->header_lock);
  317. _starpu_spin_lock(&r->lock);
  318. #endif
  319. struct _starpu_data_replicate *src_replicate = r->src_replicate;
  320. struct _starpu_data_replicate *dst_replicate = r->dst_replicate;
  321. enum starpu_data_access_mode r_mode = r->mode;
  322. STARPU_ASSERT(!(r_mode & STARPU_R) || src_replicate);
  323. STARPU_ASSERT(!(r_mode & STARPU_R) || src_replicate->allocated);
  324. STARPU_ASSERT(!(r_mode & STARPU_R) || src_replicate->refcnt);
  325. _starpu_spin_unlock(&r->lock);
  326. /* FIXME: the request may get upgraded from here to freeing it... */
  327. /* perform the transfer */
  328. /* the header of the data must be locked by the worker that submitted the request */
  329. r->retval = _starpu_driver_copy_data_1_to_1(handle, src_replicate,
  330. dst_replicate, !(r_mode & STARPU_R), r, may_alloc);
  331. if (r->retval == -ENOMEM)
  332. {
  333. /* If there was not enough memory, we will try to redo the
  334. * request later. */
  335. _starpu_spin_unlock(&handle->header_lock);
  336. return -ENOMEM;
  337. }
  338. if (r->retval == -EAGAIN)
  339. {
  340. /* The request was successful, but could not be terminated
  341. * immediately. We will handle the completion of the request
  342. * asynchronously. The request is put in the list of "pending"
  343. * requests in the meantime. */
  344. _starpu_spin_unlock(&handle->header_lock);
  345. STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[r->handling_node]);
  346. _starpu_data_request_list_push_back(data_requests_pending[r->handling_node], r);
  347. data_requests_npending[r->handling_node]++;
  348. STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[r->handling_node]);
  349. return -EAGAIN;
  350. }
  351. /* the request has been handled */
  352. _starpu_spin_lock(&r->lock);
  353. starpu_handle_data_request_completion(r);
  354. return 0;
  355. }
  356. int _starpu_handle_node_data_requests(unsigned src_node, unsigned may_alloc, unsigned *pushed)
  357. {
  358. struct _starpu_data_request *r;
  359. struct _starpu_data_request_list *new_data_requests;
  360. struct _starpu_data_request_list *empty_list;
  361. int ret = 0;
  362. *pushed = 0;
  363. #ifdef STARPU_NON_BLOCKING_DRIVERS
  364. /* This is racy, but not posing problems actually, since we know we
  365. * will come back here to probe again regularly anyway.
  366. * Thus, do not expose this optimization to helgrind */
  367. if (!RUNNING_ON_VALGRIND && _starpu_data_request_list_empty(data_requests[src_node]))
  368. return 0;
  369. #endif
  370. empty_list = _starpu_data_request_list_new();
  371. #ifdef STARPU_NON_BLOCKING_DRIVERS
  372. /* take all the entries from the request list */
  373. if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_list_mutex[src_node]))
  374. /* List is busy, do not bother with it */
  375. {
  376. _starpu_data_request_list_delete(empty_list);
  377. return -EBUSY;
  378. }
  379. #else
  380. STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
  381. #endif
  382. struct _starpu_data_request_list *local_list = data_requests[src_node];
  383. if (_starpu_data_request_list_empty(local_list))
  384. {
  385. /* there is no request */
  386. STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
  387. _starpu_data_request_list_delete(empty_list);
  388. return 0;
  389. }
  390. /* There is an entry: we create a new empty list to replace the list of
  391. * requests, and we handle the request(s) one by one in the former
  392. * list, without concurrency issues.*/
  393. data_requests[src_node] = empty_list;
  394. STARPU_HG_DISABLE_CHECKING(data_requests[src_node]->_head);
  395. STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
  396. new_data_requests = _starpu_data_request_list_new();
  397. /* for all entries of the list */
  398. while (!_starpu_data_request_list_empty(local_list))
  399. {
  400. int res;
  401. if (data_requests_npending[src_node] >= MAX_PENDING_REQUESTS_PER_NODE)
  402. {
  403. /* Too many requests at the same time, skip pushing
  404. * more for now */
  405. ret = -EBUSY;
  406. break;
  407. }
  408. r = _starpu_data_request_list_pop_front(local_list);
  409. res = starpu_handle_data_request(r, may_alloc, 0);
  410. if (res != 0 && res != -EAGAIN)
  411. {
  412. /* handle is busy, or not enough memory, postpone for now */
  413. ret = res;
  414. _starpu_data_request_list_push_back(new_data_requests, r);
  415. break;
  416. }
  417. (*pushed)++;
  418. }
  419. while (!_starpu_data_request_list_empty(local_list))
  420. {
  421. r = _starpu_data_request_list_pop_front(local_list);
  422. _starpu_data_request_list_push_back(new_data_requests, r);
  423. }
  424. if (!_starpu_data_request_list_empty(new_data_requests))
  425. {
  426. STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
  427. _starpu_data_request_list_push_list_front(new_data_requests, data_requests[src_node]);
  428. STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
  429. #ifndef STARPU_NON_BLOCKING_DRIVERS
  430. _starpu_wake_all_blocked_workers_on_node(src_node);
  431. #endif
  432. }
  433. _starpu_data_request_list_delete(new_data_requests);
  434. _starpu_data_request_list_delete(local_list);
  435. return ret;
  436. }
  437. void _starpu_handle_node_prefetch_requests(unsigned src_node, unsigned may_alloc, unsigned *pushed)
  438. {
  439. struct _starpu_data_request *r;
  440. struct _starpu_data_request_list *new_data_requests;
  441. struct _starpu_data_request_list *new_prefetch_requests;
  442. struct _starpu_data_request_list *empty_list;
  443. *pushed = 0;
  444. #ifdef STARPU_NON_BLOCKING_DRIVERS
  445. /* This is racy, but not posing problems actually, since we know we
  446. * will come back here to probe again regularly anyway.
  447. * Thus, do not expose this optimization to valgrind */
  448. if (!RUNNING_ON_VALGRIND && _starpu_data_request_list_empty(prefetch_requests[src_node]))
  449. return;
  450. #endif
  451. empty_list = _starpu_data_request_list_new();
  452. #ifdef STARPU_NON_BLOCKING_DRIVERS
  453. /* take all the entries from the request list */
  454. if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_list_mutex[src_node]))
  455. {
  456. /* List is busy, do not bother with it */
  457. _starpu_data_request_list_delete(empty_list);
  458. return;
  459. }
  460. #else
  461. STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
  462. #endif
  463. struct _starpu_data_request_list *local_list = prefetch_requests[src_node];
  464. if (_starpu_data_request_list_empty(local_list))
  465. {
  466. /* there is no request */
  467. STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
  468. _starpu_data_request_list_delete(empty_list);
  469. return;
  470. }
  471. /* There is an entry: we create a new empty list to replace the list of
  472. * requests, and we handle the request(s) one by one in the former
  473. * list, without concurrency issues.*/
  474. prefetch_requests[src_node] = empty_list;
  475. STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
  476. new_data_requests = _starpu_data_request_list_new();
  477. new_prefetch_requests = _starpu_data_request_list_new();
  478. /* for all entries of the list */
  479. while (!_starpu_data_request_list_empty(local_list))
  480. {
  481. int res;
  482. if (data_requests_npending[src_node] >= MAX_PENDING_PREFETCH_REQUESTS_PER_NODE)
  483. {
  484. /* Too many requests at the same time, skip pushing
  485. * more for now */
  486. break;
  487. }
  488. r = _starpu_data_request_list_pop_front(local_list);
  489. res = starpu_handle_data_request(r, may_alloc, 1);
  490. if (res != 0 && res != -EAGAIN)
  491. {
  492. if (r->prefetch)
  493. _starpu_data_request_list_push_back(new_prefetch_requests, r);
  494. else
  495. {
  496. /* Prefetch request promoted while in tmp list*/
  497. _starpu_data_request_list_push_back(new_data_requests, r);
  498. }
  499. break;
  500. }
  501. (*pushed)++;
  502. }
  503. while(!_starpu_data_request_list_empty(local_list))
  504. {
  505. r = _starpu_data_request_list_pop_front(local_list);
  506. if (r->prefetch)
  507. _starpu_data_request_list_push_back(new_prefetch_requests, r);
  508. else
  509. /* Prefetch request promoted while in tmp list*/
  510. _starpu_data_request_list_push_back(new_data_requests, r);
  511. }
  512. if (!(_starpu_data_request_list_empty(new_data_requests) && _starpu_data_request_list_empty(new_prefetch_requests)))
  513. {
  514. STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[src_node]);
  515. if (!(_starpu_data_request_list_empty(new_data_requests)))
  516. _starpu_data_request_list_push_list_front(new_data_requests, data_requests[src_node]);
  517. if (!(_starpu_data_request_list_empty(new_prefetch_requests)))
  518. _starpu_data_request_list_push_list_front(new_prefetch_requests, prefetch_requests[src_node]);
  519. STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[src_node]);
  520. #ifndef STARPU_NON_BLOCKING_DRIVERS
  521. _starpu_wake_all_blocked_workers_on_node(src_node);
  522. #endif
  523. }
  524. _starpu_data_request_list_delete(new_data_requests);
  525. _starpu_data_request_list_delete(new_prefetch_requests);
  526. _starpu_data_request_list_delete(local_list);
  527. }
  528. static int _handle_pending_node_data_requests(unsigned src_node, unsigned force)
  529. {
  530. // _STARPU_DEBUG("_starpu_handle_pending_node_data_requests ...\n");
  531. //
  532. struct _starpu_data_request_list *new_data_requests_pending;
  533. struct _starpu_data_request_list *empty_list;
  534. unsigned taken, kept;
  535. #ifdef STARPU_NON_BLOCKING_DRIVERS
  536. /* Here helgrind would should that this is an un protected access.
  537. * We however don't care about missing an entry, we will get called
  538. * again sooner or later. */
  539. if (!RUNNING_ON_VALGRIND && _starpu_data_request_list_empty(data_requests_pending[src_node]))
  540. return 0;
  541. #endif
  542. empty_list = _starpu_data_request_list_new();
  543. #ifdef STARPU_NON_BLOCKING_DRIVERS
  544. if (!force)
  545. {
  546. if (STARPU_PTHREAD_MUTEX_TRYLOCK(&data_requests_pending_list_mutex[src_node]))
  547. {
  548. /* List is busy, do not bother with it */
  549. _starpu_data_request_list_delete(empty_list);
  550. return 0;
  551. }
  552. }
  553. else
  554. #endif
  555. /* We really want to handle requests */
  556. STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
  557. /* for all entries of the list */
  558. struct _starpu_data_request_list *local_list = data_requests_pending[src_node];
  559. if (_starpu_data_request_list_empty(local_list))
  560. {
  561. /* there is no request */
  562. STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
  563. _starpu_data_request_list_delete(empty_list);
  564. return 0;
  565. }
  566. data_requests_pending[src_node] = empty_list;
  567. STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
  568. new_data_requests_pending = _starpu_data_request_list_new();
  569. taken = 0;
  570. kept = 0;
  571. while (!_starpu_data_request_list_empty(local_list))
  572. {
  573. struct _starpu_data_request *r;
  574. r = _starpu_data_request_list_pop_front(local_list);
  575. taken++;
  576. starpu_data_handle_t handle = r->handle;
  577. #ifndef STARPU_SIMGRID
  578. if (force)
  579. /* Have to wait for the handle, whatever it takes */
  580. #endif
  581. /* Or when running in simgrid, in which case we can not
  582. * afford going to sleep, since nobody would wake us
  583. * up. */
  584. _starpu_spin_lock(&handle->header_lock);
  585. #ifndef STARPU_SIMGRID
  586. else
  587. if (_starpu_spin_trylock(&handle->header_lock))
  588. {
  589. /* Handle is busy, retry this later */
  590. _starpu_data_request_list_push_back(new_data_requests_pending, r);
  591. kept++;
  592. continue;
  593. }
  594. #endif
  595. /* This shouldn't be too hard to acquire */
  596. _starpu_spin_lock(&r->lock);
  597. /* wait until the transfer is terminated */
  598. if (force)
  599. {
  600. _starpu_driver_wait_request_completion(&r->async_channel);
  601. starpu_handle_data_request_completion(r);
  602. }
  603. else
  604. {
  605. if (_starpu_driver_test_request_completion(&r->async_channel))
  606. {
  607. /* The request was completed */
  608. starpu_handle_data_request_completion(r);
  609. }
  610. else
  611. {
  612. /* The request was not completed, so we put it
  613. * back again on the list of pending requests
  614. * so that it can be handled later on. */
  615. _starpu_spin_unlock(&r->lock);
  616. _starpu_spin_unlock(&handle->header_lock);
  617. _starpu_data_request_list_push_back(new_data_requests_pending, r);
  618. kept++;
  619. }
  620. }
  621. }
  622. STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[src_node]);
  623. data_requests_npending[src_node] -= taken - kept;
  624. if (kept)
  625. _starpu_data_request_list_push_list_back(data_requests_pending[src_node], new_data_requests_pending);
  626. STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[src_node]);
  627. _starpu_data_request_list_delete(local_list);
  628. _starpu_data_request_list_delete(new_data_requests_pending);
  629. return taken - kept;
  630. }
  631. int _starpu_handle_pending_node_data_requests(unsigned src_node)
  632. {
  633. return _handle_pending_node_data_requests(src_node, 0);
  634. }
  635. int _starpu_handle_all_pending_node_data_requests(unsigned src_node)
  636. {
  637. return _handle_pending_node_data_requests(src_node, 1);
  638. }
  639. int _starpu_check_that_no_data_request_exists(unsigned node)
  640. {
  641. /* XXX lock that !!! that's a quick'n'dirty test */
  642. int no_request;
  643. int no_pending;
  644. STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[node]);
  645. no_request = _starpu_data_request_list_empty(data_requests[node])
  646. && _starpu_data_request_list_empty(prefetch_requests[node]);
  647. STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[node]);
  648. STARPU_PTHREAD_MUTEX_LOCK(&data_requests_pending_list_mutex[node]);
  649. no_pending = !data_requests_npending[node];
  650. STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_pending_list_mutex[node]);
  651. return (no_request && no_pending);
  652. }
  653. void _starpu_update_prefetch_status(struct _starpu_data_request *r)
  654. {
  655. STARPU_ASSERT(r->prefetch > 0);
  656. r->prefetch=0;
  657. /* We have to promote chained_request too! */
  658. unsigned chained_req;
  659. for (chained_req = 0; chained_req < r->next_req_count; chained_req++)
  660. {
  661. struct _starpu_data_request *next_req = r->next_req[chained_req];
  662. if (next_req->prefetch)
  663. _starpu_update_prefetch_status(next_req);
  664. }
  665. STARPU_PTHREAD_MUTEX_LOCK(&data_requests_list_mutex[r->handling_node]);
  666. /* The request can be in a different list (handling request or the temp list)
  667. * we have to check that it is really in the prefetch list. */
  668. struct _starpu_data_request *r_iter;
  669. for (r_iter = _starpu_data_request_list_begin(prefetch_requests[r->handling_node]);
  670. r_iter != _starpu_data_request_list_end(prefetch_requests[r->handling_node]);
  671. r_iter = _starpu_data_request_list_next(r_iter))
  672. {
  673. if (r==r_iter)
  674. {
  675. _starpu_data_request_list_erase(prefetch_requests[r->handling_node],r);
  676. _starpu_data_request_list_push_front(data_requests[r->handling_node],r);
  677. break;
  678. }
  679. }
  680. STARPU_PTHREAD_MUTEX_UNLOCK(&data_requests_list_mutex[r->handling_node]);
  681. #ifndef STARPU_NON_BLOCKING_DRIVERS
  682. _starpu_wake_all_blocked_workers_on_node(r->handling_node);
  683. #endif
  684. }