coherency.c 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011-2014,2016,2017 Inria
  4. * Copyright (C) 2008-2019 Université de Bordeaux
  5. * Copyright (C) 2018 Federal University of Rio Grande do Sul (UFRGS)
  6. * Copyright (C) 2010-2019 CNRS
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. #include <limits.h>
  20. #include <math.h>
  21. #include <common/config.h>
  22. #include <datawizard/coherency.h>
  23. #include <datawizard/copy_driver.h>
  24. #include <datawizard/write_back.h>
  25. #include <datawizard/memory_nodes.h>
  26. #include <core/dependencies/data_concurrency.h>
  27. #include <core/disk.h>
  28. #include <profiling/profiling.h>
  29. #include <core/task.h>
  30. #include <starpu_scheduler.h>
  31. #include <core/workers.h>
  32. #include <datawizard/node_ops.h>
  33. #ifdef STARPU_SIMGRID
  34. #include <core/simgrid.h>
  35. #endif
  36. static int link_supports_direct_transfers(starpu_data_handle_t handle, unsigned src_node, unsigned dst_node, unsigned *handling_node);
  37. int _starpu_select_src_node(starpu_data_handle_t handle, unsigned destination)
  38. {
  39. int src_node = -1;
  40. unsigned i;
  41. unsigned nnodes = starpu_memory_nodes_get_count();
  42. /* first find a valid copy, either a STARPU_OWNER or a STARPU_SHARED */
  43. unsigned node;
  44. size_t size = _starpu_data_get_size(handle);
  45. double cost = INFINITY;
  46. unsigned src_node_mask = 0;
  47. for (node = 0; node < nnodes; node++)
  48. {
  49. if (handle->per_node[node].state != STARPU_INVALID)
  50. {
  51. /* we found a copy ! */
  52. src_node_mask |= (1<<node);
  53. }
  54. }
  55. if (src_node_mask == 0 && handle->init_cl)
  56. {
  57. /* No copy yet, but applicationg told us how to build it. */
  58. return -1;
  59. }
  60. /* we should have found at least one copy ! */
  61. STARPU_ASSERT_MSG(src_node_mask != 0, "The data for the handle %p is requested, but the handle does not have a valid value. Perhaps some initialization task is missing?", handle);
  62. /* Without knowing the size, we won't know the cost */
  63. if (!size)
  64. cost = 0;
  65. /* Check whether we have transfer cost for all nodes, if so, take the minimum */
  66. if (cost)
  67. for (i = 0; i < nnodes; i++)
  68. {
  69. if (src_node_mask & (1<<i))
  70. {
  71. double time = starpu_transfer_predict(i, destination, size);
  72. unsigned handling_node;
  73. /* Avoid indirect transfers */
  74. if (!link_supports_direct_transfers(handle, i, destination, &handling_node))
  75. continue;
  76. if (_STARPU_IS_ZERO(time))
  77. {
  78. /* No estimation, will have to revert to dumb strategy */
  79. cost = 0.0;
  80. break;
  81. }
  82. else if (time < cost)
  83. {
  84. cost = time;
  85. src_node = i;
  86. }
  87. }
  88. }
  89. if (cost && src_node != -1)
  90. {
  91. /* Could estimate through cost, return that */
  92. STARPU_ASSERT(handle->per_node[src_node].allocated);
  93. STARPU_ASSERT(handle->per_node[src_node].initialized);
  94. return src_node;
  95. }
  96. int i_ram = -1;
  97. int i_gpu = -1;
  98. int i_disk = -1;
  99. /* Revert to dumb strategy: take RAM unless only a GPU has it */
  100. for (i = 0; i < nnodes; i++)
  101. {
  102. if (src_node_mask & (1<<i))
  103. {
  104. int (*can_copy)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node, unsigned handling_node) = handle->ops->copy_methods->can_copy;
  105. /* Avoid transfers which the interface does not want */
  106. if (can_copy)
  107. {
  108. void *src_interface = handle->per_node[i].data_interface;
  109. void *dst_interface = handle->per_node[destination].data_interface;
  110. unsigned handling_node;
  111. if (!link_supports_direct_transfers(handle, i, destination, &handling_node))
  112. {
  113. /* Avoid through RAM if the interface does not want it */
  114. void *ram_interface = handle->per_node[STARPU_MAIN_RAM].data_interface;
  115. if ((!can_copy(src_interface, i, ram_interface, STARPU_MAIN_RAM, i)
  116. && !can_copy(src_interface, i, ram_interface, STARPU_MAIN_RAM, STARPU_MAIN_RAM))
  117. || (!can_copy(ram_interface, STARPU_MAIN_RAM, dst_interface, destination, STARPU_MAIN_RAM)
  118. && !can_copy(ram_interface, STARPU_MAIN_RAM, dst_interface, destination, destination)))
  119. continue;
  120. }
  121. }
  122. /* however GPU are expensive sources, really !
  123. * Unless peer transfer is supported (and it would then have been selected above).
  124. * Other should be ok */
  125. if (starpu_node_get_kind(i) == STARPU_CUDA_RAM ||
  126. starpu_node_get_kind(i) == STARPU_OPENCL_RAM ||
  127. starpu_node_get_kind(i) == STARPU_MIC_RAM)
  128. i_gpu = i;
  129. if (starpu_node_get_kind(i) == STARPU_CPU_RAM ||
  130. starpu_node_get_kind(i) == STARPU_SCC_RAM ||
  131. starpu_node_get_kind(i) == STARPU_SCC_SHM ||
  132. starpu_node_get_kind(i) == STARPU_MPI_MS_RAM)
  133. i_ram = i;
  134. if (starpu_node_get_kind(i) == STARPU_DISK_RAM)
  135. i_disk = i;
  136. }
  137. }
  138. /* we have to use cpu_ram in first */
  139. if (i_ram != -1)
  140. src_node = i_ram;
  141. /* no luck we have to use the disk memory */
  142. else if (i_gpu != -1)
  143. src_node = i_gpu;
  144. else
  145. src_node = i_disk;
  146. STARPU_ASSERT(src_node != -1);
  147. STARPU_ASSERT(handle->per_node[src_node].allocated);
  148. STARPU_ASSERT(handle->per_node[src_node].initialized);
  149. return src_node;
  150. }
  151. /* this may be called once the data is fetched with header and STARPU_RW-lock hold */
  152. void _starpu_update_data_state(starpu_data_handle_t handle,
  153. struct _starpu_data_replicate *requesting_replicate,
  154. enum starpu_data_access_mode mode)
  155. {
  156. /* There is nothing to do for relaxed coherency modes (scratch or
  157. * reductions) */
  158. if (!(mode & STARPU_RW))
  159. return;
  160. unsigned nnodes = starpu_memory_nodes_get_count();
  161. /* the data is present now */
  162. unsigned requesting_node = requesting_replicate->memory_node;
  163. requesting_replicate->requested &= ~(1UL << requesting_node);
  164. if (mode & STARPU_W)
  165. {
  166. /* the requesting node now has the only valid copy */
  167. unsigned node;
  168. for (node = 0; node < nnodes; node++)
  169. {
  170. _STARPU_TRACE_DATA_STATE_INVALID(handle, node);
  171. handle->per_node[node].state = STARPU_INVALID;
  172. }
  173. _STARPU_TRACE_DATA_STATE_OWNER(handle, requesting_node);
  174. requesting_replicate->state = STARPU_OWNER;
  175. if (handle->home_node != -1 && handle->per_node[handle->home_node].state == STARPU_INVALID)
  176. /* Notify that this MC is now dirty */
  177. _starpu_memchunk_dirty(requesting_replicate->mc, requesting_replicate->memory_node);
  178. }
  179. else
  180. {
  181. /* read only */
  182. if (requesting_replicate->state != STARPU_OWNER)
  183. {
  184. /* there was at least another copy of the data */
  185. unsigned node;
  186. for (node = 0; node < nnodes; node++)
  187. {
  188. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  189. if (replicate->state != STARPU_INVALID){
  190. _STARPU_TRACE_DATA_STATE_SHARED(handle, node);
  191. replicate->state = STARPU_SHARED;
  192. }
  193. }
  194. _STARPU_TRACE_DATA_STATE_SHARED(handle, requesting_node);
  195. requesting_replicate->state = STARPU_SHARED;
  196. }
  197. }
  198. }
  199. static int worker_supports_direct_access(unsigned node, unsigned handling_node)
  200. {
  201. if (node == handling_node)
  202. return 1;
  203. if (!_starpu_memory_node_get_nworkers(handling_node))
  204. /* No worker to process the request from that node */
  205. return 0;
  206. int type = starpu_node_get_kind(node);
  207. if (_node_ops[type].direct_access_supported)
  208. return _node_ops[type].direct_access_supported(node, handling_node);
  209. else
  210. {
  211. STARPU_ABORT_MSG("Node %d does not define the operation 'direct_access_supported'", type);
  212. return 1;
  213. }
  214. }
  215. static int link_supports_direct_transfers(starpu_data_handle_t handle, unsigned src_node, unsigned dst_node, unsigned *handling_node)
  216. {
  217. int (*can_copy)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node, unsigned handling_node) = handle->ops->copy_methods->can_copy;
  218. void *src_interface = handle->per_node[src_node].data_interface;
  219. void *dst_interface = handle->per_node[dst_node].data_interface;
  220. /* XXX That's a hack until we fix cudaMemcpy3DPeerAsync in the block interface
  221. * Perhaps not all data interface provide a direct GPU-GPU transfer
  222. * method ! */
  223. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  224. if (src_node != dst_node && starpu_node_get_kind(src_node) == STARPU_CUDA_RAM && starpu_node_get_kind(dst_node) == STARPU_CUDA_RAM)
  225. {
  226. const struct starpu_data_copy_methods *copy_methods = handle->ops->copy_methods;
  227. if (!copy_methods->cuda_to_cuda_async && !copy_methods->any_to_any)
  228. return 0;
  229. }
  230. #endif
  231. /* Note: with CUDA, performance seems a bit better when issuing the transfer from the destination (tested without GPUDirect, but GPUDirect probably behave the same) */
  232. if (worker_supports_direct_access(src_node, dst_node) && (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, dst_node)))
  233. {
  234. *handling_node = dst_node;
  235. return 1;
  236. }
  237. if (worker_supports_direct_access(dst_node, src_node) && (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, src_node)))
  238. {
  239. *handling_node = src_node;
  240. return 1;
  241. }
  242. return 0;
  243. }
  244. /* Now, we use slowness/bandwidth to compare numa nodes, is it better to use latency ? */
  245. static unsigned chose_best_numa_between_src_and_dest(int src, int dst)
  246. {
  247. double timing_best;
  248. int best_numa = -1;
  249. unsigned numa;
  250. const unsigned nb_numa_nodes = starpu_memory_nodes_get_numa_count();
  251. for(numa = 0; numa < nb_numa_nodes; numa++)
  252. {
  253. double actual = 1.0/starpu_transfer_bandwidth(src, numa) + 1.0/starpu_transfer_bandwidth(numa, dst);
  254. /* Compare slowness : take the lowest */
  255. if (best_numa < 0 || actual < timing_best)
  256. {
  257. best_numa = numa;
  258. timing_best = actual;
  259. }
  260. }
  261. STARPU_ASSERT(best_numa >= 0);
  262. return best_numa;
  263. }
  264. /* Determines the path of a request : each hop is defined by (src,dst) and the
  265. * node that handles the hop. The returned value indicates the number of hops,
  266. * and the max_len is the maximum number of hops (ie. the size of the
  267. * src_nodes, dst_nodes and handling_nodes arrays. */
  268. int _starpu_determine_request_path(starpu_data_handle_t handle,
  269. int src_node, int dst_node,
  270. enum starpu_data_access_mode mode, int max_len,
  271. unsigned *src_nodes, unsigned *dst_nodes,
  272. unsigned *handling_nodes, unsigned write_invalidation)
  273. {
  274. if (src_node == dst_node || !(mode & STARPU_R))
  275. {
  276. if (dst_node == -1 || starpu_node_get_kind(dst_node) == STARPU_DISK_RAM)
  277. handling_nodes[0] = src_node;
  278. else
  279. handling_nodes[0] = dst_node;
  280. if (write_invalidation)
  281. /* The invalidation request will be enough */
  282. return 0;
  283. /* The destination node should only allocate the data, no transfer is required */
  284. STARPU_ASSERT(max_len >= 1);
  285. src_nodes[0] = STARPU_MAIN_RAM; // ignored
  286. dst_nodes[0] = dst_node;
  287. return 1;
  288. }
  289. if (src_node < 0)
  290. {
  291. /* Will just initialize the destination */
  292. STARPU_ASSERT(max_len >= 1);
  293. src_nodes[0] = src_node; // ignored
  294. dst_nodes[0] = dst_node;
  295. return 1;
  296. }
  297. unsigned handling_node;
  298. int link_is_valid = link_supports_direct_transfers(handle, src_node, dst_node, &handling_node);
  299. if (!link_is_valid)
  300. {
  301. int (*can_copy)(void *, unsigned, void *, unsigned, unsigned) = handle->ops->copy_methods->can_copy;
  302. void *src_interface = handle->per_node[src_node].data_interface;
  303. void *dst_interface = handle->per_node[dst_node].data_interface;
  304. /* We need an intermediate hop to implement data staging
  305. * through main memory. */
  306. STARPU_ASSERT(max_len >= 2);
  307. STARPU_ASSERT(src_node >= 0);
  308. unsigned numa = chose_best_numa_between_src_and_dest(src_node, dst_node);
  309. /* GPU -> RAM */
  310. src_nodes[0] = src_node;
  311. dst_nodes[0] = numa;
  312. if (starpu_node_get_kind(src_node) == STARPU_DISK_RAM)
  313. /* Disks don't have their own driver thread */
  314. handling_nodes[0] = dst_node;
  315. else if (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, src_node))
  316. {
  317. handling_nodes[0] = src_node;
  318. }
  319. else
  320. {
  321. STARPU_ASSERT_MSG(can_copy(src_interface, src_node, dst_interface, dst_node, dst_node), "interface %d refuses all kinds of transfers from node %d to node %d\n", handle->ops->interfaceid, src_node, dst_node);
  322. handling_nodes[0] = dst_node;
  323. }
  324. /* RAM -> GPU */
  325. src_nodes[1] = numa;
  326. dst_nodes[1] = dst_node;
  327. if (starpu_node_get_kind(dst_node) == STARPU_DISK_RAM)
  328. /* Disks don't have their own driver thread */
  329. handling_nodes[1] = src_node;
  330. else if (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, dst_node))
  331. {
  332. handling_nodes[1] = dst_node;
  333. }
  334. else
  335. {
  336. STARPU_ASSERT_MSG(can_copy(src_interface, src_node, dst_interface, dst_node, src_node), "interface %d refuses all kinds of transfers from node %d to node %d\n", handle->ops->interfaceid, src_node, dst_node);
  337. handling_nodes[1] = src_node;
  338. }
  339. return 2;
  340. }
  341. else
  342. {
  343. STARPU_ASSERT(max_len >= 1);
  344. src_nodes[0] = src_node;
  345. dst_nodes[0] = dst_node;
  346. handling_nodes[0] = handling_node;
  347. #if !defined(STARPU_HAVE_CUDA_MEMCPY_PEER) && !defined(STARPU_SIMGRID)
  348. STARPU_ASSERT(!(mode & STARPU_R) || starpu_node_get_kind(src_node) != STARPU_CUDA_RAM || starpu_node_get_kind(dst_node) != STARPU_CUDA_RAM);
  349. #endif
  350. return 1;
  351. }
  352. }
  353. /* handle->lock should be taken. r is returned locked. The node parameter
  354. * indicate either the source of the request, or the destination for a
  355. * write-only request. */
  356. static struct _starpu_data_request *_starpu_search_existing_data_request(struct _starpu_data_replicate *replicate, unsigned node, enum starpu_data_access_mode mode, unsigned is_prefetch)
  357. {
  358. struct _starpu_data_request *r;
  359. r = replicate->request[node];
  360. if (r)
  361. {
  362. _starpu_spin_checklocked(&r->handle->header_lock);
  363. _starpu_spin_lock(&r->lock);
  364. /* perhaps we need to "upgrade" the request */
  365. if (is_prefetch < r->prefetch)
  366. _starpu_update_prefetch_status(r, is_prefetch);
  367. if (mode & STARPU_R)
  368. {
  369. /* in case the exisiting request did not imply a memory
  370. * transfer yet, we have to take a second refcnt now
  371. * for the source, in addition to the refcnt for the
  372. * destination
  373. * (so that the source remains valid) */
  374. if (!(r->mode & STARPU_R))
  375. {
  376. replicate->refcnt++;
  377. replicate->handle->busy_count++;
  378. }
  379. r->mode = (enum starpu_data_access_mode) ((int) r->mode | (int) STARPU_R);
  380. }
  381. if (mode & STARPU_W)
  382. r->mode = (enum starpu_data_access_mode) ((int) r->mode | (int) STARPU_W);
  383. }
  384. return r;
  385. }
  386. /*
  387. * This function is called when the data is needed on the local node, this
  388. * returns a pointer to the local copy
  389. *
  390. * R STARPU_W STARPU_RW
  391. * Owner OK OK OK
  392. * Shared OK 1 1
  393. * Invalid 2 3 4
  394. *
  395. * case 1 : shared + (read)write :
  396. * no data copy but shared->Invalid/Owner
  397. * case 2 : invalid + read :
  398. * data copy + invalid->shared + owner->shared (STARPU_ASSERT(there is a valid))
  399. * case 3 : invalid + write :
  400. * no data copy + invalid->owner + (owner,shared)->invalid
  401. * case 4 : invalid + R/STARPU_W :
  402. * data copy + if (STARPU_W) (invalid->owner + owner->invalid)
  403. * else (invalid,owner->shared)
  404. */
  405. struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_handle_t handle,
  406. struct _starpu_data_replicate *dst_replicate,
  407. enum starpu_data_access_mode mode, unsigned is_prefetch,
  408. unsigned async,
  409. void (*callback_func)(void *), void *callback_arg, int prio, const char *origin)
  410. {
  411. /* We don't care about commuting for data requests, that was handled before. */
  412. mode &= ~STARPU_COMMUTE;
  413. /* This function is called with handle's header lock taken */
  414. _starpu_spin_checklocked(&handle->header_lock);
  415. int requesting_node = dst_replicate ? dst_replicate->memory_node : -1;
  416. unsigned nwait = 0;
  417. if (mode & STARPU_W)
  418. {
  419. /* We will write to the buffer. We will have to wait for all
  420. * existing requests before the last request which will
  421. * invalidate all their results (which were possibly spurious,
  422. * e.g. too aggressive eviction).
  423. */
  424. unsigned i, j;
  425. unsigned nnodes = starpu_memory_nodes_get_count();
  426. for (i = 0; i < nnodes; i++)
  427. for (j = 0; j < nnodes; j++)
  428. if (handle->per_node[i].request[j])
  429. nwait++;
  430. /* If the request is not detached (i.e. the caller really wants
  431. * proper ownership), no new requests will appear because a
  432. * reference will be kept on the dst replicate, which will
  433. * notably prevent data reclaiming.
  434. */
  435. }
  436. if ((!dst_replicate || dst_replicate->state != STARPU_INVALID) && (!nwait || is_prefetch))
  437. {
  438. if (dst_replicate)
  439. {
  440. #ifdef STARPU_MEMORY_STATS
  441. enum _starpu_cache_state old_state = dst_replicate->state;
  442. #endif
  443. /* the data is already available and we don't have to wait for
  444. * any request, so we can stop */
  445. _starpu_update_data_state(handle, dst_replicate, mode);
  446. _starpu_msi_cache_hit(requesting_node);
  447. #ifdef STARPU_MEMORY_STATS
  448. _starpu_memory_handle_stats_cache_hit(handle, requesting_node);
  449. /* XXX Broken ? */
  450. if (old_state == STARPU_SHARED
  451. && dst_replicate->state == STARPU_OWNER)
  452. _starpu_memory_handle_stats_shared_to_owner(handle, requesting_node);
  453. #endif
  454. if (dst_replicate->mc)
  455. _starpu_memchunk_recently_used(dst_replicate->mc, requesting_node);
  456. }
  457. _starpu_spin_unlock(&handle->header_lock);
  458. if (callback_func)
  459. callback_func(callback_arg);
  460. _STARPU_LOG_OUT_TAG("data available");
  461. return NULL;
  462. }
  463. if (dst_replicate)
  464. _starpu_msi_cache_miss(requesting_node);
  465. /* the only remaining situation is that the local copy was invalid */
  466. STARPU_ASSERT((dst_replicate && dst_replicate->state == STARPU_INVALID) || nwait);
  467. /* find someone who already has the data */
  468. int src_node = -1;
  469. if (dst_replicate && mode & STARPU_R)
  470. {
  471. if (dst_replicate->state == STARPU_INVALID)
  472. src_node = _starpu_select_src_node(handle, requesting_node);
  473. else
  474. src_node = requesting_node;
  475. if (src_node < 0)
  476. {
  477. /* We will create it, no need to read an existing value */
  478. mode &= ~STARPU_R;
  479. }
  480. }
  481. else if (dst_replicate)
  482. {
  483. /* if the data is in write only mode (and not SCRATCH or REDUX), there is no need for a source, data will be initialized by the task itself */
  484. if (mode & STARPU_W)
  485. dst_replicate->initialized = 1;
  486. if (starpu_node_get_kind(requesting_node) == STARPU_CPU_RAM && !nwait)
  487. {
  488. /* And this is the main RAM, really no need for a
  489. * request, just allocate */
  490. if (_starpu_allocate_memory_on_node(handle, dst_replicate, is_prefetch) == 0)
  491. {
  492. _starpu_update_data_state(handle, dst_replicate, mode);
  493. _starpu_spin_unlock(&handle->header_lock);
  494. if (callback_func)
  495. callback_func(callback_arg);
  496. _STARPU_LOG_OUT_TAG("data immediately allocated");
  497. return NULL;
  498. }
  499. }
  500. }
  501. #define MAX_REQUESTS 4
  502. /* We can safely assume that there won't be more than 2 hops in the
  503. * current implementation */
  504. unsigned src_nodes[MAX_REQUESTS], dst_nodes[MAX_REQUESTS], handling_nodes[MAX_REQUESTS];
  505. /* keep one slot for the last W request, if any */
  506. int write_invalidation = (mode & STARPU_W) && nwait && !is_prefetch;
  507. int nhops = _starpu_determine_request_path(handle, src_node, requesting_node, mode, MAX_REQUESTS,
  508. src_nodes, dst_nodes, handling_nodes, write_invalidation);
  509. STARPU_ASSERT(nhops >= 0 && nhops <= MAX_REQUESTS-1);
  510. struct _starpu_data_request *requests[nhops + write_invalidation];
  511. /* Did we reuse a request for that hop ? */
  512. int reused_requests[nhops + write_invalidation];
  513. /* Construct an array with a list of requests, possibly reusing existing requests */
  514. int hop;
  515. for (hop = 0; hop < nhops; hop++)
  516. {
  517. struct _starpu_data_request *r;
  518. unsigned hop_src_node = src_nodes[hop];
  519. unsigned hop_dst_node = dst_nodes[hop];
  520. unsigned hop_handling_node = handling_nodes[hop];
  521. struct _starpu_data_replicate *hop_src_replicate;
  522. struct _starpu_data_replicate *hop_dst_replicate;
  523. /* Only the first request is independant */
  524. unsigned ndeps = (hop == 0)?0:1;
  525. hop_src_replicate = &handle->per_node[hop_src_node];
  526. hop_dst_replicate = (hop != nhops - 1)?&handle->per_node[hop_dst_node]:dst_replicate;
  527. /* Try to reuse a request if possible */
  528. r = _starpu_search_existing_data_request(hop_dst_replicate,
  529. (mode & STARPU_R)?hop_src_node:hop_dst_node,
  530. mode, is_prefetch);
  531. reused_requests[hop] = !!r;
  532. if (!r)
  533. {
  534. /* Create a new request if there was no request to reuse */
  535. r = _starpu_create_data_request(handle, hop_src_replicate,
  536. hop_dst_replicate, hop_handling_node,
  537. mode, ndeps, is_prefetch, prio, 0, origin);
  538. nwait++;
  539. }
  540. requests[hop] = r;
  541. }
  542. /* Chain these requests */
  543. for (hop = 0; hop < nhops; hop++)
  544. {
  545. struct _starpu_data_request *r;
  546. r = requests[hop];
  547. if (hop != nhops - 1)
  548. {
  549. if (!reused_requests[hop + 1])
  550. {
  551. r->next_req[r->next_req_count++] = requests[hop + 1];
  552. STARPU_ASSERT(r->next_req_count <= STARPU_MAXNODES);
  553. }
  554. }
  555. else if (!write_invalidation)
  556. /* The last request will perform the callback after termination */
  557. _starpu_data_request_append_callback(r, callback_func, callback_arg);
  558. if (reused_requests[hop])
  559. _starpu_spin_unlock(&r->lock);
  560. }
  561. if (write_invalidation)
  562. {
  563. /* Some requests were still pending, we have to add yet another
  564. * request, depending on them, which will invalidate their
  565. * result.
  566. */
  567. struct _starpu_data_request *r = _starpu_create_data_request(handle, dst_replicate,
  568. dst_replicate, requesting_node,
  569. STARPU_W, nwait, is_prefetch, prio, 1, origin);
  570. /* and perform the callback after termination */
  571. _starpu_data_request_append_callback(r, callback_func, callback_arg);
  572. /* We will write to the buffer. We will have to wait for all
  573. * existing requests before the last request which will
  574. * invalidate all their results (which were possibly spurious,
  575. * e.g. too aggressive eviction).
  576. */
  577. unsigned i, j;
  578. unsigned nnodes = starpu_memory_nodes_get_count();
  579. for (i = 0; i < nnodes; i++)
  580. for (j = 0; j < nnodes; j++)
  581. {
  582. struct _starpu_data_request *r2 = handle->per_node[i].request[j];
  583. if (r2)
  584. {
  585. _starpu_spin_lock(&r2->lock);
  586. if (is_prefetch < r2->prefetch)
  587. /* Hasten the request we will have to wait for */
  588. _starpu_update_prefetch_status(r2, is_prefetch);
  589. r2->next_req[r2->next_req_count++] = r;
  590. STARPU_ASSERT(r2->next_req_count <= STARPU_MAXNODES + 1);
  591. _starpu_spin_unlock(&r2->lock);
  592. nwait--;
  593. }
  594. }
  595. STARPU_ASSERT(nwait == 0);
  596. nhops++;
  597. requests[nhops - 1] = r;
  598. /* existing requests will post this one */
  599. reused_requests[nhops - 1] = 1;
  600. }
  601. STARPU_ASSERT(nhops);
  602. if (!async)
  603. requests[nhops - 1]->refcnt++;
  604. /* we only submit the first request, the remaining will be
  605. * automatically submitted afterward */
  606. if (!reused_requests[0])
  607. _starpu_post_data_request(requests[0]);
  608. return requests[nhops - 1];
  609. }
  610. int _starpu_fetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *dst_replicate,
  611. enum starpu_data_access_mode mode, unsigned detached, unsigned is_prefetch, unsigned async,
  612. void (*callback_func)(void *), void *callback_arg, int prio, const char *origin)
  613. {
  614. _STARPU_LOG_IN();
  615. int cpt = 0;
  616. while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
  617. {
  618. cpt++;
  619. _starpu_datawizard_progress(1);
  620. }
  621. if (cpt == STARPU_SPIN_MAXTRY)
  622. _starpu_spin_lock(&handle->header_lock);
  623. if (is_prefetch > 0)
  624. {
  625. unsigned src_node_mask = 0;
  626. unsigned nnodes = starpu_memory_nodes_get_count();
  627. unsigned n;
  628. for (n = 0; n < nnodes; n++)
  629. {
  630. if (handle->per_node[n].state != STARPU_INVALID)
  631. {
  632. /* we found a copy ! */
  633. src_node_mask |= (1<<n);
  634. }
  635. }
  636. if (src_node_mask == 0)
  637. {
  638. /* no valid copy, nothing to prefetch */
  639. _starpu_spin_unlock(&handle->header_lock);
  640. return 0;
  641. }
  642. }
  643. if (!detached)
  644. {
  645. /* Take references which will be released by _starpu_release_data_on_node */
  646. if (dst_replicate)
  647. dst_replicate->refcnt++;
  648. else if (node == STARPU_ACQUIRE_NO_NODE_LOCK_ALL)
  649. {
  650. int i;
  651. for (i = 0; i < STARPU_MAXNODES; i++)
  652. handle->per_node[i].refcnt++;
  653. }
  654. handle->busy_count++;
  655. }
  656. struct _starpu_data_request *r;
  657. r = _starpu_create_request_to_fetch_data(handle, dst_replicate, mode,
  658. is_prefetch, async, callback_func, callback_arg, prio, origin);
  659. /* If no request was created, the handle was already up-to-date on the
  660. * node. In this case, _starpu_create_request_to_fetch_data has already
  661. * unlocked the header. */
  662. if (!r)
  663. return 0;
  664. _starpu_spin_unlock(&handle->header_lock);
  665. int ret = async?0:_starpu_wait_data_request_completion(r, 1);
  666. _STARPU_LOG_OUT();
  667. return ret;
  668. }
  669. static int idle_prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
  670. {
  671. return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, 2, 1, NULL, NULL, prio, "idle_prefetch_data_on_node");
  672. }
  673. static int prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
  674. {
  675. return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, 1, 1, NULL, NULL, prio, "prefetch_data_on_node");
  676. }
  677. static int fetch_data(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
  678. {
  679. return _starpu_fetch_data_on_node(handle, node, replicate, mode, 0, 0, 0, NULL, NULL, prio, "fetch_data");
  680. }
  681. uint32_t _starpu_get_data_refcnt(starpu_data_handle_t handle, unsigned node)
  682. {
  683. return handle->per_node[node].refcnt;
  684. }
  685. size_t _starpu_data_get_size(starpu_data_handle_t handle)
  686. {
  687. return handle->ops->get_size(handle);
  688. }
  689. size_t _starpu_data_get_alloc_size(starpu_data_handle_t handle)
  690. {
  691. if (handle->ops->get_alloc_size)
  692. return handle->ops->get_alloc_size(handle);
  693. else
  694. return handle->ops->get_size(handle);
  695. }
  696. uint32_t _starpu_data_get_footprint(starpu_data_handle_t handle)
  697. {
  698. return handle->footprint;
  699. }
  700. /* in case the data was accessed on a write mode, do not forget to
  701. * make it accessible again once it is possible ! */
  702. void _starpu_release_data_on_node(starpu_data_handle_t handle, uint32_t default_wt_mask, struct _starpu_data_replicate *replicate)
  703. {
  704. uint32_t wt_mask;
  705. wt_mask = default_wt_mask | handle->wt_mask;
  706. wt_mask &= (1<<starpu_memory_nodes_get_count())-1;
  707. /* Note that it is possible that there is no valid copy of the data (if
  708. * starpu_data_invalidate was called for instance). In that case, we do
  709. * not enforce any write-through mechanism. */
  710. unsigned memory_node = replicate->memory_node;
  711. if (replicate->state != STARPU_INVALID && handle->current_mode & STARPU_W)
  712. if (wt_mask & ~(1<<memory_node))
  713. _starpu_write_through_data(handle, memory_node, wt_mask);
  714. int cpt = 0;
  715. while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
  716. {
  717. cpt++;
  718. _starpu_datawizard_progress(1);
  719. }
  720. if (cpt == STARPU_SPIN_MAXTRY)
  721. _starpu_spin_lock(&handle->header_lock);
  722. /* Release refcnt taken by fetch_data_on_node */
  723. replicate->refcnt--;
  724. STARPU_ASSERT_MSG(replicate->refcnt >= 0, "handle %p released too many times", handle);
  725. STARPU_ASSERT_MSG(handle->busy_count > 0, "handle %p released too many times", handle);
  726. handle->busy_count--;
  727. if (!_starpu_notify_data_dependencies(handle))
  728. _starpu_spin_unlock(&handle->header_lock);
  729. }
  730. static void _starpu_set_data_requested_flag_if_needed(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate)
  731. {
  732. int cpt = 0;
  733. while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
  734. {
  735. cpt++;
  736. _starpu_datawizard_progress(1);
  737. }
  738. if (cpt == STARPU_SPIN_MAXTRY)
  739. _starpu_spin_lock(&handle->header_lock);
  740. if (replicate->state == STARPU_INVALID)
  741. {
  742. unsigned dst_node = replicate->memory_node;
  743. replicate->requested |= 1UL << dst_node;
  744. }
  745. _starpu_spin_unlock(&handle->header_lock);
  746. }
  747. int starpu_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned target_node, int prio)
  748. {
  749. STARPU_ASSERT(!task->prefetched);
  750. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  751. unsigned index;
  752. for (index = 0; index < nbuffers; index++)
  753. {
  754. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
  755. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
  756. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  757. continue;
  758. if (!(mode & STARPU_R))
  759. /* Don't bother prefetching some data which will be overwritten */
  760. continue;
  761. int node = _starpu_task_data_get_node_on_node(task, index, target_node);
  762. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  763. prefetch_data_on_node(handle, node, replicate, mode, prio);
  764. _starpu_set_data_requested_flag_if_needed(handle, replicate);
  765. }
  766. return 0;
  767. }
  768. int starpu_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
  769. {
  770. int prio = task->priority;
  771. if (task->workerorder)
  772. prio = INT_MAX - task->workerorder;
  773. return starpu_prefetch_task_input_on_node_prio(task, node, prio);
  774. }
  775. int starpu_idle_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned target_node, int prio)
  776. {
  777. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  778. unsigned index;
  779. for (index = 0; index < nbuffers; index++)
  780. {
  781. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
  782. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
  783. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  784. continue;
  785. if (!(mode & STARPU_R))
  786. /* Don't bother prefetching some data which will be overwritten */
  787. continue;
  788. int node = _starpu_task_data_get_node_on_node(task, index, target_node);
  789. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  790. idle_prefetch_data_on_node(handle, node, replicate, mode, prio);
  791. }
  792. return 0;
  793. }
  794. int starpu_idle_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
  795. {
  796. int prio = task->priority;
  797. if (task->workerorder)
  798. prio = INT_MAX - task->workerorder;
  799. return starpu_idle_prefetch_task_input_on_node_prio(task, node, prio);
  800. }
  801. int starpu_prefetch_task_input_for_prio(struct starpu_task *task, unsigned worker, int prio)
  802. {
  803. STARPU_ASSERT(!task->prefetched);
  804. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  805. unsigned index;
  806. for (index = 0; index < nbuffers; index++)
  807. {
  808. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
  809. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
  810. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  811. continue;
  812. if (!(mode & STARPU_R))
  813. /* Don't bother prefetching some data which will be overwritten */
  814. continue;
  815. int node = _starpu_task_data_get_node_on_worker(task, index, worker);
  816. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  817. prefetch_data_on_node(handle, node, replicate, mode, prio);
  818. _starpu_set_data_requested_flag_if_needed(handle, replicate);
  819. }
  820. return 0;
  821. }
  822. int starpu_prefetch_task_input_for(struct starpu_task *task, unsigned worker)
  823. {
  824. int prio = task->priority;
  825. if (task->workerorder)
  826. prio = INT_MAX - task->workerorder;
  827. return starpu_prefetch_task_input_for_prio(task, worker, prio);
  828. }
  829. int starpu_idle_prefetch_task_input_for_prio(struct starpu_task *task, unsigned worker, int prio)
  830. {
  831. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  832. unsigned index;
  833. for (index = 0; index < nbuffers; index++)
  834. {
  835. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
  836. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
  837. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  838. continue;
  839. if (!(mode & STARPU_R))
  840. /* Don't bother prefetching some data which will be overwritten */
  841. continue;
  842. int node = _starpu_task_data_get_node_on_worker(task, index, worker);
  843. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  844. idle_prefetch_data_on_node(handle, node, replicate, mode, prio);
  845. }
  846. return 0;
  847. }
  848. int starpu_idle_prefetch_task_input_for(struct starpu_task *task, unsigned worker)
  849. {
  850. int prio = task->priority;
  851. if (task->workerorder)
  852. prio = INT_MAX - task->workerorder;
  853. return starpu_idle_prefetch_task_input_for_prio(task, worker, prio);
  854. }
  855. struct _starpu_data_replicate *get_replicate(starpu_data_handle_t handle, enum starpu_data_access_mode mode, int workerid, unsigned node)
  856. {
  857. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  858. {
  859. STARPU_ASSERT(workerid >= 0);
  860. if (!handle->per_worker)
  861. {
  862. _starpu_spin_lock(&handle->header_lock);
  863. if (!handle->per_worker)
  864. _starpu_data_initialize_per_worker(handle);
  865. _starpu_spin_unlock(&handle->header_lock);
  866. }
  867. return &handle->per_worker[workerid];
  868. }
  869. else
  870. /* That's a "normal" buffer (R/W) */
  871. return &handle->per_node[node];
  872. }
  873. /* Callback used when a buffer is send asynchronously to the sink */
  874. static void _starpu_fetch_task_input_cb(void *arg)
  875. {
  876. struct _starpu_worker * worker = (struct _starpu_worker *) arg;
  877. /* increase the number of buffer received */
  878. STARPU_WMB();
  879. (void)STARPU_ATOMIC_ADD(&worker->nb_buffers_transferred, 1);
  880. #ifdef STARPU_SIMGRID
  881. starpu_pthread_queue_broadcast(&_starpu_simgrid_transfer_queue[worker->memory_node]);
  882. #endif
  883. }
  884. /* Synchronously or asynchronously fetch data for a given task (if it's not there already)
  885. * Returns the number of data acquired here. */
  886. /* _starpu_fetch_task_input must be called before
  887. * executing the task. __starpu_push_task_output but be called after the
  888. * execution of the task. */
  889. /* The driver can either just call _starpu_fetch_task_input with async==0,
  890. * or to improve overlapping, it can call _starpu_fetch_task_input with
  891. * async==1, then wait for transfers to complete, then call
  892. * _starpu_fetch_task_input_tail to complete the fetch. */
  893. int _starpu_fetch_task_input(struct starpu_task *task, struct _starpu_job *j, int async)
  894. {
  895. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  896. int workerid = worker->workerid;
  897. if (async)
  898. {
  899. worker->task_transferring = task;
  900. worker->nb_buffers_transferred = 0;
  901. if (worker->ntasks <= 1)
  902. _STARPU_TRACE_WORKER_START_FETCH_INPUT(NULL, workerid);
  903. }
  904. else
  905. _STARPU_TRACE_START_FETCH_INPUT(NULL);
  906. int profiling = starpu_profiling_status_get();
  907. if (profiling && task->profiling_info)
  908. _starpu_clock_gettime(&task->profiling_info->acquire_data_start_time);
  909. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  910. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  911. unsigned nacquires;
  912. unsigned index;
  913. nacquires = 0;
  914. for (index = 0; index < nbuffers; index++)
  915. {
  916. int ret;
  917. starpu_data_handle_t handle = descrs[index].handle;
  918. enum starpu_data_access_mode mode = descrs[index].mode;
  919. int node = _starpu_task_data_get_node_on_worker(task, descrs[index].index, workerid);
  920. /* We set this here for coherency with __starpu_push_task_output */
  921. descrs[index].node = node;
  922. if (mode == STARPU_NONE ||
  923. (mode & ((1<<STARPU_MODE_SHIFT) - 1)) >= STARPU_ACCESS_MODE_MAX ||
  924. (mode >> STARPU_MODE_SHIFT) >= (STARPU_SHIFTED_MODE_MAX >> STARPU_MODE_SHIFT))
  925. STARPU_ASSERT_MSG(0, "mode %d (0x%x) is bogus\n", mode, mode);
  926. struct _starpu_data_replicate *local_replicate;
  927. if (index && descrs[index-1].handle == descrs[index].handle)
  928. /* We have already took this data, skip it. This
  929. * depends on ordering putting writes before reads, see
  930. * _starpu_compar_handles */
  931. continue;
  932. local_replicate = get_replicate(handle, mode, workerid, node);
  933. if (async)
  934. {
  935. ret = _starpu_fetch_data_on_node(handle, node, local_replicate, mode, 0, 0, 1,
  936. _starpu_fetch_task_input_cb, worker, 0, "_starpu_fetch_task_input");
  937. #ifdef STARPU_SIMGRID
  938. if (_starpu_simgrid_fetching_input_cost())
  939. MSG_process_sleep(0.000001);
  940. #endif
  941. if (STARPU_UNLIKELY(ret))
  942. {
  943. /* Ooops, not enough memory, make worker wait for these for now, and the synchronous call will finish by forcing eviction*/
  944. worker->nb_buffers_totransfer = nacquires;
  945. _starpu_set_worker_status(worker, STATUS_WAITING);
  946. return 0;
  947. }
  948. }
  949. else
  950. {
  951. ret = fetch_data(handle, node, local_replicate, mode, 0);
  952. #ifdef STARPU_SIMGRID
  953. if (_starpu_simgrid_fetching_input_cost())
  954. MSG_process_sleep(0.000001);
  955. #endif
  956. if (STARPU_UNLIKELY(ret))
  957. goto enomem;
  958. }
  959. nacquires++;
  960. }
  961. if (async)
  962. {
  963. worker->nb_buffers_totransfer = nacquires;
  964. _starpu_set_worker_status(worker, STATUS_WAITING);
  965. return 0;
  966. }
  967. _starpu_fetch_task_input_tail(task, j, worker);
  968. return 0;
  969. enomem:
  970. _STARPU_TRACE_END_FETCH_INPUT(NULL);
  971. _STARPU_DISP("something went wrong with buffer %u\n", index);
  972. /* try to unreference all the input that were successfully taken */
  973. unsigned index2;
  974. for (index2 = 0; index2 < index; index2++)
  975. {
  976. starpu_data_handle_t handle = descrs[index2].handle;
  977. enum starpu_data_access_mode mode = descrs[index2].mode;
  978. int node = descrs[index].node;
  979. struct _starpu_data_replicate *local_replicate;
  980. if (index2 && descrs[index2-1].handle == descrs[index2].handle)
  981. /* We have already released this data, skip it. This
  982. * depends on ordering putting writes before reads, see
  983. * _starpu_compar_handles */
  984. continue;
  985. local_replicate = get_replicate(handle, mode, workerid, node);
  986. _starpu_release_data_on_node(handle, 0, local_replicate);
  987. }
  988. return -1;
  989. }
  990. /* Now that we have taken the data locks in locking order, fill the codelet interfaces in function order. */
  991. void _starpu_fetch_task_input_tail(struct starpu_task *task, struct _starpu_job *j, struct _starpu_worker *worker)
  992. {
  993. int workerid = worker->workerid;
  994. int profiling = starpu_profiling_status_get();
  995. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  996. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  997. unsigned index;
  998. unsigned long total_size = 0;
  999. for (index = 0; index < nbuffers; index++)
  1000. {
  1001. starpu_data_handle_t handle = descrs[index].handle;
  1002. enum starpu_data_access_mode mode = descrs[index].mode;
  1003. int node = descrs[index].node;
  1004. struct _starpu_data_replicate *local_replicate;
  1005. local_replicate = get_replicate(handle, mode, workerid, node);
  1006. _starpu_spin_lock(&handle->header_lock);
  1007. if (local_replicate->mc)
  1008. local_replicate->mc->diduse = 1;
  1009. _starpu_spin_unlock(&handle->header_lock);
  1010. _STARPU_TASK_SET_INTERFACE(task , local_replicate->data_interface, descrs[index].index);
  1011. /* If the replicate was not initialized yet, we have to do it now */
  1012. if (!(mode & STARPU_SCRATCH) && !local_replicate->initialized)
  1013. _starpu_redux_init_data_replicate(handle, local_replicate, workerid);
  1014. #ifdef STARPU_USE_FXT
  1015. total_size += _starpu_data_get_size(handle);
  1016. #endif
  1017. }
  1018. _STARPU_TRACE_DATA_LOAD(workerid,total_size);
  1019. if (profiling && task->profiling_info)
  1020. _starpu_clock_gettime(&task->profiling_info->acquire_data_end_time);
  1021. _STARPU_TRACE_END_FETCH_INPUT(NULL);
  1022. }
  1023. /* Release task data dependencies */
  1024. void __starpu_push_task_output(struct _starpu_job *j)
  1025. {
  1026. #ifdef STARPU_OPENMP
  1027. STARPU_ASSERT(!j->continuation);
  1028. #endif
  1029. int profiling = starpu_profiling_status_get();
  1030. struct starpu_task *task = j->task;
  1031. if (profiling && task->profiling_info)
  1032. _starpu_clock_gettime(&task->profiling_info->release_data_start_time);
  1033. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  1034. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  1035. int workerid = starpu_worker_get_id();
  1036. unsigned index;
  1037. for (index = 0; index < nbuffers; index++)
  1038. {
  1039. starpu_data_handle_t handle = descrs[index].handle;
  1040. enum starpu_data_access_mode mode = descrs[index].mode;
  1041. int node = descrs[index].node;
  1042. struct _starpu_data_replicate *local_replicate = NULL;
  1043. if (index && descrs[index-1].handle == descrs[index].handle)
  1044. /* We have already released this data, skip it. This
  1045. * depends on ordering putting writes before reads, see
  1046. * _starpu_compar_handles */
  1047. continue;
  1048. if (node != -1)
  1049. local_replicate = get_replicate(handle, mode, workerid, node);
  1050. /* Keep a reference for future
  1051. * _starpu_release_task_enforce_sequential_consistency call */
  1052. _starpu_spin_lock(&handle->header_lock);
  1053. handle->busy_count++;
  1054. if (node == -1)
  1055. {
  1056. /* NOWHERE case, just notify dependencies */
  1057. if (!_starpu_notify_data_dependencies(handle))
  1058. _starpu_spin_unlock(&handle->header_lock);
  1059. }
  1060. else
  1061. {
  1062. _starpu_spin_unlock(&handle->header_lock);
  1063. _starpu_release_data_on_node(handle, 0, local_replicate);
  1064. }
  1065. }
  1066. if (profiling && task->profiling_info)
  1067. _starpu_clock_gettime(&task->profiling_info->release_data_end_time);
  1068. }
  1069. /* Version for a driver running on a worker: we show the driver state in the trace */
  1070. void _starpu_push_task_output(struct _starpu_job *j)
  1071. {
  1072. _STARPU_TRACE_START_PUSH_OUTPUT(NULL);
  1073. __starpu_push_task_output(j);
  1074. _STARPU_TRACE_END_PUSH_OUTPUT(NULL);
  1075. }
  1076. struct fetch_nowhere_wrapper
  1077. {
  1078. struct _starpu_job *j;
  1079. unsigned pending;
  1080. };
  1081. static void _starpu_fetch_nowhere_task_input_cb(void *arg);
  1082. /* Asynchronously fetch data for a task which will have no content */
  1083. void _starpu_fetch_nowhere_task_input(struct _starpu_job *j)
  1084. {
  1085. int profiling = starpu_profiling_status_get();
  1086. struct starpu_task *task = j->task;
  1087. if (profiling && task->profiling_info)
  1088. _starpu_clock_gettime(&task->profiling_info->acquire_data_start_time);
  1089. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  1090. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  1091. unsigned nfetchbuffers = 0;
  1092. struct fetch_nowhere_wrapper *wrapper;
  1093. unsigned index;
  1094. for (index = 0; index < nbuffers; index++)
  1095. {
  1096. /* Note here we just follow what was requested, and not use _starpu_task_data_get_node* */
  1097. int node = -1;
  1098. if (task->cl->specific_nodes)
  1099. node = STARPU_CODELET_GET_NODE(task->cl, descrs[index].index);
  1100. descrs[index].node = node;
  1101. if (node != -1)
  1102. nfetchbuffers++;
  1103. }
  1104. if (!nfetchbuffers)
  1105. {
  1106. /* Nothing to fetch actually, already finished! */
  1107. __starpu_push_task_output(j);
  1108. _starpu_handle_job_termination(j);
  1109. _STARPU_LOG_OUT_TAG("handle_job_termination");
  1110. return;
  1111. }
  1112. _STARPU_MALLOC(wrapper, (sizeof(*wrapper)));
  1113. wrapper->j = j;
  1114. /* +1 for the call below */
  1115. wrapper->pending = nfetchbuffers + 1;
  1116. for (index = 0; index < nbuffers; index++)
  1117. {
  1118. starpu_data_handle_t handle = descrs[index].handle;
  1119. enum starpu_data_access_mode mode = descrs[index].mode;
  1120. int node = descrs[index].node;
  1121. if (node == -1)
  1122. continue;
  1123. if (mode == STARPU_NONE ||
  1124. (mode & ((1<<STARPU_MODE_SHIFT) - 1)) >= STARPU_ACCESS_MODE_MAX ||
  1125. (mode >> STARPU_MODE_SHIFT) >= (STARPU_SHIFTED_MODE_MAX >> STARPU_MODE_SHIFT))
  1126. STARPU_ASSERT_MSG(0, "mode %d (0x%x) is bogus\n", mode, mode);
  1127. STARPU_ASSERT(mode != STARPU_SCRATCH && mode != STARPU_REDUX);
  1128. struct _starpu_data_replicate *local_replicate;
  1129. local_replicate = get_replicate(handle, mode, -1, node);
  1130. _starpu_fetch_data_on_node(handle, node, local_replicate, mode, 0, 0, 1, _starpu_fetch_nowhere_task_input_cb, wrapper, 0, "_starpu_fetch_nowhere_task_input");
  1131. }
  1132. if (profiling && task->profiling_info)
  1133. _starpu_clock_gettime(&task->profiling_info->acquire_data_end_time);
  1134. /* Finished working with the task, release our reference */
  1135. _starpu_fetch_nowhere_task_input_cb(wrapper);
  1136. }
  1137. static void _starpu_fetch_nowhere_task_input_cb(void *arg)
  1138. {
  1139. /* One more transfer finished */
  1140. struct fetch_nowhere_wrapper *wrapper = arg;
  1141. unsigned pending = STARPU_ATOMIC_ADD(&wrapper->pending, -1);
  1142. ANNOTATE_HAPPENS_BEFORE(&wrapper->pending);
  1143. if (pending == 0)
  1144. {
  1145. ANNOTATE_HAPPENS_AFTER(&wrapper->pending);
  1146. /* Finished transferring, task is over */
  1147. struct _starpu_job *j = wrapper->j;
  1148. free(wrapper);
  1149. __starpu_push_task_output(j);
  1150. _starpu_handle_job_termination(j);
  1151. _STARPU_LOG_OUT_TAG("handle_job_termination");
  1152. }
  1153. }
  1154. /* NB : this value can only be an indication of the status of a data
  1155. at some point, but there is no strong garantee ! */
  1156. unsigned starpu_data_is_on_node(starpu_data_handle_t handle, unsigned node)
  1157. {
  1158. unsigned ret = 0;
  1159. // XXX : this is just a hint, so we don't take the lock ...
  1160. // STARPU_PTHREAD_SPIN_LOCK(&handle->header_lock);
  1161. if (handle->per_node[node].state != STARPU_INVALID)
  1162. {
  1163. ret = 1;
  1164. }
  1165. else
  1166. {
  1167. unsigned i;
  1168. unsigned nnodes = starpu_memory_nodes_get_count();
  1169. for (i = 0; i < nnodes; i++)
  1170. {
  1171. if ((handle->per_node[node].requested & (1UL << i)) || handle->per_node[node].request[i])
  1172. ret = 1;
  1173. }
  1174. }
  1175. // STARPU_PTHREAD_SPIN_UNLOCK(&handle->header_lock);
  1176. return ret;
  1177. }