copy-driver.c 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. /*
  2. * StarPU
  3. * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <pthread.h>
  17. #include <core/policies/sched_policy.h>
  18. #include <datawizard/datastats.h>
  19. #include <common/fxt.h>
  20. #include "copy-driver.h"
  21. #include "memalloc.h"
  22. mem_node_descr descr;
  23. static pthread_key_t memory_node_key;
  24. unsigned register_memory_node(node_kind kind)
  25. {
  26. unsigned nnodes;
  27. /* ATOMIC_ADD returns the new value ... */
  28. nnodes = STARPU_ATOMIC_ADD(&descr.nnodes, 1);
  29. descr.nodes[nnodes-1] = kind;
  30. TRACE_NEW_MEM_NODE(nnodes-1);
  31. /* for now, there is no queue related to that newly created node */
  32. descr.queues_count[nnodes-1] = 0;
  33. return (nnodes-1);
  34. }
  35. /* TODO move in a more appropriate file !! */
  36. /* attach a queue to a memory node (if it's not already attached) */
  37. void memory_node_attach_queue(struct jobq_s *q, unsigned nodeid)
  38. {
  39. unsigned queue;
  40. unsigned nqueues_total, nqueues;
  41. take_mutex(&descr.attached_queues_mutex);
  42. /* we only insert the queue if it's not already in the list */
  43. nqueues = descr.queues_count[nodeid];
  44. for (queue = 0; queue < nqueues; queue++)
  45. {
  46. if (descr.attached_queues_per_node[nodeid][queue] == q)
  47. {
  48. /* the queue is already in the list */
  49. release_mutex(&descr.attached_queues_mutex);
  50. return;
  51. }
  52. }
  53. /* it was not found locally */
  54. descr.attached_queues_per_node[nodeid][nqueues] = q;
  55. descr.queues_count[nodeid]++;
  56. /* do we have to add it in the global list as well ? */
  57. nqueues_total = descr.total_queues_count;
  58. for (queue = 0; queue < nqueues_total; queue++)
  59. {
  60. if (descr.attached_queues_all[queue] == q)
  61. {
  62. /* the queue is already in the global list */
  63. release_mutex(&descr.attached_queues_mutex);
  64. return;
  65. }
  66. }
  67. /* it was not in the global queue either */
  68. descr.attached_queues_all[nqueues_total] = q;
  69. descr.total_queues_count++;
  70. release_mutex(&descr.attached_queues_mutex);
  71. }
  72. void wake_all_blocked_workers_on_node(unsigned nodeid)
  73. {
  74. /* wake up all queues on that node */
  75. unsigned q_id;
  76. take_mutex(&descr.attached_queues_mutex);
  77. unsigned nqueues = descr.queues_count[nodeid];
  78. for (q_id = 0; q_id < nqueues; q_id++)
  79. {
  80. struct jobq_s *q;
  81. q = descr.attached_queues_per_node[nodeid][q_id];
  82. /* wake anybody waiting on that queue */
  83. pthread_mutex_lock(&q->activity_mutex);
  84. pthread_cond_broadcast(&q->activity_cond);
  85. pthread_mutex_unlock(&q->activity_mutex);
  86. }
  87. release_mutex(&descr.attached_queues_mutex);
  88. }
  89. void wake_all_blocked_workers(void)
  90. {
  91. /* workers may be blocked on the policy's global condition */
  92. struct sched_policy_s *sched = get_sched_policy();
  93. pthread_cond_t *sched_cond = &sched->sched_activity_cond;
  94. pthread_mutex_t *sched_mutex = &sched->sched_activity_mutex;
  95. pthread_mutex_lock(sched_mutex);
  96. pthread_cond_broadcast(sched_cond);
  97. pthread_mutex_unlock(sched_mutex);
  98. /* workers may be blocked on the various queues' conditions */
  99. unsigned node;
  100. for (node = 0; node < descr.nnodes; node++)
  101. {
  102. wake_all_blocked_workers_on_node(node);
  103. }
  104. }
  105. void init_memory_nodes()
  106. {
  107. /* there is no node yet, subsequent nodes will be
  108. * added using register_memory_node */
  109. descr.nnodes = 0;
  110. pthread_key_create(&memory_node_key, NULL);
  111. unsigned i;
  112. for (i = 0; i < MAXNODES; i++)
  113. {
  114. descr.nodes[i] = UNUSED;
  115. }
  116. init_mem_chunk_lists();
  117. init_data_request_lists();
  118. }
  119. void set_local_memory_node_key(unsigned *node)
  120. {
  121. pthread_setspecific(memory_node_key, node);
  122. }
  123. unsigned get_local_memory_node(void)
  124. {
  125. unsigned *memory_node;
  126. memory_node = pthread_getspecific(memory_node_key);
  127. /* in case this is called by the programmer, we assume the RAM node
  128. is the appropriate memory node ... so we return 0 XXX */
  129. if (STARPU_UNLIKELY(!memory_node))
  130. return 0;
  131. return *memory_node;
  132. }
  133. inline node_kind get_node_kind(uint32_t node)
  134. {
  135. return descr.nodes[node];
  136. }
  137. int allocate_per_node_buffer(data_state *state, uint32_t node)
  138. {
  139. int ret;
  140. if (!state->per_node[node].allocated) {
  141. /* there is no room available for the data yet */
  142. ret = allocate_memory_on_node(state, node);
  143. if (STARPU_UNLIKELY(ret == -ENOMEM))
  144. goto nomem;
  145. }
  146. return 0;
  147. nomem:
  148. /* there was not enough memory to allocate the buffer */
  149. return -ENOMEM;
  150. }
  151. #ifdef USE_FXT
  152. /* we need to identify each communication so that we can match the beginning
  153. * and the end of a communication in the trace, so we use a unique identifier
  154. * per communication */
  155. static unsigned communication_cnt = 0;
  156. #endif
  157. int __attribute__((warn_unused_result)) driver_copy_data_1_to_1(data_state *state, uint32_t src_node,
  158. uint32_t dst_node, unsigned donotread)
  159. {
  160. int ret_alloc, ret_copy;
  161. unsigned __attribute__((unused)) com_id = 0;
  162. /* first make sure the destination has an allocated buffer */
  163. ret_alloc = allocate_per_node_buffer(state, dst_node);
  164. if (ret_alloc)
  165. goto nomem;
  166. /* if there is no need to actually read the data,
  167. * we do not perform any transfer */
  168. if (!donotread) {
  169. STARPU_ASSERT(state->ops);
  170. STARPU_ASSERT(state->ops->copy_data_1_to_1);
  171. #ifdef DATA_STATS
  172. size_t size = state->ops->get_size(state);
  173. update_comm_ammount(src_node, dst_node, size);
  174. #endif
  175. #ifdef USE_FXT
  176. com_id = STARPU_ATOMIC_ADD(&communication_cnt, 1);
  177. #endif
  178. /* for now we set the size to 0 in the FxT trace XXX */
  179. TRACE_START_DRIVER_COPY(src_node, dst_node, 0, com_id);
  180. ret_copy = state->ops->copy_data_1_to_1(state, src_node, dst_node);
  181. TRACE_END_DRIVER_COPY(src_node, dst_node, 0, com_id);
  182. return ret_copy;
  183. }
  184. return 0;
  185. nomem:
  186. return -ENOMEM;
  187. }
  188. static uint32_t choose_src_node(uint32_t src_node_mask)
  189. {
  190. unsigned src_node = 0;
  191. unsigned i;
  192. /* first find the node that will be the actual source */
  193. for (i = 0; i < MAXNODES; i++)
  194. {
  195. if (src_node_mask & (1<<i))
  196. {
  197. /* this is a potential candidate */
  198. src_node = i;
  199. /* however GPU are expensive sources, really !
  200. * other should be ok */
  201. if (descr.nodes[i] != CUDA_RAM)
  202. break;
  203. /* XXX do a better algorithm to distribute the memory copies */
  204. }
  205. }
  206. return src_node;
  207. }
  208. __attribute__((warn_unused_result))
  209. int driver_copy_data(data_state *state, uint32_t src_node_mask,
  210. uint32_t dst_node, unsigned donotread)
  211. {
  212. int ret;
  213. uint32_t src_node = choose_src_node(src_node_mask);
  214. /* possibly returns -1 if there was no memory left */
  215. ret = driver_copy_data_1_to_1(state, src_node, dst_node, donotread);
  216. return ret;
  217. }