coherency.c 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures. *
  2. * Copyright (C) 2009-2016 Université de Bordeaux
  3. * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016 CNRS
  4. * Copyright (C) 2014 INRIA
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <common/config.h>
  18. #include <datawizard/coherency.h>
  19. #include <datawizard/copy_driver.h>
  20. #include <datawizard/write_back.h>
  21. #include <datawizard/memory_nodes.h>
  22. #include <core/dependencies/data_concurrency.h>
  23. #include <core/disk.h>
  24. #include <profiling/profiling.h>
  25. #include <math.h>
  26. #include <core/task.h>
  27. #include <starpu_scheduler.h>
  28. #include <core/workers.h>
  29. #include <limits.h>
  30. #ifdef STARPU_SIMGRID
  31. #include <core/simgrid.h>
  32. #endif
  33. static int link_supports_direct_transfers(starpu_data_handle_t handle, unsigned src_node, unsigned dst_node, unsigned *handling_node);
  34. int _starpu_select_src_node(starpu_data_handle_t handle, unsigned destination)
  35. {
  36. int src_node = -1;
  37. unsigned i;
  38. unsigned nnodes = starpu_memory_nodes_get_count();
  39. /* first find a valid copy, either a STARPU_OWNER or a STARPU_SHARED */
  40. unsigned node;
  41. size_t size = _starpu_data_get_size(handle);
  42. double cost = INFINITY;
  43. unsigned src_node_mask = 0;
  44. for (node = 0; node < nnodes; node++)
  45. {
  46. if (handle->per_node[node].state != STARPU_INVALID)
  47. {
  48. /* we found a copy ! */
  49. src_node_mask |= (1<<node);
  50. }
  51. }
  52. if (src_node_mask == 0 && handle->init_cl)
  53. {
  54. /* No copy yet, but applicationg told us how to build it. */
  55. return -1;
  56. }
  57. /* we should have found at least one copy ! */
  58. STARPU_ASSERT_MSG(src_node_mask != 0, "The data for the handle %p is requested, but the handle does not have a valid value. Perhaps some initialization task is missing?", handle);
  59. /* Without knowing the size, we won't know the cost */
  60. if (!size)
  61. cost = 0;
  62. /* Check whether we have transfer cost for all nodes, if so, take the minimum */
  63. if (cost)
  64. for (i = 0; i < nnodes; i++)
  65. {
  66. if (src_node_mask & (1<<i))
  67. {
  68. double time = starpu_transfer_predict(i, destination, size);
  69. unsigned handling_node;
  70. /* Avoid indirect transfers */
  71. if (!link_supports_direct_transfers(handle, i, destination, &handling_node))
  72. continue;
  73. if (_STARPU_IS_ZERO(time))
  74. {
  75. /* No estimation, will have to revert to dumb strategy */
  76. cost = 0.0;
  77. break;
  78. }
  79. else if (time < cost)
  80. {
  81. cost = time;
  82. src_node = i;
  83. }
  84. }
  85. }
  86. if (cost && src_node != -1)
  87. /* Could estimate through cost, return that */
  88. return src_node;
  89. int i_ram = -1;
  90. int i_gpu = -1;
  91. int i_disk = -1;
  92. /* Revert to dumb strategy: take RAM unless only a GPU has it */
  93. for (i = 0; i < nnodes; i++)
  94. {
  95. if (src_node_mask & (1<<i))
  96. {
  97. int (*can_copy)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node, unsigned handling_node) = handle->ops->copy_methods->can_copy;
  98. /* Avoid transfers which the interface does not want */
  99. if (can_copy)
  100. {
  101. void *src_interface = handle->per_node[i].data_interface;
  102. void *dst_interface = handle->per_node[destination].data_interface;
  103. unsigned handling_node;
  104. if (!link_supports_direct_transfers(handle, i, destination, &handling_node))
  105. {
  106. /* Avoid through RAM if the interface does not want it */
  107. void *ram_interface = handle->per_node[STARPU_MAIN_RAM].data_interface;
  108. if ((!can_copy(src_interface, i, ram_interface, STARPU_MAIN_RAM, i)
  109. && !can_copy(src_interface, i, ram_interface, STARPU_MAIN_RAM, STARPU_MAIN_RAM))
  110. || (!can_copy(ram_interface, STARPU_MAIN_RAM, dst_interface, destination, STARPU_MAIN_RAM)
  111. && !can_copy(ram_interface, STARPU_MAIN_RAM, dst_interface, destination, destination)))
  112. continue;
  113. }
  114. }
  115. /* however GPU are expensive sources, really !
  116. * Unless peer transfer is supported (and it would then have been selected above).
  117. * Other should be ok */
  118. if (starpu_node_get_kind(i) == STARPU_CUDA_RAM ||
  119. starpu_node_get_kind(i) == STARPU_OPENCL_RAM ||
  120. starpu_node_get_kind(i) == STARPU_MIC_RAM)
  121. i_gpu = i;
  122. if (starpu_node_get_kind(i) == STARPU_CPU_RAM ||
  123. starpu_node_get_kind(i) == STARPU_SCC_RAM ||
  124. starpu_node_get_kind(i) == STARPU_SCC_SHM)
  125. i_ram = i;
  126. if (starpu_node_get_kind(i) == STARPU_DISK_RAM)
  127. i_disk = i;
  128. }
  129. }
  130. /* we have to use cpu_ram in first */
  131. if (i_ram != -1)
  132. src_node = i_ram;
  133. /* no luck we have to use the disk memory */
  134. else if (i_gpu != -1)
  135. src_node = i_gpu;
  136. else
  137. src_node = i_disk;
  138. STARPU_ASSERT(src_node != -1);
  139. return src_node;
  140. }
  141. /* this may be called once the data is fetched with header and STARPU_RW-lock hold */
  142. void _starpu_update_data_state(starpu_data_handle_t handle,
  143. struct _starpu_data_replicate *requesting_replicate,
  144. enum starpu_data_access_mode mode)
  145. {
  146. /* There is nothing to do for relaxed coherency modes (scratch or
  147. * reductions) */
  148. if (!(mode & STARPU_RW))
  149. return;
  150. unsigned nnodes = starpu_memory_nodes_get_count();
  151. /* the data is present now */
  152. unsigned requesting_node = requesting_replicate->memory_node;
  153. requesting_replicate->requested &= ~(1UL << requesting_node);
  154. if (mode & STARPU_W)
  155. {
  156. /* the requesting node now has the only valid copy */
  157. unsigned node;
  158. for (node = 0; node < nnodes; node++)
  159. handle->per_node[node].state = STARPU_INVALID;
  160. requesting_replicate->state = STARPU_OWNER;
  161. if (handle->home_node != -1 && handle->per_node[handle->home_node].state == STARPU_INVALID)
  162. /* Notify that this MC is now dirty */
  163. _starpu_memchunk_dirty(requesting_replicate->mc, requesting_replicate->memory_node);
  164. }
  165. else
  166. { /* read only */
  167. if (requesting_replicate->state != STARPU_OWNER)
  168. {
  169. /* there was at least another copy of the data */
  170. unsigned node;
  171. for (node = 0; node < nnodes; node++)
  172. {
  173. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  174. if (replicate->state != STARPU_INVALID)
  175. replicate->state = STARPU_SHARED;
  176. }
  177. requesting_replicate->state = STARPU_SHARED;
  178. }
  179. }
  180. }
  181. static int worker_supports_direct_access(unsigned node, unsigned handling_node)
  182. {
  183. /* only support disk <-> ram and disk <-> disk */
  184. if (starpu_node_get_kind(node) == STARPU_DISK_RAM || starpu_node_get_kind(handling_node) == STARPU_DISK_RAM)
  185. return 0;
  186. if (node == handling_node)
  187. return 1;
  188. if (!_starpu_memory_node_get_nworkers(handling_node))
  189. /* No worker to process the request from that node */
  190. return 0;
  191. int type = starpu_node_get_kind(node);
  192. switch (type)
  193. {
  194. case STARPU_CUDA_RAM:
  195. {
  196. /* GPUs not always allow direct remote access: if CUDA4
  197. * is enabled, we allow two CUDA devices to communicate. */
  198. #ifdef STARPU_SIMGRID
  199. if (starpu_node_get_kind(handling_node) == STARPU_CUDA_RAM)
  200. {
  201. char name[16];
  202. msg_host_t host;
  203. const char* cuda_memcpy_peer;
  204. snprintf(name, sizeof(name), "CUDA%d", _starpu_memory_node_get_devid(handling_node));
  205. host = _starpu_simgrid_get_host_by_name(name);
  206. cuda_memcpy_peer = MSG_host_get_property_value(host, "memcpy_peer");
  207. return cuda_memcpy_peer && atoll(cuda_memcpy_peer);
  208. }
  209. else
  210. return 0;
  211. #elif defined(HAVE_CUDA_MEMCPY_PEER)
  212. /* simgrid */
  213. enum starpu_node_kind kind = starpu_node_get_kind(handling_node);
  214. return kind == STARPU_CUDA_RAM;
  215. #else /* HAVE_CUDA_MEMCPY_PEER */
  216. /* Direct GPU-GPU transfers are not allowed in general */
  217. return 0;
  218. #endif /* HAVE_CUDA_MEMCPY_PEER */
  219. }
  220. case STARPU_OPENCL_RAM:
  221. return 0;
  222. case STARPU_MIC_RAM:
  223. /* TODO: We don't handle direct MIC-MIC transfers yet */
  224. return 0;
  225. case STARPU_SCC_RAM:
  226. return 1;
  227. default:
  228. return 1;
  229. }
  230. }
  231. static int link_supports_direct_transfers(starpu_data_handle_t handle, unsigned src_node, unsigned dst_node, unsigned *handling_node)
  232. {
  233. int (*can_copy)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node, unsigned handling_node) = handle->ops->copy_methods->can_copy;
  234. void *src_interface = handle->per_node[src_node].data_interface;
  235. void *dst_interface = handle->per_node[dst_node].data_interface;
  236. /* XXX That's a hack until we fix cudaMemcpy3DPeerAsync in the block interface
  237. * Perhaps not all data interface provide a direct GPU-GPU transfer
  238. * method ! */
  239. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  240. if (src_node != dst_node && starpu_node_get_kind(src_node) == STARPU_CUDA_RAM && starpu_node_get_kind(dst_node) == STARPU_CUDA_RAM)
  241. {
  242. const struct starpu_data_copy_methods *copy_methods = handle->ops->copy_methods;
  243. if (!copy_methods->cuda_to_cuda_async && !copy_methods->any_to_any)
  244. return 0;
  245. }
  246. #endif
  247. /* Note: with CUDA, performance seems a bit better when issuing the transfer from the destination (tested without GPUDirect, but GPUDirect probably behave the same) */
  248. if (worker_supports_direct_access(src_node, dst_node) && (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, dst_node)))
  249. {
  250. *handling_node = dst_node;
  251. return 1;
  252. }
  253. if (worker_supports_direct_access(dst_node, src_node) && (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, src_node)))
  254. {
  255. *handling_node = src_node;
  256. return 1;
  257. }
  258. /* Link between disk and ram */
  259. if ((starpu_node_get_kind(src_node) == STARPU_DISK_RAM && starpu_node_get_kind(dst_node) == STARPU_CPU_RAM) ||
  260. (starpu_node_get_kind(src_node) == STARPU_CPU_RAM && starpu_node_get_kind(dst_node) == STARPU_DISK_RAM))
  261. {
  262. /* FIXME: not necessarily a worker :/ */
  263. *handling_node = STARPU_MAIN_RAM;
  264. return 1;
  265. }
  266. /* link between disk and disk, and they have the same kind */
  267. if (_starpu_is_same_kind_disk(src_node, dst_node))
  268. return 1;
  269. return 0;
  270. }
  271. /* Determines the path of a request : each hop is defined by (src,dst) and the
  272. * node that handles the hop. The returned value indicates the number of hops,
  273. * and the max_len is the maximum number of hops (ie. the size of the
  274. * src_nodes, dst_nodes and handling_nodes arrays. */
  275. static int determine_request_path(starpu_data_handle_t handle,
  276. int src_node, int dst_node,
  277. enum starpu_data_access_mode mode, int max_len,
  278. unsigned *src_nodes, unsigned *dst_nodes,
  279. unsigned *handling_nodes, unsigned write_invalidation)
  280. {
  281. if (src_node == dst_node || !(mode & STARPU_R))
  282. {
  283. if (write_invalidation)
  284. /* The invalidation request will be enough */
  285. return 0;
  286. /* The destination node should only allocate the data, no transfer is required */
  287. STARPU_ASSERT(max_len >= 1);
  288. src_nodes[0] = STARPU_MAIN_RAM; // ignored
  289. dst_nodes[0] = dst_node;
  290. handling_nodes[0] = dst_node;
  291. return 1;
  292. }
  293. unsigned handling_node;
  294. int link_is_valid = link_supports_direct_transfers(handle, src_node, dst_node, &handling_node);
  295. if (!link_is_valid)
  296. {
  297. int (*can_copy)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node, unsigned handling_node) = handle->ops->copy_methods->can_copy;
  298. void *src_interface = handle->per_node[src_node].data_interface;
  299. void *dst_interface = handle->per_node[dst_node].data_interface;
  300. /* We need an intermediate hop to implement data staging
  301. * through main memory. */
  302. STARPU_ASSERT(max_len >= 2);
  303. STARPU_ASSERT(src_node >= 0);
  304. /* GPU -> RAM */
  305. src_nodes[0] = src_node;
  306. dst_nodes[0] = STARPU_MAIN_RAM;
  307. if (starpu_node_get_kind(src_node) == STARPU_DISK_RAM)
  308. /* Disks don't have their own driver thread */
  309. handling_nodes[0] = dst_node;
  310. else if (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, src_node))
  311. {
  312. handling_nodes[0] = src_node;
  313. }
  314. else
  315. {
  316. STARPU_ASSERT_MSG(can_copy(src_interface, src_node, dst_interface, dst_node, dst_node), "interface %d refuses all kinds of transfers from node %u to node %u\n", handle->ops->interfaceid, src_node, dst_node);
  317. handling_nodes[0] = dst_node;
  318. }
  319. /* RAM -> GPU */
  320. src_nodes[1] = STARPU_MAIN_RAM;
  321. dst_nodes[1] = dst_node;
  322. if (starpu_node_get_kind(dst_node) == STARPU_DISK_RAM)
  323. /* Disks don't have their own driver thread */
  324. handling_nodes[1] = src_node;
  325. else if (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, dst_node))
  326. {
  327. handling_nodes[1] = dst_node;
  328. }
  329. else
  330. {
  331. STARPU_ASSERT_MSG(can_copy(src_interface, src_node, dst_interface, dst_node, src_node), "interface %d refuses all kinds of transfers from node %u to node %u\n", handle->ops->interfaceid, src_node, dst_node);
  332. handling_nodes[1] = src_node;
  333. }
  334. return 2;
  335. }
  336. else
  337. {
  338. STARPU_ASSERT(max_len >= 1);
  339. src_nodes[0] = src_node;
  340. dst_nodes[0] = dst_node;
  341. handling_nodes[0] = handling_node;
  342. #if !defined(HAVE_CUDA_MEMCPY_PEER) && !defined(STARPU_SIMGRID)
  343. STARPU_ASSERT(!(mode & STARPU_R) || starpu_node_get_kind(src_node) != STARPU_CUDA_RAM || starpu_node_get_kind(dst_node) != STARPU_CUDA_RAM);
  344. #endif
  345. return 1;
  346. }
  347. }
  348. /* handle->lock should be taken. r is returned locked. The node parameter
  349. * indicate either the source of the request, or the destination for a
  350. * write-only request. */
  351. static struct _starpu_data_request *_starpu_search_existing_data_request(struct _starpu_data_replicate *replicate, unsigned node, enum starpu_data_access_mode mode, unsigned is_prefetch)
  352. {
  353. struct _starpu_data_request *r;
  354. r = replicate->request[node];
  355. if (r)
  356. {
  357. _starpu_spin_checklocked(&r->handle->header_lock);
  358. _starpu_spin_lock(&r->lock);
  359. /* perhaps we need to "upgrade" the request */
  360. if (is_prefetch < r->prefetch)
  361. _starpu_update_prefetch_status(r, is_prefetch);
  362. if (mode & STARPU_R)
  363. {
  364. /* in case the exisiting request did not imply a memory
  365. * transfer yet, we have to take a second refcnt now
  366. * for the source, in addition to the refcnt for the
  367. * destination
  368. * (so that the source remains valid) */
  369. if (!(r->mode & STARPU_R))
  370. {
  371. replicate->refcnt++;
  372. replicate->handle->busy_count++;
  373. }
  374. r->mode = (enum starpu_data_access_mode) ((int) r->mode | (int) STARPU_R);
  375. }
  376. if (mode & STARPU_W)
  377. r->mode = (enum starpu_data_access_mode) ((int) r->mode | (int) STARPU_W);
  378. }
  379. return r;
  380. }
  381. /*
  382. * This function is called when the data is needed on the local node, this
  383. * returns a pointer to the local copy
  384. *
  385. * R STARPU_W STARPU_RW
  386. * Owner OK OK OK
  387. * Shared OK 1 1
  388. * Invalid 2 3 4
  389. *
  390. * case 1 : shared + (read)write :
  391. * no data copy but shared->Invalid/Owner
  392. * case 2 : invalid + read :
  393. * data copy + invalid->shared + owner->shared (STARPU_ASSERT(there is a valid))
  394. * case 3 : invalid + write :
  395. * no data copy + invalid->owner + (owner,shared)->invalid
  396. * case 4 : invalid + R/STARPU_W :
  397. * data copy + if (STARPU_W) (invalid->owner + owner->invalid)
  398. * else (invalid,owner->shared)
  399. */
  400. struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_handle_t handle,
  401. struct _starpu_data_replicate *dst_replicate,
  402. enum starpu_data_access_mode mode, unsigned is_prefetch,
  403. unsigned async,
  404. void (*callback_func)(void *), void *callback_arg, int prio)
  405. {
  406. /* We don't care about commuting for data requests, that was handled before. */
  407. mode &= ~STARPU_COMMUTE;
  408. /* This function is called with handle's header lock taken */
  409. _starpu_spin_checklocked(&handle->header_lock);
  410. unsigned requesting_node = dst_replicate->memory_node;
  411. unsigned nwait = 0;
  412. if (mode & STARPU_W)
  413. {
  414. /* We will write to the buffer. We will have to wait for all
  415. * existing requests before the last request which will
  416. * invalidate all their results (which were possibly spurious,
  417. * e.g. too aggressive eviction).
  418. */
  419. unsigned i, j;
  420. unsigned nnodes = starpu_memory_nodes_get_count();
  421. for (i = 0; i < nnodes; i++)
  422. for (j = 0; j < nnodes; j++)
  423. if (handle->per_node[i].request[j])
  424. nwait++;
  425. /* If the request is not detached (i.e. the caller really wants
  426. * proper ownership), no new requests will appear because a
  427. * reference will be kept on the dst replicate, which will
  428. * notably prevent data reclaiming.
  429. */
  430. }
  431. if (dst_replicate->state != STARPU_INVALID && (!nwait || is_prefetch))
  432. {
  433. #ifdef STARPU_MEMORY_STATS
  434. enum _starpu_cache_state old_state = dst_replicate->state;
  435. #endif
  436. /* the data is already available and we don't have to wait for
  437. * any request, so we can stop */
  438. _starpu_update_data_state(handle, dst_replicate, mode);
  439. _starpu_msi_cache_hit(requesting_node);
  440. #ifdef STARPU_MEMORY_STATS
  441. _starpu_memory_handle_stats_cache_hit(handle, requesting_node);
  442. /* XXX Broken ? */
  443. if (old_state == STARPU_SHARED
  444. && dst_replicate->state == STARPU_OWNER)
  445. _starpu_memory_handle_stats_shared_to_owner(handle, requesting_node);
  446. #endif
  447. if (dst_replicate->mc)
  448. _starpu_memchunk_recently_used(dst_replicate->mc, requesting_node);
  449. _starpu_spin_unlock(&handle->header_lock);
  450. if (callback_func)
  451. callback_func(callback_arg);
  452. _STARPU_LOG_OUT_TAG("data available");
  453. return NULL;
  454. }
  455. _starpu_msi_cache_miss(requesting_node);
  456. /* the only remaining situation is that the local copy was invalid */
  457. STARPU_ASSERT(dst_replicate->state == STARPU_INVALID || nwait);
  458. /* find someone who already has the data */
  459. int src_node = -1;
  460. if (mode & STARPU_R)
  461. {
  462. if (dst_replicate->state == STARPU_INVALID)
  463. src_node = _starpu_select_src_node(handle, requesting_node);
  464. else
  465. src_node = requesting_node;
  466. if (src_node < 0)
  467. {
  468. /* We will create it, no need to read an existing value */
  469. mode &= ~STARPU_R;
  470. }
  471. }
  472. else
  473. {
  474. /* if the data is in write only mode (and not SCRATCH or REDUX), there is no need for a source, data will be initialized by the task itself */
  475. if (mode & STARPU_W)
  476. dst_replicate->initialized = 1;
  477. if (requesting_node == STARPU_MAIN_RAM && !nwait)
  478. {
  479. /* And this is the main RAM, really no need for a
  480. * request, just allocate */
  481. if (_starpu_allocate_memory_on_node(handle, dst_replicate, is_prefetch) == 0)
  482. {
  483. _starpu_update_data_state(handle, dst_replicate, mode);
  484. _starpu_spin_unlock(&handle->header_lock);
  485. if (callback_func)
  486. callback_func(callback_arg);
  487. _STARPU_LOG_OUT_TAG("data immediately allocated");
  488. return NULL;
  489. }
  490. }
  491. }
  492. #define MAX_REQUESTS 4
  493. /* We can safely assume that there won't be more than 2 hops in the
  494. * current implementation */
  495. unsigned src_nodes[MAX_REQUESTS], dst_nodes[MAX_REQUESTS], handling_nodes[MAX_REQUESTS];
  496. /* keep one slot for the last W request, if any */
  497. int write_invalidation = (mode & STARPU_W) && nwait && !is_prefetch;
  498. int nhops = determine_request_path(handle, src_node, requesting_node, mode, MAX_REQUESTS,
  499. src_nodes, dst_nodes, handling_nodes, write_invalidation);
  500. STARPU_ASSERT(nhops >= 0 && nhops <= MAX_REQUESTS-1);
  501. struct _starpu_data_request *requests[nhops + write_invalidation];
  502. /* Did we reuse a request for that hop ? */
  503. int reused_requests[nhops + write_invalidation];
  504. /* Construct an array with a list of requests, possibly reusing existing requests */
  505. int hop;
  506. for (hop = 0; hop < nhops; hop++)
  507. {
  508. struct _starpu_data_request *r;
  509. unsigned hop_src_node = src_nodes[hop];
  510. unsigned hop_dst_node = dst_nodes[hop];
  511. unsigned hop_handling_node = handling_nodes[hop];
  512. struct _starpu_data_replicate *hop_src_replicate;
  513. struct _starpu_data_replicate *hop_dst_replicate;
  514. /* Only the first request is independant */
  515. unsigned ndeps = (hop == 0)?0:1;
  516. hop_src_replicate = &handle->per_node[hop_src_node];
  517. hop_dst_replicate = (hop != nhops - 1)?&handle->per_node[hop_dst_node]:dst_replicate;
  518. /* Try to reuse a request if possible */
  519. r = _starpu_search_existing_data_request(hop_dst_replicate,
  520. (mode & STARPU_R)?hop_src_node:hop_dst_node,
  521. mode, is_prefetch);
  522. reused_requests[hop] = !!r;
  523. if (!r)
  524. {
  525. /* Create a new request if there was no request to reuse */
  526. r = _starpu_create_data_request(handle, hop_src_replicate,
  527. hop_dst_replicate, hop_handling_node,
  528. mode, ndeps, is_prefetch, prio, 0);
  529. nwait++;
  530. }
  531. requests[hop] = r;
  532. }
  533. /* Chain these requests */
  534. for (hop = 0; hop < nhops; hop++)
  535. {
  536. struct _starpu_data_request *r;
  537. r = requests[hop];
  538. if (hop != nhops - 1)
  539. {
  540. if (!reused_requests[hop + 1])
  541. {
  542. r->next_req[r->next_req_count++] = requests[hop + 1];
  543. STARPU_ASSERT(r->next_req_count <= STARPU_MAXNODES);
  544. }
  545. }
  546. else if (!write_invalidation)
  547. /* The last request will perform the callback after termination */
  548. _starpu_data_request_append_callback(r, callback_func, callback_arg);
  549. if (reused_requests[hop])
  550. _starpu_spin_unlock(&r->lock);
  551. }
  552. if (write_invalidation)
  553. {
  554. /* Some requests were still pending, we have to add yet another
  555. * request, depending on them, which will invalidate their
  556. * result.
  557. */
  558. struct _starpu_data_request *r = _starpu_create_data_request(handle, dst_replicate,
  559. dst_replicate, requesting_node,
  560. STARPU_W, nwait, is_prefetch, prio, 1);
  561. /* and perform the callback after termination */
  562. _starpu_data_request_append_callback(r, callback_func, callback_arg);
  563. /* We will write to the buffer. We will have to wait for all
  564. * existing requests before the last request which will
  565. * invalidate all their results (which were possibly spurious,
  566. * e.g. too aggressive eviction).
  567. */
  568. unsigned i, j;
  569. unsigned nnodes = starpu_memory_nodes_get_count();
  570. for (i = 0; i < nnodes; i++)
  571. for (j = 0; j < nnodes; j++)
  572. {
  573. struct _starpu_data_request *r2 = handle->per_node[i].request[j];
  574. if (r2)
  575. {
  576. _starpu_spin_lock(&r2->lock);
  577. r2->next_req[r2->next_req_count++] = r;
  578. STARPU_ASSERT(r2->next_req_count <= STARPU_MAXNODES + 1);
  579. _starpu_spin_unlock(&r2->lock);
  580. nwait--;
  581. }
  582. }
  583. STARPU_ASSERT(nwait == 0);
  584. nhops++;
  585. requests[nhops - 1] = r;
  586. /* existing requests will post this one */
  587. reused_requests[nhops - 1] = 1;
  588. }
  589. STARPU_ASSERT(nhops);
  590. if (!async)
  591. requests[nhops - 1]->refcnt++;
  592. /* we only submit the first request, the remaining will be
  593. * automatically submitted afterward */
  594. if (!reused_requests[0])
  595. _starpu_post_data_request(requests[0], handling_nodes[0]);
  596. return requests[nhops - 1];
  597. }
  598. int _starpu_fetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *dst_replicate,
  599. enum starpu_data_access_mode mode, unsigned detached, unsigned is_prefetch, unsigned async,
  600. void (*callback_func)(void *), void *callback_arg, int prio)
  601. {
  602. unsigned local_node = _starpu_memory_node_get_local_key();
  603. _STARPU_LOG_IN();
  604. int cpt = 0;
  605. while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
  606. {
  607. cpt++;
  608. _starpu_datawizard_progress(local_node, 1);
  609. }
  610. if (cpt == STARPU_SPIN_MAXTRY)
  611. _starpu_spin_lock(&handle->header_lock);
  612. if (!detached)
  613. {
  614. /* Take a reference which will be released by _starpu_release_data_on_node */
  615. dst_replicate->refcnt++;
  616. dst_replicate->handle->busy_count++;
  617. }
  618. struct _starpu_data_request *r;
  619. r = _starpu_create_request_to_fetch_data(handle, dst_replicate, mode,
  620. is_prefetch, async, callback_func, callback_arg, prio);
  621. /* If no request was created, the handle was already up-to-date on the
  622. * node. In this case, _starpu_create_request_to_fetch_data has already
  623. * unlocked the header. */
  624. if (!r)
  625. return 0;
  626. _starpu_spin_unlock(&handle->header_lock);
  627. int ret = async?0:_starpu_wait_data_request_completion(r, 1);
  628. _STARPU_LOG_OUT();
  629. return ret;
  630. }
  631. static int idle_prefetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
  632. {
  633. return _starpu_fetch_data_on_node(handle, replicate, mode, 1, 2, 1, NULL, NULL, prio);
  634. }
  635. static int prefetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
  636. {
  637. return _starpu_fetch_data_on_node(handle, replicate, mode, 1, 1, 1, NULL, NULL, prio);
  638. }
  639. static int fetch_data(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
  640. {
  641. return _starpu_fetch_data_on_node(handle, replicate, mode, 0, 0, 0, NULL, NULL, prio);
  642. }
  643. uint32_t _starpu_get_data_refcnt(starpu_data_handle_t handle, unsigned node)
  644. {
  645. return handle->per_node[node].refcnt;
  646. }
  647. size_t _starpu_data_get_size(starpu_data_handle_t handle)
  648. {
  649. return handle->ops->get_size(handle);
  650. }
  651. uint32_t _starpu_data_get_footprint(starpu_data_handle_t handle)
  652. {
  653. return handle->footprint;
  654. }
  655. /* in case the data was accessed on a write mode, do not forget to
  656. * make it accessible again once it is possible ! */
  657. void _starpu_release_data_on_node(starpu_data_handle_t handle, uint32_t default_wt_mask, struct _starpu_data_replicate *replicate)
  658. {
  659. uint32_t wt_mask;
  660. wt_mask = default_wt_mask | handle->wt_mask;
  661. wt_mask &= (1<<starpu_memory_nodes_get_count())-1;
  662. /* Note that it is possible that there is no valid copy of the data (if
  663. * starpu_data_invalidate was called for instance). In that case, we do
  664. * not enforce any write-through mechanism. */
  665. unsigned memory_node = replicate->memory_node;
  666. if (replicate->state != STARPU_INVALID && handle->current_mode & STARPU_W)
  667. if ((wt_mask & ~(1<<memory_node)))
  668. _starpu_write_through_data(handle, memory_node, wt_mask);
  669. unsigned local_node = _starpu_memory_node_get_local_key();
  670. int cpt = 0;
  671. while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
  672. {
  673. cpt++;
  674. _starpu_datawizard_progress(local_node, 1);
  675. }
  676. if (cpt == STARPU_SPIN_MAXTRY)
  677. _starpu_spin_lock(&handle->header_lock);
  678. /* Release refcnt taken by fetch_data_on_node */
  679. replicate->refcnt--;
  680. STARPU_ASSERT_MSG(replicate->refcnt >= 0, "handle %p released too many times", handle);
  681. STARPU_ASSERT_MSG(handle->busy_count > 0, "handle %p released too many times", handle);
  682. handle->busy_count--;
  683. if (!_starpu_notify_data_dependencies(handle))
  684. _starpu_spin_unlock(&handle->header_lock);
  685. }
  686. static void _starpu_set_data_requested_flag_if_needed(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate)
  687. {
  688. unsigned local_node = _starpu_memory_node_get_local_key();
  689. int cpt = 0;
  690. while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
  691. {
  692. cpt++;
  693. _starpu_datawizard_progress(local_node, 1);
  694. }
  695. if (cpt == STARPU_SPIN_MAXTRY)
  696. _starpu_spin_lock(&handle->header_lock);
  697. if (replicate->state == STARPU_INVALID)
  698. {
  699. unsigned dst_node = replicate->memory_node;
  700. replicate->requested |= 1UL << dst_node;
  701. }
  702. _starpu_spin_unlock(&handle->header_lock);
  703. }
  704. int starpu_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned node, int prio)
  705. {
  706. STARPU_ASSERT(!task->prefetched);
  707. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  708. unsigned index;
  709. for (index = 0; index < nbuffers; index++)
  710. {
  711. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
  712. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
  713. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  714. continue;
  715. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  716. prefetch_data_on_node(handle, replicate, mode, prio);
  717. _starpu_set_data_requested_flag_if_needed(handle, replicate);
  718. }
  719. return 0;
  720. }
  721. int starpu_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
  722. {
  723. int prio = task->priority;
  724. if (task->workerorder)
  725. prio = INT_MAX - task->workerorder;
  726. return starpu_prefetch_task_input_on_node_prio(task, node, prio);
  727. }
  728. int starpu_idle_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned node, int prio)
  729. {
  730. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  731. unsigned index;
  732. for (index = 0; index < nbuffers; index++)
  733. {
  734. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
  735. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
  736. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  737. continue;
  738. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  739. idle_prefetch_data_on_node(handle, replicate, mode, prio);
  740. }
  741. return 0;
  742. }
  743. int starpu_idle_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
  744. {
  745. int prio = task->priority;
  746. if (task->workerorder)
  747. prio = INT_MAX - task->workerorder;
  748. return starpu_idle_prefetch_task_input_on_node_prio(task, node, prio);
  749. }
  750. static struct _starpu_data_replicate *get_replicate(starpu_data_handle_t handle, enum starpu_data_access_mode mode, int workerid, unsigned node)
  751. {
  752. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  753. {
  754. if (!handle->per_worker)
  755. {
  756. _starpu_spin_lock(&handle->header_lock);
  757. if (!handle->per_worker)
  758. _starpu_data_initialize_per_worker(handle);
  759. _starpu_spin_unlock(&handle->header_lock);
  760. }
  761. return &handle->per_worker[workerid];
  762. }
  763. else
  764. /* That's a "normal" buffer (R/W) */
  765. return &handle->per_node[node];
  766. }
  767. /* Synchronously fetch data for a given task (if it's not there already) */
  768. int _starpu_fetch_task_input(struct _starpu_job *j)
  769. {
  770. _STARPU_TRACE_START_FETCH_INPUT(NULL);
  771. int profiling = starpu_profiling_status_get();
  772. struct starpu_task *task = j->task;
  773. if (profiling && task->profiling_info)
  774. _starpu_clock_gettime(&task->profiling_info->acquire_data_start_time);
  775. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  776. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  777. unsigned local_memory_node = _starpu_memory_node_get_local_key();
  778. int workerid = starpu_worker_get_id_check();
  779. #ifdef STARPU_USE_FXT
  780. unsigned long total_size = 0;
  781. #endif
  782. unsigned index;
  783. for (index = 0; index < nbuffers; index++)
  784. {
  785. int ret;
  786. starpu_data_handle_t handle = descrs[index].handle;
  787. enum starpu_data_access_mode mode = descrs[index].mode;
  788. int node = descrs[index].node;
  789. if (node == -1)
  790. node = local_memory_node;
  791. if (mode == STARPU_NONE ||
  792. (mode & ((1<<STARPU_MODE_SHIFT) - 1)) >= STARPU_ACCESS_MODE_MAX ||
  793. (mode >> STARPU_MODE_SHIFT) >= (STARPU_SHIFTED_MODE_MAX >> STARPU_MODE_SHIFT))
  794. STARPU_ASSERT_MSG(0, "mode %d (0x%x) is bogus\n", mode, mode);
  795. struct _starpu_data_replicate *local_replicate;
  796. if (index && descrs[index-1].handle == descrs[index].handle)
  797. /* We have already took this data, skip it. This
  798. * depends on ordering putting writes before reads, see
  799. * _starpu_compar_handles */
  800. continue;
  801. local_replicate = get_replicate(handle, mode, workerid, node);
  802. ret = fetch_data(handle, local_replicate, mode, 0);
  803. if (STARPU_UNLIKELY(ret))
  804. goto enomem;
  805. #ifdef STARPU_USE_FXT
  806. total_size += _starpu_data_get_size(handle);
  807. #endif
  808. }
  809. _STARPU_TRACE_DATA_LOAD(workerid,total_size);
  810. /* Now that we have taken the data locks in locking order, fill the codelet interfaces in function order. */
  811. for (index = 0; index < nbuffers; index++)
  812. {
  813. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
  814. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
  815. int node = descrs[index].node;
  816. if (node == -1)
  817. node = local_memory_node;
  818. struct _starpu_data_replicate *local_replicate;
  819. local_replicate = get_replicate(handle, mode, workerid, node);
  820. _STARPU_TASK_SET_INTERFACE(task , local_replicate->data_interface, index);
  821. /* If the replicate was not initialized yet, we have to do it now */
  822. if (!(mode & STARPU_SCRATCH) && !local_replicate->initialized)
  823. _starpu_redux_init_data_replicate(handle, local_replicate, workerid);
  824. }
  825. if (profiling && task->profiling_info)
  826. _starpu_clock_gettime(&task->profiling_info->acquire_data_end_time);
  827. _STARPU_TRACE_END_FETCH_INPUT(NULL);
  828. return 0;
  829. enomem:
  830. _STARPU_TRACE_END_FETCH_INPUT(NULL);
  831. _STARPU_DISP("something went wrong with buffer %u\n", index);
  832. /* try to unreference all the input that were successfully taken */
  833. unsigned index2;
  834. for (index2 = 0; index2 < index; index2++)
  835. {
  836. starpu_data_handle_t handle = descrs[index2].handle;
  837. enum starpu_data_access_mode mode = descrs[index2].mode;
  838. int node = descrs[index].node;
  839. if (node == -1)
  840. node = local_memory_node;
  841. struct _starpu_data_replicate *local_replicate;
  842. if (index2 && descrs[index2-1].handle == descrs[index2].handle)
  843. /* We have already released this data, skip it. This
  844. * depends on ordering putting writes before reads, see
  845. * _starpu_compar_handles */
  846. continue;
  847. local_replicate = get_replicate(handle, mode, workerid, node);
  848. _starpu_release_data_on_node(handle, 0, local_replicate);
  849. }
  850. return -1;
  851. }
  852. /* Release task data dependencies */
  853. void __starpu_push_task_output(struct _starpu_job *j)
  854. {
  855. #ifdef STARPU_OPENMP
  856. STARPU_ASSERT(!j->continuation);
  857. #endif
  858. int profiling = starpu_profiling_status_get();
  859. struct starpu_task *task = j->task;
  860. if (profiling && task->profiling_info)
  861. _starpu_clock_gettime(&task->profiling_info->release_data_start_time);
  862. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  863. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  864. int workerid = starpu_worker_get_id();
  865. unsigned local_memory_node = _starpu_memory_node_get_local_key();
  866. unsigned index;
  867. for (index = 0; index < nbuffers; index++)
  868. {
  869. starpu_data_handle_t handle = descrs[index].handle;
  870. enum starpu_data_access_mode mode = descrs[index].mode;
  871. int node = descrs[index].node;
  872. if (node == -1 && task->cl->where != STARPU_NOWHERE)
  873. node = local_memory_node;
  874. struct _starpu_data_replicate *local_replicate;
  875. if (index && descrs[index-1].handle == descrs[index].handle)
  876. /* We have already released this data, skip it. This
  877. * depends on ordering putting writes before reads, see
  878. * _starpu_compar_handles */
  879. continue;
  880. if (node != -1)
  881. {
  882. STARPU_ASSERT(workerid >= 0);
  883. local_replicate = get_replicate(handle, mode, workerid, node);
  884. }
  885. /* Keep a reference for future
  886. * _starpu_release_task_enforce_sequential_consistency call */
  887. _starpu_spin_lock(&handle->header_lock);
  888. handle->busy_count++;
  889. if (node == -1)
  890. {
  891. /* NOWHERE case, just notify dependencies */
  892. if (!_starpu_notify_data_dependencies(handle))
  893. _starpu_spin_unlock(&handle->header_lock);
  894. }
  895. else
  896. {
  897. _starpu_spin_unlock(&handle->header_lock);
  898. _starpu_release_data_on_node(handle, 0, local_replicate);
  899. }
  900. }
  901. if (profiling && task->profiling_info)
  902. _starpu_clock_gettime(&task->profiling_info->release_data_end_time);
  903. }
  904. /* Version for a driver running on a worker: we show the driver state in the trace */
  905. void _starpu_push_task_output(struct _starpu_job *j)
  906. {
  907. _STARPU_TRACE_START_PUSH_OUTPUT(NULL);
  908. __starpu_push_task_output(j);
  909. _STARPU_TRACE_END_PUSH_OUTPUT(NULL);
  910. }
  911. struct fetch_nowhere_wrapper
  912. {
  913. struct _starpu_job *j;
  914. unsigned pending;
  915. };
  916. static void _starpu_fetch_nowhere_task_input_cb(void *arg);
  917. /* Asynchronously fetch data for a task which will have no content */
  918. void _starpu_fetch_nowhere_task_input(struct _starpu_job *j)
  919. {
  920. int profiling = starpu_profiling_status_get();
  921. struct starpu_task *task = j->task;
  922. if (profiling && task->profiling_info)
  923. _starpu_clock_gettime(&task->profiling_info->acquire_data_start_time);
  924. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  925. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  926. unsigned nfetchbuffers = 0;
  927. struct fetch_nowhere_wrapper *wrapper;
  928. unsigned index;
  929. for (index = 0; index < nbuffers; index++)
  930. {
  931. int node = descrs[index].node;
  932. if (node != -1)
  933. nfetchbuffers++;
  934. }
  935. if (!nfetchbuffers)
  936. {
  937. /* Nothing to fetch actually, already finished! */
  938. __starpu_push_task_output(j);
  939. _starpu_handle_job_termination(j);
  940. _STARPU_LOG_OUT_TAG("handle_job_termination");
  941. return;
  942. }
  943. wrapper = malloc(sizeof(*wrapper));
  944. wrapper->j = j;
  945. wrapper->pending = nfetchbuffers;
  946. for (index = 0; index < nbuffers; index++)
  947. {
  948. starpu_data_handle_t handle = descrs[index].handle;
  949. enum starpu_data_access_mode mode = descrs[index].mode;
  950. int node = descrs[index].node;
  951. if (node == -1)
  952. continue;
  953. if (mode == STARPU_NONE ||
  954. (mode & ((1<<STARPU_MODE_SHIFT) - 1)) >= STARPU_ACCESS_MODE_MAX ||
  955. (mode >> STARPU_MODE_SHIFT) >= (STARPU_SHIFTED_MODE_MAX >> STARPU_MODE_SHIFT))
  956. STARPU_ASSERT_MSG(0, "mode %d (0x%x) is bogus\n", mode, mode);
  957. STARPU_ASSERT(mode != STARPU_SCRATCH && mode != STARPU_REDUX);
  958. struct _starpu_data_replicate *local_replicate;
  959. local_replicate = get_replicate(handle, mode, -1, node);
  960. _starpu_fetch_data_on_node(handle, local_replicate, mode, 0, 0, 1, _starpu_fetch_nowhere_task_input_cb, wrapper, 0);
  961. }
  962. if (profiling && task->profiling_info)
  963. _starpu_clock_gettime(&task->profiling_info->acquire_data_end_time);
  964. }
  965. static void _starpu_fetch_nowhere_task_input_cb(void *arg)
  966. {
  967. /* One more transfer finished */
  968. struct fetch_nowhere_wrapper *wrapper = arg;
  969. unsigned pending = STARPU_ATOMIC_ADD(&wrapper->pending, -1);
  970. ANNOTATE_HAPPENS_BEFORE(&wrapper->pending);
  971. if (pending == 0)
  972. {
  973. ANNOTATE_HAPPENS_AFTER(&wrapper->pending);
  974. /* Finished transferring, task is over */
  975. struct _starpu_job *j = wrapper->j;
  976. free(wrapper);
  977. __starpu_push_task_output(j);
  978. _starpu_handle_job_termination(j);
  979. _STARPU_LOG_OUT_TAG("handle_job_termination");
  980. }
  981. }
  982. /* NB : this value can only be an indication of the status of a data
  983. at some point, but there is no strong garantee ! */
  984. unsigned _starpu_is_data_present_or_requested(starpu_data_handle_t handle, unsigned node)
  985. {
  986. unsigned ret = 0;
  987. // XXX : this is just a hint, so we don't take the lock ...
  988. // STARPU_PTHREAD_SPIN_LOCK(&handle->header_lock);
  989. if (handle->per_node[node].state != STARPU_INVALID)
  990. {
  991. ret = 1;
  992. }
  993. else
  994. {
  995. unsigned i;
  996. unsigned nnodes = starpu_memory_nodes_get_count();
  997. for (i = 0; i < nnodes; i++)
  998. {
  999. if ((handle->per_node[node].requested & (1UL << i)) || handle->per_node[node].request[i])
  1000. ret = 1;
  1001. }
  1002. }
  1003. // STARPU_PTHREAD_SPIN_UNLOCK(&handle->header_lock);
  1004. return ret;
  1005. }
  1006. void _starpu_data_set_unregister_hook(starpu_data_handle_t handle, _starpu_data_handle_unregister_hook func)
  1007. {
  1008. handle->unregister_hook = func;
  1009. }