coherency.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513
  1. /*
  2. * StarPU
  3. * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <common/config.h>
  17. #include <datawizard/coherency.h>
  18. #include <datawizard/copy_driver.h>
  19. #include <datawizard/write_back.h>
  20. #include <core/dependencies/data_concurrency.h>
  21. uint32_t _starpu_select_node_to_handle_request(uint32_t src_node, uint32_t dst_node)
  22. {
  23. /* in case one of the node is a GPU, it needs to perform the transfer,
  24. * if both of them are GPU, it's a bit more complicated */
  25. unsigned src_is_a_gpu = (_starpu_get_node_kind(src_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(src_node) == STARPU_OPENCL_RAM);
  26. unsigned dst_is_a_gpu = (_starpu_get_node_kind(dst_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(dst_node) == STARPU_OPENCL_RAM);
  27. /* we do not handle GPU->GPU transfers yet ! */
  28. STARPU_ASSERT( !(src_is_a_gpu && dst_is_a_gpu) );
  29. if (src_is_a_gpu)
  30. return src_node;
  31. if (dst_is_a_gpu)
  32. return dst_node;
  33. /* otherwise perform it locally, since we should be on a "sane" arch
  34. * where anyone can do the transfer. NB: in StarPU this should actually never
  35. * happen */
  36. return _starpu_get_local_memory_node();
  37. }
  38. uint32_t _starpu_select_src_node(starpu_data_handle handle)
  39. {
  40. unsigned src_node = 0;
  41. unsigned i;
  42. unsigned nnodes = _starpu_get_memory_nodes_count();
  43. /* first find a valid copy, either a STARPU_OWNER or a STARPU_SHARED */
  44. uint32_t node;
  45. uint32_t src_node_mask = 0;
  46. for (node = 0; node < nnodes; node++)
  47. {
  48. if (handle->per_node[node].state != STARPU_INVALID) {
  49. /* we found a copy ! */
  50. src_node_mask |= (1<<node);
  51. }
  52. }
  53. /* we should have found at least one copy ! */
  54. STARPU_ASSERT(src_node_mask != 0);
  55. /* find the node that will be the actual source */
  56. for (i = 0; i < nnodes; i++)
  57. {
  58. if (src_node_mask & (1<<i))
  59. {
  60. /* this is a potential candidate */
  61. src_node = i;
  62. /* however GPU are expensive sources, really !
  63. * other should be ok */
  64. if (_starpu_get_node_kind(i) != STARPU_CUDA_RAM)
  65. break;
  66. if (_starpu_get_node_kind(i) != STARPU_OPENCL_RAM)
  67. break;
  68. /* XXX do a better algorithm to distribute the memory copies */
  69. /* TODO : use the "requesting_node" as an argument to do so */
  70. }
  71. }
  72. return src_node;
  73. }
  74. /* this may be called once the data is fetched with header and STARPU_RW-lock hold */
  75. void _starpu_update_data_state(starpu_data_handle handle,
  76. struct starpu_data_replicate_s *requesting_replicate,
  77. starpu_access_mode mode)
  78. {
  79. unsigned nnodes = _starpu_get_memory_nodes_count();
  80. /* the data is present now */
  81. requesting_replicate->requested = 0;
  82. if (mode & STARPU_W) {
  83. /* the requesting node now has the only valid copy */
  84. uint32_t node;
  85. for (node = 0; node < nnodes; node++)
  86. handle->per_node[node].state = STARPU_INVALID;
  87. requesting_replicate->state = STARPU_OWNER;
  88. }
  89. else { /* read only */
  90. if (requesting_replicate->state != STARPU_OWNER)
  91. {
  92. /* there was at least another copy of the data */
  93. uint32_t node;
  94. for (node = 0; node < nnodes; node++)
  95. {
  96. struct starpu_data_replicate_s *replicate = &handle->per_node[node];
  97. if (replicate->state != STARPU_INVALID)
  98. replicate->state = STARPU_SHARED;
  99. }
  100. requesting_replicate->state = STARPU_SHARED;
  101. }
  102. }
  103. }
  104. /*
  105. * This function is called when the data is needed on the local node, this
  106. * returns a pointer to the local copy
  107. *
  108. * R STARPU_W STARPU_RW
  109. * Owner OK OK OK
  110. * Shared OK 1 1
  111. * Invalid 2 3 4
  112. *
  113. * case 1 : shared + (read)write :
  114. * no data copy but shared->Invalid/Owner
  115. * case 2 : invalid + read :
  116. * data copy + invalid->shared + owner->shared (STARPU_ASSERT(there is a valid))
  117. * case 3 : invalid + write :
  118. * no data copy + invalid->owner + (owner,shared)->invalid
  119. * case 4 : invalid + R/STARPU_W :
  120. * data copy + if (STARPU_W) (invalid->owner + owner->invalid)
  121. * else (invalid,owner->shared)
  122. */
  123. int _starpu_fetch_data_on_node(starpu_data_handle handle, uint32_t requesting_node,
  124. starpu_access_mode mode, unsigned is_prefetch,
  125. void (*callback_func)(void *), void *callback_arg)
  126. {
  127. uint32_t local_node = _starpu_get_local_memory_node();
  128. _STARPU_LOG_IN();
  129. struct starpu_data_replicate_s *dst_replicate = &handle->per_node[requesting_node];
  130. while (_starpu_spin_trylock(&handle->header_lock))
  131. _starpu_datawizard_progress(local_node, 1);
  132. if (!is_prefetch)
  133. dst_replicate->refcnt++;
  134. if (dst_replicate->state != STARPU_INVALID)
  135. {
  136. /* the data is already available so we can stop */
  137. _starpu_update_data_state(handle, dst_replicate, mode);
  138. _starpu_msi_cache_hit(requesting_node);
  139. _starpu_spin_unlock(&handle->header_lock);
  140. if (callback_func)
  141. callback_func(callback_arg);
  142. _STARPU_LOG_OUT_TAG("data available");
  143. return 0;
  144. }
  145. /* the only remaining situation is that the local copy was invalid */
  146. STARPU_ASSERT(dst_replicate->state == STARPU_INVALID);
  147. _starpu_msi_cache_miss(requesting_node);
  148. starpu_data_request_t r;
  149. /* is there already a pending request ? */
  150. r = _starpu_search_existing_data_request(dst_replicate, mode);
  151. /* at the exit of _starpu_search_existing_data_request the lock is taken is the request existed ! */
  152. if (!r) {
  153. /* find someone who already has the data */
  154. uint32_t src_node = 0;
  155. /* if the data is in write only mode, there is no need for a source */
  156. if (mode & STARPU_R)
  157. {
  158. src_node = _starpu_select_src_node(handle);
  159. STARPU_ASSERT(src_node != requesting_node);
  160. }
  161. unsigned src_is_a_gpu = (_starpu_get_node_kind(src_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(src_node) == STARPU_OPENCL_RAM);
  162. unsigned dst_is_a_gpu = (_starpu_get_node_kind(requesting_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(requesting_node) == STARPU_OPENCL_RAM);
  163. struct starpu_data_replicate_s *src_replicate = &handle->per_node[src_node];
  164. /* we have to perform 2 successive requests for GPU->GPU transfers */
  165. if ((mode & STARPU_R) && (src_is_a_gpu && dst_is_a_gpu)) {
  166. unsigned reuse_r_src_to_ram;
  167. starpu_data_request_t r_src_to_ram;
  168. starpu_data_request_t r_ram_to_dst;
  169. struct starpu_data_replicate_s *ram_replicate = &handle->per_node[0];
  170. /* XXX we hardcore 0 as the RAM node ... */
  171. r_ram_to_dst = _starpu_create_data_request(handle, ram_replicate,
  172. dst_replicate, requesting_node, mode, is_prefetch);
  173. if (!is_prefetch)
  174. r_ram_to_dst->refcnt++;
  175. r_src_to_ram = _starpu_search_existing_data_request(ram_replicate, mode);
  176. reuse_r_src_to_ram = r_src_to_ram?1:0;
  177. if (!r_src_to_ram)
  178. {
  179. r_src_to_ram = _starpu_create_data_request(handle, src_replicate,
  180. ram_replicate, src_node, mode, is_prefetch);
  181. }
  182. /* we chain both requests */
  183. r_src_to_ram->next_req[r_src_to_ram->next_req_count++]= r_ram_to_dst;
  184. _starpu_data_request_append_callback(r_ram_to_dst, callback_func, callback_arg);
  185. if (reuse_r_src_to_ram)
  186. _starpu_spin_unlock(&r_src_to_ram->lock);
  187. _starpu_spin_unlock(&handle->header_lock);
  188. /* we only submit the first request, the remaining will be automatically submitted afterward */
  189. if (!reuse_r_src_to_ram)
  190. _starpu_post_data_request(r_src_to_ram, src_node);
  191. /* the application only waits for the termination of the last request */
  192. r = r_ram_to_dst;
  193. }
  194. else {
  195. /* who will perform that request ? */
  196. uint32_t handling_node =
  197. _starpu_select_node_to_handle_request(src_node, requesting_node);
  198. r = _starpu_create_data_request(handle, src_replicate,
  199. dst_replicate, handling_node, mode, is_prefetch);
  200. _starpu_data_request_append_callback(r, callback_func, callback_arg);
  201. if (!is_prefetch)
  202. r->refcnt++;
  203. _starpu_spin_unlock(&handle->header_lock);
  204. _starpu_post_data_request(r, handling_node);
  205. }
  206. }
  207. else {
  208. /* the lock was taken by _starpu_search_existing_data_request */
  209. _starpu_data_request_append_callback(r, callback_func, callback_arg);
  210. /* there is already a similar request */
  211. if (is_prefetch)
  212. {
  213. _starpu_spin_unlock(&r->lock);
  214. _starpu_spin_unlock(&handle->header_lock);
  215. _STARPU_LOG_OUT_TAG("similar request");
  216. return 0;
  217. }
  218. r->refcnt++;
  219. //_starpu_spin_lock(&r->lock);
  220. if (r->is_a_prefetch_request)
  221. {
  222. /* transform that prefetch request into a "normal" request */
  223. r->is_a_prefetch_request = 0;
  224. /* transform that request into the proper access mode (prefetch could be read only) */
  225. #warning check that
  226. r->mode |= mode;
  227. }
  228. //_STARPU_DEBUG("found a similar request : refcnt (req) %d\n", r->refcnt);
  229. _starpu_spin_unlock(&r->lock);
  230. _starpu_spin_unlock(&handle->header_lock);
  231. }
  232. int ret = is_prefetch?0:_starpu_wait_data_request_completion(r, 1);
  233. _STARPU_LOG_OUT();
  234. return ret;
  235. }
  236. static int prefetch_data_on_node(starpu_data_handle handle, starpu_access_mode mode, uint32_t node)
  237. {
  238. return _starpu_fetch_data_on_node(handle, node, mode, 1, NULL, NULL);
  239. }
  240. static int fetch_data(starpu_data_handle handle, starpu_access_mode mode)
  241. {
  242. uint32_t requesting_node = _starpu_get_local_memory_node();
  243. STARPU_ASSERT(!(mode & STARPU_SCRATCH));
  244. return _starpu_fetch_data_on_node(handle, requesting_node, mode, 0, NULL, NULL);
  245. }
  246. inline uint32_t _starpu_get_data_refcnt(starpu_data_handle handle, uint32_t node)
  247. {
  248. return handle->per_node[node].refcnt;
  249. }
  250. size_t _starpu_data_get_size(starpu_data_handle handle)
  251. {
  252. return handle->data_size;
  253. }
  254. uint32_t _starpu_data_get_footprint(starpu_data_handle handle)
  255. {
  256. return handle->footprint;
  257. }
  258. /* in case the data was accessed on a write mode, do not forget to
  259. * make it accessible again once it is possible ! */
  260. void _starpu_release_data_on_node(starpu_data_handle handle, uint32_t default_wt_mask, uint32_t memory_node)
  261. {
  262. uint32_t wt_mask;
  263. wt_mask = default_wt_mask | handle->wt_mask;
  264. /* Note that it is possible that there is no valid copy of the data (if
  265. * starpu_data_invalidate was called for instance). In that case, we do
  266. * not enforce any write-through mechanism. */
  267. struct starpu_data_replicate_s *replicate = &handle->per_node[memory_node];
  268. if (replicate->state != STARPU_INVALID)
  269. if ((wt_mask & ~(1<<memory_node)))
  270. _starpu_write_through_data(handle, memory_node, wt_mask);
  271. uint32_t local_node = _starpu_get_local_memory_node();
  272. while (_starpu_spin_trylock(&handle->header_lock))
  273. _starpu_datawizard_progress(local_node, 1);
  274. replicate->refcnt--;
  275. STARPU_ASSERT(replicate->refcnt >= 0);
  276. _starpu_notify_data_dependencies(handle);
  277. _starpu_spin_unlock(&handle->header_lock);
  278. }
  279. inline void _starpu_set_data_requested_flag_if_needed(starpu_data_handle handle, uint32_t node)
  280. {
  281. // XXX : this is just a hint, so we don't take the lock ...
  282. // pthread_spin_lock(&handle->header_lock);
  283. struct starpu_data_replicate_s *replicate = &handle->per_node[node];
  284. if (replicate->state == STARPU_INVALID)
  285. replicate->requested = 1;
  286. // pthread_spin_unlock(&handle->header_lock);
  287. }
  288. int _starpu_prefetch_task_input_on_node(struct starpu_task *task, uint32_t node)
  289. {
  290. starpu_buffer_descr *descrs = task->buffers;
  291. unsigned nbuffers = task->cl->nbuffers;
  292. unsigned index;
  293. for (index = 0; index < nbuffers; index++)
  294. {
  295. starpu_data_handle handle = descrs[index].handle;
  296. starpu_access_mode mode = descrs[index].mode;
  297. if (mode & STARPU_SCRATCH)
  298. continue;
  299. prefetch_data_on_node(handle, mode, node);
  300. _starpu_set_data_requested_flag_if_needed(handle, node);
  301. }
  302. return 0;
  303. }
  304. int _starpu_fetch_task_input(struct starpu_task *task, uint32_t mask)
  305. {
  306. STARPU_TRACE_START_FETCH_INPUT(NULL);
  307. uint32_t local_memory_node = _starpu_get_local_memory_node();
  308. starpu_buffer_descr *descrs = task->buffers;
  309. unsigned nbuffers = task->cl->nbuffers;
  310. /* TODO get that from the stack */
  311. starpu_job_t j = (struct starpu_job_s *)task->starpu_private;
  312. unsigned index;
  313. for (index = 0; index < nbuffers; index++)
  314. {
  315. int ret;
  316. starpu_data_handle handle = descrs[index].handle;
  317. starpu_access_mode mode = descrs[index].mode;
  318. void *interface;
  319. if (mode & STARPU_SCRATCH)
  320. {
  321. starpu_mem_chunk_t mc;
  322. mc = _starpu_memchunk_cache_lookup(local_memory_node, handle);
  323. if (!mc)
  324. {
  325. /* Cache miss */
  326. /* This is a scratch memory, so we duplicate (any of)
  327. * the interface which contains sufficient information
  328. * to allocate the buffer. */
  329. size_t interface_size = handle->ops->interface_size;
  330. void *src_interface = starpu_data_get_interface_on_node(handle, local_memory_node);
  331. /* Pass the interface to StarPU so that the buffer can be allocated */
  332. _starpu_allocate_interface(handle, src_interface, local_memory_node);
  333. size_t size = _starpu_data_get_size(handle);
  334. #warning TODO create a replicate struct here:
  335. mc = _starpu_memchunk_init(handle, size, src_interface, interface_size, 1);
  336. }
  337. interface = mc->interface;
  338. j->scratch_memchunks[index] = mc;
  339. }
  340. else {
  341. /* That's a "normal" buffer (R/W) */
  342. ret = fetch_data(handle, mode);
  343. if (STARPU_UNLIKELY(ret))
  344. goto enomem;
  345. interface = starpu_data_get_interface_on_node(handle, local_memory_node);
  346. }
  347. task->interface[index] = interface;
  348. }
  349. STARPU_TRACE_END_FETCH_INPUT(NULL);
  350. return 0;
  351. enomem:
  352. /* try to unreference all the input that were successfully taken */
  353. /* XXX broken ... */
  354. _STARPU_DISP("something went wrong with buffer %u\n", index);
  355. //push_codelet_output(task, index, mask);
  356. _starpu_push_task_output(task, mask);
  357. return -1;
  358. }
  359. void _starpu_push_task_output(struct starpu_task *task, uint32_t mask)
  360. {
  361. STARPU_TRACE_START_PUSH_OUTPUT(NULL);
  362. starpu_buffer_descr *descrs = task->buffers;
  363. unsigned nbuffers = task->cl->nbuffers;
  364. starpu_job_t j = (struct starpu_job_s *)task->starpu_private;
  365. uint32_t local_node = _starpu_get_local_memory_node();
  366. unsigned index;
  367. for (index = 0; index < nbuffers; index++)
  368. {
  369. starpu_data_handle handle = descrs[index].handle;
  370. starpu_access_mode mode = descrs[index].mode;
  371. if (mode & STARPU_SCRATCH)
  372. {
  373. _starpu_memchunk_cache_insert(local_node, j->scratch_memchunks[index]);
  374. }
  375. else {
  376. _starpu_release_data_on_node(handle, mask, local_node);
  377. _starpu_release_data_enforce_sequential_consistency(task, handle);
  378. }
  379. }
  380. STARPU_TRACE_END_PUSH_OUTPUT(NULL);
  381. }
  382. /* NB : this value can only be an indication of the status of a data
  383. at some point, but there is no strong garantee ! */
  384. unsigned _starpu_is_data_present_or_requested(starpu_data_handle handle, uint32_t node)
  385. {
  386. unsigned ret = 0;
  387. // XXX : this is just a hint, so we don't take the lock ...
  388. // pthread_spin_lock(&handle->header_lock);
  389. if (handle->per_node[node].state != STARPU_INVALID
  390. || handle->per_node[node].requested || handle->per_node[node].request)
  391. ret = 1;
  392. // pthread_spin_unlock(&handle->header_lock);
  393. return ret;
  394. }