copy_driver.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. /*
  2. * StarPU
  3. * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <starpu.h>
  17. #include <common/config.h>
  18. #include <common/utils.h>
  19. #include <core/sched_policy.h>
  20. #include <datawizard/datastats.h>
  21. #include <common/fxt.h>
  22. #include "copy_driver.h"
  23. #include "memalloc.h"
  24. #include <starpu_opencl.h>
  25. #include <starpu_cuda.h>
  26. #include <profiling/profiling.h>
  27. void _starpu_wake_all_blocked_workers_on_node(unsigned nodeid)
  28. {
  29. /* wake up all workers on that memory node */
  30. unsigned cond_id;
  31. starpu_mem_node_descr * const descr = _starpu_get_memory_node_description();
  32. PTHREAD_RWLOCK_RDLOCK(&descr->conditions_rwlock);
  33. unsigned nconds = descr->condition_count[nodeid];
  34. for (cond_id = 0; cond_id < nconds; cond_id++)
  35. {
  36. struct _cond_and_mutex *condition;
  37. condition = &descr->conditions_attached_to_node[nodeid][cond_id];
  38. /* wake anybody waiting on that condition */
  39. PTHREAD_MUTEX_LOCK(condition->mutex);
  40. PTHREAD_COND_BROADCAST(condition->cond);
  41. PTHREAD_MUTEX_UNLOCK(condition->mutex);
  42. }
  43. PTHREAD_RWLOCK_UNLOCK(&descr->conditions_rwlock);
  44. }
  45. void starpu_wake_all_blocked_workers(void)
  46. {
  47. /* workers may be blocked on the various queues' conditions */
  48. unsigned cond_id;
  49. starpu_mem_node_descr * const descr = _starpu_get_memory_node_description();
  50. PTHREAD_RWLOCK_RDLOCK(&descr->conditions_rwlock);
  51. unsigned nconds = descr->total_condition_count;
  52. for (cond_id = 0; cond_id < nconds; cond_id++)
  53. {
  54. struct _cond_and_mutex *condition;
  55. condition = &descr->conditions_all[cond_id];
  56. /* wake anybody waiting on that condition */
  57. PTHREAD_MUTEX_LOCK(condition->mutex);
  58. PTHREAD_COND_BROADCAST(condition->cond);
  59. PTHREAD_MUTEX_UNLOCK(condition->mutex);
  60. }
  61. PTHREAD_RWLOCK_UNLOCK(&descr->conditions_rwlock);
  62. }
  63. #ifdef STARPU_USE_FXT
  64. /* we need to identify each communication so that we can match the beginning
  65. * and the end of a communication in the trace, so we use a unique identifier
  66. * per communication */
  67. static unsigned communication_cnt = 0;
  68. #endif
  69. static int copy_data_1_to_1_generic(starpu_data_handle handle, struct starpu_data_replicate_s *src_replicate, struct starpu_data_replicate_s *dst_replicate, struct starpu_data_request_s *req __attribute__((unused)))
  70. {
  71. int ret = 0;
  72. const struct starpu_data_copy_methods *copy_methods = handle->ops->copy_methods;
  73. unsigned src_node = src_replicate->memory_node;
  74. unsigned dst_node = dst_replicate->memory_node;
  75. starpu_node_kind src_kind = _starpu_get_node_kind(src_node);
  76. starpu_node_kind dst_kind = _starpu_get_node_kind(dst_node);
  77. STARPU_ASSERT(src_replicate->refcnt);
  78. STARPU_ASSERT(dst_replicate->refcnt);
  79. STARPU_ASSERT(src_replicate->allocated);
  80. STARPU_ASSERT(dst_replicate->allocated);
  81. #ifdef STARPU_USE_CUDA
  82. cudaError_t cures;
  83. cudaStream_t *stream;
  84. #endif
  85. void *src_interface = src_replicate->interface;
  86. void *dst_interface = dst_replicate->interface;
  87. switch (_STARPU_MEMORY_NODE_TUPLE(src_kind,dst_kind)) {
  88. case _STARPU_MEMORY_NODE_TUPLE(STARPU_CPU_RAM,STARPU_CPU_RAM):
  89. /* STARPU_CPU_RAM -> STARPU_CPU_RAM */
  90. STARPU_ASSERT(copy_methods->ram_to_ram);
  91. copy_methods->ram_to_ram(src_interface, src_node, dst_interface, dst_node);
  92. break;
  93. #ifdef STARPU_USE_CUDA
  94. case _STARPU_MEMORY_NODE_TUPLE(STARPU_CUDA_RAM,STARPU_CPU_RAM):
  95. /* CUBLAS_RAM -> STARPU_CPU_RAM */
  96. /* only the proper CUBLAS thread can initiate this ! */
  97. if (_starpu_get_local_memory_node() == src_node) {
  98. /* only the proper CUBLAS thread can initiate this directly ! */
  99. STARPU_ASSERT(copy_methods->cuda_to_ram);
  100. if (!req || !copy_methods->cuda_to_ram_async) {
  101. /* this is not associated to a request so it's synchronous */
  102. copy_methods->cuda_to_ram(src_interface, src_node, dst_interface, dst_node);
  103. }
  104. else {
  105. cures = cudaEventCreate(&req->async_channel.cuda_event);
  106. if (STARPU_UNLIKELY(cures != cudaSuccess)) STARPU_CUDA_REPORT_ERROR(cures);
  107. stream = starpu_cuda_get_local_stream();
  108. ret = copy_methods->cuda_to_ram_async(src_interface, src_node, dst_interface, dst_node, stream);
  109. cures = cudaEventRecord(req->async_channel.cuda_event, *stream);
  110. if (STARPU_UNLIKELY(cures != cudaSuccess)) STARPU_CUDA_REPORT_ERROR(cures);
  111. }
  112. }
  113. else {
  114. /* we should not have a blocking call ! */
  115. STARPU_ABORT();
  116. }
  117. break;
  118. case _STARPU_MEMORY_NODE_TUPLE(STARPU_CPU_RAM,STARPU_CUDA_RAM):
  119. /* STARPU_CPU_RAM -> CUBLAS_RAM */
  120. /* only the proper CUBLAS thread can initiate this ! */
  121. STARPU_ASSERT(_starpu_get_local_memory_node() == dst_node);
  122. STARPU_ASSERT(copy_methods->ram_to_cuda);
  123. if (!req || !copy_methods->ram_to_cuda_async) {
  124. /* this is not associated to a request so it's synchronous */
  125. copy_methods->ram_to_cuda(src_interface, src_node, dst_interface, dst_node);
  126. }
  127. else {
  128. cures = cudaEventCreate(&req->async_channel.cuda_event);
  129. if (STARPU_UNLIKELY(cures != cudaSuccess)) STARPU_CUDA_REPORT_ERROR(cures);
  130. stream = starpu_cuda_get_local_stream();
  131. ret = copy_methods->ram_to_cuda_async(src_interface, src_node, dst_interface, dst_node, stream);
  132. cures = cudaEventRecord(req->async_channel.cuda_event, *stream);
  133. if (STARPU_UNLIKELY(cures != cudaSuccess)) STARPU_CUDA_REPORT_ERROR(cures);
  134. }
  135. break;
  136. #endif
  137. #ifdef STARPU_USE_OPENCL
  138. case _STARPU_MEMORY_NODE_TUPLE(STARPU_OPENCL_RAM,STARPU_CPU_RAM):
  139. /* OpenCL -> RAM */
  140. if (_starpu_get_local_memory_node() == src_node) {
  141. STARPU_ASSERT(copy_methods->opencl_to_ram);
  142. if (!req || !copy_methods->opencl_to_ram_async) {
  143. /* this is not associated to a request so it's synchronous */
  144. copy_methods->opencl_to_ram(src_interface, src_node, dst_interface, dst_node);
  145. }
  146. else {
  147. ret = copy_methods->opencl_to_ram_async(src_interface, src_node, dst_interface, dst_node, &(req->async_channel.opencl_event));
  148. }
  149. }
  150. else {
  151. /* we should not have a blocking call ! */
  152. STARPU_ABORT();
  153. }
  154. break;
  155. case _STARPU_MEMORY_NODE_TUPLE(STARPU_CPU_RAM,STARPU_OPENCL_RAM):
  156. /* STARPU_CPU_RAM -> STARPU_OPENCL_RAM */
  157. STARPU_ASSERT(_starpu_get_local_memory_node() == dst_node);
  158. STARPU_ASSERT(copy_methods->ram_to_opencl);
  159. if (!req || !copy_methods->ram_to_opencl_async) {
  160. /* this is not associated to a request so it's synchronous */
  161. copy_methods->ram_to_opencl(src_interface, src_node, dst_interface, dst_node);
  162. }
  163. else {
  164. ret = copy_methods->ram_to_opencl_async(src_interface, src_node, dst_interface, dst_node, &(req->async_channel.opencl_event));
  165. }
  166. break;
  167. #endif
  168. default:
  169. STARPU_ABORT();
  170. break;
  171. }
  172. return ret;
  173. }
  174. int __attribute__((warn_unused_result)) _starpu_driver_copy_data_1_to_1(starpu_data_handle handle,
  175. struct starpu_data_replicate_s *src_replicate,
  176. struct starpu_data_replicate_s *dst_replicate,
  177. unsigned donotread,
  178. struct starpu_data_request_s *req,
  179. unsigned may_alloc)
  180. {
  181. if (!donotread)
  182. {
  183. STARPU_ASSERT(src_replicate->allocated);
  184. STARPU_ASSERT(src_replicate->refcnt);
  185. }
  186. int ret_alloc, ret_copy;
  187. unsigned __attribute__((unused)) com_id = 0;
  188. unsigned src_node = src_replicate->memory_node;
  189. unsigned dst_node = dst_replicate->memory_node;
  190. /* first make sure the destination has an allocated buffer */
  191. if (!dst_replicate->allocated)
  192. {
  193. ret_alloc = _starpu_allocate_memory_on_node(handle, dst_replicate);
  194. if (ret_alloc)
  195. goto nomem;
  196. }
  197. STARPU_ASSERT(dst_replicate->allocated);
  198. STARPU_ASSERT(dst_replicate->refcnt);
  199. /* if there is no need to actually read the data,
  200. * we do not perform any transfer */
  201. if (!donotread) {
  202. STARPU_ASSERT(handle->ops);
  203. //STARPU_ASSERT(handle->ops->copy_data_1_to_1);
  204. size_t size = _starpu_data_get_size(handle);
  205. _starpu_bus_update_profiling_info((int)src_node, (int)dst_node, size);
  206. #ifdef STARPU_USE_FXT
  207. com_id = STARPU_ATOMIC_ADD(&communication_cnt, 1);
  208. if (req)
  209. req->com_id = com_id;
  210. #endif
  211. /* for now we set the size to 0 in the FxT trace XXX */
  212. STARPU_TRACE_START_DRIVER_COPY(src_node, dst_node, 0, com_id);
  213. ret_copy = copy_data_1_to_1_generic(handle, src_replicate, dst_replicate, req);
  214. #ifdef STARPU_USE_FXT
  215. if (ret_copy != -EAGAIN)
  216. {
  217. size_t size = _starpu_data_get_size(handle);
  218. STARPU_TRACE_END_DRIVER_COPY(src_node, dst_node, size, com_id);
  219. }
  220. #endif
  221. return ret_copy;
  222. }
  223. return 0;
  224. nomem:
  225. return -ENOMEM;
  226. }
  227. void _starpu_driver_wait_request_completion(starpu_async_channel *async_channel __attribute__ ((unused)),
  228. unsigned handling_node)
  229. {
  230. starpu_node_kind kind = _starpu_get_node_kind(handling_node);
  231. #ifdef STARPU_USE_CUDA
  232. cudaEvent_t event;
  233. cudaError_t cures;
  234. #endif
  235. switch (kind) {
  236. #ifdef STARPU_USE_CUDA
  237. case STARPU_CUDA_RAM:
  238. event = (*async_channel).cuda_event;
  239. cures = cudaEventSynchronize(event);
  240. if (STARPU_UNLIKELY(cures))
  241. STARPU_CUDA_REPORT_ERROR(cures);
  242. cures = cudaEventDestroy(event);
  243. if (STARPU_UNLIKELY(cures))
  244. STARPU_CUDA_REPORT_ERROR(cures);
  245. break;
  246. #endif
  247. #ifdef STARPU_USE_OPENCL
  248. case STARPU_OPENCL_RAM:
  249. {
  250. cl_event opencl_event = (*async_channel).opencl_event;
  251. if (opencl_event == NULL) STARPU_ABORT();
  252. cl_int err = clWaitForEvents(1, &opencl_event);
  253. if (err != CL_SUCCESS) STARPU_OPENCL_REPORT_ERROR(err);
  254. clReleaseEvent(opencl_event);
  255. }
  256. break;
  257. #endif
  258. case STARPU_CPU_RAM:
  259. default:
  260. STARPU_ABORT();
  261. }
  262. }
  263. unsigned _starpu_driver_test_request_completion(starpu_async_channel *async_channel __attribute__ ((unused)),
  264. unsigned handling_node)
  265. {
  266. starpu_node_kind kind = _starpu_get_node_kind(handling_node);
  267. unsigned success;
  268. #ifdef STARPU_USE_CUDA
  269. cudaEvent_t event;
  270. #endif
  271. switch (kind) {
  272. #ifdef STARPU_USE_CUDA
  273. case STARPU_CUDA_RAM:
  274. event = (*async_channel).cuda_event;
  275. success = (cudaEventQuery(event) == cudaSuccess);
  276. if (success)
  277. cudaEventDestroy(event);
  278. break;
  279. #endif
  280. #ifdef STARPU_USE_OPENCL
  281. case STARPU_OPENCL_RAM:
  282. {
  283. cl_int event_status;
  284. cl_event opencl_event = (*async_channel).opencl_event;
  285. if (opencl_event == NULL) STARPU_ABORT();
  286. cl_int err = clGetEventInfo(opencl_event, CL_EVENT_COMMAND_EXECUTION_STATUS, sizeof(event_status), &event_status, NULL);
  287. if (err != CL_SUCCESS) STARPU_OPENCL_REPORT_ERROR(err);
  288. success = (event_status == CL_COMPLETE);
  289. break;
  290. }
  291. #endif
  292. case STARPU_CPU_RAM:
  293. default:
  294. STARPU_ABORT();
  295. success = 0;
  296. }
  297. return success;
  298. }