copy_driver.c 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. /*
  2. * StarPU
  3. * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <pthread.h>
  17. #include <common/config.h>
  18. #include <common/utils.h>
  19. #include <core/policies/sched_policy.h>
  20. #include <datawizard/datastats.h>
  21. #include <common/fxt.h>
  22. #include "copy_driver.h"
  23. #include "memalloc.h"
  24. void _starpu_wake_all_blocked_workers_on_node(unsigned nodeid)
  25. {
  26. /* wake up all queues on that node */
  27. unsigned q_id;
  28. starpu_mem_node_descr * const descr = _starpu_get_memory_node_description();
  29. pthread_rwlock_rdlock(&descr->attached_queues_rwlock);
  30. unsigned nqueues = descr->queues_count[nodeid];
  31. for (q_id = 0; q_id < nqueues; q_id++)
  32. {
  33. struct starpu_jobq_s *q;
  34. q = descr->attached_queues_per_node[nodeid][q_id];
  35. /* wake anybody waiting on that queue */
  36. PTHREAD_MUTEX_LOCK(&q->activity_mutex);
  37. PTHREAD_COND_BROADCAST(&q->activity_cond);
  38. PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
  39. }
  40. pthread_rwlock_unlock(&descr->attached_queues_rwlock);
  41. }
  42. void starpu_wake_all_blocked_workers(void)
  43. {
  44. /* workers may be blocked on the policy's global condition */
  45. struct starpu_sched_policy_s *sched = _starpu_get_sched_policy();
  46. pthread_cond_t *sched_cond = &sched->sched_activity_cond;
  47. pthread_mutex_t *sched_mutex = &sched->sched_activity_mutex;
  48. PTHREAD_MUTEX_LOCK(sched_mutex);
  49. PTHREAD_COND_BROADCAST(sched_cond);
  50. PTHREAD_MUTEX_UNLOCK(sched_mutex);
  51. /* workers may be blocked on the various queues' conditions */
  52. unsigned node;
  53. unsigned nnodes = _starpu_get_memory_nodes_count();
  54. for (node = 0; node < nnodes; node++)
  55. {
  56. _starpu_wake_all_blocked_workers_on_node(node);
  57. }
  58. }
  59. #ifdef STARPU_USE_FXT
  60. /* we need to identify each communication so that we can match the beginning
  61. * and the end of a communication in the trace, so we use a unique identifier
  62. * per communication */
  63. static unsigned communication_cnt = 0;
  64. #endif
  65. static int copy_data_1_to_1_generic(starpu_data_handle handle, uint32_t src_node, uint32_t dst_node, struct starpu_data_request_s *req __attribute__((unused)))
  66. {
  67. int ret = 0;
  68. //ret = handle->ops->copy_data_1_to_1(handle, src_node, dst_node);
  69. const struct starpu_copy_data_methods_s *copy_methods = handle->ops->copy_methods;
  70. starpu_node_kind src_kind = _starpu_get_node_kind(src_node);
  71. starpu_node_kind dst_kind = _starpu_get_node_kind(dst_node);
  72. STARPU_ASSERT(handle->per_node[src_node].refcnt);
  73. STARPU_ASSERT(handle->per_node[dst_node].refcnt);
  74. STARPU_ASSERT(handle->per_node[src_node].allocated);
  75. STARPU_ASSERT(handle->per_node[dst_node].allocated);
  76. #ifdef STARPU_USE_CUDA
  77. cudaError_t cures;
  78. cudaStream_t *stream;
  79. #endif
  80. switch (dst_kind) {
  81. case STARPU_RAM:
  82. switch (src_kind) {
  83. case STARPU_RAM:
  84. /* STARPU_RAM -> STARPU_RAM */
  85. STARPU_ASSERT(copy_methods->ram_to_ram);
  86. copy_methods->ram_to_ram(handle, src_node, dst_node);
  87. break;
  88. #ifdef STARPU_USE_CUDA
  89. case STARPU_CUDA_RAM:
  90. /* CUBLAS_RAM -> STARPU_RAM */
  91. /* only the proper CUBLAS thread can initiate this ! */
  92. if (_starpu_get_local_memory_node() == src_node)
  93. {
  94. /* only the proper CUBLAS thread can initiate this directly ! */
  95. STARPU_ASSERT(copy_methods->cuda_to_ram);
  96. if (!req || !copy_methods->cuda_to_ram_async)
  97. {
  98. /* this is not associated to a request so it's synchronous */
  99. copy_methods->cuda_to_ram(handle, src_node, dst_node);
  100. }
  101. else {
  102. cures = cudaEventCreate(&req->async_channel.cuda_event);
  103. STARPU_ASSERT(cures == cudaSuccess);
  104. stream = starpu_get_local_cuda_stream();
  105. ret = copy_methods->cuda_to_ram_async(handle, src_node, dst_node, stream);
  106. cures = cudaEventRecord(req->async_channel.cuda_event, *stream);
  107. STARPU_ASSERT(cures == cudaSuccess);
  108. }
  109. }
  110. else
  111. {
  112. /* we should not have a blocking call ! */
  113. STARPU_ABORT();
  114. }
  115. break;
  116. #endif
  117. case STARPU_SPU_LS:
  118. STARPU_ABORT(); // TODO
  119. break;
  120. case STARPU_UNUSED:
  121. printf("error node %u STARPU_UNUSED\n", src_node);
  122. default:
  123. assert(0);
  124. break;
  125. }
  126. break;
  127. #ifdef STARPU_USE_CUDA
  128. case STARPU_CUDA_RAM:
  129. switch (src_kind) {
  130. case STARPU_RAM:
  131. /* STARPU_RAM -> CUBLAS_RAM */
  132. /* only the proper CUBLAS thread can initiate this ! */
  133. STARPU_ASSERT(_starpu_get_local_memory_node() == dst_node);
  134. STARPU_ASSERT(copy_methods->ram_to_cuda);
  135. if (!req || !copy_methods->ram_to_cuda_async)
  136. {
  137. /* this is not associated to a request so it's synchronous */
  138. copy_methods->ram_to_cuda(handle, src_node, dst_node);
  139. }
  140. else {
  141. cures = cudaEventCreate(&req->async_channel.cuda_event);
  142. STARPU_ASSERT(cures == cudaSuccess);
  143. stream = starpu_get_local_cuda_stream();
  144. ret = copy_methods->ram_to_cuda_async(handle, src_node, dst_node, stream);
  145. cures = cudaEventRecord(req->async_channel.cuda_event, *stream);
  146. STARPU_ASSERT(cures == cudaSuccess);
  147. }
  148. break;
  149. case STARPU_CUDA_RAM:
  150. case STARPU_SPU_LS:
  151. STARPU_ABORT(); // TODO
  152. break;
  153. case STARPU_UNUSED:
  154. default:
  155. STARPU_ABORT();
  156. break;
  157. }
  158. break;
  159. #endif
  160. case STARPU_SPU_LS:
  161. STARPU_ABORT(); // TODO
  162. break;
  163. case STARPU_UNUSED:
  164. default:
  165. assert(0);
  166. break;
  167. }
  168. return ret;
  169. }
  170. int __attribute__((warn_unused_result)) _starpu_driver_copy_data_1_to_1(starpu_data_handle handle, uint32_t src_node,
  171. uint32_t dst_node, unsigned donotread, struct starpu_data_request_s *req, unsigned may_alloc)
  172. {
  173. if (!donotread)
  174. {
  175. STARPU_ASSERT(handle->per_node[src_node].allocated);
  176. STARPU_ASSERT(handle->per_node[src_node].refcnt);
  177. }
  178. int ret_alloc, ret_copy;
  179. unsigned __attribute__((unused)) com_id = 0;
  180. /* first make sure the destination has an allocated buffer */
  181. ret_alloc = _starpu_allocate_memory_on_node(handle, dst_node, may_alloc);
  182. if (ret_alloc)
  183. goto nomem;
  184. STARPU_ASSERT(handle->per_node[dst_node].allocated);
  185. STARPU_ASSERT(handle->per_node[dst_node].refcnt);
  186. /* if there is no need to actually read the data,
  187. * we do not perform any transfer */
  188. if (!donotread) {
  189. STARPU_ASSERT(handle->ops);
  190. //STARPU_ASSERT(handle->ops->copy_data_1_to_1);
  191. #ifdef STARPU_DATA_STATS
  192. size_t size = handle->ops->get_size(handle);
  193. _starpu_update_comm_amount(src_node, dst_node, size);
  194. #endif
  195. #ifdef STARPU_USE_FXT
  196. com_id = STARPU_ATOMIC_ADD(&communication_cnt, 1);
  197. if (req)
  198. req->com_id = com_id;
  199. #endif
  200. /* for now we set the size to 0 in the FxT trace XXX */
  201. STARPU_TRACE_START_DRIVER_COPY(src_node, dst_node, 0, com_id);
  202. ret_copy = copy_data_1_to_1_generic(handle, src_node, dst_node, req);
  203. #ifdef STARPU_USE_FXT
  204. if (ret_copy != EAGAIN)
  205. {
  206. size_t size = handle->ops->get_size(handle);
  207. STARPU_TRACE_END_DRIVER_COPY(src_node, dst_node, size, com_id);
  208. }
  209. #endif
  210. return ret_copy;
  211. }
  212. return 0;
  213. nomem:
  214. return ENOMEM;
  215. }
  216. void _starpu_driver_wait_request_completion(starpu_async_channel *async_channel __attribute__ ((unused)),
  217. unsigned handling_node)
  218. {
  219. starpu_node_kind kind = _starpu_get_node_kind(handling_node);
  220. #ifdef STARPU_USE_CUDA
  221. cudaEvent_t event;
  222. cudaError_t cures;
  223. #endif
  224. switch (kind) {
  225. #ifdef STARPU_USE_CUDA
  226. case STARPU_CUDA_RAM:
  227. event = (*async_channel).cuda_event;
  228. cures = cudaEventSynchronize(event);
  229. if (STARPU_UNLIKELY(cures))
  230. STARPU_CUDA_REPORT_ERROR(cures);
  231. cures = cudaEventDestroy(event);
  232. if (STARPU_UNLIKELY(cures))
  233. STARPU_CUDA_REPORT_ERROR(cures);
  234. break;
  235. #endif
  236. case STARPU_RAM:
  237. default:
  238. STARPU_ABORT();
  239. }
  240. }
  241. unsigned _starpu_driver_test_request_completion(starpu_async_channel *async_channel __attribute__ ((unused)),
  242. unsigned handling_node)
  243. {
  244. starpu_node_kind kind = _starpu_get_node_kind(handling_node);
  245. unsigned success;
  246. #ifdef STARPU_USE_CUDA
  247. cudaEvent_t event;
  248. #endif
  249. switch (kind) {
  250. #ifdef STARPU_USE_CUDA
  251. case STARPU_CUDA_RAM:
  252. event = (*async_channel).cuda_event;
  253. success = (cudaEventQuery(event) == cudaSuccess);
  254. if (success)
  255. cudaEventDestroy(event);
  256. break;
  257. #endif
  258. case STARPU_RAM:
  259. default:
  260. STARPU_ABORT();
  261. success = 0;
  262. }
  263. return success;
  264. }