coherency.c 44 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011-2014,2016,2017 Inria
  4. * Copyright (C) 2008-2019 Université de Bordeaux
  5. * Copyright (C) 2018 Federal University of Rio Grande do Sul (UFRGS)
  6. * Copyright (C) 2010-2019 CNRS
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. #include <limits.h>
  20. #include <math.h>
  21. #include <common/config.h>
  22. #include <datawizard/coherency.h>
  23. #include <datawizard/copy_driver.h>
  24. #include <datawizard/write_back.h>
  25. #include <datawizard/memory_nodes.h>
  26. #include <core/dependencies/data_concurrency.h>
  27. #include <core/disk.h>
  28. #include <profiling/profiling.h>
  29. #include <core/task.h>
  30. #include <starpu_scheduler.h>
  31. #include <core/workers.h>
  32. #ifdef STARPU_SIMGRID
  33. #include <core/simgrid.h>
  34. #endif
  35. static int link_supports_direct_transfers(starpu_data_handle_t handle, unsigned src_node, unsigned dst_node, unsigned *handling_node);
  36. int _starpu_select_src_node(starpu_data_handle_t handle, unsigned destination)
  37. {
  38. int src_node = -1;
  39. unsigned i;
  40. unsigned nnodes = starpu_memory_nodes_get_count();
  41. /* first find a valid copy, either a STARPU_OWNER or a STARPU_SHARED */
  42. unsigned node;
  43. size_t size = _starpu_data_get_size(handle);
  44. double cost = INFINITY;
  45. unsigned src_node_mask = 0;
  46. for (node = 0; node < nnodes; node++)
  47. {
  48. if (handle->per_node[node].state != STARPU_INVALID)
  49. {
  50. /* we found a copy ! */
  51. src_node_mask |= (1<<node);
  52. }
  53. }
  54. if (src_node_mask == 0 && handle->init_cl)
  55. {
  56. /* No copy yet, but applicationg told us how to build it. */
  57. return -1;
  58. }
  59. /* we should have found at least one copy ! */
  60. STARPU_ASSERT_MSG(src_node_mask != 0, "The data for the handle %p is requested, but the handle does not have a valid value. Perhaps some initialization task is missing?", handle);
  61. /* Without knowing the size, we won't know the cost */
  62. if (!size)
  63. cost = 0;
  64. /* Check whether we have transfer cost for all nodes, if so, take the minimum */
  65. if (cost)
  66. for (i = 0; i < nnodes; i++)
  67. {
  68. if (src_node_mask & (1<<i))
  69. {
  70. double time = starpu_transfer_predict(i, destination, size);
  71. unsigned handling_node;
  72. /* Avoid indirect transfers */
  73. if (!link_supports_direct_transfers(handle, i, destination, &handling_node))
  74. continue;
  75. if (_STARPU_IS_ZERO(time))
  76. {
  77. /* No estimation, will have to revert to dumb strategy */
  78. cost = 0.0;
  79. break;
  80. }
  81. else if (time < cost)
  82. {
  83. cost = time;
  84. src_node = i;
  85. }
  86. }
  87. }
  88. if (cost && src_node != -1)
  89. {
  90. /* Could estimate through cost, return that */
  91. STARPU_ASSERT(handle->per_node[src_node].allocated);
  92. STARPU_ASSERT(handle->per_node[src_node].initialized);
  93. return src_node;
  94. }
  95. int i_ram = -1;
  96. int i_gpu = -1;
  97. int i_disk = -1;
  98. /* Revert to dumb strategy: take RAM unless only a GPU has it */
  99. for (i = 0; i < nnodes; i++)
  100. {
  101. if (src_node_mask & (1<<i))
  102. {
  103. int (*can_copy)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node, unsigned handling_node) = handle->ops->copy_methods->can_copy;
  104. /* Avoid transfers which the interface does not want */
  105. if (can_copy)
  106. {
  107. void *src_interface = handle->per_node[i].data_interface;
  108. void *dst_interface = handle->per_node[destination].data_interface;
  109. unsigned handling_node;
  110. if (!link_supports_direct_transfers(handle, i, destination, &handling_node))
  111. {
  112. /* Avoid through RAM if the interface does not want it */
  113. void *ram_interface = handle->per_node[STARPU_MAIN_RAM].data_interface;
  114. if ((!can_copy(src_interface, i, ram_interface, STARPU_MAIN_RAM, i)
  115. && !can_copy(src_interface, i, ram_interface, STARPU_MAIN_RAM, STARPU_MAIN_RAM))
  116. || (!can_copy(ram_interface, STARPU_MAIN_RAM, dst_interface, destination, STARPU_MAIN_RAM)
  117. && !can_copy(ram_interface, STARPU_MAIN_RAM, dst_interface, destination, destination)))
  118. continue;
  119. }
  120. }
  121. /* however GPU are expensive sources, really !
  122. * Unless peer transfer is supported (and it would then have been selected above).
  123. * Other should be ok */
  124. if (starpu_node_get_kind(i) == STARPU_CUDA_RAM ||
  125. starpu_node_get_kind(i) == STARPU_OPENCL_RAM ||
  126. starpu_node_get_kind(i) == STARPU_MIC_RAM)
  127. i_gpu = i;
  128. if (starpu_node_get_kind(i) == STARPU_CPU_RAM ||
  129. starpu_node_get_kind(i) == STARPU_MPI_MS_RAM)
  130. i_ram = i;
  131. if (starpu_node_get_kind(i) == STARPU_DISK_RAM)
  132. i_disk = i;
  133. }
  134. }
  135. /* we have to use cpu_ram in first */
  136. if (i_ram != -1)
  137. src_node = i_ram;
  138. /* no luck we have to use the disk memory */
  139. else if (i_gpu != -1)
  140. src_node = i_gpu;
  141. else
  142. src_node = i_disk;
  143. STARPU_ASSERT(src_node != -1);
  144. STARPU_ASSERT(handle->per_node[src_node].allocated);
  145. STARPU_ASSERT(handle->per_node[src_node].initialized);
  146. return src_node;
  147. }
  148. /* this may be called once the data is fetched with header and STARPU_RW-lock hold */
  149. void _starpu_update_data_state(starpu_data_handle_t handle,
  150. struct _starpu_data_replicate *requesting_replicate,
  151. enum starpu_data_access_mode mode)
  152. {
  153. /* There is nothing to do for relaxed coherency modes (scratch or
  154. * reductions) */
  155. if (!(mode & STARPU_RW))
  156. return;
  157. unsigned nnodes = starpu_memory_nodes_get_count();
  158. /* the data is present now */
  159. unsigned requesting_node = requesting_replicate->memory_node;
  160. requesting_replicate->requested &= ~(1UL << requesting_node);
  161. if (mode & STARPU_W)
  162. {
  163. /* the requesting node now has the only valid copy */
  164. unsigned node;
  165. for (node = 0; node < nnodes; node++)
  166. {
  167. _STARPU_TRACE_DATA_STATE_INVALID(handle, node);
  168. handle->per_node[node].state = STARPU_INVALID;
  169. }
  170. _STARPU_TRACE_DATA_STATE_OWNER(handle, requesting_node);
  171. requesting_replicate->state = STARPU_OWNER;
  172. if (handle->home_node != -1 && handle->per_node[handle->home_node].state == STARPU_INVALID)
  173. /* Notify that this MC is now dirty */
  174. _starpu_memchunk_dirty(requesting_replicate->mc, requesting_replicate->memory_node);
  175. }
  176. else
  177. {
  178. /* read only */
  179. if (requesting_replicate->state != STARPU_OWNER)
  180. {
  181. /* there was at least another copy of the data */
  182. unsigned node;
  183. for (node = 0; node < nnodes; node++)
  184. {
  185. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  186. if (replicate->state != STARPU_INVALID){
  187. _STARPU_TRACE_DATA_STATE_SHARED(handle, node);
  188. replicate->state = STARPU_SHARED;
  189. }
  190. }
  191. _STARPU_TRACE_DATA_STATE_SHARED(handle, requesting_node);
  192. requesting_replicate->state = STARPU_SHARED;
  193. }
  194. }
  195. }
  196. static int worker_supports_direct_access(unsigned node, unsigned handling_node)
  197. {
  198. if (node == handling_node)
  199. return 1;
  200. if (!_starpu_memory_node_get_nworkers(handling_node))
  201. /* No worker to process the request from that node */
  202. return 0;
  203. struct _starpu_node_ops *node_ops = _starpu_memory_node_get_node_ops(node);
  204. if (node_ops && node_ops->is_direct_access_supported)
  205. return node_ops->is_direct_access_supported(node, handling_node);
  206. else
  207. {
  208. STARPU_ABORT_MSG("Node %s does not define the operation 'is_direct_access_supported'", _starpu_node_get_prefix(starpu_node_get_kind(node)));
  209. return 1;
  210. }
  211. }
  212. static int link_supports_direct_transfers(starpu_data_handle_t handle, unsigned src_node, unsigned dst_node, unsigned *handling_node)
  213. {
  214. int (*can_copy)(void *src_interface, unsigned src_node, void *dst_interface, unsigned dst_node, unsigned handling_node) = handle->ops->copy_methods->can_copy;
  215. void *src_interface = handle->per_node[src_node].data_interface;
  216. void *dst_interface = handle->per_node[dst_node].data_interface;
  217. /* XXX That's a hack until we fix cudaMemcpy3DPeerAsync in the block interface
  218. * Perhaps not all data interface provide a direct GPU-GPU transfer
  219. * method ! */
  220. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  221. if (src_node != dst_node && starpu_node_get_kind(src_node) == STARPU_CUDA_RAM && starpu_node_get_kind(dst_node) == STARPU_CUDA_RAM)
  222. {
  223. const struct starpu_data_copy_methods *copy_methods = handle->ops->copy_methods;
  224. if (!copy_methods->cuda_to_cuda_async && !copy_methods->any_to_any)
  225. return 0;
  226. }
  227. #endif
  228. /* Note: with CUDA, performance seems a bit better when issuing the transfer from the destination (tested without GPUDirect, but GPUDirect probably behave the same) */
  229. if (worker_supports_direct_access(src_node, dst_node) && (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, dst_node)))
  230. {
  231. *handling_node = dst_node;
  232. return 1;
  233. }
  234. if (worker_supports_direct_access(dst_node, src_node) && (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, src_node)))
  235. {
  236. *handling_node = src_node;
  237. return 1;
  238. }
  239. return 0;
  240. }
  241. /* Now, we use slowness/bandwidth to compare numa nodes, is it better to use latency ? */
  242. static unsigned chose_best_numa_between_src_and_dest(int src, int dst)
  243. {
  244. double timing_best;
  245. int best_numa = -1;
  246. unsigned numa;
  247. const unsigned nb_numa_nodes = starpu_memory_nodes_get_numa_count();
  248. for(numa = 0; numa < nb_numa_nodes; numa++)
  249. {
  250. double actual = 1.0/starpu_transfer_bandwidth(src, numa) + 1.0/starpu_transfer_bandwidth(numa, dst);
  251. /* Compare slowness : take the lowest */
  252. if (best_numa < 0 || actual < timing_best)
  253. {
  254. best_numa = numa;
  255. timing_best = actual;
  256. }
  257. }
  258. STARPU_ASSERT(best_numa >= 0);
  259. return best_numa;
  260. }
  261. /* Determines the path of a request : each hop is defined by (src,dst) and the
  262. * node that handles the hop. The returned value indicates the number of hops,
  263. * and the max_len is the maximum number of hops (ie. the size of the
  264. * src_nodes, dst_nodes and handling_nodes arrays. */
  265. int _starpu_determine_request_path(starpu_data_handle_t handle,
  266. int src_node, int dst_node,
  267. enum starpu_data_access_mode mode, int max_len,
  268. unsigned *src_nodes, unsigned *dst_nodes,
  269. unsigned *handling_nodes, unsigned write_invalidation)
  270. {
  271. if (src_node == dst_node || !(mode & STARPU_R))
  272. {
  273. if (dst_node == -1 || starpu_node_get_kind(dst_node) == STARPU_DISK_RAM)
  274. handling_nodes[0] = src_node;
  275. else
  276. handling_nodes[0] = dst_node;
  277. if (write_invalidation)
  278. /* The invalidation request will be enough */
  279. return 0;
  280. /* The destination node should only allocate the data, no transfer is required */
  281. STARPU_ASSERT(max_len >= 1);
  282. src_nodes[0] = STARPU_MAIN_RAM; // ignored
  283. dst_nodes[0] = dst_node;
  284. return 1;
  285. }
  286. if (src_node < 0)
  287. {
  288. /* Will just initialize the destination */
  289. STARPU_ASSERT(max_len >= 1);
  290. src_nodes[0] = src_node; // ignored
  291. dst_nodes[0] = dst_node;
  292. return 1;
  293. }
  294. unsigned handling_node;
  295. int link_is_valid = link_supports_direct_transfers(handle, src_node, dst_node, &handling_node);
  296. if (!link_is_valid)
  297. {
  298. int (*can_copy)(void *, unsigned, void *, unsigned, unsigned) = handle->ops->copy_methods->can_copy;
  299. void *src_interface = handle->per_node[src_node].data_interface;
  300. void *dst_interface = handle->per_node[dst_node].data_interface;
  301. /* We need an intermediate hop to implement data staging
  302. * through main memory. */
  303. STARPU_ASSERT(max_len >= 2);
  304. STARPU_ASSERT(src_node >= 0);
  305. unsigned numa = chose_best_numa_between_src_and_dest(src_node, dst_node);
  306. /* GPU -> RAM */
  307. src_nodes[0] = src_node;
  308. dst_nodes[0] = numa;
  309. if (starpu_node_get_kind(src_node) == STARPU_DISK_RAM)
  310. /* Disks don't have their own driver thread */
  311. handling_nodes[0] = dst_node;
  312. else if (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, src_node))
  313. {
  314. handling_nodes[0] = src_node;
  315. }
  316. else
  317. {
  318. STARPU_ASSERT_MSG(can_copy(src_interface, src_node, dst_interface, dst_node, dst_node), "interface %d refuses all kinds of transfers from node %d to node %d\n", handle->ops->interfaceid, src_node, dst_node);
  319. handling_nodes[0] = dst_node;
  320. }
  321. /* RAM -> GPU */
  322. src_nodes[1] = numa;
  323. dst_nodes[1] = dst_node;
  324. if (starpu_node_get_kind(dst_node) == STARPU_DISK_RAM)
  325. /* Disks don't have their own driver thread */
  326. handling_nodes[1] = src_node;
  327. else if (!can_copy || can_copy(src_interface, src_node, dst_interface, dst_node, dst_node))
  328. {
  329. handling_nodes[1] = dst_node;
  330. }
  331. else
  332. {
  333. STARPU_ASSERT_MSG(can_copy(src_interface, src_node, dst_interface, dst_node, src_node), "interface %d refuses all kinds of transfers from node %d to node %d\n", handle->ops->interfaceid, src_node, dst_node);
  334. handling_nodes[1] = src_node;
  335. }
  336. return 2;
  337. }
  338. else
  339. {
  340. STARPU_ASSERT(max_len >= 1);
  341. src_nodes[0] = src_node;
  342. dst_nodes[0] = dst_node;
  343. handling_nodes[0] = handling_node;
  344. #if !defined(STARPU_HAVE_CUDA_MEMCPY_PEER) && !defined(STARPU_SIMGRID)
  345. STARPU_ASSERT(!(mode & STARPU_R) || starpu_node_get_kind(src_node) != STARPU_CUDA_RAM || starpu_node_get_kind(dst_node) != STARPU_CUDA_RAM);
  346. #endif
  347. return 1;
  348. }
  349. }
  350. /* handle->lock should be taken. r is returned locked. The node parameter
  351. * indicate either the source of the request, or the destination for a
  352. * write-only request. */
  353. static struct _starpu_data_request *_starpu_search_existing_data_request(struct _starpu_data_replicate *replicate, unsigned node, enum starpu_data_access_mode mode, unsigned is_prefetch)
  354. {
  355. struct _starpu_data_request *r;
  356. r = replicate->request[node];
  357. if (r)
  358. {
  359. _starpu_spin_checklocked(&r->handle->header_lock);
  360. _starpu_spin_lock(&r->lock);
  361. /* perhaps we need to "upgrade" the request */
  362. if (is_prefetch < r->prefetch)
  363. _starpu_update_prefetch_status(r, is_prefetch);
  364. if (mode & STARPU_R)
  365. {
  366. /* in case the exisiting request did not imply a memory
  367. * transfer yet, we have to take a second refcnt now
  368. * for the source, in addition to the refcnt for the
  369. * destination
  370. * (so that the source remains valid) */
  371. if (!(r->mode & STARPU_R))
  372. {
  373. replicate->refcnt++;
  374. replicate->handle->busy_count++;
  375. }
  376. r->mode = (enum starpu_data_access_mode) ((int) r->mode | (int) STARPU_R);
  377. }
  378. if (mode & STARPU_W)
  379. r->mode = (enum starpu_data_access_mode) ((int) r->mode | (int) STARPU_W);
  380. }
  381. return r;
  382. }
  383. /*
  384. * This function is called when the data is needed on the local node, this
  385. * returns a pointer to the local copy
  386. *
  387. * R STARPU_W STARPU_RW
  388. * Owner OK OK OK
  389. * Shared OK 1 1
  390. * Invalid 2 3 4
  391. *
  392. * case 1 : shared + (read)write :
  393. * no data copy but shared->Invalid/Owner
  394. * case 2 : invalid + read :
  395. * data copy + invalid->shared + owner->shared (STARPU_ASSERT(there is a valid))
  396. * case 3 : invalid + write :
  397. * no data copy + invalid->owner + (owner,shared)->invalid
  398. * case 4 : invalid + R/STARPU_W :
  399. * data copy + if (STARPU_W) (invalid->owner + owner->invalid)
  400. * else (invalid,owner->shared)
  401. */
  402. struct _starpu_data_request *_starpu_create_request_to_fetch_data(starpu_data_handle_t handle,
  403. struct _starpu_data_replicate *dst_replicate,
  404. enum starpu_data_access_mode mode, unsigned is_prefetch,
  405. unsigned async,
  406. void (*callback_func)(void *), void *callback_arg, int prio, const char *origin)
  407. {
  408. /* We don't care about commuting for data requests, that was handled before. */
  409. mode &= ~STARPU_COMMUTE;
  410. /* This function is called with handle's header lock taken */
  411. _starpu_spin_checklocked(&handle->header_lock);
  412. int requesting_node = dst_replicate ? dst_replicate->memory_node : -1;
  413. unsigned nwait = 0;
  414. if (mode & STARPU_W)
  415. {
  416. /* We will write to the buffer. We will have to wait for all
  417. * existing requests before the last request which will
  418. * invalidate all their results (which were possibly spurious,
  419. * e.g. too aggressive eviction).
  420. */
  421. unsigned i, j;
  422. unsigned nnodes = starpu_memory_nodes_get_count();
  423. for (i = 0; i < nnodes; i++)
  424. for (j = 0; j < nnodes; j++)
  425. if (handle->per_node[i].request[j])
  426. nwait++;
  427. /* If the request is not detached (i.e. the caller really wants
  428. * proper ownership), no new requests will appear because a
  429. * reference will be kept on the dst replicate, which will
  430. * notably prevent data reclaiming.
  431. */
  432. }
  433. if ((!dst_replicate || dst_replicate->state != STARPU_INVALID) && (!nwait || is_prefetch))
  434. {
  435. if (dst_replicate)
  436. {
  437. #ifdef STARPU_MEMORY_STATS
  438. enum _starpu_cache_state old_state = dst_replicate->state;
  439. #endif
  440. /* the data is already available and we don't have to wait for
  441. * any request, so we can stop */
  442. _starpu_update_data_state(handle, dst_replicate, mode);
  443. _starpu_msi_cache_hit(requesting_node);
  444. #ifdef STARPU_MEMORY_STATS
  445. _starpu_memory_handle_stats_cache_hit(handle, requesting_node);
  446. /* XXX Broken ? */
  447. if (old_state == STARPU_SHARED
  448. && dst_replicate->state == STARPU_OWNER)
  449. _starpu_memory_handle_stats_shared_to_owner(handle, requesting_node);
  450. #endif
  451. if (dst_replicate->mc)
  452. _starpu_memchunk_recently_used(dst_replicate->mc, requesting_node);
  453. }
  454. _starpu_spin_unlock(&handle->header_lock);
  455. if (callback_func)
  456. callback_func(callback_arg);
  457. _STARPU_LOG_OUT_TAG("data available");
  458. return NULL;
  459. }
  460. if (dst_replicate)
  461. _starpu_msi_cache_miss(requesting_node);
  462. /* the only remaining situation is that the local copy was invalid */
  463. STARPU_ASSERT((dst_replicate && dst_replicate->state == STARPU_INVALID) || nwait);
  464. /* find someone who already has the data */
  465. int src_node = -1;
  466. if (dst_replicate && mode & STARPU_R)
  467. {
  468. if (dst_replicate->state == STARPU_INVALID)
  469. src_node = _starpu_select_src_node(handle, requesting_node);
  470. else
  471. src_node = requesting_node;
  472. if (src_node < 0)
  473. {
  474. /* We will create it, no need to read an existing value */
  475. mode &= ~STARPU_R;
  476. }
  477. }
  478. else if (dst_replicate)
  479. {
  480. /* if the data is in write only mode (and not SCRATCH or REDUX), there is no need for a source, data will be initialized by the task itself */
  481. if (mode & STARPU_W)
  482. dst_replicate->initialized = 1;
  483. if (starpu_node_get_kind(requesting_node) == STARPU_CPU_RAM && !nwait)
  484. {
  485. /* And this is the main RAM, really no need for a
  486. * request, just allocate */
  487. if (_starpu_allocate_memory_on_node(handle, dst_replicate, is_prefetch) == 0)
  488. {
  489. _starpu_update_data_state(handle, dst_replicate, mode);
  490. _starpu_spin_unlock(&handle->header_lock);
  491. if (callback_func)
  492. callback_func(callback_arg);
  493. _STARPU_LOG_OUT_TAG("data immediately allocated");
  494. return NULL;
  495. }
  496. }
  497. }
  498. #define MAX_REQUESTS 4
  499. /* We can safely assume that there won't be more than 2 hops in the
  500. * current implementation */
  501. unsigned src_nodes[MAX_REQUESTS], dst_nodes[MAX_REQUESTS], handling_nodes[MAX_REQUESTS];
  502. /* keep one slot for the last W request, if any */
  503. int write_invalidation = (mode & STARPU_W) && nwait && !is_prefetch;
  504. int nhops = _starpu_determine_request_path(handle, src_node, requesting_node, mode, MAX_REQUESTS,
  505. src_nodes, dst_nodes, handling_nodes, write_invalidation);
  506. STARPU_ASSERT(nhops >= 0 && nhops <= MAX_REQUESTS-1);
  507. struct _starpu_data_request *requests[nhops + write_invalidation];
  508. /* Did we reuse a request for that hop ? */
  509. int reused_requests[nhops + write_invalidation];
  510. /* Construct an array with a list of requests, possibly reusing existing requests */
  511. int hop;
  512. for (hop = 0; hop < nhops; hop++)
  513. {
  514. struct _starpu_data_request *r;
  515. unsigned hop_src_node = src_nodes[hop];
  516. unsigned hop_dst_node = dst_nodes[hop];
  517. unsigned hop_handling_node = handling_nodes[hop];
  518. struct _starpu_data_replicate *hop_src_replicate;
  519. struct _starpu_data_replicate *hop_dst_replicate;
  520. /* Only the first request is independant */
  521. unsigned ndeps = (hop == 0)?0:1;
  522. hop_src_replicate = &handle->per_node[hop_src_node];
  523. hop_dst_replicate = (hop != nhops - 1)?&handle->per_node[hop_dst_node]:dst_replicate;
  524. /* Try to reuse a request if possible */
  525. r = _starpu_search_existing_data_request(hop_dst_replicate,
  526. (mode & STARPU_R)?hop_src_node:hop_dst_node,
  527. mode, is_prefetch);
  528. reused_requests[hop] = !!r;
  529. if (!r)
  530. {
  531. /* Create a new request if there was no request to reuse */
  532. r = _starpu_create_data_request(handle, hop_src_replicate,
  533. hop_dst_replicate, hop_handling_node,
  534. mode, ndeps, is_prefetch, prio, 0, origin);
  535. nwait++;
  536. }
  537. requests[hop] = r;
  538. }
  539. /* Chain these requests */
  540. for (hop = 0; hop < nhops; hop++)
  541. {
  542. struct _starpu_data_request *r;
  543. r = requests[hop];
  544. if (hop != nhops - 1)
  545. {
  546. if (!reused_requests[hop + 1])
  547. {
  548. r->next_req[r->next_req_count++] = requests[hop + 1];
  549. STARPU_ASSERT(r->next_req_count <= STARPU_MAXNODES);
  550. }
  551. }
  552. else if (!write_invalidation)
  553. /* The last request will perform the callback after termination */
  554. _starpu_data_request_append_callback(r, callback_func, callback_arg);
  555. if (reused_requests[hop])
  556. _starpu_spin_unlock(&r->lock);
  557. }
  558. if (write_invalidation)
  559. {
  560. /* Some requests were still pending, we have to add yet another
  561. * request, depending on them, which will invalidate their
  562. * result.
  563. */
  564. struct _starpu_data_request *r = _starpu_create_data_request(handle, dst_replicate,
  565. dst_replicate, requesting_node,
  566. STARPU_W, nwait, is_prefetch, prio, 1, origin);
  567. /* and perform the callback after termination */
  568. _starpu_data_request_append_callback(r, callback_func, callback_arg);
  569. /* We will write to the buffer. We will have to wait for all
  570. * existing requests before the last request which will
  571. * invalidate all their results (which were possibly spurious,
  572. * e.g. too aggressive eviction).
  573. */
  574. unsigned i, j;
  575. unsigned nnodes = starpu_memory_nodes_get_count();
  576. for (i = 0; i < nnodes; i++)
  577. for (j = 0; j < nnodes; j++)
  578. {
  579. struct _starpu_data_request *r2 = handle->per_node[i].request[j];
  580. if (r2)
  581. {
  582. _starpu_spin_lock(&r2->lock);
  583. if (is_prefetch < r2->prefetch)
  584. /* Hasten the request we will have to wait for */
  585. _starpu_update_prefetch_status(r2, is_prefetch);
  586. r2->next_req[r2->next_req_count++] = r;
  587. STARPU_ASSERT(r2->next_req_count <= STARPU_MAXNODES + 1);
  588. _starpu_spin_unlock(&r2->lock);
  589. nwait--;
  590. }
  591. }
  592. STARPU_ASSERT(nwait == 0);
  593. nhops++;
  594. requests[nhops - 1] = r;
  595. /* existing requests will post this one */
  596. reused_requests[nhops - 1] = 1;
  597. }
  598. STARPU_ASSERT(nhops);
  599. if (!async)
  600. requests[nhops - 1]->refcnt++;
  601. /* we only submit the first request, the remaining will be
  602. * automatically submitted afterward */
  603. if (!reused_requests[0])
  604. _starpu_post_data_request(requests[0]);
  605. return requests[nhops - 1];
  606. }
  607. int _starpu_fetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *dst_replicate,
  608. enum starpu_data_access_mode mode, unsigned detached, unsigned is_prefetch, unsigned async,
  609. void (*callback_func)(void *), void *callback_arg, int prio, const char *origin)
  610. {
  611. _STARPU_LOG_IN();
  612. int cpt = 0;
  613. while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
  614. {
  615. cpt++;
  616. _starpu_datawizard_progress(1);
  617. }
  618. if (cpt == STARPU_SPIN_MAXTRY)
  619. _starpu_spin_lock(&handle->header_lock);
  620. if (is_prefetch > 0)
  621. {
  622. unsigned src_node_mask = 0;
  623. unsigned nnodes = starpu_memory_nodes_get_count();
  624. unsigned n;
  625. for (n = 0; n < nnodes; n++)
  626. {
  627. if (handle->per_node[n].state != STARPU_INVALID)
  628. {
  629. /* we found a copy ! */
  630. src_node_mask |= (1<<n);
  631. }
  632. }
  633. if (src_node_mask == 0)
  634. {
  635. /* no valid copy, nothing to prefetch */
  636. _starpu_spin_unlock(&handle->header_lock);
  637. return 0;
  638. }
  639. }
  640. if (!detached)
  641. {
  642. /* Take references which will be released by _starpu_release_data_on_node */
  643. if (dst_replicate)
  644. dst_replicate->refcnt++;
  645. else if (node == STARPU_ACQUIRE_NO_NODE_LOCK_ALL)
  646. {
  647. int i;
  648. for (i = 0; i < STARPU_MAXNODES; i++)
  649. handle->per_node[i].refcnt++;
  650. }
  651. handle->busy_count++;
  652. }
  653. struct _starpu_data_request *r;
  654. r = _starpu_create_request_to_fetch_data(handle, dst_replicate, mode,
  655. is_prefetch, async, callback_func, callback_arg, prio, origin);
  656. /* If no request was created, the handle was already up-to-date on the
  657. * node. In this case, _starpu_create_request_to_fetch_data has already
  658. * unlocked the header. */
  659. if (!r)
  660. return 0;
  661. _starpu_spin_unlock(&handle->header_lock);
  662. int ret = async?0:_starpu_wait_data_request_completion(r, 1);
  663. _STARPU_LOG_OUT();
  664. return ret;
  665. }
  666. static int idle_prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
  667. {
  668. return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, 2, 1, NULL, NULL, prio, "idle_prefetch_data_on_node");
  669. }
  670. static int prefetch_data_on_node(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
  671. {
  672. return _starpu_fetch_data_on_node(handle, node, replicate, mode, 1, 1, 1, NULL, NULL, prio, "prefetch_data_on_node");
  673. }
  674. static int fetch_data(starpu_data_handle_t handle, int node, struct _starpu_data_replicate *replicate, enum starpu_data_access_mode mode, int prio)
  675. {
  676. return _starpu_fetch_data_on_node(handle, node, replicate, mode, 0, 0, 0, NULL, NULL, prio, "fetch_data");
  677. }
  678. uint32_t _starpu_get_data_refcnt(starpu_data_handle_t handle, unsigned node)
  679. {
  680. return handle->per_node[node].refcnt;
  681. }
  682. size_t _starpu_data_get_size(starpu_data_handle_t handle)
  683. {
  684. return handle->ops->get_size(handle);
  685. }
  686. size_t _starpu_data_get_alloc_size(starpu_data_handle_t handle)
  687. {
  688. if (handle->ops->get_alloc_size)
  689. return handle->ops->get_alloc_size(handle);
  690. else
  691. return handle->ops->get_size(handle);
  692. }
  693. starpu_ssize_t _starpu_data_get_max_size(starpu_data_handle_t handle)
  694. {
  695. if (handle->ops->get_max_size)
  696. return handle->ops->get_max_size(handle);
  697. else
  698. return -1;
  699. }
  700. uint32_t _starpu_data_get_footprint(starpu_data_handle_t handle)
  701. {
  702. return handle->footprint;
  703. }
  704. /* in case the data was accessed on a write mode, do not forget to
  705. * make it accessible again once it is possible ! */
  706. void _starpu_release_data_on_node(starpu_data_handle_t handle, uint32_t default_wt_mask, struct _starpu_data_replicate *replicate)
  707. {
  708. uint32_t wt_mask;
  709. wt_mask = default_wt_mask | handle->wt_mask;
  710. wt_mask &= (1<<starpu_memory_nodes_get_count())-1;
  711. /* Note that it is possible that there is no valid copy of the data (if
  712. * starpu_data_invalidate was called for instance). In that case, we do
  713. * not enforce any write-through mechanism. */
  714. unsigned memory_node = replicate->memory_node;
  715. if (replicate->state != STARPU_INVALID && handle->current_mode & STARPU_W)
  716. if (wt_mask & ~(1<<memory_node))
  717. _starpu_write_through_data(handle, memory_node, wt_mask);
  718. int cpt = 0;
  719. while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
  720. {
  721. cpt++;
  722. _starpu_datawizard_progress(1);
  723. }
  724. if (cpt == STARPU_SPIN_MAXTRY)
  725. _starpu_spin_lock(&handle->header_lock);
  726. /* Release refcnt taken by fetch_data_on_node */
  727. replicate->refcnt--;
  728. STARPU_ASSERT_MSG(replicate->refcnt >= 0, "handle %p released too many times", handle);
  729. STARPU_ASSERT_MSG(handle->busy_count > 0, "handle %p released too many times", handle);
  730. handle->busy_count--;
  731. if (!_starpu_notify_data_dependencies(handle))
  732. _starpu_spin_unlock(&handle->header_lock);
  733. }
  734. static void _starpu_set_data_requested_flag_if_needed(starpu_data_handle_t handle, struct _starpu_data_replicate *replicate)
  735. {
  736. int cpt = 0;
  737. while (cpt < STARPU_SPIN_MAXTRY && _starpu_spin_trylock(&handle->header_lock))
  738. {
  739. cpt++;
  740. _starpu_datawizard_progress(1);
  741. }
  742. if (cpt == STARPU_SPIN_MAXTRY)
  743. _starpu_spin_lock(&handle->header_lock);
  744. if (replicate->state == STARPU_INVALID)
  745. {
  746. unsigned dst_node = replicate->memory_node;
  747. replicate->requested |= 1UL << dst_node;
  748. }
  749. _starpu_spin_unlock(&handle->header_lock);
  750. }
  751. int starpu_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned target_node, int prio)
  752. {
  753. #ifdef STARPU_OPENMP
  754. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  755. /* do not attempt to prefetch task input if this is an OpenMP task resuming after blocking */
  756. if (j->discontinuous != 0)
  757. return 0;
  758. #endif
  759. STARPU_ASSERT(!task->prefetched);
  760. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  761. unsigned index;
  762. for (index = 0; index < nbuffers; index++)
  763. {
  764. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
  765. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
  766. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  767. continue;
  768. if (!(mode & STARPU_R))
  769. /* Don't bother prefetching some data which will be overwritten */
  770. continue;
  771. int node = _starpu_task_data_get_node_on_node(task, index, target_node);
  772. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  773. prefetch_data_on_node(handle, node, replicate, mode, prio);
  774. _starpu_set_data_requested_flag_if_needed(handle, replicate);
  775. }
  776. return 0;
  777. }
  778. int starpu_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
  779. {
  780. int prio = task->priority;
  781. if (task->workerorder)
  782. prio = INT_MAX - task->workerorder;
  783. return starpu_prefetch_task_input_on_node_prio(task, node, prio);
  784. }
  785. int starpu_idle_prefetch_task_input_on_node_prio(struct starpu_task *task, unsigned target_node, int prio)
  786. {
  787. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  788. unsigned index;
  789. for (index = 0; index < nbuffers; index++)
  790. {
  791. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
  792. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
  793. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  794. continue;
  795. if (!(mode & STARPU_R))
  796. /* Don't bother prefetching some data which will be overwritten */
  797. continue;
  798. int node = _starpu_task_data_get_node_on_node(task, index, target_node);
  799. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  800. idle_prefetch_data_on_node(handle, node, replicate, mode, prio);
  801. }
  802. return 0;
  803. }
  804. int starpu_idle_prefetch_task_input_on_node(struct starpu_task *task, unsigned node)
  805. {
  806. int prio = task->priority;
  807. if (task->workerorder)
  808. prio = INT_MAX - task->workerorder;
  809. return starpu_idle_prefetch_task_input_on_node_prio(task, node, prio);
  810. }
  811. int starpu_prefetch_task_input_for_prio(struct starpu_task *task, unsigned worker, int prio)
  812. {
  813. #ifdef STARPU_OPENMP
  814. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  815. /* do not attempt to prefetch task input if this is an OpenMP task resuming after blocking */
  816. if (j->discontinuous != 0)
  817. return 0;
  818. #endif
  819. STARPU_ASSERT(!task->prefetched);
  820. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  821. unsigned index;
  822. for (index = 0; index < nbuffers; index++)
  823. {
  824. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
  825. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
  826. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  827. continue;
  828. if (!(mode & STARPU_R))
  829. /* Don't bother prefetching some data which will be overwritten */
  830. continue;
  831. int node = _starpu_task_data_get_node_on_worker(task, index, worker);
  832. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  833. prefetch_data_on_node(handle, node, replicate, mode, prio);
  834. _starpu_set_data_requested_flag_if_needed(handle, replicate);
  835. }
  836. return 0;
  837. }
  838. int starpu_prefetch_task_input_for(struct starpu_task *task, unsigned worker)
  839. {
  840. int prio = task->priority;
  841. if (task->workerorder)
  842. prio = INT_MAX - task->workerorder;
  843. return starpu_prefetch_task_input_for_prio(task, worker, prio);
  844. }
  845. int starpu_idle_prefetch_task_input_for_prio(struct starpu_task *task, unsigned worker, int prio)
  846. {
  847. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  848. unsigned index;
  849. for (index = 0; index < nbuffers; index++)
  850. {
  851. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, index);
  852. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, index);
  853. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  854. continue;
  855. if (!(mode & STARPU_R))
  856. /* Don't bother prefetching some data which will be overwritten */
  857. continue;
  858. int node = _starpu_task_data_get_node_on_worker(task, index, worker);
  859. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  860. idle_prefetch_data_on_node(handle, node, replicate, mode, prio);
  861. }
  862. return 0;
  863. }
  864. int starpu_idle_prefetch_task_input_for(struct starpu_task *task, unsigned worker)
  865. {
  866. int prio = task->priority;
  867. if (task->workerorder)
  868. prio = INT_MAX - task->workerorder;
  869. return starpu_idle_prefetch_task_input_for_prio(task, worker, prio);
  870. }
  871. struct _starpu_data_replicate *get_replicate(starpu_data_handle_t handle, enum starpu_data_access_mode mode, int workerid, unsigned node)
  872. {
  873. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  874. {
  875. STARPU_ASSERT(workerid >= 0);
  876. if (!handle->per_worker)
  877. {
  878. _starpu_spin_lock(&handle->header_lock);
  879. if (!handle->per_worker)
  880. _starpu_data_initialize_per_worker(handle);
  881. _starpu_spin_unlock(&handle->header_lock);
  882. }
  883. return &handle->per_worker[workerid];
  884. }
  885. else
  886. /* That's a "normal" buffer (R/W) */
  887. return &handle->per_node[node];
  888. }
  889. /* Callback used when a buffer is send asynchronously to the sink */
  890. static void _starpu_fetch_task_input_cb(void *arg)
  891. {
  892. struct _starpu_worker * worker = (struct _starpu_worker *) arg;
  893. /* increase the number of buffer received */
  894. STARPU_WMB();
  895. (void)STARPU_ATOMIC_ADD(&worker->nb_buffers_transferred, 1);
  896. #ifdef STARPU_SIMGRID
  897. starpu_pthread_queue_broadcast(&_starpu_simgrid_transfer_queue[worker->memory_node]);
  898. #endif
  899. }
  900. /* Synchronously or asynchronously fetch data for a given task (if it's not there already)
  901. * Returns the number of data acquired here. */
  902. /* _starpu_fetch_task_input must be called before
  903. * executing the task. __starpu_push_task_output but be called after the
  904. * execution of the task. */
  905. /* The driver can either just call _starpu_fetch_task_input with async==0,
  906. * or to improve overlapping, it can call _starpu_fetch_task_input with
  907. * async==1, then wait for transfers to complete, then call
  908. * _starpu_fetch_task_input_tail to complete the fetch. */
  909. int _starpu_fetch_task_input(struct starpu_task *task, struct _starpu_job *j, int async)
  910. {
  911. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  912. int workerid = worker->workerid;
  913. if (async)
  914. {
  915. worker->task_transferring = task;
  916. worker->nb_buffers_transferred = 0;
  917. if (worker->ntasks <= 1)
  918. _STARPU_TRACE_WORKER_START_FETCH_INPUT(NULL, workerid);
  919. }
  920. else
  921. _STARPU_TRACE_START_FETCH_INPUT(NULL);
  922. int profiling = starpu_profiling_status_get();
  923. if (profiling && task->profiling_info)
  924. _starpu_clock_gettime(&task->profiling_info->acquire_data_start_time);
  925. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  926. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  927. unsigned nacquires;
  928. unsigned index;
  929. nacquires = 0;
  930. for (index = 0; index < nbuffers; index++)
  931. {
  932. int ret;
  933. starpu_data_handle_t handle = descrs[index].handle;
  934. enum starpu_data_access_mode mode = descrs[index].mode;
  935. int node = _starpu_task_data_get_node_on_worker(task, descrs[index].index, workerid);
  936. /* We set this here for coherency with __starpu_push_task_output */
  937. descrs[index].node = node;
  938. if (mode == STARPU_NONE ||
  939. (mode & ((1<<STARPU_MODE_SHIFT) - 1)) >= STARPU_ACCESS_MODE_MAX ||
  940. (mode >> STARPU_MODE_SHIFT) >= (STARPU_SHIFTED_MODE_MAX >> STARPU_MODE_SHIFT))
  941. STARPU_ASSERT_MSG(0, "mode %d (0x%x) is bogus\n", mode, mode);
  942. struct _starpu_data_replicate *local_replicate;
  943. if (index && descrs[index-1].handle == descrs[index].handle)
  944. /* We have already took this data, skip it. This
  945. * depends on ordering putting writes before reads, see
  946. * _starpu_compar_handles */
  947. continue;
  948. local_replicate = get_replicate(handle, mode, workerid, node);
  949. if (async)
  950. {
  951. ret = _starpu_fetch_data_on_node(handle, node, local_replicate, mode, 0, 0, 1,
  952. _starpu_fetch_task_input_cb, worker, 0, "_starpu_fetch_task_input");
  953. #ifdef STARPU_SIMGRID
  954. if (_starpu_simgrid_fetching_input_cost())
  955. starpu_sleep(0.000001);
  956. #endif
  957. if (STARPU_UNLIKELY(ret))
  958. {
  959. /* Ooops, not enough memory, make worker wait for these for now, and the synchronous call will finish by forcing eviction*/
  960. worker->nb_buffers_totransfer = nacquires;
  961. _starpu_set_worker_status(worker, STATUS_WAITING);
  962. return 0;
  963. }
  964. }
  965. else
  966. {
  967. ret = fetch_data(handle, node, local_replicate, mode, 0);
  968. #ifdef STARPU_SIMGRID
  969. if (_starpu_simgrid_fetching_input_cost())
  970. starpu_sleep(0.000001);
  971. #endif
  972. if (STARPU_UNLIKELY(ret))
  973. goto enomem;
  974. }
  975. nacquires++;
  976. }
  977. if (async)
  978. {
  979. worker->nb_buffers_totransfer = nacquires;
  980. _starpu_set_worker_status(worker, STATUS_WAITING);
  981. return 0;
  982. }
  983. _starpu_fetch_task_input_tail(task, j, worker);
  984. return 0;
  985. enomem:
  986. _STARPU_TRACE_END_FETCH_INPUT(NULL);
  987. _STARPU_DISP("something went wrong with buffer %u\n", index);
  988. /* try to unreference all the input that were successfully taken */
  989. unsigned index2;
  990. for (index2 = 0; index2 < index; index2++)
  991. {
  992. starpu_data_handle_t handle = descrs[index2].handle;
  993. enum starpu_data_access_mode mode = descrs[index2].mode;
  994. int node = descrs[index].node;
  995. struct _starpu_data_replicate *local_replicate;
  996. if (index2 && descrs[index2-1].handle == descrs[index2].handle)
  997. /* We have already released this data, skip it. This
  998. * depends on ordering putting writes before reads, see
  999. * _starpu_compar_handles */
  1000. continue;
  1001. local_replicate = get_replicate(handle, mode, workerid, node);
  1002. _starpu_release_data_on_node(handle, 0, local_replicate);
  1003. }
  1004. return -1;
  1005. }
  1006. /* Now that we have taken the data locks in locking order, fill the codelet interfaces in function order. */
  1007. void _starpu_fetch_task_input_tail(struct starpu_task *task, struct _starpu_job *j, struct _starpu_worker *worker)
  1008. {
  1009. int workerid = worker->workerid;
  1010. int profiling = starpu_profiling_status_get();
  1011. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  1012. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  1013. unsigned index;
  1014. unsigned long total_size = 0;
  1015. for (index = 0; index < nbuffers; index++)
  1016. {
  1017. starpu_data_handle_t handle = descrs[index].handle;
  1018. enum starpu_data_access_mode mode = descrs[index].mode;
  1019. int node = descrs[index].node;
  1020. struct _starpu_data_replicate *local_replicate;
  1021. local_replicate = get_replicate(handle, mode, workerid, node);
  1022. _starpu_spin_lock(&handle->header_lock);
  1023. if (local_replicate->mc)
  1024. local_replicate->mc->diduse = 1;
  1025. _starpu_spin_unlock(&handle->header_lock);
  1026. _STARPU_TASK_SET_INTERFACE(task , local_replicate->data_interface, descrs[index].index);
  1027. /* If the replicate was not initialized yet, we have to do it now */
  1028. if (!(mode & STARPU_SCRATCH) && !local_replicate->initialized)
  1029. _starpu_redux_init_data_replicate(handle, local_replicate, workerid);
  1030. #ifdef STARPU_USE_FXT
  1031. total_size += _starpu_data_get_size(handle);
  1032. #endif
  1033. }
  1034. _STARPU_TRACE_DATA_LOAD(workerid,total_size);
  1035. if (profiling && task->profiling_info)
  1036. _starpu_clock_gettime(&task->profiling_info->acquire_data_end_time);
  1037. _STARPU_TRACE_END_FETCH_INPUT(NULL);
  1038. }
  1039. /* Release task data dependencies */
  1040. void __starpu_push_task_output(struct _starpu_job *j)
  1041. {
  1042. #ifdef STARPU_OPENMP
  1043. STARPU_ASSERT(!j->continuation);
  1044. #endif
  1045. int profiling = starpu_profiling_status_get();
  1046. struct starpu_task *task = j->task;
  1047. if (profiling && task->profiling_info)
  1048. _starpu_clock_gettime(&task->profiling_info->release_data_start_time);
  1049. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  1050. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  1051. int workerid = starpu_worker_get_id();
  1052. unsigned index;
  1053. for (index = 0; index < nbuffers; index++)
  1054. {
  1055. starpu_data_handle_t handle = descrs[index].handle;
  1056. enum starpu_data_access_mode mode = descrs[index].mode;
  1057. int node = descrs[index].node;
  1058. struct _starpu_data_replicate *local_replicate = NULL;
  1059. if (index && descrs[index-1].handle == descrs[index].handle)
  1060. /* We have already released this data, skip it. This
  1061. * depends on ordering putting writes before reads, see
  1062. * _starpu_compar_handles */
  1063. continue;
  1064. if (node != -1)
  1065. local_replicate = get_replicate(handle, mode, workerid, node);
  1066. /* Keep a reference for future
  1067. * _starpu_release_task_enforce_sequential_consistency call */
  1068. _starpu_spin_lock(&handle->header_lock);
  1069. handle->busy_count++;
  1070. if (node == -1)
  1071. {
  1072. /* NOWHERE case, just notify dependencies */
  1073. if (!_starpu_notify_data_dependencies(handle))
  1074. _starpu_spin_unlock(&handle->header_lock);
  1075. }
  1076. else
  1077. {
  1078. _starpu_spin_unlock(&handle->header_lock);
  1079. _starpu_release_data_on_node(handle, 0, local_replicate);
  1080. }
  1081. }
  1082. if (profiling && task->profiling_info)
  1083. _starpu_clock_gettime(&task->profiling_info->release_data_end_time);
  1084. }
  1085. /* Version for a driver running on a worker: we show the driver state in the trace */
  1086. void _starpu_push_task_output(struct _starpu_job *j)
  1087. {
  1088. _STARPU_TRACE_START_PUSH_OUTPUT(NULL);
  1089. __starpu_push_task_output(j);
  1090. _STARPU_TRACE_END_PUSH_OUTPUT(NULL);
  1091. }
  1092. struct fetch_nowhere_wrapper
  1093. {
  1094. struct _starpu_job *j;
  1095. unsigned pending;
  1096. };
  1097. static void _starpu_fetch_nowhere_task_input_cb(void *arg);
  1098. /* Asynchronously fetch data for a task which will have no content */
  1099. void _starpu_fetch_nowhere_task_input(struct _starpu_job *j)
  1100. {
  1101. int profiling = starpu_profiling_status_get();
  1102. struct starpu_task *task = j->task;
  1103. if (profiling && task->profiling_info)
  1104. _starpu_clock_gettime(&task->profiling_info->acquire_data_start_time);
  1105. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  1106. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  1107. unsigned nfetchbuffers = 0;
  1108. struct fetch_nowhere_wrapper *wrapper;
  1109. unsigned index;
  1110. for (index = 0; index < nbuffers; index++)
  1111. {
  1112. /* Note here we just follow what was requested, and not use _starpu_task_data_get_node* */
  1113. int node = -1;
  1114. if (task->cl->specific_nodes)
  1115. node = STARPU_CODELET_GET_NODE(task->cl, descrs[index].index);
  1116. descrs[index].node = node;
  1117. if (node != -1)
  1118. nfetchbuffers++;
  1119. }
  1120. if (!nfetchbuffers)
  1121. {
  1122. /* Nothing to fetch actually, already finished! */
  1123. __starpu_push_task_output(j);
  1124. _starpu_handle_job_termination(j);
  1125. _STARPU_LOG_OUT_TAG("handle_job_termination");
  1126. return;
  1127. }
  1128. _STARPU_MALLOC(wrapper, (sizeof(*wrapper)));
  1129. wrapper->j = j;
  1130. /* +1 for the call below */
  1131. wrapper->pending = nfetchbuffers + 1;
  1132. for (index = 0; index < nbuffers; index++)
  1133. {
  1134. starpu_data_handle_t handle = descrs[index].handle;
  1135. enum starpu_data_access_mode mode = descrs[index].mode;
  1136. int node = descrs[index].node;
  1137. if (node == -1)
  1138. continue;
  1139. if (mode == STARPU_NONE ||
  1140. (mode & ((1<<STARPU_MODE_SHIFT) - 1)) >= STARPU_ACCESS_MODE_MAX ||
  1141. (mode >> STARPU_MODE_SHIFT) >= (STARPU_SHIFTED_MODE_MAX >> STARPU_MODE_SHIFT))
  1142. STARPU_ASSERT_MSG(0, "mode %d (0x%x) is bogus\n", mode, mode);
  1143. STARPU_ASSERT(mode != STARPU_SCRATCH && mode != STARPU_REDUX);
  1144. struct _starpu_data_replicate *local_replicate;
  1145. local_replicate = get_replicate(handle, mode, -1, node);
  1146. _starpu_fetch_data_on_node(handle, node, local_replicate, mode, 0, 0, 1, _starpu_fetch_nowhere_task_input_cb, wrapper, 0, "_starpu_fetch_nowhere_task_input");
  1147. }
  1148. if (profiling && task->profiling_info)
  1149. _starpu_clock_gettime(&task->profiling_info->acquire_data_end_time);
  1150. /* Finished working with the task, release our reference */
  1151. _starpu_fetch_nowhere_task_input_cb(wrapper);
  1152. }
  1153. static void _starpu_fetch_nowhere_task_input_cb(void *arg)
  1154. {
  1155. /* One more transfer finished */
  1156. struct fetch_nowhere_wrapper *wrapper = arg;
  1157. unsigned pending = STARPU_ATOMIC_ADD(&wrapper->pending, -1);
  1158. ANNOTATE_HAPPENS_BEFORE(&wrapper->pending);
  1159. if (pending == 0)
  1160. {
  1161. ANNOTATE_HAPPENS_AFTER(&wrapper->pending);
  1162. /* Finished transferring, task is over */
  1163. struct _starpu_job *j = wrapper->j;
  1164. free(wrapper);
  1165. __starpu_push_task_output(j);
  1166. _starpu_handle_job_termination(j);
  1167. _STARPU_LOG_OUT_TAG("handle_job_termination");
  1168. }
  1169. }
  1170. /* NB : this value can only be an indication of the status of a data
  1171. at some point, but there is no strong garantee ! */
  1172. unsigned starpu_data_is_on_node(starpu_data_handle_t handle, unsigned node)
  1173. {
  1174. unsigned ret = 0;
  1175. // XXX : this is just a hint, so we don't take the lock ...
  1176. // STARPU_PTHREAD_SPIN_LOCK(&handle->header_lock);
  1177. if (handle->per_node[node].state != STARPU_INVALID)
  1178. {
  1179. ret = 1;
  1180. }
  1181. else
  1182. {
  1183. unsigned i;
  1184. unsigned nnodes = starpu_memory_nodes_get_count();
  1185. for (i = 0; i < nnodes; i++)
  1186. {
  1187. if ((handle->per_node[node].requested & (1UL << i)) || handle->per_node[node].request[i])
  1188. ret = 1;
  1189. }
  1190. }
  1191. // STARPU_PTHREAD_SPIN_UNLOCK(&handle->header_lock);
  1192. return ret;
  1193. }