coherency.c 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures. *
  2. * Copyright (C) 2009-2017 Université de Bordeaux
  3. * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017 CNRS
  4. * Copyright (C) 2014, 2017 INRIA
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <common/config.h>
  18. #include <datawizard/coherency.h>
  19. #include <datawizard/copy_driver.h>
  20. #include <datawizard/write_back.h>
  21. #include <datawizard/memory_nodes.h>
  22. #include <core/dependencies/data_concurrency.h>
  23. #include <core/disk.h>
  24. #include <profiling/profiling.h>
  25. #include <math.h>
  26. #include <core/task.h>
  27. #include <starpu_scheduler.h>
  28. #include <core/workers.h>
  29. #include <limits.h>
  30. #ifdef STARPU_SIMGRID
  31. #include <core/simgrid.h>
  32. #endif
  33. static int link_supports_direct_transfers(starpu_data_handle_t handle, unsigned src_node, unsigned dst_node, unsigned *handling_node);
  34. int _starpu_select_src_node(starpu_data_handle_t handle, unsigned destination)
  35. {
  36. int src_node = -1;
  37. unsigned i;
  38. unsigned nnodes = starpu_memory_nodes_get_count();
  39. /* first find a valid copy, either a STARPU_OWNER or a STARPU_SHARED */
  40. unsigned node;
  41. size_t size = _starpu_data_get_size(handle);
  42. double cost = INFINITY;
  43. unsigned src_node_mask = 0;
  44. for (node = 0; node < nnodes; node++)
  45. {
  46. if (handle->per_node[node].state != STARPU_INVALID)
  47. {
  48. /* we found a copy ! */
  49. src_node_mask |= (1<<node);
  50. }
  51. }
  52. if (src_node_mask == 0 && handle->init_cl)
  53. {
  54. /* No copy yet, but applicationg told us how to build it. */
  55. return -1;
  56. }
  57. /* we should have found at least one copy ! */
  58. STARPU_ASSERT_MSG(src_node_mask != 0, "The data for the handle %p is requested, but the handle does not have a valid value. Perhaps some initialization task is missing?", handle);
  59. /* Without knowing the size, we won't know the cost */
  60. if (!size)
  61. cost = 0;
  62. /* Check whether we have transfer cost for all nodes, if so, take the minimum */
  63. if (cost)
  64. for (i = 0; i < nnodes; i++)
  65. {
  66. if (src_node_mask & (1<<i))
  67. {
  68. double time = starpu_transfer_predict(i, destination, size);
  69. unsigned handling_node;
  70. /* Avoid indirect transfers */
  71. if (!link_supports_direct_transfers(handle, i, destination, &handling_node))
  72. continue;
  73. if (_STARPU_IS_ZERO(time))
  74. {
  75. /* No estimation, will have to revert to dumb strategy */
  76. cost = 0.0;
  77. break;
  78. }
  79. else if (time < cost)
  80. {
  81. cost = time;
  82. src_node = i;
  83. }
  84. }
  85. }
  86. if (cost && src_node != -1)
  87. {
  88. /* Could estimate through cost, return that */
  89. STARPU_ASSERT(handle->per_node[src_node].allocated);
  90. STARPU_ASSERT(handle->per_node[src_node].initialized);
  91. return src_node;
  92. }
  93. int i_ram = -1;
  94. int i_gpu = -1;
  95. int i_disk = -1;
  96. /* Revert to dumb strategy: take RAM unless only a GPU has it */
  97. for (i = 0; i < nnodes; i++)
  98. {
  99. if (src_node_mask & (1<<i))
  100. {
  101. int (*can_copy)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node, unsigned handling_node) = handle->ops->copy_methods->can_copy;
  102. /* Avoid transfers which the interface does not want */
  103. if (can_copy)
  104. {
  105. void *src_interface = handle->per_node[i].data_interface;
  106. void *dst_interface = handle->per_node[destination].data_interface;
  107. unsigned handling_node;
  108. if (!link_supports_direct_transfers(handle, i, destination, &handling_node))
  109. {
  110. /* Avoid through RAM if the interface does not want it */
  111. void *ram_interface = handle->per_node[STARPU_MAIN_RAM].data_interface;
  112. if ((!can_copy(src_interface, i, ram_interface, STARPU_MAIN_RAM, i)
  113. && !can_copy(src_interface, i, ram_interface, STARPU_MAIN_RAM, STARPU_MAIN_RAM))
  114. || (!can_copy(ram_interface, STARPU_MAIN_RAM, dst_interface, destination, STARPU_MAIN_RAM)
  115. && !can_copy(ram_interface, STARPU_MAIN_RAM, dst_interface, destination, destination)))
  116. continue;
  117. }
  118. }
  119. /* however GPU are expensive sources, really !
  120. * Unless peer transfer is supported (and it would then have been selected above).
  121. * Other should be ok */
  122. if (starpu_node_get_kind(i) == STARPU_CUDA_RAM ||
  123. starpu_node_get_kind(i) == STARPU_OPENCL_RAM ||
  124. starpu_node_get_kind(i) == STARPU_MIC_RAM)
  125. i_gpu = i;
  126. if (starpu_node_get_kind(i) == STARPU_CPU_RAM ||
  127. starpu_node_get_kind(i) == STARPU_SCC_RAM ||
  128. starpu_node_get_kind(i) == STARPU_SCC_SHM ||
  129. starpu_node_get_kind(i) == STARPU_MPI_MS_RAM)
  130. i_ram = i;
  131. if (starpu_node_get_kind(i) == STARPU_DISK_RAM)
  132. i_disk = i;
  133. }
  134. }
  135. /* we have to use cpu_ram in first */
  136. if (i_ram != -1)
  137. src_node = i_ram;
  138. /* no luck we have to use the disk memory */
  139. else if (i_gpu != -1)
  140. src_node = i_gpu;
  141. else
  142. src_node = i_disk;
  143. STARPU_ASSERT(src_node != -1);
  144. STARPU_ASSERT(handle->per_node[src_node].allocated);
  145. STARPU_ASSERT(handle->per_node[src_node].initialized);
  146. return src_node;
  147. }
  148. /* this may be called once the data is fetched with header and STARPU_RW-lock hold */
  149. void _starpu_update_data_state(starpu_data_handle_t handle,
  150. struct _starpu_data_replicate *requesting_replicate,
  151. enum starpu_data_access_mode mode)
  152. {
  153. /* There is nothing to do for relaxed coherency modes (scratch or
  154. * reductions) */
  155. if (!(mode & STARPU_RW))
  156. return;
  157. unsigned nnodes = starpu_memory_nodes_get_count();
  158. /* the data is present now */
  159. unsigned requesting_node = requesting_replicate->memory_node;
  160. requesting_replicate->requested &= ~(1UL << requesting_node);
  161. if (mode & STARPU_W)
  162. {
  163. /* the requesting node now has the only valid copy */
  164. unsigned node;
  165. for (node = 0; node < nnodes; node++)
  166. {
  167. _STARPU_TRACE_DATA_INVALIDATE(handle, node);
  168. handle->per_node[node].state = STARPU_INVALID;
  169. }
  170. requesting_replicate->state = STARPU_OWNER;
  171. if (handle->home_node != -1 && handle->per_node[handle->home_node].state == STARPU_INVALID)
  172. /* Notify that this MC is now dirty */
  173. _starpu_memchunk_dirty(requesting_replicate->mc, requesting_replicate->memory_node);
  174. }
  175. else
  176. {
  177. /* read only */
  178. if (requesting_replicate->state != STARPU_OWNER)
  179. {
  180. /* there was at least another copy of the data */
  181. unsigned node;
  182. for (node = 0; node < nnodes; node++)
  183. {
  184. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  185. if (replicate->state != STARPU_INVALID)
  186. replicate->state = STARPU_SHARED;
  187. }
  188. requesting_replicate->state = STARPU_SHARED;
  189. }
  190. }
  191. }
  192. static int worker_supports_direct_access(unsigned node, unsigned handling_node)
  193. {
  194. /* only support disk <-> ram and disk <-> disk */
  195. if (starpu_node_get_kind(node) == STARPU_DISK_RAM || starpu_node_get_kind(handling_node) == STARPU_DISK_RAM)
  196. return 0;
  197. if (node == handling_node)
  198. return 1;
  199. if (!_starpu_memory_node_get_nworkers(handling_node))
  200. /* No worker to process the request from that node */
  201. return 0;
  202. int type = starpu_node_get_kind(node);
  203. switch (type)
  204. {
  205. case STARPU_CUDA_RAM:
  206. {
  207. /* GPUs not always allow direct remote access: if CUDA4
  208. * is enabled, we allow two CUDA devices to communicate. */
  209. #ifdef STARPU_SIMGRID
  210. if (starpu_node_get_kind(handling_node) == STARPU_CUDA_RAM)
  211. {
  212. msg_host_t host = _starpu_simgrid_get_memnode_host(handling_node);
  213. const char* cuda_memcpy_peer = MSG_host_get_property_value(host, "memcpy_peer");
  214. return cuda_memcpy_peer && atoll(cuda_memcpy_peer);
  215. }
  216. else
  217. return 0;
  218. #elif defined(HAVE_CUDA_MEMCPY_PEER)
  219. /* simgrid */
  220. enum starpu_node_kind kind = starpu_node_get_kind(handling_node);
  221. return kind == STARPU_CUDA_RAM;
  222. #else /* HAVE_CUDA_MEMCPY_PEER */
  223. /* Direct GPU-GPU transfers are not allowed in general */
  224. return 0;
  225. #endif /* HAVE_CUDA_MEMCPY_PEER */
  226. }
  227. case STARPU_OPENCL_RAM:
  228. return 0;
  229. case STARPU_MIC_RAM:
  230. /* TODO: We don't handle direct MIC-MIC transfers yet */
  231. return 0;
  232. case STARPU_MPI_MS_RAM:
  233. {
  234. enum starpu_node_kind kind = starpu_node_get_kind(handling_node);
  235. return kind == STARPU_MPI_MS_RAM;
  236. }
  237. case STARPU_SCC_RAM:
  238. return 1;
  239. default:
  240. return 1;
  241. }
  242. }
  243. static int link_supports_direct_transfers(starpu_data_handle_t handle, unsigned src_node, unsigned dst_node, unsigned *handling_node)
  244. {
  245. int (*can_copy)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node, unsigned handling_node) = handle->ops->copy_methods->can_copy;
  246. void *src_interface = handle->per_node[src_node].data_interface;
  247. void *dst_interface = handle->per_node[dst_node].data_interface;
  248. /* XXX That's a hack until we fix cudaMemcpy3DPeerAsync in the block interface
  249. * Perhaps not all data interface provide a direct GPU-GPU transfer
  250. * method ! */
  251. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  252. if (src_node != dst_node && starpu_node_get_kind(src_node) == STARPU_CUDA_RAM && starpu_node_get_kind(dst_node) == STARPU_CUDA_RAM)
  253. {
  254. const struct starpu_data_copy_methods *copy_methods = handle->ops->copy_methods;
  255. if (!copy_methods->cuda_to_cuda_async && !copy_methods->any_to_any)
  256. return 0;
  257. }
  258. #endif
  259. /* Note: with CUDA, performance seems a bit better when issuing the transfer from the destination (tested without GPUDirect, but GPUDirect probably behave the same) */
  260. if (worker_supports_direct_access(src_node, dst_node) && (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, dst_node)))
  261. {
  262. *handling_node = dst_node;
  263. return 1;
  264. }
  265. if (worker_supports_direct_access(dst_node, src_node) && (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, src_node)))
  266. {
  267. *handling_node = src_node;
  268. return 1;
  269. }
  270. /* Link between disk and ram */
  271. if ((starpu_node_get_kind(src_node) == STARPU_DISK_RAM && starpu_node_get_kind(dst_node) == STARPU_CPU_RAM) ||
  272. (starpu_node_get_kind(src_node) == STARPU_CPU_RAM && starpu_node_get_kind(dst_node) == STARPU_DISK_RAM))
  273. {
  274. /* FIXME: not necessarily a worker :/ */
  275. *handling_node = STARPU_MAIN_RAM;
  276. return 1;
  277. }
  278. /* link between disk and disk, and they have the same kind */
  279. if (_starpu_is_same_kind_disk(src_node, dst_node))
  280. return 1;
  281. return 0;
  282. }
  283. /* Now, we use slowness/bandwidth to compare numa nodes, is it better to use latency ? */
  284. static unsigned chose_best_numa_between_src_and_dest(int src, int dst)
  285. {
  286. double timing_best;
  287. int best_numa = -1;
  288. unsigned numa;
  289. const unsigned nb_numa_nodes = starpu_memory_nodes_get_numa_count();
  290. for(numa = 0; numa < nb_numa_nodes; numa++)
  291. {
  292. double actual = 1.0/starpu_transfer_bandwidth(src, numa) + 1.0/starpu_transfer_bandwidth(numa, dst);
  293. /* Compare slowness : take the lowest */
  294. if (best_numa < 0 || actual < timing_best)
  295. {
  296. best_numa = numa;
  297. timing_best = actual;
  298. }
  299. }
  300. STARPU_ASSERT(best_numa >= 0);
  301. return best_numa;
  302. }
  303. /* Determines the path of a request : each hop is defined by (src,dst) and the
  304. * node that handles the hop. The returned value indicates the number of hops,
  305. * and the max_len is the maximum number of hops (ie. the size of the
  306. * src_nodes, dst_nodes and handling_nodes arrays. */
  307. static int determine_request_path(starpu_data_handle_t handle,
  308. int src_node, int dst_node,
  309. enum starpu_data_access_mode mode, int max_len,
  310. unsigned *src_nodes, unsigned *dst_nodes,
  311. unsigned *handling_nodes, unsigned write_invalidation)
  312. {
  313. if (src_node == dst_node || !(mode & STARPU_R))
  314. {
  315. if (dst_node == -1 || starpu_node_get_kind(dst_node) == STARPU_DISK_RAM)
  316. handling_nodes[0] = src_node;
  317. else
  318. handling_nodes[0] = dst_node;
  319. if (write_invalidation)
  320. /* The invalidation request will be enough */
  321. return 0;
  322. /* The destination node should only allocate the data, no transfer is required */
  323. STARPU_ASSERT(max_len >= 1);
  324. src_nodes[0] = STARPU_MAIN_RAM; // ignored
  325. dst_nodes[0] = dst_node;
  326. return 1;
  327. }
  328. unsigned handling_node;
  329. int link_is_valid = link_supports_direct_transfers(handle, src_node, dst_node, &handling_node);
  330. if (!link_is_valid)
  331. {
  332. int (*can_copy)(void *, unsigned, void *, unsigned, unsigned) = handle->ops->copy_methods->can_copy;
  333. void *src_interface = handle->per_node[src_node].data_interface;
  334. void *dst_interface = handle->per_node[dst_node].data_interface;
  335. /* We need an intermediate hop to implement data staging
  336. * through main memory. */
  337. STARPU_ASSERT(max_len >= 2);
  338. STARPU_ASSERT(src_node >= 0);
  339. unsigned numa = chose_best_numa_between_src_and_dest(src_node, dst_node);
  340. /* GPU -> RAM */
  341. src_nodes[0] = src_node;
  342. dst_nodes[0] = numa;
  343. if (starpu_node_get_kind(src_node) == STARPU_DISK_RAM)
  344. /* Disks don't have their own driver thread */
  345. handling_nodes[0] = dst_node;
  346. else if (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, src_node))
  347. {
  348. handling_nodes[0] = src_node;
  349. }
  350. else
  351. {
  352. STARPU_ASSERT_MSG(can_copy(src_interface, src_node, dst_interface, dst_node, dst_node), "interface %d refuses all kinds of transfers from node %d to node %d\n", handle->ops->interfaceid, src_node, dst_node);
  353. handling_nodes[0] = dst_node;
  354. }
  355. /* RAM -> GPU */
  356. src_nodes[1] = numa;
  357. dst_nodes[1] = dst_node;
  358. if (starpu_node_get_kind(dst_node) == STARPU_DISK_RAM)
  359. /* Disks don't have their own driver thread */
  360. handling_nodes[1] = src_node;
  361. else if (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, dst_node))
  362. {
  363. handling_nodes[1] = dst_node;
  364. }
  365. else
  366. {
  367. STARPU_ASSERT_MSG(can_copy(src_interface, src_node, dst_interface, dst_node, src_node), "interface %d refuses all kinds of transfers from node %d to node %d\n", handle->ops->interfaceid, src_node, dst_node);
  368. handling_nodes[1] = src_node;
  369. }
  370. return 2;
  371. }
  372. else
  373. {
  374. STARPU_ASSERT(max_len >= 1);
  375. src_nodes[0] = src_node;
  376. dst_nodes[0] = dst_node;
  377. handling_nodes[0] = handling_node;
  378. #if !defined(HAVE_CUDA_MEMCPY_PEER) && !defined(STARPU_SIMGRID)
  379. STARPU_ASSERT(!(mode & STARPU_R) || starpu_node_get_kind(src_node) != STARPU_CUDA_RAM || starpu_node_get_kind(dst_node) != STARPU_CUDA_RAM);
  380. #endif
  381. return 1;
  382. }
  383. }
  384. /* handle->lock should be taken. r is returned locked. The node parameter
  385. * indicate either the source of the request, or the destination for a
  386. * write-only request. */
  387. static struct _starpu_data_request *_starpu_search_existing_data_request(struct _starpu_data_replicate *replicate, unsigned node, enum starpu_data_access_mode mode, unsigned is_prefetch)
  388. {
  389. struct _starpu_data_request *r;
  390. r = replicate->request[node];
  391. if (r)
  392. {
  393. _starpu_spin_checklocked(&r->handle->header_lock);
  394. _starpu_spin_lock(&r->lock);
  395. /* perhaps we need to "upgrade" the request */
  396. if (is_prefetch < r->prefetch)
  397. _starpu_update_prefetch_status(r, is_prefetch);
  398. if (mode & STARPU_R)
  399. {
  400. /* in case the exisiting request did not imply a memory
  401. * transfer yet, we have to take a second refcnt now
  402. * for the source, in addition to the refcnt for the
  403. * destination
  404. * (so that the source remains valid) */
  405. if (!(r->mode & STARPU_R))
  406. {
  407. replicate->refcnt++;
  408. replicate->handle->busy_count++;
  409. }
  410. r->mode = (enum starpu_data_access_mode) ((int) r->mode | (int) STARPU_R);
  411. }
  412. if (mode & STARPU_W)
  413. r->mode = (enum starpu_data_access_mode) ((int) r->mode | (int) STARPU_W);
  414. }
  415. return r;
  416. }
  417. /*
  418. * This function is called when the data is needed on the local node, this
  419. * returns a pointer to the local copy
  420. *
  421. * R STARPU_W STARPU_RW
  422. * Owner OK OK OK
  423. * Shared OK 1 1
  424. * Invalid 2 3 4
  425. *
  426. * case 1 : shared + (read)write :
  427. * no data copy but shared->Invalid/Owner
  428. * case 2 : invalid + read :
  429. * data copy + invalid->shared + owner->shared (STARPU_ASSERT(there is a valid))
  430. * case 3 : invalid + write :
  431. * no data copy + invalid->owner + (owner,shared)->invalid
  432. * case 4 : invalid + R/STARPU_W :
  433. * data copy + if (STARPU_W) (invalid->owner + owner->invalid)
  434. * else (invalid,owner->shared)
  435. */
  436. struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_handle_t handle,
  437. struct _starpu_data_replicate *dst_replicate,
  438. enum starpu_data_access_mode mode, unsigned is_prefetch,
  439. unsigned async,
  440. void (*callback_func)(void *), void *callback_arg, int prio, const char *origin)
  441. {
  442. /* We don't care about commuting for data requests, that was handled before. */
  443. mode &= ~STARPU_COMMUTE;
  444. /* This function is called with handle's header lock taken */
  445. _starpu_spin_checklocked(&handle->header_lock);
  446. int requesting_node = dst_replicate ? dst_replicate->memory_node : -1;
  447. unsigned nwait = 0;
  448. if (mode & STARPU_W)
  449. {
  450. /* We will write to the buffer. We will have to wait for all
  451. * existing requests before the last request which will
  452. * invalidate all their results (which were possibly spurious,
  453. * e.g. too aggressive eviction).
  454. */
  455. unsigned i, j;
  456. unsigned nnodes = starpu_memory_nodes_get_count();
  457. for (i = 0; i < nnodes; i++)
  458. for (j = 0; j < nnodes; j++)
  459. if (handle->per_node[i].request[j])
  460. nwait++;
  461. /* If the request is not detached (i.e. the caller really wants
  462. * proper ownership), no new requests will appear because a
  463. * reference will be kept on the dst replicate, which will
  464. * notably prevent data reclaiming.
  465. */
  466. }
  467. if ((!dst_replicate || dst_replicate->state != STARPU_INVALID) && (!nwait || is_prefetch))
  468. {
  469. if (dst_replicate)
  470. {
  471. #ifdef STARPU_MEMORY_STATS
  472. enum _starpu_cache_state old_state = dst_replicate->state;
  473. #endif
  474. /* the data is already available and we don't have to wait for
  475. * any request, so we can stop */
  476. _starpu_update_data_state(handle, dst_replicate, mode);
  477. _starpu_msi_cache_hit(requesting_node);
  478. #ifdef STARPU_MEMORY_STATS
  479. _starpu_memory_handle_stats_cache_hit(handle, requesting_node);
  480. /* XXX Broken ? */
  481. if (old_state == STARPU_SHARED
  482. && dst_replicate->state == STARPU_OWNER)
  483. _starpu_memory_handle_stats_shared_to_owner(handle, requesting_node);
  484. #endif
  485. if (dst_replicate->mc)
  486. _starpu_memchunk_recently_used(dst_replicate->mc, requesting_node);
  487. }
  488. _starpu_spin_unlock(&handle->header_lock);
  489. if (callback_func)
  490. callback_func(callback_arg);
  491. _STARPU_LOG_OUT_TAG("data available");
  492. return NULL;
  493. }
  494. if (dst_replicate)
  495. _starpu_msi_cache_miss(requesting_node);
  496. /* the only remaining situation is that the local copy was invalid */
  497. STARPU_ASSERT((dst_replicate && dst_replicate->state == STARPU_INVALID) || nwait);
  498. /* find someone who already has the data */
  499. int src_node = -1;
  500. if (dst_replicate && mode & STARPU_R)
  501. {
  502. if (dst_replicate->state == STARPU_INVALID)
  503. src_node = _starpu_select_src_node(handle, requesting_node);
  504. else
  505. src_node = requesting_node;
  506. if (src_node < 0)
  507. {
  508. /* We will create it, no need to read an existing value */
  509. mode &= ~STARPU_R;
  510. }
  511. }
  512. else if (dst_replicate)
  513. {
  514. /* if the data is in write only mode (and not SCRATCH or REDUX), there is no need for a source, data will be initialized by the task itself */
  515. if (mode & STARPU_W)
  516. dst_replicate->initialized = 1;
  517. if (starpu_node_get_kind(requesting_node) == STARPU_CPU_RAM && !nwait)
  518. {
  519. /* And this is the main RAM, really no need for a
  520. * request, just allocate */
  521. if (_starpu_allocate_memory_on_node(handle, dst_replicate, is_prefetch) == 0)
  522. {
  523. _starpu_update_data_state(handle, dst_replicate, mode);
  524. _starpu_spin_unlock(&handle->header_lock);
  525. if (callback_func)
  526. callback_func(callback_arg);
  527. _STARPU_LOG_OUT_TAG("data immediately allocated");
  528. return NULL;
  529. }
  530. }
  531. }
  532. #define MAX_REQUESTS 4
  533. /* We can safely assume that there won't be more than 2 hops in the
  534. * current implementation */
  535. unsigned src_nodes[MAX_REQUESTS], dst_nodes[MAX_REQUESTS], handling_nodes[MAX_REQUESTS];
  536. /* keep one slot for the last W request, if any */
  537. int write_invalidation = (mode & STARPU_W) && nwait && !is_prefetch;
  538. int nhops = determine_request_path(handle, src_node, requesting_node, mode, MAX_REQUESTS,
  539. src_nodes, dst_nodes, handling_nodes, write_invalidation);
  540. STARPU_ASSERT(nhops >= 0 && nhops <= MAX_REQUESTS-1);
  541. struct _starpu_data_request *requests[nhops + write_invalidation];
  542. /* Did we reuse a request for that hop ? */
  543. int reused_requests[nhops + write_invalidation];
  544. /* Construct an array with a list of requests, possibly reusing existing requests */
  545. int hop;
  546. for (hop = 0; hop < nhops; hop++)
  547. {
  548. struct _starpu_data_request *r;
  549. unsigned hop_src_node = src_nodes[hop];
  550. unsigned hop_dst_node = dst_nodes[hop];
  551. unsigned hop_handling_node = handling_nodes[hop];
  552. struct _starpu_data_replicate *hop_src_replicate;
  553. struct _starpu_data_replicate *hop_dst_replicate;
  554. /* Only the first request is independant */
  555. unsigned ndeps = (hop == 0)?0:1;
  556. hop_src_replicate = &handle->per_node[hop_src_node];
  557. hop_dst_replicate = (hop != nhops - 1)?&handle->per_node[hop_dst_node]:dst_replicate;
  558. /* Try to reuse a request if possible */
  559. r = _starpu_search_existing_data_request(hop_dst_replicate,
  560. (mode & STARPU_R)?hop_src_node:hop_dst_node,
  561. mode, is_prefetch);
  562. reused_requests[hop] = !!r;
  563. if (!r)
  564. {
  565. /* Create a new request if there was no request to reuse */
  566. r = _starpu_create_data_request(handle, hop_src_replicate,
  567. hop_dst_replicate, hop_handling_node,
  568. mode, ndeps, is_prefetch, prio, 0, origin);
  569. nwait++;
  570. }
  571. requests[hop] = r;
  572. }
  573. /* Chain these requests */
  574. for (hop = 0; hop < nhops; hop++)
  575. {
  576. struct _starpu_data_request *r;
  577. r = requests[hop];
  578. if (hop != nhops - 1)
  579. {
  580. if (!reused_requests[hop + 1])
  581. {
  582. r->next_req[r->next_req_count++] = requests[hop + 1];
  583. STARPU_ASSERT(r->next_req_count <= STARPU_MAXNODES);
  584. }
  585. }
  586. else if (!write_invalidation)
  587. /* The last request will perform the callback after termination */
  588. _starpu_data_request_append_callback(r, callback_func, callback_arg);
  589. if (reused_requests[hop])
  590. _starpu_spin_unlock(&r->lock);
  591. }
  592. if (write_invalidation)
  593. {
  594. /* Some requests were still pending, we have to add yet another
  595. * request, depending on them, which will invalidate their
  596. * result.
  597. */
  598. struct _starpu_data_request *r = _starpu_create_data_request(handle, dst_replicate,
  599. dst_replicate, requesting_node,
  600. STARPU_W, nwait, is_prefetch, prio, 1, origin);
  601. /* and perform the callback after termination */
  602. _starpu_data_request_append_callback(r, callback_func, callback_arg);
  603. /* We will write to the buffer. We will have to wait for all
  604. * existing requests before the last request which will
  605. * invalidate all their results (which were possibly spurious,
  606. * e.g. too aggressive eviction).
  607. */
  608. unsigned i, j;
  609. unsigned nnodes = starpu_memory_nodes_get_count();
  610. for (i = 0; i < nnodes; i++)
  611. for (j = 0; j < nnodes; j++)
  612. {
  613. struct _starpu_data_request *r2 = handle->per_node[i].request[j];
  614. if (r2)
  615. {
  616. _starpu_spin_lock(&r2->lock);
  617. if (is_prefetch < r2->prefetch)
  618. /* Hasten the request we will have to wait for */
  619. _starpu_update_prefetch_status(r2, is_prefetch);
  620. r2->next_req[r2->next_req_count++] = r;
  621. STARPU_ASSERT(r2->next_req_count <= STARPU_MAXNODES + 1);
  622. _starpu_spin_unlock(&r2->lock);
  623. nwait--;
  624. }
  625. }
  626. STARPU_ASSERT(nwait == 0);
  627. nhops++;
  628. requests[nhops - 1] = r;
  629. /* existing requests will post this one */
  630. reused_requests[nhops - 1] = 1;
  631. }
  632. STARPU_ASSERT(nhops);
  633. if (!async)
  634. requests[nhops - 1]->refcnt++;
  635. /* we only submit the first request, the remaining will be
  636. * automatically submitted afterward */
  637. if (!reused_requests[0])
  638. _starpu_post_data_request(requests[0]);
  639. return requests[nhops - 1];
  640. }
  641. int _starpu_fetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *dst_replicate,
  642. enum starpu_data_access_mode mode, unsigned detached, unsigned is_prefetch, unsigned async,
  643. void (*callback_func)(void *), void *callback_arg, int prio, const char *origin)
  644. {
  645. _STARPU_LOG_IN();
  646. int cpt = 0;
  647. while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
  648. {
  649. cpt++;
  650. _starpu_datawizard_progress(1);
  651. }
  652. if (cpt == STARPU_SPIN_MAXTRY)
  653. _starpu_spin_lock(&handle->header_lock);
  654. if (!detached)
  655. {
  656. /* Take references which will be released by _starpu_release_data_on_node */
  657. if (dst_replicate)
  658. dst_replicate->refcnt++;
  659. else if (node == STARPU_ACQUIRE_NO_NODE_LOCK_ALL)
  660. {
  661. int i;
  662. for (i = 0; i < STARPU_MAXNODES; i++)
  663. handle->per_node[i].refcnt++;
  664. }
  665. handle->busy_count++;
  666. }
  667. struct _starpu_data_request *r;
  668. r = _starpu_create_request_to_fetch_data(handle, dst_replicate, mode,
  669. is_prefetch, async, callback_func, callback_arg, prio, origin);
  670. /* If no request was created, the handle was already up-to-date on the
  671. * node. In this case, _starpu_create_request_to_fetch_data has already
  672. * unlocked the header. */
  673. if (!r)
  674. return 0;
  675. _starpu_spin_unlock(&handle->header_lock);
  676. int ret = async?0:_starpu_wait_data_request_completion(r, 1);
  677. _STARPU_LOG_OUT();
  678. return ret;
  679. }
  680. static int idle_prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
  681. {
  682. return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, 2, 1, NULL, NULL, prio, "idle_prefetch_data_on_node");
  683. }
  684. static int prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
  685. {
  686. return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, 1, 1, NULL, NULL, prio, "prefetch_data_on_node");
  687. }
  688. static int fetch_data(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
  689. {
  690. return _starpu_fetch_data_on_node(handle, node, replicate, mode, 0, 0, 0, NULL, NULL, prio, "fetch_data");
  691. }
  692. uint32_t _starpu_get_data_refcnt(starpu_data_handle_t handle, unsigned node)
  693. {
  694. return handle->per_node[node].refcnt;
  695. }
  696. size_t _starpu_data_get_size(starpu_data_handle_t handle)
  697. {
  698. return handle->ops->get_size(handle);
  699. }
  700. uint32_t _starpu_data_get_footprint(starpu_data_handle_t handle)
  701. {
  702. return handle->footprint;
  703. }
  704. /* in case the data was accessed on a write mode, do not forget to
  705. * make it accessible again once it is possible ! */
  706. void _starpu_release_data_on_node(starpu_data_handle_t handle, uint32_t default_wt_mask, struct _starpu_data_replicate *replicate)
  707. {
  708. uint32_t wt_mask;
  709. wt_mask = default_wt_mask | handle->wt_mask;
  710. wt_mask &= (1<<starpu_memory_nodes_get_count())-1;
  711. /* Note that it is possible that there is no valid copy of the data (if
  712. * starpu_data_invalidate was called for instance). In that case, we do
  713. * not enforce any write-through mechanism. */
  714. unsigned memory_node = replicate->memory_node;
  715. if (replicate->state != STARPU_INVALID && handle->current_mode & STARPU_W)
  716. if ((wt_mask & ~(1<<memory_node)))
  717. _starpu_write_through_data(handle, memory_node, wt_mask);
  718. int cpt = 0;
  719. while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
  720. {
  721. cpt++;
  722. _starpu_datawizard_progress(1);
  723. }
  724. if (cpt == STARPU_SPIN_MAXTRY)
  725. _starpu_spin_lock(&handle->header_lock);
  726. /* Release refcnt taken by fetch_data_on_node */
  727. replicate->refcnt--;
  728. STARPU_ASSERT_MSG(replicate->refcnt >= 0, "handle %p released too many times", handle);
  729. STARPU_ASSERT_MSG(handle->busy_count > 0, "handle %p released too many times", handle);
  730. handle->busy_count--;
  731. if (!_starpu_notify_data_dependencies(handle))
  732. _starpu_spin_unlock(&handle->header_lock);
  733. }
  734. static void _starpu_set_data_requested_flag_if_needed(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate)
  735. {
  736. int cpt = 0;
  737. while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
  738. {
  739. cpt++;
  740. _starpu_datawizard_progress(1);
  741. }
  742. if (cpt == STARPU_SPIN_MAXTRY)
  743. _starpu_spin_lock(&handle->header_lock);
  744. if (replicate->state == STARPU_INVALID)
  745. {
  746. unsigned dst_node = replicate->memory_node;
  747. replicate->requested |= 1UL << dst_node;
  748. }
  749. _starpu_spin_unlock(&handle->header_lock);
  750. }
  751. int starpu_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned node, int prio)
  752. {
  753. STARPU_ASSERT(!task->prefetched);
  754. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  755. unsigned index;
  756. for (index = 0; index < nbuffers; index++)
  757. {
  758. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
  759. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
  760. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  761. continue;
  762. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  763. prefetch_data_on_node(handle, node, replicate, mode, prio);
  764. _starpu_set_data_requested_flag_if_needed(handle, replicate);
  765. }
  766. return 0;
  767. }
  768. int starpu_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
  769. {
  770. int prio = task->priority;
  771. if (task->workerorder)
  772. prio = INT_MAX - task->workerorder;
  773. return starpu_prefetch_task_input_on_node_prio(task, node, prio);
  774. }
  775. int starpu_idle_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned node, int prio)
  776. {
  777. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  778. unsigned index;
  779. for (index = 0; index < nbuffers; index++)
  780. {
  781. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
  782. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
  783. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  784. continue;
  785. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  786. idle_prefetch_data_on_node(handle, node, replicate, mode, prio);
  787. }
  788. return 0;
  789. }
  790. int starpu_idle_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
  791. {
  792. int prio = task->priority;
  793. if (task->workerorder)
  794. prio = INT_MAX - task->workerorder;
  795. return starpu_idle_prefetch_task_input_on_node_prio(task, node, prio);
  796. }
  797. struct _starpu_data_replicate *get_replicate(starpu_data_handle_t handle, enum starpu_data_access_mode mode, int workerid, unsigned node)
  798. {
  799. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  800. {
  801. STARPU_ASSERT(workerid >= 0);
  802. if (!handle->per_worker)
  803. {
  804. _starpu_spin_lock(&handle->header_lock);
  805. if (!handle->per_worker)
  806. _starpu_data_initialize_per_worker(handle);
  807. _starpu_spin_unlock(&handle->header_lock);
  808. }
  809. return &handle->per_worker[workerid];
  810. }
  811. else
  812. /* That's a "normal" buffer (R/W) */
  813. return &handle->per_node[node];
  814. }
  815. /* Callback used when a buffer is send asynchronously to the sink */
  816. static void _starpu_fetch_task_input_cb(void *arg)
  817. {
  818. struct _starpu_worker * worker = (struct _starpu_worker *) arg;
  819. /* increase the number of buffer received */
  820. STARPU_WMB();
  821. (void)STARPU_ATOMIC_ADD(&worker->nb_buffers_transferred, 1);
  822. #ifdef STARPU_SIMGRID
  823. starpu_pthread_queue_broadcast(&_starpu_simgrid_transfer_queue[worker->memory_node]);
  824. #endif
  825. }
  826. /* Synchronously or asynchronously fetch data for a given task (if it's not there already)
  827. * Returns the number of data acquired here. */
  828. /* _starpu_fetch_task_input must be called before
  829. * executing the task. __starpu_push_task_output but be called after the
  830. * execution of the task. */
  831. /* The driver can either just call _starpu_fetch_task_input with async==0,
  832. * or to improve overlapping, it can call _starpu_fetch_task_input with
  833. * async==1, then wait for transfers to complete, then call
  834. * _starpu_fetch_task_input_tail to complete the fetch. */
  835. int _starpu_fetch_task_input(struct starpu_task *task, struct _starpu_job *j, int async)
  836. {
  837. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  838. int workerid = worker->workerid;
  839. if (async)
  840. {
  841. worker->task_transferring = task;
  842. worker->nb_buffers_transferred = 0;
  843. if (worker->ntasks <= 1)
  844. _STARPU_TRACE_WORKER_START_FETCH_INPUT(NULL, workerid);
  845. }
  846. else
  847. _STARPU_TRACE_START_FETCH_INPUT(NULL);
  848. int profiling = starpu_profiling_status_get();
  849. if (profiling && task->profiling_info)
  850. _starpu_clock_gettime(&task->profiling_info->acquire_data_start_time);
  851. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  852. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  853. unsigned nacquires;
  854. unsigned local_memory_node = worker->memory_node;
  855. unsigned index;
  856. nacquires = 0;
  857. for (index = 0; index < nbuffers; index++)
  858. {
  859. int ret;
  860. starpu_data_handle_t handle = descrs[index].handle;
  861. enum starpu_data_access_mode mode = descrs[index].mode;
  862. int node = descrs[index].node;
  863. if (node == -1)
  864. node = local_memory_node;
  865. if (mode == STARPU_NONE ||
  866. (mode & ((1<<STARPU_MODE_SHIFT) - 1)) >= STARPU_ACCESS_MODE_MAX ||
  867. (mode >> STARPU_MODE_SHIFT) >= (STARPU_SHIFTED_MODE_MAX >> STARPU_MODE_SHIFT))
  868. STARPU_ASSERT_MSG(0, "mode %d (0x%x) is bogus\n", mode, mode);
  869. struct _starpu_data_replicate *local_replicate;
  870. if (index && descrs[index-1].handle == descrs[index].handle)
  871. /* We have already took this data, skip it. This
  872. * depends on ordering putting writes before reads, see
  873. * _starpu_compar_handles */
  874. continue;
  875. local_replicate = get_replicate(handle, mode, workerid, node);
  876. if (async)
  877. {
  878. ret = _starpu_fetch_data_on_node(handle, node, local_replicate, mode, 0, 0, 1,
  879. _starpu_fetch_task_input_cb, worker, 0, "_starpu_src_common_worker_internal_work");
  880. #ifdef STARPU_SIMGRID
  881. if (_starpu_simgrid_fetching_input_cost())
  882. MSG_process_sleep(0.000001);
  883. #endif
  884. if (STARPU_UNLIKELY(ret))
  885. {
  886. /* Ooops, not enough memory, make worker wait for these for now, and the synchronous call will finish by forcing eviction*/
  887. worker->nb_buffers_totransfer = nacquires;
  888. return 0;
  889. }
  890. }
  891. else
  892. {
  893. ret = fetch_data(handle, node, local_replicate, mode, 0);
  894. #ifdef STARPU_SIMGRID
  895. if (_starpu_simgrid_fetching_input_cost())
  896. MSG_process_sleep(0.000001);
  897. #endif
  898. if (STARPU_UNLIKELY(ret))
  899. goto enomem;
  900. }
  901. nacquires++;
  902. }
  903. if (async)
  904. {
  905. worker->nb_buffers_totransfer = nacquires;
  906. return 0;
  907. }
  908. _starpu_fetch_task_input_tail(task, j, worker);
  909. return 0;
  910. enomem:
  911. _STARPU_TRACE_END_FETCH_INPUT(NULL);
  912. _STARPU_DISP("something went wrong with buffer %u\n", index);
  913. /* try to unreference all the input that were successfully taken */
  914. unsigned index2;
  915. for (index2 = 0; index2 < index; index2++)
  916. {
  917. starpu_data_handle_t handle = descrs[index2].handle;
  918. enum starpu_data_access_mode mode = descrs[index2].mode;
  919. int node = descrs[index].node;
  920. if (node == -1)
  921. node = local_memory_node;
  922. struct _starpu_data_replicate *local_replicate;
  923. if (index2 && descrs[index2-1].handle == descrs[index2].handle)
  924. /* We have already released this data, skip it. This
  925. * depends on ordering putting writes before reads, see
  926. * _starpu_compar_handles */
  927. continue;
  928. local_replicate = get_replicate(handle, mode, workerid, node);
  929. _starpu_release_data_on_node(handle, 0, local_replicate);
  930. }
  931. return -1;
  932. }
  933. /* Now that we have taken the data locks in locking order, fill the codelet interfaces in function order. */
  934. void _starpu_fetch_task_input_tail(struct starpu_task *task, struct _starpu_job *j, struct _starpu_worker *worker)
  935. {
  936. int workerid = worker->workerid;
  937. int profiling = starpu_profiling_status_get();
  938. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  939. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  940. unsigned local_memory_node = worker->memory_node;
  941. unsigned index;
  942. unsigned long total_size = 0;
  943. for (index = 0; index < nbuffers; index++)
  944. {
  945. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
  946. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
  947. int node = descrs[index].node;
  948. if (node == -1)
  949. node = local_memory_node;
  950. struct _starpu_data_replicate *local_replicate;
  951. local_replicate = get_replicate(handle, mode, workerid, node);
  952. if (local_replicate->mc)
  953. local_replicate->mc->diduse = 1;
  954. _STARPU_TASK_SET_INTERFACE(task , local_replicate->data_interface, index);
  955. /* If the replicate was not initialized yet, we have to do it now */
  956. if (!(mode & STARPU_SCRATCH) && !local_replicate->initialized)
  957. _starpu_redux_init_data_replicate(handle, local_replicate, workerid);
  958. #ifdef STARPU_USE_FXT
  959. total_size += _starpu_data_get_size(handle);
  960. #endif
  961. }
  962. _STARPU_TRACE_DATA_LOAD(workerid,total_size);
  963. if (profiling && task->profiling_info)
  964. _starpu_clock_gettime(&task->profiling_info->acquire_data_end_time);
  965. _STARPU_TRACE_END_FETCH_INPUT(NULL);
  966. }
  967. /* Release task data dependencies */
  968. void __starpu_push_task_output(struct _starpu_job *j)
  969. {
  970. #ifdef STARPU_OPENMP
  971. STARPU_ASSERT(!j->continuation);
  972. #endif
  973. int profiling = starpu_profiling_status_get();
  974. struct starpu_task *task = j->task;
  975. if (profiling && task->profiling_info)
  976. _starpu_clock_gettime(&task->profiling_info->release_data_start_time);
  977. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  978. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  979. int workerid = starpu_worker_get_id();
  980. unsigned local_memory_node = _starpu_memory_node_get_local_key();
  981. unsigned index;
  982. for (index = 0; index < nbuffers; index++)
  983. {
  984. starpu_data_handle_t handle = descrs[index].handle;
  985. enum starpu_data_access_mode mode = descrs[index].mode;
  986. int node = descrs[index].node;
  987. if (node == -1 && task->where != STARPU_NOWHERE)
  988. node = local_memory_node;
  989. struct _starpu_data_replicate *local_replicate = NULL;
  990. if (index && descrs[index-1].handle == descrs[index].handle)
  991. /* We have already released this data, skip it. This
  992. * depends on ordering putting writes before reads, see
  993. * _starpu_compar_handles */
  994. continue;
  995. if (node != -1)
  996. local_replicate = get_replicate(handle, mode, workerid, node);
  997. /* Keep a reference for future
  998. * _starpu_release_task_enforce_sequential_consistency call */
  999. _starpu_spin_lock(&handle->header_lock);
  1000. handle->busy_count++;
  1001. if (node == -1)
  1002. {
  1003. /* NOWHERE case, just notify dependencies */
  1004. if (!_starpu_notify_data_dependencies(handle))
  1005. _starpu_spin_unlock(&handle->header_lock);
  1006. }
  1007. else
  1008. {
  1009. _starpu_spin_unlock(&handle->header_lock);
  1010. _starpu_release_data_on_node(handle, 0, local_replicate);
  1011. }
  1012. }
  1013. if (profiling && task->profiling_info)
  1014. _starpu_clock_gettime(&task->profiling_info->release_data_end_time);
  1015. }
  1016. /* Version for a driver running on a worker: we show the driver state in the trace */
  1017. void _starpu_push_task_output(struct _starpu_job *j)
  1018. {
  1019. _STARPU_TRACE_START_PUSH_OUTPUT(NULL);
  1020. __starpu_push_task_output(j);
  1021. _STARPU_TRACE_END_PUSH_OUTPUT(NULL);
  1022. }
  1023. struct fetch_nowhere_wrapper
  1024. {
  1025. struct _starpu_job *j;
  1026. unsigned pending;
  1027. };
  1028. static void _starpu_fetch_nowhere_task_input_cb(void *arg);
  1029. /* Asynchronously fetch data for a task which will have no content */
  1030. void _starpu_fetch_nowhere_task_input(struct _starpu_job *j)
  1031. {
  1032. int profiling = starpu_profiling_status_get();
  1033. struct starpu_task *task = j->task;
  1034. if (profiling && task->profiling_info)
  1035. _starpu_clock_gettime(&task->profiling_info->acquire_data_start_time);
  1036. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  1037. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  1038. unsigned nfetchbuffers = 0;
  1039. struct fetch_nowhere_wrapper *wrapper;
  1040. unsigned index;
  1041. for (index = 0; index < nbuffers; index++)
  1042. {
  1043. int node = descrs[index].node;
  1044. if (node != -1)
  1045. nfetchbuffers++;
  1046. }
  1047. if (!nfetchbuffers)
  1048. {
  1049. /* Nothing to fetch actually, already finished! */
  1050. __starpu_push_task_output(j);
  1051. _starpu_handle_job_termination(j);
  1052. _STARPU_LOG_OUT_TAG("handle_job_termination");
  1053. return;
  1054. }
  1055. _STARPU_MALLOC(wrapper, (sizeof(*wrapper)));
  1056. wrapper->j = j;
  1057. /* +1 for the call below */
  1058. wrapper->pending = nfetchbuffers + 1;
  1059. for (index = 0; index < nbuffers; index++)
  1060. {
  1061. starpu_data_handle_t handle = descrs[index].handle;
  1062. enum starpu_data_access_mode mode = descrs[index].mode;
  1063. int node = descrs[index].node;
  1064. if (node == -1)
  1065. continue;
  1066. if (mode == STARPU_NONE ||
  1067. (mode & ((1<<STARPU_MODE_SHIFT) - 1)) >= STARPU_ACCESS_MODE_MAX ||
  1068. (mode >> STARPU_MODE_SHIFT) >= (STARPU_SHIFTED_MODE_MAX >> STARPU_MODE_SHIFT))
  1069. STARPU_ASSERT_MSG(0, "mode %d (0x%x) is bogus\n", mode, mode);
  1070. STARPU_ASSERT(mode != STARPU_SCRATCH && mode != STARPU_REDUX);
  1071. struct _starpu_data_replicate *local_replicate;
  1072. local_replicate = get_replicate(handle, mode, -1, node);
  1073. _starpu_fetch_data_on_node(handle, node, local_replicate, mode, 0, 0, 1, _starpu_fetch_nowhere_task_input_cb, wrapper, 0, "_starpu_fetch_nowhere_task_input");
  1074. }
  1075. if (profiling && task->profiling_info)
  1076. _starpu_clock_gettime(&task->profiling_info->acquire_data_end_time);
  1077. /* Finished working with the task, release our reference */
  1078. _starpu_fetch_nowhere_task_input_cb(wrapper);
  1079. }
  1080. static void _starpu_fetch_nowhere_task_input_cb(void *arg)
  1081. {
  1082. /* One more transfer finished */
  1083. struct fetch_nowhere_wrapper *wrapper = arg;
  1084. unsigned pending = STARPU_ATOMIC_ADD(&wrapper->pending, -1);
  1085. ANNOTATE_HAPPENS_BEFORE(&wrapper->pending);
  1086. if (pending == 0)
  1087. {
  1088. ANNOTATE_HAPPENS_AFTER(&wrapper->pending);
  1089. /* Finished transferring, task is over */
  1090. struct _starpu_job *j = wrapper->j;
  1091. free(wrapper);
  1092. __starpu_push_task_output(j);
  1093. _starpu_handle_job_termination(j);
  1094. _STARPU_LOG_OUT_TAG("handle_job_termination");
  1095. }
  1096. }
  1097. /* NB : this value can only be an indication of the status of a data
  1098. at some point, but there is no strong garantee ! */
  1099. unsigned _starpu_is_data_present_or_requested(starpu_data_handle_t handle, unsigned node)
  1100. {
  1101. unsigned ret = 0;
  1102. // XXX : this is just a hint, so we don't take the lock ...
  1103. // STARPU_PTHREAD_SPIN_LOCK(&handle->header_lock);
  1104. if (handle->per_node[node].state != STARPU_INVALID)
  1105. {
  1106. ret = 1;
  1107. }
  1108. else
  1109. {
  1110. unsigned i;
  1111. unsigned nnodes = starpu_memory_nodes_get_count();
  1112. for (i = 0; i < nnodes; i++)
  1113. {
  1114. if ((handle->per_node[node].requested & (1UL << i)) || handle->per_node[node].request[i])
  1115. ret = 1;
  1116. }
  1117. }
  1118. // STARPU_PTHREAD_SPIN_UNLOCK(&handle->header_lock);
  1119. return ret;
  1120. }
  1121. void _starpu_data_set_unregister_hook(starpu_data_handle_t handle, _starpu_data_handle_unregister_hook func)
  1122. {
  1123. handle->unregister_hook = func;
  1124. }