coherency.c 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures. *
  2. * Copyright (C) 2009-2016 Université de Bordeaux
  3. * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016 CNRS
  4. * Copyright (C) 2014 INRIA
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <common/config.h>
  18. #include <datawizard/coherency.h>
  19. #include <datawizard/copy_driver.h>
  20. #include <datawizard/write_back.h>
  21. #include <datawizard/memory_nodes.h>
  22. #include <core/dependencies/data_concurrency.h>
  23. #include <core/disk.h>
  24. #include <profiling/profiling.h>
  25. #include <math.h>
  26. #include <core/task.h>
  27. #include <starpu_scheduler.h>
  28. #include <core/workers.h>
  29. #include <limits.h>
  30. #ifdef STARPU_SIMGRID
  31. #include <core/simgrid.h>
  32. #endif
  33. static int link_supports_direct_transfers(starpu_data_handle_t handle, unsigned src_node, unsigned dst_node, unsigned *handling_node);
  34. int _starpu_select_src_node(starpu_data_handle_t handle, unsigned destination)
  35. {
  36. int src_node = -1;
  37. unsigned i;
  38. unsigned nnodes = starpu_memory_nodes_get_count();
  39. /* first find a valid copy, either a STARPU_OWNER or a STARPU_SHARED */
  40. unsigned node;
  41. size_t size = _starpu_data_get_size(handle);
  42. double cost = INFINITY;
  43. unsigned src_node_mask = 0;
  44. for (node = 0; node < nnodes; node++)
  45. {
  46. if (handle->per_node[node].state != STARPU_INVALID)
  47. {
  48. /* we found a copy ! */
  49. src_node_mask |= (1<<node);
  50. }
  51. }
  52. if (src_node_mask == 0 && handle->init_cl)
  53. {
  54. /* No copy yet, but applicationg told us how to build it. */
  55. return -1;
  56. }
  57. /* we should have found at least one copy ! */
  58. STARPU_ASSERT_MSG(src_node_mask != 0, "The data for the handle %p is requested, but the handle does not have a valid value. Perhaps some initialization task is missing?", handle);
  59. /* Without knowing the size, we won't know the cost */
  60. if (!size)
  61. cost = 0;
  62. /* Check whether we have transfer cost for all nodes, if so, take the minimum */
  63. if (cost)
  64. for (i = 0; i < nnodes; i++)
  65. {
  66. if (src_node_mask & (1<<i))
  67. {
  68. double time = starpu_transfer_predict(i, destination, size);
  69. unsigned handling_node;
  70. /* Avoid indirect transfers */
  71. if (!link_supports_direct_transfers(handle, i, destination, &handling_node))
  72. continue;
  73. if (_STARPU_IS_ZERO(time))
  74. {
  75. /* No estimation, will have to revert to dumb strategy */
  76. cost = 0.0;
  77. break;
  78. }
  79. else if (time < cost)
  80. {
  81. cost = time;
  82. src_node = i;
  83. }
  84. }
  85. }
  86. if (cost && src_node != -1)
  87. {
  88. /* Could estimate through cost, return that */
  89. STARPU_ASSERT(handle->per_node[src_node].allocated);
  90. STARPU_ASSERT(handle->per_node[src_node].initialized);
  91. return src_node;
  92. }
  93. int i_ram = -1;
  94. int i_gpu = -1;
  95. int i_disk = -1;
  96. /* Revert to dumb strategy: take RAM unless only a GPU has it */
  97. for (i = 0; i < nnodes; i++)
  98. {
  99. if (src_node_mask & (1<<i))
  100. {
  101. int (*can_copy)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node, unsigned handling_node) = handle->ops->copy_methods->can_copy;
  102. /* Avoid transfers which the interface does not want */
  103. if (can_copy)
  104. {
  105. void *src_interface = handle->per_node[i].data_interface;
  106. void *dst_interface = handle->per_node[destination].data_interface;
  107. unsigned handling_node;
  108. if (!link_supports_direct_transfers(handle, i, destination, &handling_node))
  109. {
  110. /* Avoid through RAM if the interface does not want it */
  111. void *ram_interface = handle->per_node[STARPU_MAIN_RAM].data_interface;
  112. if ((!can_copy(src_interface, i, ram_interface, STARPU_MAIN_RAM, i)
  113. && !can_copy(src_interface, i, ram_interface, STARPU_MAIN_RAM, STARPU_MAIN_RAM))
  114. || (!can_copy(ram_interface, STARPU_MAIN_RAM, dst_interface, destination, STARPU_MAIN_RAM)
  115. && !can_copy(ram_interface, STARPU_MAIN_RAM, dst_interface, destination, destination)))
  116. continue;
  117. }
  118. }
  119. /* however GPU are expensive sources, really !
  120. * Unless peer transfer is supported (and it would then have been selected above).
  121. * Other should be ok */
  122. if (starpu_node_get_kind(i) == STARPU_CUDA_RAM ||
  123. starpu_node_get_kind(i) == STARPU_OPENCL_RAM ||
  124. starpu_node_get_kind(i) == STARPU_MIC_RAM)
  125. i_gpu = i;
  126. if (starpu_node_get_kind(i) == STARPU_CPU_RAM ||
  127. starpu_node_get_kind(i) == STARPU_SCC_RAM ||
  128. starpu_node_get_kind(i) == STARPU_SCC_SHM)
  129. i_ram = i;
  130. if (starpu_node_get_kind(i) == STARPU_DISK_RAM)
  131. i_disk = i;
  132. }
  133. }
  134. /* we have to use cpu_ram in first */
  135. if (i_ram != -1)
  136. src_node = i_ram;
  137. /* no luck we have to use the disk memory */
  138. else if (i_gpu != -1)
  139. src_node = i_gpu;
  140. else
  141. src_node = i_disk;
  142. STARPU_ASSERT(src_node != -1);
  143. STARPU_ASSERT(handle->per_node[src_node].allocated);
  144. STARPU_ASSERT(handle->per_node[src_node].initialized);
  145. return src_node;
  146. }
  147. /* this may be called once the data is fetched with header and STARPU_RW-lock hold */
  148. void _starpu_update_data_state(starpu_data_handle_t handle,
  149. struct _starpu_data_replicate *requesting_replicate,
  150. enum starpu_data_access_mode mode)
  151. {
  152. /* There is nothing to do for relaxed coherency modes (scratch or
  153. * reductions) */
  154. if (!(mode & STARPU_RW))
  155. return;
  156. unsigned nnodes = starpu_memory_nodes_get_count();
  157. /* the data is present now */
  158. unsigned requesting_node = requesting_replicate->memory_node;
  159. requesting_replicate->requested &= ~(1UL << requesting_node);
  160. if (mode & STARPU_W)
  161. {
  162. /* the requesting node now has the only valid copy */
  163. unsigned node;
  164. for (node = 0; node < nnodes; node++)
  165. {
  166. _STARPU_TRACE_DATA_INVALIDATE(handle, node);
  167. handle->per_node[node].state = STARPU_INVALID;
  168. }
  169. requesting_replicate->state = STARPU_OWNER;
  170. if (handle->home_node != -1 && handle->per_node[handle->home_node].state == STARPU_INVALID)
  171. /* Notify that this MC is now dirty */
  172. _starpu_memchunk_dirty(requesting_replicate->mc, requesting_replicate->memory_node);
  173. }
  174. else
  175. { /* read only */
  176. if (requesting_replicate->state != STARPU_OWNER)
  177. {
  178. /* there was at least another copy of the data */
  179. unsigned node;
  180. for (node = 0; node < nnodes; node++)
  181. {
  182. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  183. if (replicate->state != STARPU_INVALID)
  184. replicate->state = STARPU_SHARED;
  185. }
  186. requesting_replicate->state = STARPU_SHARED;
  187. }
  188. }
  189. }
  190. static int worker_supports_direct_access(unsigned node, unsigned handling_node)
  191. {
  192. /* only support disk <-> ram and disk <-> disk */
  193. if (starpu_node_get_kind(node) == STARPU_DISK_RAM || starpu_node_get_kind(handling_node) == STARPU_DISK_RAM)
  194. return 0;
  195. if (node == handling_node)
  196. return 1;
  197. if (!_starpu_memory_node_get_nworkers(handling_node))
  198. /* No worker to process the request from that node */
  199. return 0;
  200. int type = starpu_node_get_kind(node);
  201. switch (type)
  202. {
  203. case STARPU_CUDA_RAM:
  204. {
  205. /* GPUs not always allow direct remote access: if CUDA4
  206. * is enabled, we allow two CUDA devices to communicate. */
  207. #ifdef STARPU_SIMGRID
  208. if (starpu_node_get_kind(handling_node) == STARPU_CUDA_RAM)
  209. {
  210. char name[16];
  211. msg_host_t host;
  212. const char* cuda_memcpy_peer;
  213. snprintf(name, sizeof(name), "CUDA%d", _starpu_memory_node_get_devid(handling_node));
  214. host = _starpu_simgrid_get_host_by_name(name);
  215. cuda_memcpy_peer = MSG_host_get_property_value(host, "memcpy_peer");
  216. return cuda_memcpy_peer && atoll(cuda_memcpy_peer);
  217. }
  218. else
  219. return 0;
  220. #elif defined(HAVE_CUDA_MEMCPY_PEER)
  221. /* simgrid */
  222. enum starpu_node_kind kind = starpu_node_get_kind(handling_node);
  223. return kind == STARPU_CUDA_RAM;
  224. #else /* HAVE_CUDA_MEMCPY_PEER */
  225. /* Direct GPU-GPU transfers are not allowed in general */
  226. return 0;
  227. #endif /* HAVE_CUDA_MEMCPY_PEER */
  228. }
  229. case STARPU_OPENCL_RAM:
  230. return 0;
  231. case STARPU_MIC_RAM:
  232. /* TODO: We don't handle direct MIC-MIC transfers yet */
  233. return 0;
  234. case STARPU_SCC_RAM:
  235. return 1;
  236. default:
  237. return 1;
  238. }
  239. }
  240. static int link_supports_direct_transfers(starpu_data_handle_t handle, unsigned src_node, unsigned dst_node, unsigned *handling_node)
  241. {
  242. int (*can_copy)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node, unsigned handling_node) = handle->ops->copy_methods->can_copy;
  243. void *src_interface = handle->per_node[src_node].data_interface;
  244. void *dst_interface = handle->per_node[dst_node].data_interface;
  245. /* XXX That's a hack until we fix cudaMemcpy3DPeerAsync in the block interface
  246. * Perhaps not all data interface provide a direct GPU-GPU transfer
  247. * method ! */
  248. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  249. if (src_node != dst_node && starpu_node_get_kind(src_node) == STARPU_CUDA_RAM && starpu_node_get_kind(dst_node) == STARPU_CUDA_RAM)
  250. {
  251. const struct starpu_data_copy_methods *copy_methods = handle->ops->copy_methods;
  252. if (!copy_methods->cuda_to_cuda_async && !copy_methods->any_to_any)
  253. return 0;
  254. }
  255. #endif
  256. /* Note: with CUDA, performance seems a bit better when issuing the transfer from the destination (tested without GPUDirect, but GPUDirect probably behave the same) */
  257. if (worker_supports_direct_access(src_node, dst_node) && (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, dst_node)))
  258. {
  259. *handling_node = dst_node;
  260. return 1;
  261. }
  262. if (worker_supports_direct_access(dst_node, src_node) && (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, src_node)))
  263. {
  264. *handling_node = src_node;
  265. return 1;
  266. }
  267. /* Link between disk and ram */
  268. if ((starpu_node_get_kind(src_node) == STARPU_DISK_RAM && starpu_node_get_kind(dst_node) == STARPU_CPU_RAM) ||
  269. (starpu_node_get_kind(src_node) == STARPU_CPU_RAM && starpu_node_get_kind(dst_node) == STARPU_DISK_RAM))
  270. {
  271. /* FIXME: not necessarily a worker :/ */
  272. *handling_node = STARPU_MAIN_RAM;
  273. return 1;
  274. }
  275. /* link between disk and disk, and they have the same kind */
  276. if (_starpu_is_same_kind_disk(src_node, dst_node))
  277. return 1;
  278. return 0;
  279. }
  280. /* Determines the path of a request : each hop is defined by (src,dst) and the
  281. * node that handles the hop. The returned value indicates the number of hops,
  282. * and the max_len is the maximum number of hops (ie. the size of the
  283. * src_nodes, dst_nodes and handling_nodes arrays. */
  284. static int determine_request_path(starpu_data_handle_t handle,
  285. int src_node, int dst_node,
  286. enum starpu_data_access_mode mode, int max_len,
  287. unsigned *src_nodes, unsigned *dst_nodes,
  288. unsigned *handling_nodes, unsigned write_invalidation)
  289. {
  290. if (src_node == dst_node || !(mode & STARPU_R))
  291. {
  292. if (write_invalidation)
  293. /* The invalidation request will be enough */
  294. return 0;
  295. /* The destination node should only allocate the data, no transfer is required */
  296. STARPU_ASSERT(max_len >= 1);
  297. src_nodes[0] = STARPU_MAIN_RAM; // ignored
  298. dst_nodes[0] = dst_node;
  299. handling_nodes[0] = dst_node;
  300. return 1;
  301. }
  302. unsigned handling_node;
  303. int link_is_valid = link_supports_direct_transfers(handle, src_node, dst_node, &handling_node);
  304. if (!link_is_valid)
  305. {
  306. int (*can_copy)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node, unsigned handling_node) = handle->ops->copy_methods->can_copy;
  307. void *src_interface = handle->per_node[src_node].data_interface;
  308. void *dst_interface = handle->per_node[dst_node].data_interface;
  309. /* We need an intermediate hop to implement data staging
  310. * through main memory. */
  311. STARPU_ASSERT(max_len >= 2);
  312. STARPU_ASSERT(src_node >= 0);
  313. /* GPU -> RAM */
  314. src_nodes[0] = src_node;
  315. dst_nodes[0] = STARPU_MAIN_RAM;
  316. if (starpu_node_get_kind(src_node) == STARPU_DISK_RAM)
  317. /* Disks don't have their own driver thread */
  318. handling_nodes[0] = dst_node;
  319. else if (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, src_node))
  320. {
  321. handling_nodes[0] = src_node;
  322. }
  323. else
  324. {
  325. STARPU_ASSERT_MSG(can_copy(src_interface, src_node, dst_interface, dst_node, dst_node), "interface %d refuses all kinds of transfers from node %u to node %u\n", handle->ops->interfaceid, src_node, dst_node);
  326. handling_nodes[0] = dst_node;
  327. }
  328. /* RAM -> GPU */
  329. src_nodes[1] = STARPU_MAIN_RAM;
  330. dst_nodes[1] = dst_node;
  331. if (starpu_node_get_kind(dst_node) == STARPU_DISK_RAM)
  332. /* Disks don't have their own driver thread */
  333. handling_nodes[1] = src_node;
  334. else if (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, dst_node))
  335. {
  336. handling_nodes[1] = dst_node;
  337. }
  338. else
  339. {
  340. STARPU_ASSERT_MSG(can_copy(src_interface, src_node, dst_interface, dst_node, src_node), "interface %d refuses all kinds of transfers from node %u to node %u\n", handle->ops->interfaceid, src_node, dst_node);
  341. handling_nodes[1] = src_node;
  342. }
  343. return 2;
  344. }
  345. else
  346. {
  347. STARPU_ASSERT(max_len >= 1);
  348. src_nodes[0] = src_node;
  349. dst_nodes[0] = dst_node;
  350. handling_nodes[0] = handling_node;
  351. #if !defined(HAVE_CUDA_MEMCPY_PEER) && !defined(STARPU_SIMGRID)
  352. STARPU_ASSERT(!(mode & STARPU_R) || starpu_node_get_kind(src_node) != STARPU_CUDA_RAM || starpu_node_get_kind(dst_node) != STARPU_CUDA_RAM);
  353. #endif
  354. return 1;
  355. }
  356. }
  357. /* handle->lock should be taken. r is returned locked. The node parameter
  358. * indicate either the source of the request, or the destination for a
  359. * write-only request. */
  360. static struct _starpu_data_request *_starpu_search_existing_data_request(struct _starpu_data_replicate *replicate, unsigned node, enum starpu_data_access_mode mode, unsigned is_prefetch)
  361. {
  362. struct _starpu_data_request *r;
  363. r = replicate->request[node];
  364. if (r)
  365. {
  366. _starpu_spin_checklocked(&r->handle->header_lock);
  367. _starpu_spin_lock(&r->lock);
  368. /* perhaps we need to "upgrade" the request */
  369. if (is_prefetch < r->prefetch)
  370. _starpu_update_prefetch_status(r, is_prefetch);
  371. if (mode & STARPU_R)
  372. {
  373. /* in case the exisiting request did not imply a memory
  374. * transfer yet, we have to take a second refcnt now
  375. * for the source, in addition to the refcnt for the
  376. * destination
  377. * (so that the source remains valid) */
  378. if (!(r->mode & STARPU_R))
  379. {
  380. replicate->refcnt++;
  381. replicate->handle->busy_count++;
  382. }
  383. r->mode = (enum starpu_data_access_mode) ((int) r->mode | (int) STARPU_R);
  384. }
  385. if (mode & STARPU_W)
  386. r->mode = (enum starpu_data_access_mode) ((int) r->mode | (int) STARPU_W);
  387. }
  388. return r;
  389. }
  390. /*
  391. * This function is called when the data is needed on the local node, this
  392. * returns a pointer to the local copy
  393. *
  394. * R STARPU_W STARPU_RW
  395. * Owner OK OK OK
  396. * Shared OK 1 1
  397. * Invalid 2 3 4
  398. *
  399. * case 1 : shared + (read)write :
  400. * no data copy but shared->Invalid/Owner
  401. * case 2 : invalid + read :
  402. * data copy + invalid->shared + owner->shared (STARPU_ASSERT(there is a valid))
  403. * case 3 : invalid + write :
  404. * no data copy + invalid->owner + (owner,shared)->invalid
  405. * case 4 : invalid + R/STARPU_W :
  406. * data copy + if (STARPU_W) (invalid->owner + owner->invalid)
  407. * else (invalid,owner->shared)
  408. */
  409. struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_handle_t handle,
  410. struct _starpu_data_replicate *dst_replicate,
  411. enum starpu_data_access_mode mode, unsigned is_prefetch,
  412. unsigned async,
  413. void (*callback_func)(void *), void *callback_arg, int prio, const char *origin)
  414. {
  415. /* We don't care about commuting for data requests, that was handled before. */
  416. mode &= ~STARPU_COMMUTE;
  417. /* This function is called with handle's header lock taken */
  418. _starpu_spin_checklocked(&handle->header_lock);
  419. int requesting_node = dst_replicate ? dst_replicate->memory_node : -1;
  420. unsigned nwait = 0;
  421. if (mode & STARPU_W)
  422. {
  423. /* We will write to the buffer. We will have to wait for all
  424. * existing requests before the last request which will
  425. * invalidate all their results (which were possibly spurious,
  426. * e.g. too aggressive eviction).
  427. */
  428. unsigned i, j;
  429. unsigned nnodes = starpu_memory_nodes_get_count();
  430. for (i = 0; i < nnodes; i++)
  431. for (j = 0; j < nnodes; j++)
  432. if (handle->per_node[i].request[j])
  433. nwait++;
  434. /* If the request is not detached (i.e. the caller really wants
  435. * proper ownership), no new requests will appear because a
  436. * reference will be kept on the dst replicate, which will
  437. * notably prevent data reclaiming.
  438. */
  439. }
  440. if ((!dst_replicate || dst_replicate->state != STARPU_INVALID) && (!nwait || is_prefetch))
  441. {
  442. if (dst_replicate)
  443. {
  444. #ifdef STARPU_MEMORY_STATS
  445. enum _starpu_cache_state old_state = dst_replicate->state;
  446. #endif
  447. /* the data is already available and we don't have to wait for
  448. * any request, so we can stop */
  449. _starpu_update_data_state(handle, dst_replicate, mode);
  450. _starpu_msi_cache_hit(requesting_node);
  451. #ifdef STARPU_MEMORY_STATS
  452. _starpu_memory_handle_stats_cache_hit(handle, requesting_node);
  453. /* XXX Broken ? */
  454. if (old_state == STARPU_SHARED
  455. && dst_replicate->state == STARPU_OWNER)
  456. _starpu_memory_handle_stats_shared_to_owner(handle, requesting_node);
  457. #endif
  458. if (dst_replicate->mc)
  459. _starpu_memchunk_recently_used(dst_replicate->mc, requesting_node);
  460. }
  461. _starpu_spin_unlock(&handle->header_lock);
  462. if (callback_func)
  463. callback_func(callback_arg);
  464. _STARPU_LOG_OUT_TAG("data available");
  465. return NULL;
  466. }
  467. if (dst_replicate)
  468. _starpu_msi_cache_miss(requesting_node);
  469. /* the only remaining situation is that the local copy was invalid */
  470. STARPU_ASSERT((dst_replicate && dst_replicate->state == STARPU_INVALID) || nwait);
  471. /* find someone who already has the data */
  472. int src_node = -1;
  473. if (dst_replicate && mode & STARPU_R)
  474. {
  475. if (dst_replicate->state == STARPU_INVALID)
  476. src_node = _starpu_select_src_node(handle, requesting_node);
  477. else
  478. src_node = requesting_node;
  479. if (src_node < 0)
  480. {
  481. /* We will create it, no need to read an existing value */
  482. mode &= ~STARPU_R;
  483. }
  484. }
  485. else if (dst_replicate)
  486. {
  487. /* if the data is in write only mode (and not SCRATCH or REDUX), there is no need for a source, data will be initialized by the task itself */
  488. if (mode & STARPU_W)
  489. dst_replicate->initialized = 1;
  490. if (requesting_node == STARPU_MAIN_RAM && !nwait)
  491. {
  492. /* And this is the main RAM, really no need for a
  493. * request, just allocate */
  494. if (_starpu_allocate_memory_on_node(handle, dst_replicate, is_prefetch) == 0)
  495. {
  496. _starpu_update_data_state(handle, dst_replicate, mode);
  497. _starpu_spin_unlock(&handle->header_lock);
  498. if (callback_func)
  499. callback_func(callback_arg);
  500. _STARPU_LOG_OUT_TAG("data immediately allocated");
  501. return NULL;
  502. }
  503. }
  504. }
  505. #define MAX_REQUESTS 4
  506. /* We can safely assume that there won't be more than 2 hops in the
  507. * current implementation */
  508. unsigned src_nodes[MAX_REQUESTS], dst_nodes[MAX_REQUESTS], handling_nodes[MAX_REQUESTS];
  509. /* keep one slot for the last W request, if any */
  510. int write_invalidation = (mode & STARPU_W) && nwait && !is_prefetch;
  511. int nhops = determine_request_path(handle, src_node, requesting_node, mode, MAX_REQUESTS,
  512. src_nodes, dst_nodes, handling_nodes, write_invalidation);
  513. STARPU_ASSERT(nhops >= 0 && nhops <= MAX_REQUESTS-1);
  514. struct _starpu_data_request *requests[nhops + write_invalidation];
  515. /* Did we reuse a request for that hop ? */
  516. int reused_requests[nhops + write_invalidation];
  517. /* Construct an array with a list of requests, possibly reusing existing requests */
  518. int hop;
  519. for (hop = 0; hop < nhops; hop++)
  520. {
  521. struct _starpu_data_request *r;
  522. unsigned hop_src_node = src_nodes[hop];
  523. unsigned hop_dst_node = dst_nodes[hop];
  524. unsigned hop_handling_node = handling_nodes[hop];
  525. struct _starpu_data_replicate *hop_src_replicate;
  526. struct _starpu_data_replicate *hop_dst_replicate;
  527. /* Only the first request is independant */
  528. unsigned ndeps = (hop == 0)?0:1;
  529. hop_src_replicate = &handle->per_node[hop_src_node];
  530. hop_dst_replicate = (hop != nhops - 1)?&handle->per_node[hop_dst_node]:dst_replicate;
  531. /* Try to reuse a request if possible */
  532. r = _starpu_search_existing_data_request(hop_dst_replicate,
  533. (mode & STARPU_R)?hop_src_node:hop_dst_node,
  534. mode, is_prefetch);
  535. reused_requests[hop] = !!r;
  536. if (!r)
  537. {
  538. /* Create a new request if there was no request to reuse */
  539. r = _starpu_create_data_request(handle, hop_src_replicate,
  540. hop_dst_replicate, hop_handling_node,
  541. mode, ndeps, is_prefetch, prio, 0, origin);
  542. nwait++;
  543. }
  544. requests[hop] = r;
  545. }
  546. /* Chain these requests */
  547. for (hop = 0; hop < nhops; hop++)
  548. {
  549. struct _starpu_data_request *r;
  550. r = requests[hop];
  551. if (hop != nhops - 1)
  552. {
  553. if (!reused_requests[hop + 1])
  554. {
  555. r->next_req[r->next_req_count++] = requests[hop + 1];
  556. STARPU_ASSERT(r->next_req_count <= STARPU_MAXNODES);
  557. }
  558. }
  559. else if (!write_invalidation)
  560. /* The last request will perform the callback after termination */
  561. _starpu_data_request_append_callback(r, callback_func, callback_arg);
  562. if (reused_requests[hop])
  563. _starpu_spin_unlock(&r->lock);
  564. }
  565. if (write_invalidation)
  566. {
  567. /* Some requests were still pending, we have to add yet another
  568. * request, depending on them, which will invalidate their
  569. * result.
  570. */
  571. struct _starpu_data_request *r = _starpu_create_data_request(handle, dst_replicate,
  572. dst_replicate, requesting_node,
  573. STARPU_W, nwait, is_prefetch, prio, 1, origin);
  574. /* and perform the callback after termination */
  575. _starpu_data_request_append_callback(r, callback_func, callback_arg);
  576. /* We will write to the buffer. We will have to wait for all
  577. * existing requests before the last request which will
  578. * invalidate all their results (which were possibly spurious,
  579. * e.g. too aggressive eviction).
  580. */
  581. unsigned i, j;
  582. unsigned nnodes = starpu_memory_nodes_get_count();
  583. for (i = 0; i < nnodes; i++)
  584. for (j = 0; j < nnodes; j++)
  585. {
  586. struct _starpu_data_request *r2 = handle->per_node[i].request[j];
  587. if (r2)
  588. {
  589. _starpu_spin_lock(&r2->lock);
  590. if (is_prefetch < r2->prefetch)
  591. /* Hasten the request we will have to wait for */
  592. _starpu_update_prefetch_status(r2, is_prefetch);
  593. r2->next_req[r2->next_req_count++] = r;
  594. STARPU_ASSERT(r2->next_req_count <= STARPU_MAXNODES + 1);
  595. _starpu_spin_unlock(&r2->lock);
  596. nwait--;
  597. }
  598. }
  599. STARPU_ASSERT(nwait == 0);
  600. nhops++;
  601. requests[nhops - 1] = r;
  602. /* existing requests will post this one */
  603. reused_requests[nhops - 1] = 1;
  604. }
  605. STARPU_ASSERT(nhops);
  606. if (!async)
  607. requests[nhops - 1]->refcnt++;
  608. /* we only submit the first request, the remaining will be
  609. * automatically submitted afterward */
  610. if (!reused_requests[0])
  611. _starpu_post_data_request(requests[0], handling_nodes[0]);
  612. return requests[nhops - 1];
  613. }
  614. int _starpu_fetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *dst_replicate,
  615. enum starpu_data_access_mode mode, unsigned detached, unsigned is_prefetch, unsigned async,
  616. void (*callback_func)(void *), void *callback_arg, int prio, const char *origin)
  617. {
  618. unsigned local_node = _starpu_memory_node_get_local_key();
  619. _STARPU_LOG_IN();
  620. int cpt = 0;
  621. while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
  622. {
  623. cpt++;
  624. _starpu_datawizard_progress(local_node, 1);
  625. }
  626. if (cpt == STARPU_SPIN_MAXTRY)
  627. _starpu_spin_lock(&handle->header_lock);
  628. if (!detached)
  629. {
  630. /* Take references which will be released by _starpu_release_data_on_node */
  631. if (dst_replicate)
  632. dst_replicate->refcnt++;
  633. handle->busy_count++;
  634. }
  635. struct _starpu_data_request *r;
  636. r = _starpu_create_request_to_fetch_data(handle, dst_replicate, mode,
  637. is_prefetch, async, callback_func, callback_arg, prio, origin);
  638. /* If no request was created, the handle was already up-to-date on the
  639. * node. In this case, _starpu_create_request_to_fetch_data has already
  640. * unlocked the header. */
  641. if (!r)
  642. return 0;
  643. _starpu_spin_unlock(&handle->header_lock);
  644. int ret = async?0:_starpu_wait_data_request_completion(r, 1);
  645. _STARPU_LOG_OUT();
  646. return ret;
  647. }
  648. static int idle_prefetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
  649. {
  650. return _starpu_fetch_data_on_node(handle, replicate, mode, 1, 2, 1, NULL, NULL, prio, "idle_prefetch_data_on_node");
  651. }
  652. static int prefetch_data_on_node(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
  653. {
  654. return _starpu_fetch_data_on_node(handle, replicate, mode, 1, 1, 1, NULL, NULL, prio, "prefetch_data_on_node");
  655. }
  656. static int fetch_data(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
  657. {
  658. return _starpu_fetch_data_on_node(handle, replicate, mode, 0, 0, 0, NULL, NULL, prio, "fetch_data");
  659. }
  660. uint32_t _starpu_get_data_refcnt(starpu_data_handle_t handle, unsigned node)
  661. {
  662. return handle->per_node[node].refcnt;
  663. }
  664. size_t _starpu_data_get_size(starpu_data_handle_t handle)
  665. {
  666. return handle->ops->get_size(handle);
  667. }
  668. uint32_t _starpu_data_get_footprint(starpu_data_handle_t handle)
  669. {
  670. return handle->footprint;
  671. }
  672. /* in case the data was accessed on a write mode, do not forget to
  673. * make it accessible again once it is possible ! */
  674. void _starpu_release_data_on_node(starpu_data_handle_t handle, uint32_t default_wt_mask, struct _starpu_data_replicate *replicate)
  675. {
  676. uint32_t wt_mask;
  677. wt_mask = default_wt_mask | handle->wt_mask;
  678. wt_mask &= (1<<starpu_memory_nodes_get_count())-1;
  679. /* Note that it is possible that there is no valid copy of the data (if
  680. * starpu_data_invalidate was called for instance). In that case, we do
  681. * not enforce any write-through mechanism. */
  682. unsigned memory_node = replicate->memory_node;
  683. if (replicate->state != STARPU_INVALID && handle->current_mode & STARPU_W)
  684. if ((wt_mask & ~(1<<memory_node)))
  685. _starpu_write_through_data(handle, memory_node, wt_mask);
  686. unsigned local_node = _starpu_memory_node_get_local_key();
  687. int cpt = 0;
  688. while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
  689. {
  690. cpt++;
  691. _starpu_datawizard_progress(local_node, 1);
  692. }
  693. if (cpt == STARPU_SPIN_MAXTRY)
  694. _starpu_spin_lock(&handle->header_lock);
  695. /* Release refcnt taken by fetch_data_on_node */
  696. replicate->refcnt--;
  697. STARPU_ASSERT_MSG(replicate->refcnt >= 0, "handle %p released too many times", handle);
  698. STARPU_ASSERT_MSG(handle->busy_count > 0, "handle %p released too many times", handle);
  699. handle->busy_count--;
  700. if (!_starpu_notify_data_dependencies(handle))
  701. _starpu_spin_unlock(&handle->header_lock);
  702. }
  703. static void _starpu_set_data_requested_flag_if_needed(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate)
  704. {
  705. unsigned local_node = _starpu_memory_node_get_local_key();
  706. int cpt = 0;
  707. while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
  708. {
  709. cpt++;
  710. _starpu_datawizard_progress(local_node, 1);
  711. }
  712. if (cpt == STARPU_SPIN_MAXTRY)
  713. _starpu_spin_lock(&handle->header_lock);
  714. if (replicate->state == STARPU_INVALID)
  715. {
  716. unsigned dst_node = replicate->memory_node;
  717. replicate->requested |= 1UL << dst_node;
  718. }
  719. _starpu_spin_unlock(&handle->header_lock);
  720. }
  721. int starpu_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned node, int prio)
  722. {
  723. STARPU_ASSERT(!task->prefetched);
  724. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  725. unsigned index;
  726. for (index = 0; index < nbuffers; index++)
  727. {
  728. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
  729. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
  730. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  731. continue;
  732. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  733. prefetch_data_on_node(handle, replicate, mode, prio);
  734. _starpu_set_data_requested_flag_if_needed(handle, replicate);
  735. }
  736. return 0;
  737. }
  738. int starpu_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
  739. {
  740. int prio = task->priority;
  741. if (task->workerorder)
  742. prio = INT_MAX - task->workerorder;
  743. return starpu_prefetch_task_input_on_node_prio(task, node, prio);
  744. }
  745. int starpu_idle_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned node, int prio)
  746. {
  747. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  748. unsigned index;
  749. for (index = 0; index < nbuffers; index++)
  750. {
  751. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
  752. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
  753. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  754. continue;
  755. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  756. idle_prefetch_data_on_node(handle, replicate, mode, prio);
  757. }
  758. return 0;
  759. }
  760. int starpu_idle_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
  761. {
  762. int prio = task->priority;
  763. if (task->workerorder)
  764. prio = INT_MAX - task->workerorder;
  765. return starpu_idle_prefetch_task_input_on_node_prio(task, node, prio);
  766. }
  767. static struct _starpu_data_replicate *get_replicate(starpu_data_handle_t handle, enum starpu_data_access_mode mode, int workerid, unsigned node)
  768. {
  769. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  770. {
  771. STARPU_ASSERT(workerid >= 0);
  772. if (!handle->per_worker)
  773. {
  774. _starpu_spin_lock(&handle->header_lock);
  775. if (!handle->per_worker)
  776. _starpu_data_initialize_per_worker(handle);
  777. _starpu_spin_unlock(&handle->header_lock);
  778. }
  779. return &handle->per_worker[workerid];
  780. }
  781. else
  782. /* That's a "normal" buffer (R/W) */
  783. return &handle->per_node[node];
  784. }
  785. /* Synchronously fetch data for a given task (if it's not there already) */
  786. int _starpu_fetch_task_input(struct _starpu_job *j)
  787. {
  788. _STARPU_TRACE_START_FETCH_INPUT(NULL);
  789. int profiling = starpu_profiling_status_get();
  790. struct starpu_task *task = j->task;
  791. if (profiling && task->profiling_info)
  792. _starpu_clock_gettime(&task->profiling_info->acquire_data_start_time);
  793. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  794. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  795. unsigned local_memory_node = _starpu_memory_node_get_local_key();
  796. int workerid = starpu_worker_get_id_check();
  797. #ifdef STARPU_USE_FXT
  798. unsigned long total_size = 0;
  799. #endif
  800. unsigned index;
  801. for (index = 0; index < nbuffers; index++)
  802. {
  803. int ret;
  804. starpu_data_handle_t handle = descrs[index].handle;
  805. enum starpu_data_access_mode mode = descrs[index].mode;
  806. int node = descrs[index].node;
  807. if (node == -1)
  808. node = local_memory_node;
  809. if (mode == STARPU_NONE ||
  810. (mode & ((1<<STARPU_MODE_SHIFT) - 1)) >= STARPU_ACCESS_MODE_MAX ||
  811. (mode >> STARPU_MODE_SHIFT) >= (STARPU_SHIFTED_MODE_MAX >> STARPU_MODE_SHIFT))
  812. STARPU_ASSERT_MSG(0, "mode %d (0x%x) is bogus\n", mode, mode);
  813. struct _starpu_data_replicate *local_replicate;
  814. if (index && descrs[index-1].handle == descrs[index].handle)
  815. /* We have already took this data, skip it. This
  816. * depends on ordering putting writes before reads, see
  817. * _starpu_compar_handles */
  818. continue;
  819. local_replicate = get_replicate(handle, mode, workerid, node);
  820. ret = fetch_data(handle, local_replicate, mode, 0);
  821. if (STARPU_UNLIKELY(ret))
  822. goto enomem;
  823. #ifdef STARPU_USE_FXT
  824. total_size += _starpu_data_get_size(handle);
  825. #endif
  826. }
  827. _STARPU_TRACE_DATA_LOAD(workerid,total_size);
  828. /* Now that we have taken the data locks in locking order, fill the codelet interfaces in function order. */
  829. for (index = 0; index < nbuffers; index++)
  830. {
  831. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
  832. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
  833. int node = descrs[index].node;
  834. if (node == -1)
  835. node = local_memory_node;
  836. struct _starpu_data_replicate *local_replicate;
  837. local_replicate = get_replicate(handle, mode, workerid, node);
  838. _STARPU_TASK_SET_INTERFACE(task , local_replicate->data_interface, index);
  839. /* If the replicate was not initialized yet, we have to do it now */
  840. if (!(mode & STARPU_SCRATCH) && !local_replicate->initialized)
  841. _starpu_redux_init_data_replicate(handle, local_replicate, workerid);
  842. }
  843. if (profiling && task->profiling_info)
  844. _starpu_clock_gettime(&task->profiling_info->acquire_data_end_time);
  845. _STARPU_TRACE_END_FETCH_INPUT(NULL);
  846. return 0;
  847. enomem:
  848. _STARPU_TRACE_END_FETCH_INPUT(NULL);
  849. _STARPU_DISP("something went wrong with buffer %u\n", index);
  850. /* try to unreference all the input that were successfully taken */
  851. unsigned index2;
  852. for (index2 = 0; index2 < index; index2++)
  853. {
  854. starpu_data_handle_t handle = descrs[index2].handle;
  855. enum starpu_data_access_mode mode = descrs[index2].mode;
  856. int node = descrs[index].node;
  857. if (node == -1)
  858. node = local_memory_node;
  859. struct _starpu_data_replicate *local_replicate;
  860. if (index2 && descrs[index2-1].handle == descrs[index2].handle)
  861. /* We have already released this data, skip it. This
  862. * depends on ordering putting writes before reads, see
  863. * _starpu_compar_handles */
  864. continue;
  865. local_replicate = get_replicate(handle, mode, workerid, node);
  866. _starpu_release_data_on_node(handle, 0, local_replicate);
  867. }
  868. return -1;
  869. }
  870. /* Release task data dependencies */
  871. void __starpu_push_task_output(struct _starpu_job *j)
  872. {
  873. #ifdef STARPU_OPENMP
  874. STARPU_ASSERT(!j->continuation);
  875. #endif
  876. int profiling = starpu_profiling_status_get();
  877. struct starpu_task *task = j->task;
  878. if (profiling && task->profiling_info)
  879. _starpu_clock_gettime(&task->profiling_info->release_data_start_time);
  880. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  881. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  882. int workerid = starpu_worker_get_id();
  883. unsigned local_memory_node = _starpu_memory_node_get_local_key();
  884. unsigned index;
  885. for (index = 0; index < nbuffers; index++)
  886. {
  887. starpu_data_handle_t handle = descrs[index].handle;
  888. enum starpu_data_access_mode mode = descrs[index].mode;
  889. int node = descrs[index].node;
  890. if (node == -1 && task->cl->where != STARPU_NOWHERE)
  891. node = local_memory_node;
  892. struct _starpu_data_replicate *local_replicate = NULL;
  893. if (index && descrs[index-1].handle == descrs[index].handle)
  894. /* We have already released this data, skip it. This
  895. * depends on ordering putting writes before reads, see
  896. * _starpu_compar_handles */
  897. continue;
  898. if (node != -1)
  899. local_replicate = get_replicate(handle, mode, workerid, node);
  900. /* Keep a reference for future
  901. * _starpu_release_task_enforce_sequential_consistency call */
  902. _starpu_spin_lock(&handle->header_lock);
  903. handle->busy_count++;
  904. if (node == -1)
  905. {
  906. /* NOWHERE case, just notify dependencies */
  907. if (!_starpu_notify_data_dependencies(handle))
  908. _starpu_spin_unlock(&handle->header_lock);
  909. }
  910. else
  911. {
  912. _starpu_spin_unlock(&handle->header_lock);
  913. _starpu_release_data_on_node(handle, 0, local_replicate);
  914. }
  915. }
  916. if (profiling && task->profiling_info)
  917. _starpu_clock_gettime(&task->profiling_info->release_data_end_time);
  918. }
  919. /* Version for a driver running on a worker: we show the driver state in the trace */
  920. void _starpu_push_task_output(struct _starpu_job *j)
  921. {
  922. _STARPU_TRACE_START_PUSH_OUTPUT(NULL);
  923. __starpu_push_task_output(j);
  924. _STARPU_TRACE_END_PUSH_OUTPUT(NULL);
  925. }
  926. struct fetch_nowhere_wrapper
  927. {
  928. struct _starpu_job *j;
  929. unsigned pending;
  930. };
  931. static void _starpu_fetch_nowhere_task_input_cb(void *arg);
  932. /* Asynchronously fetch data for a task which will have no content */
  933. void _starpu_fetch_nowhere_task_input(struct _starpu_job *j)
  934. {
  935. int profiling = starpu_profiling_status_get();
  936. struct starpu_task *task = j->task;
  937. if (profiling && task->profiling_info)
  938. _starpu_clock_gettime(&task->profiling_info->acquire_data_start_time);
  939. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  940. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  941. unsigned nfetchbuffers = 0;
  942. struct fetch_nowhere_wrapper *wrapper;
  943. unsigned index;
  944. for (index = 0; index < nbuffers; index++)
  945. {
  946. int node = descrs[index].node;
  947. if (node != -1)
  948. nfetchbuffers++;
  949. }
  950. if (!nfetchbuffers)
  951. {
  952. /* Nothing to fetch actually, already finished! */
  953. __starpu_push_task_output(j);
  954. _starpu_handle_job_termination(j);
  955. _STARPU_LOG_OUT_TAG("handle_job_termination");
  956. return;
  957. }
  958. wrapper = malloc(sizeof(*wrapper));
  959. wrapper->j = j;
  960. /* +1 for the call below */
  961. wrapper->pending = nfetchbuffers + 1;
  962. for (index = 0; index < nbuffers; index++)
  963. {
  964. starpu_data_handle_t handle = descrs[index].handle;
  965. enum starpu_data_access_mode mode = descrs[index].mode;
  966. int node = descrs[index].node;
  967. if (node == -1)
  968. continue;
  969. if (mode == STARPU_NONE ||
  970. (mode & ((1<<STARPU_MODE_SHIFT) - 1)) >= STARPU_ACCESS_MODE_MAX ||
  971. (mode >> STARPU_MODE_SHIFT) >= (STARPU_SHIFTED_MODE_MAX >> STARPU_MODE_SHIFT))
  972. STARPU_ASSERT_MSG(0, "mode %d (0x%x) is bogus\n", mode, mode);
  973. STARPU_ASSERT(mode != STARPU_SCRATCH && mode != STARPU_REDUX);
  974. struct _starpu_data_replicate *local_replicate;
  975. local_replicate = get_replicate(handle, mode, -1, node);
  976. _starpu_fetch_data_on_node(handle, local_replicate, mode, 0, 0, 1, _starpu_fetch_nowhere_task_input_cb, wrapper, 0, "_starpu_fetch_nowhere_task_input");
  977. }
  978. if (profiling && task->profiling_info)
  979. _starpu_clock_gettime(&task->profiling_info->acquire_data_end_time);
  980. /* Finished working with the task, release our reference */
  981. _starpu_fetch_nowhere_task_input_cb(wrapper);
  982. }
  983. static void _starpu_fetch_nowhere_task_input_cb(void *arg)
  984. {
  985. /* One more transfer finished */
  986. struct fetch_nowhere_wrapper *wrapper = arg;
  987. unsigned pending = STARPU_ATOMIC_ADD(&wrapper->pending, -1);
  988. ANNOTATE_HAPPENS_BEFORE(&wrapper->pending);
  989. if (pending == 0)
  990. {
  991. ANNOTATE_HAPPENS_AFTER(&wrapper->pending);
  992. /* Finished transferring, task is over */
  993. struct _starpu_job *j = wrapper->j;
  994. free(wrapper);
  995. __starpu_push_task_output(j);
  996. _starpu_handle_job_termination(j);
  997. _STARPU_LOG_OUT_TAG("handle_job_termination");
  998. }
  999. }
  1000. /* NB : this value can only be an indication of the status of a data
  1001. at some point, but there is no strong garantee ! */
  1002. unsigned _starpu_is_data_present_or_requested(starpu_data_handle_t handle, unsigned node)
  1003. {
  1004. unsigned ret = 0;
  1005. // XXX : this is just a hint, so we don't take the lock ...
  1006. // STARPU_PTHREAD_SPIN_LOCK(&handle->header_lock);
  1007. if (handle->per_node[node].state != STARPU_INVALID)
  1008. {
  1009. ret = 1;
  1010. }
  1011. else
  1012. {
  1013. unsigned i;
  1014. unsigned nnodes = starpu_memory_nodes_get_count();
  1015. for (i = 0; i < nnodes; i++)
  1016. {
  1017. if ((handle->per_node[node].requested & (1UL << i)) || handle->per_node[node].request[i])
  1018. ret = 1;
  1019. }
  1020. }
  1021. // STARPU_PTHREAD_SPIN_UNLOCK(&handle->header_lock);
  1022. return ret;
  1023. }
  1024. void _starpu_data_set_unregister_hook(starpu_data_handle_t handle, _starpu_data_handle_unregister_hook func)
  1025. {
  1026. handle->unregister_hook = func;
  1027. }