coherency.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009, 2010 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011 Centre National de la Recherche Scientifique
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <common/config.h>
  18. #include <datawizard/coherency.h>
  19. #include <datawizard/copy_driver.h>
  20. #include <datawizard/write_back.h>
  21. #include <core/dependencies/data_concurrency.h>
  22. #include <profiling/profiling.h>
  23. uint32_t _starpu_select_node_to_handle_request(uint32_t src_node, uint32_t dst_node)
  24. {
  25. /* in case one of the node is a GPU, it needs to perform the transfer,
  26. * if both of them are GPU, it's a bit more complicated */
  27. unsigned src_is_a_gpu = (_starpu_get_node_kind(src_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(src_node) == STARPU_OPENCL_RAM);
  28. unsigned dst_is_a_gpu = (_starpu_get_node_kind(dst_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(dst_node) == STARPU_OPENCL_RAM);
  29. /* we do not handle GPU->GPU transfers yet ! */
  30. STARPU_ASSERT( !(src_is_a_gpu && dst_is_a_gpu) );
  31. if (src_is_a_gpu)
  32. return src_node;
  33. if (dst_is_a_gpu)
  34. return dst_node;
  35. /* otherwise perform it locally, since we should be on a "sane" arch
  36. * where anyone can do the transfer. NB: in StarPU this should actually never
  37. * happen */
  38. return _starpu_get_local_memory_node();
  39. }
  40. uint32_t _starpu_select_src_node(starpu_data_handle handle)
  41. {
  42. unsigned src_node = 0;
  43. unsigned i;
  44. unsigned nnodes = _starpu_get_memory_nodes_count();
  45. /* first find a valid copy, either a STARPU_OWNER or a STARPU_SHARED */
  46. uint32_t node;
  47. uint32_t src_node_mask = 0;
  48. for (node = 0; node < nnodes; node++)
  49. {
  50. if (handle->per_node[node].state != STARPU_INVALID) {
  51. /* we found a copy ! */
  52. src_node_mask |= (1<<node);
  53. }
  54. }
  55. /* we should have found at least one copy ! */
  56. STARPU_ASSERT(src_node_mask != 0);
  57. /* find the node that will be the actual source */
  58. for (i = 0; i < nnodes; i++)
  59. {
  60. if (src_node_mask & (1<<i))
  61. {
  62. /* this is a potential candidate */
  63. src_node = i;
  64. /* however GPU are expensive sources, really !
  65. * other should be ok */
  66. if (_starpu_get_node_kind(i) != STARPU_CUDA_RAM)
  67. break;
  68. if (_starpu_get_node_kind(i) != STARPU_OPENCL_RAM)
  69. break;
  70. /* XXX do a better algorithm to distribute the memory copies */
  71. /* TODO : use the "requesting_node" as an argument to do so */
  72. }
  73. }
  74. return src_node;
  75. }
  76. /* this may be called once the data is fetched with header and STARPU_RW-lock hold */
  77. void _starpu_update_data_state(starpu_data_handle handle,
  78. struct starpu_data_replicate_s *requesting_replicate,
  79. starpu_access_mode mode)
  80. {
  81. /* There is nothing to do for relaxed coherency modes (scratch or
  82. * reductions) */
  83. if (!(mode & STARPU_RW))
  84. return;
  85. unsigned nnodes = _starpu_get_memory_nodes_count();
  86. /* the data is present now */
  87. requesting_replicate->requested = 0;
  88. if (mode & STARPU_W) {
  89. /* the requesting node now has the only valid copy */
  90. uint32_t node;
  91. for (node = 0; node < nnodes; node++)
  92. handle->per_node[node].state = STARPU_INVALID;
  93. requesting_replicate->state = STARPU_OWNER;
  94. }
  95. else { /* read only */
  96. if (requesting_replicate->state != STARPU_OWNER)
  97. {
  98. /* there was at least another copy of the data */
  99. uint32_t node;
  100. for (node = 0; node < nnodes; node++)
  101. {
  102. struct starpu_data_replicate_s *replicate = &handle->per_node[node];
  103. if (replicate->state != STARPU_INVALID)
  104. replicate->state = STARPU_SHARED;
  105. }
  106. requesting_replicate->state = STARPU_SHARED;
  107. }
  108. }
  109. }
  110. /*
  111. * This function is called when the data is needed on the local node, this
  112. * returns a pointer to the local copy
  113. *
  114. * R STARPU_W STARPU_RW
  115. * Owner OK OK OK
  116. * Shared OK 1 1
  117. * Invalid 2 3 4
  118. *
  119. * case 1 : shared + (read)write :
  120. * no data copy but shared->Invalid/Owner
  121. * case 2 : invalid + read :
  122. * data copy + invalid->shared + owner->shared (STARPU_ASSERT(there is a valid))
  123. * case 3 : invalid + write :
  124. * no data copy + invalid->owner + (owner,shared)->invalid
  125. * case 4 : invalid + R/STARPU_W :
  126. * data copy + if (STARPU_W) (invalid->owner + owner->invalid)
  127. * else (invalid,owner->shared)
  128. */
  129. /* This function is called with handle's header lock taken */
  130. static starpu_data_request_t create_new_request_to_fetch_data(starpu_data_handle handle,
  131. struct starpu_data_replicate_s *dst_replicate,
  132. starpu_access_mode mode, unsigned is_prefetch,
  133. void (*callback_func)(void *), void *callback_arg)
  134. {
  135. starpu_data_request_t r;
  136. unsigned requesting_node = dst_replicate->memory_node;
  137. /* find someone who already has the data */
  138. uint32_t src_node = 0;
  139. /* if the data is in write only mode, there is no need for a source */
  140. if (mode & STARPU_R)
  141. {
  142. src_node = _starpu_select_src_node(handle);
  143. STARPU_ASSERT(src_node != requesting_node);
  144. }
  145. unsigned src_is_a_gpu = (_starpu_get_node_kind(src_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(src_node) == STARPU_OPENCL_RAM);
  146. unsigned dst_is_a_gpu = (_starpu_get_node_kind(requesting_node) == STARPU_CUDA_RAM || _starpu_get_node_kind(requesting_node) == STARPU_OPENCL_RAM);
  147. struct starpu_data_replicate_s *src_replicate = &handle->per_node[src_node];
  148. /* we have to perform 2 successive requests for GPU->GPU transfers */
  149. if ((mode & STARPU_R) && (src_is_a_gpu && dst_is_a_gpu)) {
  150. unsigned reuse_r_src_to_ram;
  151. starpu_data_request_t r_src_to_ram;
  152. starpu_data_request_t r_ram_to_dst;
  153. struct starpu_data_replicate_s *ram_replicate = &handle->per_node[0];
  154. /* XXX we hardcore 0 as the RAM node ... */
  155. /* We put a 1 in the number of dependencies because this
  156. * depends on the r_src_to_ram request. */
  157. r_ram_to_dst = _starpu_create_data_request(handle, ram_replicate,
  158. dst_replicate, requesting_node, mode, 1, is_prefetch);
  159. if (!is_prefetch)
  160. r_ram_to_dst->refcnt++;
  161. r_src_to_ram = _starpu_search_existing_data_request(ram_replicate, mode);
  162. reuse_r_src_to_ram = r_src_to_ram?1:0;
  163. if (!r_src_to_ram)
  164. {
  165. r_src_to_ram = _starpu_create_data_request(handle, src_replicate,
  166. ram_replicate, src_node, mode, 0, is_prefetch);
  167. }
  168. /* we chain both requests */
  169. r_src_to_ram->next_req[r_src_to_ram->next_req_count++]= r_ram_to_dst;
  170. _starpu_data_request_append_callback(r_ram_to_dst, callback_func, callback_arg);
  171. if (reuse_r_src_to_ram)
  172. _starpu_spin_unlock(&r_src_to_ram->lock);
  173. _starpu_spin_unlock(&handle->header_lock);
  174. /* we only submit the first request, the remaining will be automatically submitted afterward */
  175. if (!reuse_r_src_to_ram)
  176. _starpu_post_data_request(r_src_to_ram, src_node);
  177. /* the application only waits for the termination of the last request */
  178. r = r_ram_to_dst;
  179. }
  180. else {
  181. /* who will perform that request ? */
  182. uint32_t handling_node =
  183. _starpu_select_node_to_handle_request(src_node, requesting_node);
  184. r = _starpu_create_data_request(handle, src_replicate,
  185. dst_replicate, handling_node, mode, 0, is_prefetch);
  186. _starpu_data_request_append_callback(r, callback_func, callback_arg);
  187. if (!is_prefetch)
  188. r->refcnt++;
  189. _starpu_spin_unlock(&handle->header_lock);
  190. _starpu_post_data_request(r, handling_node);
  191. }
  192. return r;
  193. }
  194. int _starpu_fetch_data_on_node(starpu_data_handle handle, struct starpu_data_replicate_s *dst_replicate,
  195. starpu_access_mode mode, unsigned is_prefetch,
  196. void (*callback_func)(void *), void *callback_arg)
  197. {
  198. uint32_t local_node = _starpu_get_local_memory_node();
  199. _STARPU_LOG_IN();
  200. unsigned requesting_node = dst_replicate->memory_node;
  201. while (_starpu_spin_trylock(&handle->header_lock))
  202. _starpu_datawizard_progress(local_node, 1);
  203. if (!is_prefetch)
  204. dst_replicate->refcnt++;
  205. if (dst_replicate->state != STARPU_INVALID)
  206. {
  207. /* the data is already available so we can stop */
  208. _starpu_update_data_state(handle, dst_replicate, mode);
  209. _starpu_msi_cache_hit(requesting_node);
  210. _starpu_spin_unlock(&handle->header_lock);
  211. if (callback_func)
  212. callback_func(callback_arg);
  213. _STARPU_LOG_OUT_TAG("data available");
  214. return 0;
  215. }
  216. /* the only remaining situation is that the local copy was invalid */
  217. STARPU_ASSERT(dst_replicate->state == STARPU_INVALID);
  218. _starpu_msi_cache_miss(requesting_node);
  219. starpu_data_request_t r;
  220. /* is there already a pending request ? */
  221. r = _starpu_search_existing_data_request(dst_replicate, mode);
  222. /* at the exit of _starpu_search_existing_data_request the lock is taken if the request existed ! */
  223. if (!r) {
  224. r = create_new_request_to_fetch_data(handle, dst_replicate, mode, is_prefetch, callback_func, callback_arg);
  225. }
  226. else {
  227. /* the lock was taken by _starpu_search_existing_data_request */
  228. _starpu_data_request_append_callback(r, callback_func, callback_arg);
  229. /* there is already a similar request */
  230. if (is_prefetch)
  231. {
  232. _starpu_spin_unlock(&r->lock);
  233. _starpu_spin_unlock(&handle->header_lock);
  234. _STARPU_LOG_OUT_TAG("similar request");
  235. return 0;
  236. }
  237. r->refcnt++;
  238. //_starpu_spin_lock(&r->lock);
  239. if (r->is_a_prefetch_request)
  240. {
  241. /* transform that prefetch request into a "normal" request */
  242. r->is_a_prefetch_request = 0;
  243. /* transform that request into the proper access mode (prefetch could be read only) */
  244. r->mode |= mode;
  245. }
  246. _starpu_spin_unlock(&r->lock);
  247. _starpu_spin_unlock(&handle->header_lock);
  248. }
  249. int ret = is_prefetch?0:_starpu_wait_data_request_completion(r, 1);
  250. _STARPU_LOG_OUT();
  251. return ret;
  252. }
  253. static int prefetch_data_on_node(starpu_data_handle handle, struct starpu_data_replicate_s *replicate, starpu_access_mode mode)
  254. {
  255. return _starpu_fetch_data_on_node(handle, replicate, mode, 1, NULL, NULL);
  256. }
  257. static int fetch_data(starpu_data_handle handle, struct starpu_data_replicate_s *replicate, starpu_access_mode mode)
  258. {
  259. return _starpu_fetch_data_on_node(handle, replicate, mode, 0, NULL, NULL);
  260. }
  261. inline uint32_t _starpu_get_data_refcnt(starpu_data_handle handle, uint32_t node)
  262. {
  263. return handle->per_node[node].refcnt;
  264. }
  265. size_t _starpu_data_get_size(starpu_data_handle handle)
  266. {
  267. return handle->data_size;
  268. }
  269. uint32_t _starpu_data_get_footprint(starpu_data_handle handle)
  270. {
  271. return handle->footprint;
  272. }
  273. /* in case the data was accessed on a write mode, do not forget to
  274. * make it accessible again once it is possible ! */
  275. void _starpu_release_data_on_node(starpu_data_handle handle, uint32_t default_wt_mask, struct starpu_data_replicate_s *replicate)
  276. {
  277. uint32_t wt_mask;
  278. wt_mask = default_wt_mask | handle->wt_mask;
  279. /* Note that it is possible that there is no valid copy of the data (if
  280. * starpu_data_invalidate was called for instance). In that case, we do
  281. * not enforce any write-through mechanism. */
  282. unsigned memory_node = replicate->memory_node;
  283. if (replicate->state != STARPU_INVALID)
  284. if ((wt_mask & ~(1<<memory_node)))
  285. _starpu_write_through_data(handle, memory_node, wt_mask);
  286. uint32_t local_node = _starpu_get_local_memory_node();
  287. while (_starpu_spin_trylock(&handle->header_lock))
  288. _starpu_datawizard_progress(local_node, 1);
  289. replicate->refcnt--;
  290. STARPU_ASSERT(replicate->refcnt >= 0);
  291. /* In case there was a temporary handle (eg. used for reduction), this
  292. * handle may have requested to be destroyed when the data is released
  293. * */
  294. unsigned handle_was_destroyed = handle->lazy_unregister;
  295. _starpu_notify_data_dependencies(handle);
  296. if (!handle_was_destroyed)
  297. _starpu_spin_unlock(&handle->header_lock);
  298. }
  299. static void _starpu_set_data_requested_flag_if_needed(struct starpu_data_replicate_s *replicate)
  300. {
  301. // XXX : this is just a hint, so we don't take the lock ...
  302. // pthread_spin_lock(&handle->header_lock);
  303. if (replicate->state == STARPU_INVALID)
  304. replicate->requested = 1;
  305. // pthread_spin_unlock(&handle->header_lock);
  306. }
  307. int starpu_prefetch_task_input_on_node(struct starpu_task *task, uint32_t node)
  308. {
  309. starpu_buffer_descr *descrs = task->buffers;
  310. unsigned nbuffers = task->cl->nbuffers;
  311. unsigned index;
  312. for (index = 0; index < nbuffers; index++)
  313. {
  314. starpu_data_handle handle = descrs[index].handle;
  315. starpu_access_mode mode = descrs[index].mode;
  316. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  317. continue;
  318. struct starpu_data_replicate_s *replicate = &handle->per_node[node];
  319. prefetch_data_on_node(handle, replicate, mode);
  320. _starpu_set_data_requested_flag_if_needed(replicate);
  321. }
  322. return 0;
  323. }
  324. int _starpu_fetch_task_input(struct starpu_task *task, uint32_t mask)
  325. {
  326. STARPU_TRACE_START_FETCH_INPUT(NULL);
  327. int profiling = starpu_profiling_status_get();
  328. if (profiling && task->profiling_info)
  329. starpu_clock_gettime(&task->profiling_info->acquire_data_start_time);
  330. starpu_buffer_descr *descrs = task->buffers;
  331. unsigned nbuffers = task->cl->nbuffers;
  332. unsigned local_memory_node = _starpu_get_local_memory_node();
  333. int workerid = starpu_worker_get_id();
  334. unsigned index;
  335. for (index = 0; index < nbuffers; index++)
  336. {
  337. int ret;
  338. starpu_data_handle handle = descrs[index].handle;
  339. starpu_access_mode mode = descrs[index].mode;
  340. struct starpu_data_replicate_s *local_replicate;
  341. if (mode & (STARPU_SCRATCH|STARPU_REDUX))
  342. {
  343. local_replicate = &handle->per_worker[workerid];
  344. }
  345. else {
  346. /* That's a "normal" buffer (R/W) */
  347. local_replicate = &handle->per_node[local_memory_node];
  348. }
  349. ret = fetch_data(handle, local_replicate, mode);
  350. if (STARPU_UNLIKELY(ret))
  351. goto enomem;
  352. task->interfaces[index] = local_replicate->data_interface;
  353. if (mode & STARPU_REDUX)
  354. {
  355. /* If the replicate was not initialized yet, we have to do it now */
  356. if (!local_replicate->initialized)
  357. _starpu_redux_init_data_replicate(handle, local_replicate, workerid);
  358. }
  359. }
  360. if (profiling && task->profiling_info)
  361. starpu_clock_gettime(&task->profiling_info->acquire_data_end_time);
  362. STARPU_TRACE_END_FETCH_INPUT(NULL);
  363. return 0;
  364. enomem:
  365. /* try to unreference all the input that were successfully taken */
  366. /* XXX broken ... */
  367. _STARPU_DISP("something went wrong with buffer %u\n", index);
  368. //push_codelet_output(task, index, mask);
  369. _starpu_push_task_output(task, mask);
  370. return -1;
  371. }
  372. void _starpu_push_task_output(struct starpu_task *task, uint32_t mask)
  373. {
  374. STARPU_TRACE_START_PUSH_OUTPUT(NULL);
  375. int profiling = starpu_profiling_status_get();
  376. if (profiling && task->profiling_info)
  377. starpu_clock_gettime(&task->profiling_info->release_data_start_time);
  378. starpu_buffer_descr *descrs = task->buffers;
  379. unsigned nbuffers = task->cl->nbuffers;
  380. unsigned index;
  381. for (index = 0; index < nbuffers; index++)
  382. {
  383. starpu_data_handle handle = descrs[index].handle;
  384. starpu_access_mode mode = descrs[index].mode;
  385. struct starpu_data_replicate_s *replicate;
  386. if (mode & STARPU_RW)
  387. {
  388. unsigned local_node = _starpu_get_local_memory_node();
  389. replicate = &handle->per_node[local_node];
  390. }
  391. else
  392. {
  393. int workerid = starpu_worker_get_id();
  394. replicate = &handle->per_worker[workerid];
  395. }
  396. /* In case there was a temporary handle (eg. used for
  397. * reduction), this handle may have requested to be destroyed
  398. * when the data is released
  399. * */
  400. unsigned handle_was_destroyed = handle->lazy_unregister;
  401. _starpu_release_data_on_node(handle, mask, replicate);
  402. if (!handle_was_destroyed)
  403. _starpu_release_data_enforce_sequential_consistency(task, handle);
  404. }
  405. if (profiling && task->profiling_info)
  406. starpu_clock_gettime(&task->profiling_info->release_data_end_time);
  407. STARPU_TRACE_END_PUSH_OUTPUT(NULL);
  408. }
  409. /* NB : this value can only be an indication of the status of a data
  410. at some point, but there is no strong garantee ! */
  411. unsigned _starpu_is_data_present_or_requested(starpu_data_handle handle, uint32_t node)
  412. {
  413. unsigned ret = 0;
  414. // XXX : this is just a hint, so we don't take the lock ...
  415. // pthread_spin_lock(&handle->header_lock);
  416. if (handle->per_node[node].state != STARPU_INVALID
  417. || handle->per_node[node].requested || handle->per_node[node].request)
  418. ret = 1;
  419. // pthread_spin_unlock(&handle->header_lock);
  420. return ret;
  421. }