data_interface.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009, 2010, 2011 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011 Centre National de la Recherche Scientifique
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <stdint.h>
  18. #include <datawizard/datawizard.h>
  19. #include <core/dependencies/data_concurrency.h>
  20. #include <common/uthash.h>
  21. #include <common/starpu_spinlock.h>
  22. /* Entry in the `registered_handles' hash table. */
  23. struct handle_entry
  24. {
  25. UT_hash_handle hh;
  26. void *pointer;
  27. starpu_data_handle handle;
  28. };
  29. /* Hash table mapping host pointers to data handles. */
  30. static struct handle_entry *registered_handles;
  31. static starpu_spinlock_t registered_handles_lock;
  32. void _starpu_data_interface_init()
  33. {
  34. _starpu_spin_init(&registered_handles_lock);
  35. }
  36. void _starpu_data_interface_shutdown()
  37. {
  38. struct handle_entry *entry, *tmp;
  39. _starpu_spin_destroy(&registered_handles_lock);
  40. HASH_ITER(hh, registered_handles, entry, tmp) {
  41. HASH_DEL(registered_handles, entry);
  42. free(entry);
  43. }
  44. registered_handles = NULL;
  45. }
  46. /* Register the mapping from PTR to HANDLE. If PTR is already mapped to
  47. * some handle, the new mapping shadows the previous one. */
  48. void _starpu_data_register_ram_pointer(starpu_data_handle handle, void *ptr)
  49. {
  50. struct handle_entry *entry;
  51. entry = (struct handle_entry *) malloc(sizeof(*entry));
  52. STARPU_ASSERT(entry != NULL);
  53. entry->pointer = ptr;
  54. entry->handle = handle;
  55. _starpu_spin_lock(&registered_handles_lock);
  56. HASH_ADD_PTR(registered_handles, pointer, entry);
  57. _starpu_spin_unlock(&registered_handles_lock);
  58. }
  59. starpu_data_handle starpu_data_lookup(const void *ptr)
  60. {
  61. starpu_data_handle result;
  62. _starpu_spin_lock(&registered_handles_lock);
  63. {
  64. struct handle_entry *entry;
  65. HASH_FIND_PTR(registered_handles, &ptr, entry);
  66. if(STARPU_UNLIKELY(entry == NULL))
  67. result = NULL;
  68. else
  69. result = entry->handle;
  70. }
  71. _starpu_spin_unlock(&registered_handles_lock);
  72. return result;
  73. }
  74. /*
  75. * Start monitoring a piece of data
  76. */
  77. static void _starpu_register_new_data(starpu_data_handle handle,
  78. uint32_t home_node, uint32_t wt_mask)
  79. {
  80. void *ptr;
  81. STARPU_ASSERT(handle);
  82. /* initialize the new lock */
  83. handle->req_list = starpu_data_requester_list_new();
  84. handle->refcnt = 0;
  85. _starpu_spin_init(&handle->header_lock);
  86. /* first take care to properly lock the data */
  87. _starpu_spin_lock(&handle->header_lock);
  88. /* there is no hierarchy yet */
  89. handle->nchildren = 0;
  90. handle->root_handle = handle;
  91. handle->father_handle = NULL;
  92. handle->sibling_index = 0; /* could be anything for the root */
  93. handle->depth = 1; /* the tree is just a node yet */
  94. handle->rank = -1; /* invalid until set */
  95. handle->tag = -1; /* invalid until set */
  96. handle->is_not_important = 0;
  97. handle->sequential_consistency =
  98. starpu_data_get_default_sequential_consistency_flag();
  99. PTHREAD_MUTEX_INIT(&handle->sequential_consistency_mutex, NULL);
  100. handle->last_submitted_mode = STARPU_R;
  101. handle->last_submitted_writer = NULL;
  102. handle->last_submitted_readers = NULL;
  103. handle->post_sync_tasks = NULL;
  104. handle->post_sync_tasks_cnt = 0;
  105. /* By default, there are no methods available to perform a reduction */
  106. handle->redux_cl = NULL;
  107. handle->init_cl = NULL;
  108. handle->reduction_refcnt = 0;
  109. handle->reduction_req_list = starpu_data_requester_list_new();
  110. #ifdef STARPU_USE_FXT
  111. handle->last_submitted_ghost_writer_id_is_valid = 0;
  112. handle->last_submitted_ghost_writer_id = 0;
  113. handle->last_submitted_ghost_readers_id = NULL;
  114. #endif
  115. handle->wt_mask = wt_mask;
  116. /* Store some values directly in the handle not to recompute them all
  117. * the time. */
  118. handle->data_size = handle->ops->get_size(handle);
  119. handle->footprint = _starpu_compute_data_footprint(handle);
  120. handle->home_node = home_node;
  121. /* that new data is invalid from all nodes perpective except for the
  122. * home node */
  123. unsigned node;
  124. for (node = 0; node < STARPU_MAXNODES; node++)
  125. {
  126. struct starpu_data_replicate_s *replicate;
  127. replicate = &handle->per_node[node];
  128. replicate->memory_node = node;
  129. replicate->relaxed_coherency = 0;
  130. replicate->refcnt = 0;
  131. if (node == home_node) {
  132. /* this is the home node with the only valid copy */
  133. replicate->state = STARPU_OWNER;
  134. replicate->allocated = 1;
  135. replicate->automatically_allocated = 0;
  136. }
  137. else {
  138. /* the value is not available here yet */
  139. replicate->state = STARPU_INVALID;
  140. replicate->allocated = 0;
  141. }
  142. }
  143. unsigned worker;
  144. unsigned nworkers = starpu_worker_get_count();
  145. for (worker = 0; worker < nworkers; worker++)
  146. {
  147. struct starpu_data_replicate_s *replicate;
  148. replicate = &handle->per_worker[worker];
  149. replicate->allocated = 0;
  150. replicate->automatically_allocated = 0;
  151. replicate->state = STARPU_INVALID;
  152. replicate->refcnt = 0;
  153. replicate->handle = handle;
  154. for (node = 0; node < STARPU_MAXNODES; node++)
  155. {
  156. replicate->requested[node] = 0;
  157. replicate->request[node] = NULL;
  158. }
  159. replicate->relaxed_coherency = 1;
  160. replicate->initialized = 0;
  161. replicate->memory_node = starpu_worker_get_memory_node(worker);
  162. /* duplicate the content of the interface on node 0 */
  163. memcpy(replicate->data_interface, handle->per_node[0].data_interface, handle->ops->interface_size);
  164. }
  165. /* now the data is available ! */
  166. _starpu_spin_unlock(&handle->header_lock);
  167. ptr = starpu_handle_to_pointer(handle, 0);
  168. if (ptr != NULL)
  169. {
  170. _starpu_data_register_ram_pointer(handle, ptr);
  171. }
  172. }
  173. static starpu_data_handle _starpu_data_handle_allocate(struct starpu_data_interface_ops_t *interface_ops)
  174. {
  175. starpu_data_handle handle = (starpu_data_handle)
  176. calloc(1, sizeof(struct starpu_data_state_t));
  177. STARPU_ASSERT(handle);
  178. handle->ops = interface_ops;
  179. size_t interfacesize = interface_ops->interface_size;
  180. unsigned node;
  181. for (node = 0; node < STARPU_MAXNODES; node++)
  182. {
  183. struct starpu_data_replicate_s *replicate;
  184. replicate = &handle->per_node[node];
  185. /* relaxed_coherency = 0 */
  186. replicate->handle = handle;
  187. replicate->data_interface = calloc(1, interfacesize);
  188. STARPU_ASSERT(replicate->data_interface);
  189. }
  190. unsigned worker;
  191. unsigned nworkers = starpu_worker_get_count();
  192. for (worker = 0; worker < nworkers; worker++)
  193. {
  194. struct starpu_data_replicate_s *replicate;
  195. replicate = &handle->per_worker[worker];
  196. replicate->handle = handle;
  197. replicate->data_interface = calloc(1, interfacesize);
  198. STARPU_ASSERT(replicate->data_interface);
  199. }
  200. return handle;
  201. }
  202. void starpu_data_register(starpu_data_handle *handleptr, uint32_t home_node,
  203. void *data_interface,
  204. struct starpu_data_interface_ops_t *ops)
  205. {
  206. starpu_data_handle handle =
  207. _starpu_data_handle_allocate(ops);
  208. STARPU_ASSERT(handleptr);
  209. *handleptr = handle;
  210. /* fill the interface fields with the appropriate method */
  211. ops->register_data_handle(handle, home_node, data_interface);
  212. _starpu_register_new_data(handle, home_node, 0);
  213. }
  214. void *starpu_handle_to_pointer(starpu_data_handle handle, uint32_t node)
  215. {
  216. /* Check whether the operation is supported and the node has actually
  217. * been allocated. */
  218. if (handle->ops->handle_to_pointer
  219. && starpu_data_test_if_allocated_on_node(handle, node))
  220. {
  221. return handle->ops->handle_to_pointer(handle, node);
  222. }
  223. return NULL;
  224. }
  225. void *starpu_handle_get_local_ptr(starpu_data_handle handle)
  226. {
  227. return starpu_handle_to_pointer(handle,
  228. _starpu_get_local_memory_node());
  229. }
  230. int starpu_data_get_rank(starpu_data_handle handle)
  231. {
  232. return handle->rank;
  233. }
  234. int starpu_data_set_rank(starpu_data_handle handle, int rank)
  235. {
  236. handle->rank = rank;
  237. return 0;
  238. }
  239. int starpu_data_get_tag(starpu_data_handle handle)
  240. {
  241. return handle->tag;
  242. }
  243. int starpu_data_set_tag(starpu_data_handle handle, int tag)
  244. {
  245. handle->tag = tag;
  246. return 0;
  247. }
  248. /*
  249. * Stop monitoring a piece of data
  250. */
  251. void _starpu_data_free_interfaces(starpu_data_handle handle)
  252. {
  253. const void *ram_ptr;
  254. unsigned node;
  255. unsigned worker;
  256. unsigned nworkers = starpu_worker_get_count();
  257. ram_ptr = starpu_handle_to_pointer(handle, 0);
  258. for (node = 0; node < STARPU_MAXNODES; node++)
  259. free(handle->per_node[node].data_interface);
  260. for (worker = 0; worker < nworkers; worker++)
  261. free(handle->per_worker[worker].data_interface);
  262. if (ram_ptr != NULL)
  263. {
  264. /* Remove the PTR -> HANDLE mapping. If a mapping from PTR
  265. * to another handle existed before (e.g., when using
  266. * filters), it becomes visible again. */
  267. struct handle_entry *entry;
  268. _starpu_spin_lock(&registered_handles_lock);
  269. HASH_FIND_PTR(registered_handles, &ram_ptr, entry);
  270. STARPU_ASSERT(entry != NULL);
  271. HASH_DEL(registered_handles, entry);
  272. free(entry);
  273. _starpu_spin_unlock(&registered_handles_lock);
  274. }
  275. }
  276. struct unregister_callback_arg {
  277. unsigned memory_node;
  278. starpu_data_handle handle;
  279. unsigned terminated;
  280. pthread_mutex_t mutex;
  281. pthread_cond_t cond;
  282. };
  283. static void _starpu_data_unregister_fetch_data_callback(void *_arg)
  284. {
  285. int ret;
  286. struct unregister_callback_arg *arg = (struct unregister_callback_arg *) _arg;
  287. starpu_data_handle handle = arg->handle;
  288. STARPU_ASSERT(handle);
  289. struct starpu_data_replicate_s *replicate = &handle->per_node[arg->memory_node];
  290. ret = _starpu_fetch_data_on_node(handle, replicate, STARPU_R, 0, NULL, NULL);
  291. STARPU_ASSERT(!ret);
  292. /* unlock the caller */
  293. PTHREAD_MUTEX_LOCK(&arg->mutex);
  294. arg->terminated = 1;
  295. PTHREAD_COND_SIGNAL(&arg->cond);
  296. PTHREAD_MUTEX_UNLOCK(&arg->mutex);
  297. }
  298. /* Unregister the data handle, perhaps we don't need to update the home_node
  299. * (in that case coherent is set to 0) */
  300. static void _starpu_data_unregister(starpu_data_handle handle, unsigned coherent)
  301. {
  302. STARPU_ASSERT(handle);
  303. if (coherent)
  304. {
  305. /* If sequential consistency is enabled, wait until data is available */
  306. _starpu_data_wait_until_available(handle, STARPU_RW);
  307. /* Fetch data in the home of the data to ensure we have a valid copy
  308. * where we registered it */
  309. int home_node = handle->home_node;
  310. if (home_node >= 0)
  311. {
  312. struct unregister_callback_arg arg;
  313. arg.handle = handle;
  314. arg.memory_node = (unsigned)home_node;
  315. arg.terminated = 0;
  316. PTHREAD_MUTEX_INIT(&arg.mutex, NULL);
  317. PTHREAD_COND_INIT(&arg.cond, NULL);
  318. if (!_starpu_attempt_to_submit_data_request_from_apps(handle, STARPU_R,
  319. _starpu_data_unregister_fetch_data_callback, &arg))
  320. {
  321. /* no one has locked this data yet, so we proceed immediately */
  322. struct starpu_data_replicate_s *home_replicate = &handle->per_node[home_node];
  323. int ret = _starpu_fetch_data_on_node(handle, home_replicate, STARPU_R, 0, NULL, NULL);
  324. STARPU_ASSERT(!ret);
  325. }
  326. else {
  327. PTHREAD_MUTEX_LOCK(&arg.mutex);
  328. while (!arg.terminated)
  329. PTHREAD_COND_WAIT(&arg.cond, &arg.mutex);
  330. PTHREAD_MUTEX_UNLOCK(&arg.mutex);
  331. }
  332. }
  333. }
  334. else {
  335. /* Should we postpone the unregister operation ? */
  336. if ((handle->refcnt > 0) && handle->lazy_unregister)
  337. return;
  338. }
  339. _starpu_data_free_interfaces(handle);
  340. /* Destroy the data now */
  341. unsigned node;
  342. for (node = 0; node < STARPU_MAXNODES; node++)
  343. {
  344. struct starpu_data_replicate_s *local = &handle->per_node[node];
  345. if (local->allocated && local->automatically_allocated){
  346. /* free the data copy in a lazy fashion */
  347. _starpu_request_mem_chunk_removal(handle, node);
  348. }
  349. }
  350. starpu_data_requester_list_delete(handle->req_list);
  351. starpu_data_requester_list_delete(handle->reduction_req_list);
  352. free(handle);
  353. }
  354. void starpu_data_unregister(starpu_data_handle handle)
  355. {
  356. _starpu_data_unregister(handle, 1);
  357. }
  358. void starpu_data_unregister_no_coherency(starpu_data_handle handle)
  359. {
  360. _starpu_data_unregister(handle, 0);
  361. }
  362. void starpu_data_invalidate(starpu_data_handle handle)
  363. {
  364. STARPU_ASSERT(handle);
  365. starpu_data_acquire(handle, STARPU_W);
  366. _starpu_spin_lock(&handle->header_lock);
  367. unsigned node;
  368. for (node = 0; node < STARPU_MAXNODES; node++)
  369. {
  370. struct starpu_data_replicate_s *local = &handle->per_node[node];
  371. if (local->allocated && local->automatically_allocated){
  372. /* free the data copy in a lazy fashion */
  373. _starpu_request_mem_chunk_removal(handle, node);
  374. }
  375. local->state = STARPU_INVALID;
  376. }
  377. _starpu_spin_unlock(&handle->header_lock);
  378. starpu_data_release(handle);
  379. }
  380. unsigned starpu_get_handle_interface_id(starpu_data_handle handle)
  381. {
  382. return handle->ops->interfaceid;
  383. }
  384. void *starpu_data_get_interface_on_node(starpu_data_handle handle, unsigned memory_node)
  385. {
  386. return handle->per_node[memory_node].data_interface;
  387. }