copy_driver.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2008-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <starpu.h>
  17. #include <common/config.h>
  18. #include <common/utils.h>
  19. #include <core/sched_policy.h>
  20. #include <datawizard/datastats.h>
  21. #include <datawizard/memory_nodes.h>
  22. #include <drivers/disk/driver_disk.h>
  23. #include <drivers/mpi/driver_mpi_sink.h>
  24. #include <drivers/mpi/driver_mpi_source.h>
  25. #include <drivers/mpi/driver_mpi_common.h>
  26. #include <drivers/mic/driver_mic_source.h>
  27. #include <common/fxt.h>
  28. #include <datawizard/copy_driver.h>
  29. #include <datawizard/memalloc.h>
  30. #include <starpu_opencl.h>
  31. #include <starpu_cuda.h>
  32. #include <profiling/profiling.h>
  33. #include <core/disk.h>
  34. #ifdef STARPU_SIMGRID
  35. #include <core/simgrid.h>
  36. #endif
  37. void _starpu_wake_all_blocked_workers_on_node(unsigned nodeid)
  38. {
  39. /* wake up all workers on that memory node */
  40. struct _starpu_memory_node_descr * const descr = _starpu_memory_node_get_description();
  41. const int cur_workerid = starpu_worker_get_id();
  42. struct _starpu_worker *cur_worker = cur_workerid>=0?_starpu_get_worker_struct(cur_workerid):NULL;
  43. STARPU_PTHREAD_RWLOCK_RDLOCK(&descr->conditions_rwlock);
  44. unsigned nconds = descr->condition_count[nodeid];
  45. unsigned cond_id;
  46. for (cond_id = 0; cond_id < nconds; cond_id++)
  47. {
  48. struct _starpu_cond_and_worker *condition;
  49. condition = &descr->conditions_attached_to_node[nodeid][cond_id];
  50. if (condition->worker == cur_worker)
  51. {
  52. if (condition->cond == &condition->worker->sched_cond)
  53. {
  54. condition->worker->state_keep_awake = 1;
  55. }
  56. /* No need to wake myself, and I might be called from
  57. * the scheduler with mutex locked, through
  58. * starpu_prefetch_task_input_on_node */
  59. continue;
  60. }
  61. /* wake anybody waiting on that condition */
  62. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&condition->worker->sched_mutex);
  63. if (condition->cond == &condition->worker->sched_cond)
  64. {
  65. condition->worker->state_keep_awake = 1;
  66. }
  67. STARPU_PTHREAD_COND_BROADCAST(condition->cond);
  68. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&condition->worker->sched_mutex);
  69. }
  70. STARPU_PTHREAD_RWLOCK_UNLOCK(&descr->conditions_rwlock);
  71. #ifdef STARPU_SIMGRID
  72. starpu_pthread_queue_broadcast(&_starpu_simgrid_transfer_queue[nodeid]);
  73. #endif
  74. }
  75. void starpu_wake_all_blocked_workers(void)
  76. {
  77. /* workers may be blocked on the various queues' conditions */
  78. struct _starpu_memory_node_descr * const descr = _starpu_memory_node_get_description();
  79. const int cur_workerid = starpu_worker_get_id();
  80. struct _starpu_worker *cur_worker = cur_workerid>=0?_starpu_get_worker_struct(cur_workerid):NULL;
  81. STARPU_PTHREAD_RWLOCK_RDLOCK(&descr->conditions_rwlock);
  82. unsigned nconds = descr->total_condition_count;
  83. unsigned cond_id;
  84. for (cond_id = 0; cond_id < nconds; cond_id++)
  85. {
  86. struct _starpu_cond_and_worker *condition;
  87. condition = &descr->conditions_all[cond_id];
  88. if (condition->worker == cur_worker)
  89. {
  90. if (condition->cond == &condition->worker->sched_cond)
  91. {
  92. condition->worker->state_keep_awake = 1;
  93. }
  94. /* No need to wake myself, and I might be called from
  95. * the scheduler with mutex locked, through
  96. * starpu_prefetch_task_input_on_node */
  97. continue;
  98. }
  99. /* wake anybody waiting on that condition */
  100. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&condition->worker->sched_mutex);
  101. if (condition->cond == &condition->worker->sched_cond)
  102. {
  103. condition->worker->state_keep_awake = 1;
  104. }
  105. STARPU_PTHREAD_COND_BROADCAST(condition->cond);
  106. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&condition->worker->sched_mutex);
  107. }
  108. STARPU_PTHREAD_RWLOCK_UNLOCK(&descr->conditions_rwlock);
  109. #ifdef STARPU_SIMGRID
  110. unsigned workerid, nodeid;
  111. for (workerid = 0; workerid < starpu_worker_get_count(); workerid++)
  112. starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[workerid]);
  113. for (nodeid = 0; nodeid < starpu_memory_nodes_get_count(); nodeid++)
  114. starpu_pthread_queue_broadcast(&_starpu_simgrid_transfer_queue[nodeid]);
  115. #endif
  116. }
  117. #ifdef STARPU_USE_FXT
  118. /* we need to identify each communication so that we can match the beginning
  119. * and the end of a communication in the trace, so we use a unique identifier
  120. * per communication */
  121. static unsigned long communication_cnt = 0;
  122. #endif
  123. static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
  124. struct _starpu_data_replicate *src_replicate,
  125. struct _starpu_data_replicate *dst_replicate,
  126. struct _starpu_data_request *req)
  127. {
  128. unsigned src_node = (unsigned)src_replicate->memory_node;
  129. unsigned dst_node = (unsigned)dst_replicate->memory_node;
  130. STARPU_ASSERT(src_replicate->refcnt);
  131. STARPU_ASSERT(dst_replicate->refcnt);
  132. STARPU_ASSERT(src_replicate->allocated);
  133. STARPU_ASSERT(dst_replicate->allocated);
  134. #ifdef STARPU_SIMGRID
  135. if (src_node == STARPU_MAIN_RAM || dst_node == STARPU_MAIN_RAM)
  136. _starpu_simgrid_data_transfer(handle->ops->get_size(handle), src_node, dst_node);
  137. return _starpu_simgrid_transfer(handle->ops->get_size(handle), src_node, dst_node, req);
  138. #else /* !SIMGRID */
  139. enum starpu_node_kind dst_kind = starpu_node_get_kind(dst_node);
  140. void *src_interface = src_replicate->data_interface;
  141. void *dst_interface = dst_replicate->data_interface;
  142. #if defined(STARPU_USE_CUDA) && defined(STARPU_HAVE_CUDA_MEMCPY_PEER) && !defined(STARPU_SIMGRID)
  143. enum starpu_node_kind src_kind = starpu_node_get_kind(src_node);
  144. if ((src_kind == STARPU_CUDA_RAM) || (dst_kind == STARPU_CUDA_RAM))
  145. {
  146. unsigned devid;
  147. if ((src_kind == STARPU_CUDA_RAM) && (dst_kind == STARPU_CUDA_RAM))
  148. {
  149. /* GPU-GPU transfer, issue it from the destination */
  150. devid = starpu_memory_node_get_devid(dst_node);
  151. }
  152. else
  153. {
  154. unsigned node = (dst_kind == STARPU_CUDA_RAM)?dst_node:src_node;
  155. devid = starpu_memory_node_get_devid(node);
  156. }
  157. starpu_cuda_set_device(devid);
  158. }
  159. #endif
  160. struct _starpu_node_ops *node_ops = _starpu_memory_node_get_node_ops(src_node);
  161. if (node_ops && node_ops->copy_interface_to[dst_kind])
  162. {
  163. return node_ops->copy_interface_to[dst_kind](handle, src_interface, src_node, dst_interface, dst_node, req);
  164. }
  165. else
  166. {
  167. STARPU_ABORT_MSG("No copy_interface_to function defined from node %s to node %s\n", _starpu_node_get_prefix(starpu_node_get_kind(src_node)), _starpu_node_get_prefix(starpu_node_get_kind(dst_node)));
  168. }
  169. #endif /* !SIMGRID */
  170. }
  171. int STARPU_ATTRIBUTE_WARN_UNUSED_RESULT _starpu_driver_copy_data_1_to_1(starpu_data_handle_t handle,
  172. struct _starpu_data_replicate *src_replicate,
  173. struct _starpu_data_replicate *dst_replicate,
  174. unsigned donotread,
  175. struct _starpu_data_request *req,
  176. unsigned may_alloc,
  177. enum _starpu_is_prefetch prefetch STARPU_ATTRIBUTE_UNUSED)
  178. {
  179. if (!donotread)
  180. {
  181. STARPU_ASSERT(src_replicate->allocated);
  182. STARPU_ASSERT(src_replicate->refcnt);
  183. }
  184. unsigned src_node = src_replicate->memory_node;
  185. unsigned dst_node = dst_replicate->memory_node;
  186. /* first make sure the destination has an allocated buffer */
  187. if (!dst_replicate->allocated)
  188. {
  189. if (!may_alloc || _starpu_is_reclaiming(dst_node))
  190. /* We're not supposed to allocate there at the moment */
  191. return -ENOMEM;
  192. int ret_alloc = _starpu_allocate_memory_on_node(handle, dst_replicate, req ? req->prefetch : STARPU_FETCH);
  193. if (ret_alloc)
  194. return -ENOMEM;
  195. }
  196. STARPU_ASSERT(dst_replicate->allocated);
  197. STARPU_ASSERT(dst_replicate->refcnt);
  198. /* if there is no need to actually read the data,
  199. * we do not perform any transfer */
  200. if (!donotread)
  201. {
  202. unsigned long STARPU_ATTRIBUTE_UNUSED com_id = 0;
  203. size_t size = _starpu_data_get_size(handle);
  204. _starpu_bus_update_profiling_info((int)src_node, (int)dst_node, size);
  205. #ifdef STARPU_USE_FXT
  206. com_id = STARPU_ATOMIC_ADDL(&communication_cnt, 1);
  207. if (req)
  208. req->com_id = com_id;
  209. #endif
  210. dst_replicate->initialized = 1;
  211. _STARPU_TRACE_START_DRIVER_COPY(src_node, dst_node, size, com_id, prefetch, handle);
  212. int ret_copy = copy_data_1_to_1_generic(handle, src_replicate, dst_replicate, req);
  213. if (!req)
  214. /* Synchronous, this is already finished */
  215. _STARPU_TRACE_END_DRIVER_COPY(src_node, dst_node, size, com_id, prefetch);
  216. return ret_copy;
  217. }
  218. return 0;
  219. }
  220. void starpu_interface_data_copy(unsigned src_node, unsigned dst_node, size_t size)
  221. {
  222. _STARPU_TRACE_DATA_COPY(src_node, dst_node, size);
  223. }
  224. void starpu_interface_start_driver_copy_async(unsigned src_node, unsigned dst_node, double *start)
  225. {
  226. *start = starpu_timing_now();
  227. _STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
  228. }
  229. void starpu_interface_end_driver_copy_async(unsigned src_node, unsigned dst_node, double start)
  230. {
  231. double end = starpu_timing_now();
  232. double elapsed = end - start;
  233. if (elapsed > 300)
  234. {
  235. static int warned = 0;
  236. if (!warned)
  237. {
  238. char src_name[16], dst_name[16];
  239. warned = 1;
  240. starpu_memory_node_get_name(src_node, src_name, sizeof(src_name));
  241. starpu_memory_node_get_name(dst_node, dst_name, sizeof(dst_name));
  242. _STARPU_DISP("Warning: the submission of asynchronous transfer from %s to %s took a very long time (%f ms)\nFor proper asynchronous transfer overlapping, data registered to StarPU must be allocated with starpu_malloc() or pinned with starpu_memory_pin()\n", src_name, dst_name, elapsed / 1000.);
  243. }
  244. }
  245. _STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
  246. }
  247. /* This can be used by interfaces to easily transfer a piece of data without
  248. * caring about the particular transfer methods. */
  249. /* This should either return 0 if the transfer is complete, or -EAGAIN if the
  250. * transfer is still pending, and will have to be waited for by
  251. * _starpu_driver_test_request_completion/_starpu_driver_wait_request_completion
  252. */
  253. int starpu_interface_copy(uintptr_t src, size_t src_offset, unsigned src_node, uintptr_t dst, size_t dst_offset, unsigned dst_node, size_t size, void *async_data)
  254. {
  255. struct _starpu_async_channel *async_channel = async_data;
  256. enum starpu_node_kind dst_kind = starpu_node_get_kind(dst_node);
  257. struct _starpu_node_ops *node_ops = _starpu_memory_node_get_node_ops(src_node);
  258. if (node_ops && node_ops->copy_data_to[dst_kind])
  259. {
  260. return node_ops->copy_data_to[dst_kind](src, src_offset, src_node,
  261. dst, dst_offset, dst_node,
  262. size,
  263. async_channel);
  264. }
  265. else
  266. {
  267. STARPU_ABORT_MSG("No copy_data_to function defined from node %s to node %s\n", _starpu_node_get_prefix(starpu_node_get_kind(src_node)), _starpu_node_get_prefix(starpu_node_get_kind(dst_node)));
  268. return -1;
  269. }
  270. }
  271. int starpu_interface_copy2d(uintptr_t src, size_t src_offset, unsigned src_node,
  272. uintptr_t dst, size_t dst_offset, unsigned dst_node,
  273. size_t blocksize,
  274. size_t numblocks, size_t ld_src, size_t ld_dst,
  275. void *async_data)
  276. {
  277. int ret = 0;
  278. unsigned i;
  279. struct _starpu_async_channel *async_channel = async_data;
  280. enum starpu_node_kind dst_kind = starpu_node_get_kind(dst_node);
  281. struct _starpu_node_ops *node_ops = _starpu_memory_node_get_node_ops(src_node);
  282. STARPU_ASSERT_MSG(ld_src >= blocksize, "block size %lu is bigger than ld %lu in source", (unsigned long) blocksize, (unsigned long) ld_src);
  283. STARPU_ASSERT_MSG(ld_dst >= blocksize, "block size %lu is bigger than ld %lu in destination", (unsigned long) blocksize, (unsigned long) ld_dst);
  284. if (ld_src == blocksize && ld_dst == blocksize)
  285. /* Optimize contiguous case */
  286. return starpu_interface_copy(src, src_offset, src_node,
  287. dst, dst_offset, dst_node,
  288. blocksize * numblocks, async_data);
  289. if (node_ops && node_ops->copy2d_data_to[dst_kind])
  290. /* Hardware-optimized non-contiguous case */
  291. return node_ops->copy2d_data_to[dst_kind](src, src_offset, src_node,
  292. dst, dst_offset, dst_node,
  293. blocksize,
  294. numblocks, ld_src, ld_dst,
  295. async_channel);
  296. for (i = 0; i < numblocks; i++)
  297. {
  298. if (starpu_interface_copy(src, src_offset + i*ld_src, src_node,
  299. dst, dst_offset + i*ld_dst, dst_node,
  300. blocksize, async_data))
  301. ret = -EAGAIN;
  302. }
  303. return ret;
  304. }
  305. int starpu_interface_copy3d(uintptr_t src, size_t src_offset, unsigned src_node,
  306. uintptr_t dst, size_t dst_offset, unsigned dst_node,
  307. size_t blocksize,
  308. size_t numblocks_1, size_t ld1_src, size_t ld1_dst,
  309. size_t numblocks_2, size_t ld2_src, size_t ld2_dst,
  310. void *async_data)
  311. {
  312. int ret = 0;
  313. unsigned i;
  314. struct _starpu_async_channel *async_channel = async_data;
  315. enum starpu_node_kind dst_kind = starpu_node_get_kind(dst_node);
  316. struct _starpu_node_ops *node_ops = _starpu_memory_node_get_node_ops(src_node);
  317. STARPU_ASSERT_MSG(ld1_src >= blocksize, "block size %lu is bigger than ld %lu in source", (unsigned long) blocksize, (unsigned long) ld1_src);
  318. STARPU_ASSERT_MSG(ld1_dst >= blocksize, "block size %lu is bigger than ld %lu in destination", (unsigned long) blocksize, (unsigned long) ld1_dst);
  319. STARPU_ASSERT_MSG(ld2_src >= numblocks_1 * ld1_src, "block group size %lu is bigger than group ld %lu in source", (unsigned long) numblocks_1 * ld1_src, (unsigned long) ld2_src);
  320. STARPU_ASSERT_MSG(ld2_dst >= numblocks_1 * ld1_dst, "block group size %lu is bigger than group ld %lu in destination", (unsigned long) numblocks_1 * ld1_dst, (unsigned long) ld2_dst);
  321. if (ld1_src * ld2_src == blocksize * numblocks_1 &&
  322. ld1_dst * ld2_dst == blocksize * numblocks_1)
  323. /* Optimize contiguous case */
  324. return starpu_interface_copy(src, src_offset, src_node,
  325. dst, dst_offset, dst_node,
  326. blocksize * numblocks_1 * numblocks_2,
  327. async_data);
  328. if (node_ops && node_ops->copy3d_data_to[dst_kind])
  329. /* Hardware-optimized non-contiguous case */
  330. return node_ops->copy3d_data_to[dst_kind](src, src_offset, src_node,
  331. dst, dst_offset, dst_node,
  332. blocksize,
  333. numblocks_1, ld1_src, ld1_dst,
  334. numblocks_2, ld2_src, ld2_dst,
  335. async_channel);
  336. for (i = 0; i < numblocks_2; i++)
  337. {
  338. if (starpu_interface_copy2d(src, src_offset + i*ld2_src, src_node,
  339. dst, dst_offset + i*ld2_dst, dst_node,
  340. blocksize, numblocks_1, ld1_src, ld1_dst,
  341. async_data))
  342. ret = -EAGAIN;
  343. }
  344. return ret;
  345. }
  346. int starpu_interface_copy4d(uintptr_t src, size_t src_offset, unsigned src_node,
  347. uintptr_t dst, size_t dst_offset, unsigned dst_node,
  348. size_t blocksize,
  349. size_t numblocks_1, size_t ld1_src, size_t ld1_dst,
  350. size_t numblocks_2, size_t ld2_src, size_t ld2_dst,
  351. size_t numblocks_3, size_t ld3_src, size_t ld3_dst,
  352. void *async_data)
  353. {
  354. int ret = 0;
  355. unsigned i;
  356. STARPU_ASSERT_MSG(ld1_src >= blocksize, "block size %lu is bigger than ld %lu in source", (unsigned long) blocksize, (unsigned long) ld1_src);
  357. STARPU_ASSERT_MSG(ld1_dst >= blocksize, "block size %lu is bigger than ld %lu in destination", (unsigned long) blocksize, (unsigned long) ld1_dst);
  358. STARPU_ASSERT_MSG(ld2_src >= numblocks_1 * ld1_src, "block group size %lu is bigger than group ld %lu in source", (unsigned long) numblocks_1 * ld1_src, (unsigned long) ld2_src);
  359. STARPU_ASSERT_MSG(ld2_dst >= numblocks_1 * ld1_dst, "block group size %lu is bigger than group ld %lu in destination", (unsigned long) numblocks_1 * ld1_dst, (unsigned long) ld2_dst);
  360. STARPU_ASSERT_MSG(ld3_src >= numblocks_2 * ld2_src, "block group group size %lu is bigger than group group ld %lu in source", (unsigned long) numblocks_2 * ld2_src, (unsigned long) ld3_src);
  361. STARPU_ASSERT_MSG(ld3_dst >= numblocks_2 * ld2_dst, "block group group size %lu is bigger than group group ld %lu in destination", (unsigned long) numblocks_2 * ld2_dst, (unsigned long) ld3_dst);
  362. if (ld1_src * ld2_src * ld3_src == blocksize * numblocks_1 * numblocks_2 &&
  363. ld1_dst * ld2_dst * ld3_dst == blocksize * numblocks_1 * numblocks_2)
  364. /* Optimize contiguous case */
  365. return starpu_interface_copy(src, src_offset, src_node,
  366. dst, dst_offset, dst_node,
  367. blocksize * numblocks_1 * numblocks_2 * numblocks_3,
  368. async_data);
  369. /* Probably won't ever have a 4D interface in drivers :) */
  370. for (i = 0; i < numblocks_3; i++)
  371. {
  372. if (starpu_interface_copy3d(src, src_offset + i*ld3_src, src_node,
  373. dst, dst_offset + i*ld3_dst, dst_node,
  374. blocksize,
  375. numblocks_1, ld1_src, ld1_dst,
  376. numblocks_2, ld2_src, ld2_dst,
  377. async_data))
  378. ret = -EAGAIN;
  379. }
  380. return ret;
  381. }
  382. void _starpu_driver_wait_request_completion(struct _starpu_async_channel *async_channel)
  383. {
  384. #ifdef STARPU_SIMGRID
  385. _starpu_simgrid_wait_transfer_event(&async_channel->event);
  386. #else /* !SIMGRID */
  387. struct _starpu_node_ops *node_ops = async_channel->node_ops;
  388. if (node_ops && node_ops->wait_request_completion != NULL)
  389. {
  390. node_ops->wait_request_completion(async_channel);
  391. }
  392. else
  393. {
  394. STARPU_ABORT_MSG("No wait_request_completion function defined for node %s\n", node_ops?node_ops->name:"unknown");
  395. }
  396. #endif /* !SIMGRID */
  397. }
  398. unsigned _starpu_driver_test_request_completion(struct _starpu_async_channel *async_channel)
  399. {
  400. #ifdef STARPU_SIMGRID
  401. return _starpu_simgrid_test_transfer_event(&async_channel->event);
  402. #else /* !SIMGRID */
  403. struct _starpu_node_ops *node_ops = async_channel->node_ops;
  404. if (node_ops && node_ops->test_request_completion != NULL)
  405. {
  406. return node_ops->test_request_completion(async_channel);
  407. }
  408. else
  409. {
  410. STARPU_ABORT_MSG("No test_request_completion function defined for node %s\n", node_ops?node_ops->name:"unknown");
  411. }
  412. #endif /* !SIMGRID */
  413. }