copy_driver.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. /*
  2. * StarPU
  3. * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <starpu.h>
  17. #include <common/config.h>
  18. #include <common/utils.h>
  19. #include <core/policies/sched_policy.h>
  20. #include <datawizard/datastats.h>
  21. #include <common/fxt.h>
  22. #include "copy_driver.h"
  23. #include "memalloc.h"
  24. #include "starpu_opencl.h"
  25. #include <profiling/profiling.h>
  26. void _starpu_wake_all_blocked_workers_on_node(unsigned nodeid)
  27. {
  28. /* wake up all queues on that node */
  29. unsigned q_id;
  30. starpu_mem_node_descr * const descr = _starpu_get_memory_node_description();
  31. PTHREAD_RWLOCK_RDLOCK(&descr->attached_queues_rwlock);
  32. unsigned nqueues = descr->queues_count[nodeid];
  33. for (q_id = 0; q_id < nqueues; q_id++)
  34. {
  35. struct starpu_jobq_s *q;
  36. q = descr->attached_queues_per_node[nodeid][q_id];
  37. /* wake anybody waiting on that queue */
  38. PTHREAD_MUTEX_LOCK(&q->activity_mutex);
  39. PTHREAD_COND_BROADCAST(&q->activity_cond);
  40. PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
  41. }
  42. PTHREAD_RWLOCK_UNLOCK(&descr->attached_queues_rwlock);
  43. }
  44. void starpu_wake_all_blocked_workers(void)
  45. {
  46. /* workers may be blocked on the policy's global condition */
  47. struct starpu_sched_policy_s *sched = _starpu_get_sched_policy();
  48. pthread_cond_t *sched_cond = &sched->sched_activity_cond;
  49. pthread_mutex_t *sched_mutex = &sched->sched_activity_mutex;
  50. PTHREAD_MUTEX_LOCK(sched_mutex);
  51. PTHREAD_COND_BROADCAST(sched_cond);
  52. PTHREAD_MUTEX_UNLOCK(sched_mutex);
  53. /* workers may be blocked on the various queues' conditions */
  54. unsigned node;
  55. unsigned nnodes = _starpu_get_memory_nodes_count();
  56. for (node = 0; node < nnodes; node++)
  57. {
  58. _starpu_wake_all_blocked_workers_on_node(node);
  59. }
  60. }
  61. #ifdef STARPU_USE_FXT
  62. /* we need to identify each communication so that we can match the beginning
  63. * and the end of a communication in the trace, so we use a unique identifier
  64. * per communication */
  65. static unsigned communication_cnt = 0;
  66. #endif
  67. static int copy_data_1_to_1_generic(starpu_data_handle handle, uint32_t src_node, uint32_t dst_node, struct starpu_data_request_s *req __attribute__((unused)))
  68. {
  69. int ret = 0;
  70. const struct starpu_data_copy_methods *copy_methods = handle->ops->copy_methods;
  71. starpu_node_kind src_kind = _starpu_get_node_kind(src_node);
  72. starpu_node_kind dst_kind = _starpu_get_node_kind(dst_node);
  73. STARPU_ASSERT(handle->per_node[src_node].refcnt);
  74. STARPU_ASSERT(handle->per_node[dst_node].refcnt);
  75. STARPU_ASSERT(handle->per_node[src_node].allocated);
  76. STARPU_ASSERT(handle->per_node[dst_node].allocated);
  77. #ifdef STARPU_USE_CUDA
  78. cudaError_t cures;
  79. cudaStream_t *stream;
  80. #endif
  81. void *src_interface = starpu_data_get_interface_on_node(handle, src_node);
  82. void *dst_interface = starpu_data_get_interface_on_node(handle, dst_node);
  83. switch (_STARPU_MEMORY_NODE_TUPLE(src_kind,dst_kind)) {
  84. case _STARPU_MEMORY_NODE_TUPLE(STARPU_CPU_RAM,STARPU_CPU_RAM):
  85. /* STARPU_CPU_RAM -> STARPU_CPU_RAM */
  86. STARPU_ASSERT(copy_methods->ram_to_ram);
  87. copy_methods->ram_to_ram(src_interface, src_node, dst_interface, dst_node);
  88. break;
  89. #ifdef STARPU_USE_CUDA
  90. case _STARPU_MEMORY_NODE_TUPLE(STARPU_CUDA_RAM,STARPU_CPU_RAM):
  91. /* CUBLAS_RAM -> STARPU_CPU_RAM */
  92. /* only the proper CUBLAS thread can initiate this ! */
  93. if (_starpu_get_local_memory_node() == src_node) {
  94. /* only the proper CUBLAS thread can initiate this directly ! */
  95. STARPU_ASSERT(copy_methods->cuda_to_ram);
  96. if (!req || !copy_methods->cuda_to_ram_async) {
  97. /* this is not associated to a request so it's synchronous */
  98. copy_methods->cuda_to_ram(src_interface, src_node, dst_interface, dst_node);
  99. }
  100. else {
  101. cures = cudaEventCreate(&req->async_channel.cuda_event);
  102. if (STARPU_UNLIKELY(cures != cudaSuccess)) STARPU_CUDA_REPORT_ERROR(cures);
  103. stream = starpu_cuda_get_local_stream();
  104. ret = copy_methods->cuda_to_ram_async(src_interface, src_node, dst_interface, dst_node, stream);
  105. cures = cudaEventRecord(req->async_channel.cuda_event, *stream);
  106. if (STARPU_UNLIKELY(cures != cudaSuccess)) STARPU_CUDA_REPORT_ERROR(cures);
  107. }
  108. }
  109. else {
  110. /* we should not have a blocking call ! */
  111. STARPU_ABORT();
  112. }
  113. break;
  114. case _STARPU_MEMORY_NODE_TUPLE(STARPU_CPU_RAM,STARPU_CUDA_RAM):
  115. /* STARPU_CPU_RAM -> CUBLAS_RAM */
  116. /* only the proper CUBLAS thread can initiate this ! */
  117. STARPU_ASSERT(_starpu_get_local_memory_node() == dst_node);
  118. STARPU_ASSERT(copy_methods->ram_to_cuda);
  119. if (!req || !copy_methods->ram_to_cuda_async) {
  120. /* this is not associated to a request so it's synchronous */
  121. copy_methods->ram_to_cuda(src_interface, src_node, dst_interface, dst_node);
  122. }
  123. else {
  124. cures = cudaEventCreate(&req->async_channel.cuda_event);
  125. if (STARPU_UNLIKELY(cures != cudaSuccess)) STARPU_CUDA_REPORT_ERROR(cures);
  126. stream = starpu_cuda_get_local_stream();
  127. ret = copy_methods->ram_to_cuda_async(src_interface, src_node, dst_interface, dst_node, stream);
  128. cures = cudaEventRecord(req->async_channel.cuda_event, *stream);
  129. if (STARPU_UNLIKELY(cures != cudaSuccess)) STARPU_CUDA_REPORT_ERROR(cures);
  130. }
  131. break;
  132. #endif
  133. #ifdef STARPU_USE_OPENCL
  134. case _STARPU_MEMORY_NODE_TUPLE(STARPU_OPENCL_RAM,STARPU_CPU_RAM):
  135. /* OpenCL -> RAM */
  136. if (_starpu_get_local_memory_node() == src_node) {
  137. STARPU_ASSERT(copy_methods->opencl_to_ram);
  138. if (!req || !copy_methods->opencl_to_ram_async) {
  139. /* this is not associated to a request so it's synchronous */
  140. copy_methods->opencl_to_ram(src_interface, src_node, dst_interface, dst_node);
  141. }
  142. else {
  143. ret = copy_methods->opencl_to_ram_async(src_interface, src_node, dst_interface, dst_node, &(req->async_channel.opencl_event));
  144. }
  145. }
  146. else {
  147. /* we should not have a blocking call ! */
  148. STARPU_ABORT();
  149. }
  150. break;
  151. case _STARPU_MEMORY_NODE_TUPLE(STARPU_CPU_RAM,STARPU_OPENCL_RAM):
  152. /* STARPU_CPU_RAM -> STARPU_OPENCL_RAM */
  153. STARPU_ASSERT(_starpu_get_local_memory_node() == dst_node);
  154. STARPU_ASSERT(copy_methods->ram_to_opencl);
  155. if (!req || !copy_methods->ram_to_opencl_async) {
  156. /* this is not associated to a request so it's synchronous */
  157. copy_methods->ram_to_opencl(src_interface, src_node, dst_interface, dst_node);
  158. }
  159. else {
  160. ret = copy_methods->ram_to_opencl_async(src_interface, src_node, dst_interface, dst_node, &(req->async_channel.opencl_event));
  161. }
  162. break;
  163. #endif
  164. default:
  165. STARPU_ABORT();
  166. break;
  167. }
  168. return ret;
  169. }
  170. int __attribute__((warn_unused_result)) _starpu_driver_copy_data_1_to_1(starpu_data_handle handle, uint32_t src_node,
  171. uint32_t dst_node, unsigned donotread, struct starpu_data_request_s *req, unsigned may_alloc)
  172. {
  173. if (!donotread)
  174. {
  175. STARPU_ASSERT(handle->per_node[src_node].allocated);
  176. STARPU_ASSERT(handle->per_node[src_node].refcnt);
  177. }
  178. int ret_alloc, ret_copy;
  179. unsigned __attribute__((unused)) com_id = 0;
  180. /* first make sure the destination has an allocated buffer */
  181. ret_alloc = _starpu_allocate_memory_on_node(handle, dst_node, may_alloc);
  182. if (ret_alloc)
  183. goto nomem;
  184. STARPU_ASSERT(handle->per_node[dst_node].allocated);
  185. STARPU_ASSERT(handle->per_node[dst_node].refcnt);
  186. /* if there is no need to actually read the data,
  187. * we do not perform any transfer */
  188. if (!donotread) {
  189. STARPU_ASSERT(handle->ops);
  190. //STARPU_ASSERT(handle->ops->copy_data_1_to_1);
  191. size_t size = _starpu_data_get_size(handle);
  192. _starpu_bus_update_profiling_info((int)src_node, (int)dst_node, size);
  193. #ifdef STARPU_USE_FXT
  194. com_id = STARPU_ATOMIC_ADD(&communication_cnt, 1);
  195. if (req)
  196. req->com_id = com_id;
  197. #endif
  198. /* for now we set the size to 0 in the FxT trace XXX */
  199. STARPU_TRACE_START_DRIVER_COPY(src_node, dst_node, 0, com_id);
  200. ret_copy = copy_data_1_to_1_generic(handle, src_node, dst_node, req);
  201. #ifdef STARPU_USE_FXT
  202. if (ret_copy != EAGAIN)
  203. {
  204. size_t size = _starpu_data_get_size(handle);
  205. STARPU_TRACE_END_DRIVER_COPY(src_node, dst_node, size, com_id);
  206. }
  207. #endif
  208. return ret_copy;
  209. }
  210. return 0;
  211. nomem:
  212. return ENOMEM;
  213. }
  214. void _starpu_driver_wait_request_completion(starpu_async_channel *async_channel __attribute__ ((unused)),
  215. unsigned handling_node)
  216. {
  217. starpu_node_kind kind = _starpu_get_node_kind(handling_node);
  218. #ifdef STARPU_USE_CUDA
  219. cudaEvent_t event;
  220. cudaError_t cures;
  221. #endif
  222. switch (kind) {
  223. #ifdef STARPU_USE_CUDA
  224. case STARPU_CUDA_RAM:
  225. event = (*async_channel).cuda_event;
  226. cures = cudaEventSynchronize(event);
  227. if (STARPU_UNLIKELY(cures))
  228. STARPU_CUDA_REPORT_ERROR(cures);
  229. cures = cudaEventDestroy(event);
  230. if (STARPU_UNLIKELY(cures))
  231. STARPU_CUDA_REPORT_ERROR(cures);
  232. break;
  233. #endif
  234. #ifdef STARPU_USE_OPENCL
  235. case STARPU_OPENCL_RAM:
  236. {
  237. cl_event opencl_event = (*async_channel).opencl_event;
  238. if (opencl_event == NULL) STARPU_ABORT();
  239. cl_int err = clWaitForEvents(1, &opencl_event);
  240. if (err != CL_SUCCESS) STARPU_OPENCL_REPORT_ERROR(err);
  241. clReleaseEvent(opencl_event);
  242. }
  243. break;
  244. #endif
  245. case STARPU_CPU_RAM:
  246. default:
  247. STARPU_ABORT();
  248. }
  249. }
  250. unsigned _starpu_driver_test_request_completion(starpu_async_channel *async_channel __attribute__ ((unused)),
  251. unsigned handling_node)
  252. {
  253. starpu_node_kind kind = _starpu_get_node_kind(handling_node);
  254. unsigned success;
  255. #ifdef STARPU_USE_CUDA
  256. cudaEvent_t event;
  257. #endif
  258. switch (kind) {
  259. #ifdef STARPU_USE_CUDA
  260. case STARPU_CUDA_RAM:
  261. event = (*async_channel).cuda_event;
  262. success = (cudaEventQuery(event) == cudaSuccess);
  263. if (success)
  264. cudaEventDestroy(event);
  265. break;
  266. #endif
  267. #ifdef STARPU_USE_OPENCL
  268. case STARPU_OPENCL_RAM:
  269. {
  270. cl_int event_status;
  271. cl_event opencl_event = (*async_channel).opencl_event;
  272. if (opencl_event == NULL) STARPU_ABORT();
  273. cl_int err = clGetEventInfo(opencl_event, CL_EVENT_COMMAND_EXECUTION_STATUS, sizeof(event_status), &event_status, NULL);
  274. if (err != CL_SUCCESS) STARPU_OPENCL_REPORT_ERROR(err);
  275. success = (event_status == CL_COMPLETE);
  276. break;
  277. }
  278. #endif
  279. case STARPU_CPU_RAM:
  280. default:
  281. STARPU_ABORT();
  282. success = 0;
  283. }
  284. return success;
  285. }