user_interactions.c 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <common/config.h>
  17. #include <common/utils.h>
  18. #include <core/task.h>
  19. #include <datawizard/coherency.h>
  20. #include <datawizard/copy_driver.h>
  21. #include <datawizard/write_back.h>
  22. #include <core/dependencies/data_concurrency.h>
  23. #include <core/sched_policy.h>
  24. #include <datawizard/memory_nodes.h>
  25. static void _starpu_data_check_initialized(starpu_data_handle_t handle, enum starpu_data_access_mode mode)
  26. {
  27. if (!(mode & STARPU_R))
  28. return;
  29. if (((handle->nplans && !handle->nchildren) || handle->siblings)
  30. && handle->partition_automatic_disabled == 0)
  31. {
  32. _starpu_data_partition_access_submit(handle, (mode & STARPU_W) != 0);
  33. }
  34. if (!handle->initialized && handle->init_cl)
  35. {
  36. int ret = starpu_task_insert(handle->init_cl, STARPU_W, handle, 0);
  37. STARPU_ASSERT(ret == 0);
  38. }
  39. STARPU_ASSERT_MSG(handle->initialized, "handle %p is not initialized while trying to read it\n", handle);
  40. }
  41. /* Explicitly ask StarPU to allocate room for a piece of data on the specified
  42. * memory node. */
  43. int starpu_data_request_allocation(starpu_data_handle_t handle, unsigned node)
  44. {
  45. struct _starpu_data_request *r;
  46. STARPU_ASSERT(handle);
  47. _starpu_spin_lock(&handle->header_lock);
  48. r = _starpu_create_data_request(handle, NULL, &handle->per_node[node], node, STARPU_NONE, 0, NULL, STARPU_PREFETCH, 0, 0, "starpu_data_request_allocation");
  49. /* we do not increase the refcnt associated to the request since we are
  50. * not waiting for its termination */
  51. _starpu_post_data_request(r);
  52. _starpu_spin_unlock(&handle->header_lock);
  53. return 0;
  54. }
  55. struct user_interaction_wrapper
  56. {
  57. starpu_data_handle_t handle;
  58. enum starpu_data_access_mode mode;
  59. int node;
  60. starpu_pthread_cond_t cond;
  61. starpu_pthread_mutex_t lock;
  62. unsigned finished;
  63. unsigned detached;
  64. enum starpu_is_prefetch prefetch;
  65. unsigned async;
  66. int prio;
  67. void (*callback_acquired)(void *, int *node, enum starpu_data_access_mode mode);
  68. void (*callback)(void *);
  69. void *callback_arg;
  70. struct starpu_task *pre_sync_task;
  71. struct starpu_task *post_sync_task;
  72. };
  73. static inline void _starpu_data_acquire_wrapper_init(struct user_interaction_wrapper *wrapper, starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode)
  74. {
  75. memset(wrapper, 0, sizeof(*wrapper));
  76. wrapper->handle = handle;
  77. wrapper->node = node;
  78. wrapper->mode = mode;
  79. //wrapper->finished = 0;
  80. STARPU_PTHREAD_COND_INIT0(&wrapper->cond, NULL);
  81. STARPU_PTHREAD_MUTEX_INIT0(&wrapper->lock, NULL);
  82. }
  83. /* Called to signal completion of asynchronous data acquisition */
  84. static inline void _starpu_data_acquire_wrapper_finished(struct user_interaction_wrapper *wrapper)
  85. {
  86. STARPU_PTHREAD_MUTEX_LOCK(&wrapper->lock);
  87. wrapper->finished = 1;
  88. STARPU_PTHREAD_COND_SIGNAL(&wrapper->cond);
  89. STARPU_PTHREAD_MUTEX_UNLOCK(&wrapper->lock);
  90. }
  91. /* Called to wait for completion of asynchronous data acquisition */
  92. static inline void _starpu_data_acquire_wrapper_wait(struct user_interaction_wrapper *wrapper)
  93. {
  94. STARPU_PTHREAD_MUTEX_LOCK(&wrapper->lock);
  95. while (!wrapper->finished)
  96. STARPU_PTHREAD_COND_WAIT(&wrapper->cond, &wrapper->lock);
  97. STARPU_PTHREAD_MUTEX_UNLOCK(&wrapper->lock);
  98. }
  99. static inline void _starpu_data_acquire_wrapper_fini(struct user_interaction_wrapper *wrapper)
  100. {
  101. STARPU_PTHREAD_COND_DESTROY(&wrapper->cond);
  102. STARPU_PTHREAD_MUTEX_DESTROY(&wrapper->lock);
  103. }
  104. /* Called when the data acquisition is done, to launch the fetch into target memory */
  105. static inline void _starpu_data_acquire_launch_fetch(struct user_interaction_wrapper *wrapper, int async, void (*callback)(void *), void *callback_arg)
  106. {
  107. int node = wrapper->node;
  108. starpu_data_handle_t handle = wrapper->handle;
  109. struct _starpu_data_replicate *replicate = node >= 0 ? &handle->per_node[node] : NULL;
  110. int ret = _starpu_fetch_data_on_node(handle, node, replicate, wrapper->mode, wrapper->detached, NULL, wrapper->prefetch, async, callback, callback_arg, wrapper->prio, "_starpu_data_acquire_launch_fetch");
  111. STARPU_ASSERT(!ret);
  112. }
  113. /*
  114. * Non Blocking data request from application
  115. */
  116. /* Called when fetch is done, call the callback */
  117. static void _starpu_data_acquire_fetch_data_callback(void *arg)
  118. {
  119. struct user_interaction_wrapper *wrapper = (struct user_interaction_wrapper *) arg;
  120. starpu_data_handle_t handle = wrapper->handle;
  121. /* At that moment, the caller holds a reference to the piece of data.
  122. * We enqueue the "post" sync task in the list associated to the handle
  123. * so that it is submitted by the starpu_data_release
  124. * function. */
  125. if (wrapper->post_sync_task)
  126. _starpu_add_post_sync_tasks(wrapper->post_sync_task, handle);
  127. wrapper->callback(wrapper->callback_arg);
  128. _starpu_data_acquire_wrapper_fini(wrapper);
  129. free(wrapper);
  130. }
  131. /* Called when the data acquisition is done, launch the fetch into target memory */
  132. static void _starpu_data_acquire_continuation_non_blocking(void *arg)
  133. {
  134. struct user_interaction_wrapper *wrapper = (struct user_interaction_wrapper *) arg;
  135. if (wrapper->callback_acquired)
  136. /* This can change the node at will according to the current data situation */
  137. wrapper->callback_acquired(wrapper->callback_arg, &wrapper->node, wrapper->mode);
  138. _starpu_data_acquire_launch_fetch(arg, 1, _starpu_data_acquire_fetch_data_callback, arg);
  139. }
  140. /* Called when the implicit data dependencies are done, launch the data acquisition */
  141. static void starpu_data_acquire_cb_pre_sync_callback(void *arg)
  142. {
  143. struct user_interaction_wrapper *wrapper = (struct user_interaction_wrapper *) arg;
  144. /* we try to get the data, if we do not succeed immediately, we set a
  145. * callback function that will be executed automatically when the data is
  146. * available again, otherwise we fetch the data directly */
  147. if (!_starpu_attempt_to_submit_data_request_from_apps(wrapper->handle, wrapper->mode,
  148. _starpu_data_acquire_continuation_non_blocking, wrapper))
  149. {
  150. /* no one has locked this data yet, so we proceed immediately */
  151. _starpu_data_acquire_continuation_non_blocking(wrapper);
  152. }
  153. }
  154. /* The data must be released by calling starpu_data_release later on */
  155. int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_data_handle_t handle, int node,
  156. enum starpu_data_access_mode mode,
  157. void (*callback_acquired)(void *arg, int *node, enum starpu_data_access_mode mode),
  158. void (*callback)(void *arg),
  159. void *arg,
  160. int sequential_consistency, int quick,
  161. long *pre_sync_jobid, long *post_sync_jobid, int prio)
  162. {
  163. STARPU_ASSERT(handle);
  164. STARPU_ASSERT_MSG(handle->nchildren == 0, "Acquiring a partitioned data (%p) is not possible", handle);
  165. _STARPU_LOG_IN();
  166. /* Check that previous tasks have set a value if needed */
  167. _starpu_data_check_initialized(handle, mode);
  168. struct user_interaction_wrapper *wrapper;
  169. _STARPU_MALLOC(wrapper, sizeof(struct user_interaction_wrapper));
  170. _starpu_data_acquire_wrapper_init(wrapper, handle, node, mode);
  171. wrapper->async = 1;
  172. wrapper->callback_acquired = callback_acquired;
  173. wrapper->callback = callback;
  174. wrapper->callback_arg = arg;
  175. wrapper->pre_sync_task = NULL;
  176. wrapper->post_sync_task = NULL;
  177. wrapper->prio = prio;
  178. STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
  179. int handle_sequential_consistency = handle->sequential_consistency;
  180. if (handle_sequential_consistency && sequential_consistency)
  181. {
  182. struct starpu_task *new_task;
  183. struct _starpu_job *pre_sync_job, *post_sync_job;
  184. int submit_pre_sync = 0;
  185. wrapper->pre_sync_task = starpu_task_create();
  186. wrapper->pre_sync_task->name = "_starpu_data_acquire_cb_pre";
  187. wrapper->pre_sync_task->detach = 1;
  188. wrapper->pre_sync_task->callback_func = starpu_data_acquire_cb_pre_sync_callback;
  189. wrapper->pre_sync_task->callback_arg = wrapper;
  190. wrapper->pre_sync_task->type = STARPU_TASK_TYPE_DATA_ACQUIRE;
  191. wrapper->pre_sync_task->priority = prio;
  192. pre_sync_job = _starpu_get_job_associated_to_task(wrapper->pre_sync_task);
  193. if (pre_sync_jobid)
  194. *pre_sync_jobid = pre_sync_job->job_id;
  195. wrapper->post_sync_task = starpu_task_create();
  196. wrapper->post_sync_task->name = "_starpu_data_acquire_cb_release";
  197. wrapper->post_sync_task->detach = 1;
  198. wrapper->post_sync_task->type = STARPU_TASK_TYPE_DATA_ACQUIRE;
  199. wrapper->post_sync_task->priority = prio;
  200. post_sync_job = _starpu_get_job_associated_to_task(wrapper->post_sync_task);
  201. if (post_sync_jobid)
  202. *post_sync_jobid = post_sync_job->job_id;
  203. if (quick)
  204. pre_sync_job->quick_next = post_sync_job;
  205. new_task = _starpu_detect_implicit_data_deps_with_handle(wrapper->pre_sync_task, &submit_pre_sync, wrapper->post_sync_task, &_starpu_get_job_associated_to_task(wrapper->post_sync_task)->implicit_dep_slot, handle, mode, sequential_consistency);
  206. STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  207. if (STARPU_UNLIKELY(new_task))
  208. {
  209. int ret = _starpu_task_submit_internally(new_task);
  210. STARPU_ASSERT(!ret);
  211. }
  212. if (submit_pre_sync)
  213. {
  214. int ret = _starpu_task_submit_internally(wrapper->pre_sync_task);
  215. STARPU_ASSERT(!ret);
  216. }
  217. else
  218. {
  219. wrapper->pre_sync_task->detach = 0;
  220. starpu_task_destroy(wrapper->pre_sync_task);
  221. starpu_data_acquire_cb_pre_sync_callback(wrapper);
  222. }
  223. }
  224. else
  225. {
  226. if (pre_sync_jobid)
  227. *pre_sync_jobid = -1;
  228. if (post_sync_jobid)
  229. *post_sync_jobid = -1;
  230. STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  231. starpu_data_acquire_cb_pre_sync_callback(wrapper);
  232. }
  233. _STARPU_LOG_OUT();
  234. return 0;
  235. }
  236. int starpu_data_acquire_on_node_cb_sequential_consistency_quick(starpu_data_handle_t handle, int node,
  237. enum starpu_data_access_mode mode, void (*callback)(void *), void *arg,
  238. int sequential_consistency, int quick)
  239. {
  240. return starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(handle, node, mode, NULL, callback, arg, sequential_consistency, quick, NULL, NULL, STARPU_DEFAULT_PRIO);
  241. }
  242. int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t handle, int node,
  243. enum starpu_data_access_mode mode, void (*callback)(void *), void *arg,
  244. int sequential_consistency)
  245. {
  246. return starpu_data_acquire_on_node_cb_sequential_consistency_quick(handle, node, mode, callback, arg, sequential_consistency, 0);
  247. }
  248. int starpu_data_acquire_on_node_cb(starpu_data_handle_t handle, int node,
  249. enum starpu_data_access_mode mode, void (*callback)(void *), void *arg)
  250. {
  251. return starpu_data_acquire_on_node_cb_sequential_consistency(handle, node, mode, callback, arg, 1);
  252. }
  253. int starpu_data_acquire_cb(starpu_data_handle_t handle,
  254. enum starpu_data_access_mode mode, void (*callback)(void *), void *arg)
  255. {
  256. int home_node = handle->home_node;
  257. if (home_node < 0)
  258. home_node = STARPU_MAIN_RAM;
  259. return starpu_data_acquire_on_node_cb(handle, home_node, mode, callback, arg);
  260. }
  261. int starpu_data_acquire_cb_sequential_consistency(starpu_data_handle_t handle,
  262. enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency)
  263. {
  264. int home_node = handle->home_node;
  265. if (home_node < 0)
  266. home_node = STARPU_MAIN_RAM;
  267. return starpu_data_acquire_on_node_cb_sequential_consistency(handle, home_node, mode, callback, arg, sequential_consistency);
  268. }
  269. /*
  270. * Blocking data request from application
  271. */
  272. static inline void _starpu_data_acquire_continuation(void *arg)
  273. {
  274. struct user_interaction_wrapper *wrapper = (struct user_interaction_wrapper *) arg;
  275. starpu_data_handle_t handle = wrapper->handle;
  276. STARPU_ASSERT(handle);
  277. _starpu_data_acquire_launch_fetch(wrapper, 0, NULL, NULL);
  278. _starpu_data_acquire_wrapper_finished(wrapper);
  279. }
  280. /* The data must be released by calling starpu_data_release later on */
  281. int starpu_data_acquire_on_node(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode)
  282. {
  283. STARPU_ASSERT(handle);
  284. STARPU_ASSERT_MSG(handle->nchildren == 0, "Acquiring a partitioned data is not possible");
  285. _STARPU_LOG_IN();
  286. /* unless asynchronous, it is forbidden to call this function from a callback or a codelet */
  287. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "Acquiring a data synchronously is not possible from a codelet or from a task callback, use starpu_data_acquire_cb instead.");
  288. /* Check that previous tasks have set a value if needed */
  289. _starpu_data_check_initialized(handle, mode);
  290. if (node >= 0 && _starpu_data_is_multiformat_handle(handle) &&
  291. _starpu_handle_needs_conversion_task(handle, node))
  292. {
  293. struct starpu_task *task = _starpu_create_conversion_task(handle, node);
  294. int ret;
  295. _starpu_spin_lock(&handle->header_lock);
  296. handle->refcnt--;
  297. handle->busy_count--;
  298. handle->mf_node = node;
  299. _starpu_spin_unlock(&handle->header_lock);
  300. task->synchronous = 1;
  301. ret = _starpu_task_submit_internally(task);
  302. STARPU_ASSERT(!ret);
  303. }
  304. struct user_interaction_wrapper wrapper;
  305. _starpu_data_acquire_wrapper_init(&wrapper, handle, node, mode);
  306. // _STARPU_DEBUG("TAKE sequential_consistency_mutex starpu_data_acquire\n");
  307. STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
  308. int sequential_consistency = handle->sequential_consistency;
  309. if (sequential_consistency)
  310. {
  311. struct starpu_task *new_task;
  312. int submit_pre_sync = 0;
  313. wrapper.pre_sync_task = starpu_task_create();
  314. wrapper.pre_sync_task->name = "_starpu_data_acquire_pre";
  315. wrapper.pre_sync_task->detach = 0;
  316. wrapper.pre_sync_task->type = STARPU_TASK_TYPE_DATA_ACQUIRE;
  317. wrapper.post_sync_task = starpu_task_create();
  318. wrapper.post_sync_task->name = "_starpu_data_acquire_post";
  319. wrapper.post_sync_task->detach = 1;
  320. wrapper.post_sync_task->type = STARPU_TASK_TYPE_DATA_ACQUIRE;
  321. new_task = _starpu_detect_implicit_data_deps_with_handle(wrapper.pre_sync_task, &submit_pre_sync, wrapper.post_sync_task, &_starpu_get_job_associated_to_task(wrapper.post_sync_task)->implicit_dep_slot, handle, mode, sequential_consistency);
  322. STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  323. if (STARPU_UNLIKELY(new_task))
  324. {
  325. int ret = _starpu_task_submit_internally(new_task);
  326. STARPU_ASSERT(!ret);
  327. }
  328. if (submit_pre_sync)
  329. {
  330. wrapper.pre_sync_task->synchronous = 1;
  331. int ret = _starpu_task_submit_internally(wrapper.pre_sync_task);
  332. STARPU_ASSERT(!ret);
  333. }
  334. else
  335. {
  336. wrapper.pre_sync_task->detach = 0;
  337. starpu_task_destroy(wrapper.pre_sync_task);
  338. }
  339. }
  340. else
  341. {
  342. STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  343. }
  344. /* we try to get the data, if we do not succeed immediately, we set a
  345. * callback function that will be executed automatically when the data is
  346. * available again, otherwise we fetch the data directly */
  347. if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _starpu_data_acquire_continuation, &wrapper))
  348. {
  349. /* no one has locked this data yet, so we proceed immediately */
  350. _starpu_data_acquire_launch_fetch(&wrapper, 0, NULL, NULL);
  351. }
  352. else
  353. {
  354. _starpu_data_acquire_wrapper_wait(&wrapper);
  355. }
  356. _starpu_data_acquire_wrapper_fini(&wrapper);
  357. /* At that moment, the caller holds a reference to the piece of data.
  358. * We enqueue the "post" sync task in the list associated to the handle
  359. * so that it is submitted by the starpu_data_release
  360. * function. */
  361. if (sequential_consistency)
  362. _starpu_add_post_sync_tasks(wrapper.post_sync_task, handle);
  363. _STARPU_LOG_OUT();
  364. return 0;
  365. }
  366. int starpu_data_acquire(starpu_data_handle_t handle, enum starpu_data_access_mode mode)
  367. {
  368. int home_node = handle->home_node;
  369. if (home_node < 0)
  370. home_node = STARPU_MAIN_RAM;
  371. return starpu_data_acquire_on_node(handle, home_node, mode);
  372. }
  373. int starpu_data_acquire_on_node_try(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode)
  374. {
  375. STARPU_ASSERT(handle);
  376. STARPU_ASSERT_MSG(handle->nchildren == 0, "Acquiring a partitioned data is not possible");
  377. /* it is forbidden to call this function from a callback or a codelet */
  378. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "Acquiring a data synchronously is not possible from a codelet or from a task callback, use starpu_data_acquire_cb instead.");
  379. /* Check that previous tasks have set a value if needed */
  380. _starpu_data_check_initialized(handle, mode);
  381. int ret;
  382. STARPU_ASSERT_MSG(!_starpu_data_is_multiformat_handle(handle), "not supported yet");
  383. STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
  384. ret = _starpu_test_implicit_data_deps_with_handle(handle, mode);
  385. STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  386. if (ret)
  387. return ret;
  388. struct user_interaction_wrapper wrapper;
  389. _starpu_data_acquire_wrapper_init(&wrapper, handle, node, mode);
  390. /* we try to get the data, if we do not succeed immediately, we set a
  391. * callback function that will be executed automatically when the data is
  392. * available again, otherwise we fetch the data directly */
  393. if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _starpu_data_acquire_continuation, &wrapper))
  394. {
  395. /* no one has locked this data yet, so we proceed immediately */
  396. _starpu_data_acquire_launch_fetch(&wrapper, 0, NULL, NULL);
  397. }
  398. else
  399. {
  400. _starpu_data_acquire_wrapper_wait(&wrapper);
  401. }
  402. _starpu_data_acquire_wrapper_fini(&wrapper);
  403. return 0;
  404. }
  405. int starpu_data_acquire_try(starpu_data_handle_t handle, enum starpu_data_access_mode mode)
  406. {
  407. return starpu_data_acquire_on_node_try(handle, STARPU_MAIN_RAM, mode);
  408. }
  409. /* This function must be called after starpu_data_acquire so that the
  410. * application release the data */
  411. void starpu_data_release_to_on_node(starpu_data_handle_t handle, enum starpu_data_access_mode mode, int node)
  412. {
  413. STARPU_ASSERT(handle);
  414. if (mode == STARPU_RW)
  415. /* They are equivalent here, and current_mode is never STARPU_RW */
  416. mode = STARPU_W;
  417. STARPU_ASSERT_MSG(mode == STARPU_NONE ||
  418. mode == handle->current_mode ||
  419. (mode == STARPU_R &&
  420. handle->current_mode == STARPU_W),
  421. "We only support releasing from W to R");
  422. /* In case there are some implicit dependencies, unlock the "post sync" tasks */
  423. _starpu_unlock_post_sync_tasks(handle, mode);
  424. /* The application can now release the rw-lock */
  425. if (node >= 0)
  426. _starpu_release_data_on_node(handle, 0, mode, &handle->per_node[node]);
  427. else
  428. {
  429. _starpu_spin_lock(&handle->header_lock);
  430. if (node == STARPU_ACQUIRE_NO_NODE_LOCK_ALL)
  431. {
  432. int i;
  433. for (i = 0; i < STARPU_MAXNODES; i++)
  434. handle->per_node[i].refcnt--;
  435. }
  436. handle->busy_count--;
  437. if (!_starpu_notify_data_dependencies(handle, mode))
  438. _starpu_spin_unlock(&handle->header_lock);
  439. }
  440. }
  441. void starpu_data_release_on_node(starpu_data_handle_t handle, int node)
  442. {
  443. starpu_data_release_to_on_node(handle, STARPU_NONE, node);
  444. }
  445. void starpu_data_release_to(starpu_data_handle_t handle, enum starpu_data_access_mode mode)
  446. {
  447. int home_node = handle->home_node;
  448. if (home_node < 0)
  449. home_node = STARPU_MAIN_RAM;
  450. starpu_data_release_to_on_node(handle, mode, home_node);
  451. }
  452. void starpu_data_release(starpu_data_handle_t handle)
  453. {
  454. starpu_data_release_to(handle, STARPU_NONE);
  455. }
  456. static void _prefetch_data_on_node(void *arg)
  457. {
  458. struct user_interaction_wrapper *wrapper = (struct user_interaction_wrapper *) arg;
  459. starpu_data_handle_t handle = wrapper->handle;
  460. _starpu_data_acquire_launch_fetch(wrapper, wrapper->async, NULL, NULL);
  461. if (wrapper->async)
  462. free(wrapper);
  463. else
  464. _starpu_data_acquire_wrapper_finished(wrapper);
  465. _starpu_spin_lock(&handle->header_lock);
  466. if (!_starpu_notify_data_dependencies(handle, STARPU_NONE))
  467. _starpu_spin_unlock(&handle->header_lock);
  468. }
  469. static
  470. int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle_t handle, unsigned node, unsigned async, enum starpu_data_access_mode mode, enum starpu_is_prefetch prefetch, int prio)
  471. {
  472. STARPU_ASSERT(handle);
  473. /* it is forbidden to call this function from a callback or a codelet */
  474. STARPU_ASSERT_MSG(async || _starpu_worker_may_perform_blocking_calls(), "Synchronous prefetch is not possible from a task or a callback");
  475. /* Check that previous tasks have set a value if needed */
  476. _starpu_data_check_initialized(handle, mode);
  477. struct user_interaction_wrapper *wrapper;
  478. _STARPU_MALLOC(wrapper, sizeof(*wrapper));
  479. _starpu_data_acquire_wrapper_init(wrapper, handle, node, STARPU_R);
  480. wrapper->detached = async;
  481. wrapper->prefetch = prefetch;
  482. wrapper->async = async;
  483. wrapper->prio = prio;
  484. if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _prefetch_data_on_node, wrapper))
  485. {
  486. /* we can immediately proceed */
  487. struct _starpu_data_replicate *replicate = &handle->per_node[node];
  488. _starpu_data_acquire_launch_fetch(wrapper, async, NULL, NULL);
  489. _starpu_data_acquire_wrapper_fini(wrapper);
  490. free(wrapper);
  491. /* remove the "lock"/reference */
  492. _starpu_spin_lock(&handle->header_lock);
  493. if (!async)
  494. {
  495. /* Release our refcnt, like _starpu_release_data_on_node would do */
  496. replicate->refcnt--;
  497. STARPU_ASSERT(replicate->refcnt >= 0);
  498. STARPU_ASSERT(handle->busy_count > 0);
  499. handle->busy_count--;
  500. }
  501. /* In case there was a temporary handle (eg. used for reduction), this
  502. * handle may have requested to be destroyed when the data is released
  503. * */
  504. if (!_starpu_notify_data_dependencies(handle, STARPU_NONE))
  505. _starpu_spin_unlock(&handle->header_lock);
  506. }
  507. else if (!async)
  508. {
  509. _starpu_data_acquire_wrapper_wait(wrapper);
  510. _starpu_data_acquire_wrapper_fini(wrapper);
  511. free(wrapper);
  512. }
  513. return 0;
  514. }
  515. int starpu_data_fetch_on_node(starpu_data_handle_t handle, unsigned node, unsigned async)
  516. {
  517. return _starpu_prefetch_data_on_node_with_mode(handle, node, async, STARPU_R, STARPU_FETCH, STARPU_DEFAULT_PRIO);
  518. }
  519. int starpu_data_prefetch_on_node_prio(starpu_data_handle_t handle, unsigned node, unsigned async, int prio)
  520. {
  521. return _starpu_prefetch_data_on_node_with_mode(handle, node, async, STARPU_R, STARPU_PREFETCH, prio);
  522. }
  523. int starpu_data_prefetch_on_node(starpu_data_handle_t handle, unsigned node, unsigned async)
  524. {
  525. return starpu_data_prefetch_on_node_prio(handle, node, async, STARPU_DEFAULT_PRIO);
  526. }
  527. int starpu_data_idle_prefetch_on_node_prio(starpu_data_handle_t handle, unsigned node, unsigned async, int prio)
  528. {
  529. return _starpu_prefetch_data_on_node_with_mode(handle, node, async, STARPU_R, STARPU_IDLEFETCH, prio);
  530. }
  531. int starpu_data_idle_prefetch_on_node(starpu_data_handle_t handle, unsigned node, unsigned async)
  532. {
  533. return starpu_data_idle_prefetch_on_node_prio(handle, node, async, STARPU_DEFAULT_PRIO);
  534. }
  535. static void _starpu_data_wont_use(void *data)
  536. {
  537. unsigned node;
  538. starpu_data_handle_t handle = data;
  539. _STARPU_TRACE_DATA_DOING_WONT_USE(handle);
  540. _starpu_spin_lock(&handle->header_lock);
  541. for (node = 0; node < STARPU_MAXNODES; node++)
  542. {
  543. struct _starpu_data_replicate *local = &handle->per_node[node];
  544. if (local->allocated && local->automatically_allocated)
  545. _starpu_memchunk_wont_use(local->mc, node);
  546. }
  547. if (handle->per_worker)
  548. {
  549. unsigned nworkers = starpu_worker_get_count();
  550. unsigned worker;
  551. for (worker = 0; worker < nworkers; worker++)
  552. {
  553. struct _starpu_data_replicate *local = &handle->per_worker[worker];
  554. if (local->allocated && local->automatically_allocated)
  555. _starpu_memchunk_wont_use(local->mc, starpu_worker_get_memory_node(worker));
  556. }
  557. }
  558. _starpu_spin_unlock(&handle->header_lock);
  559. starpu_data_release_on_node(handle, STARPU_ACQUIRE_NO_NODE_LOCK_ALL);
  560. if (handle->home_node != -1)
  561. starpu_data_idle_prefetch_on_node(handle, handle->home_node, 1);
  562. else
  563. {
  564. if (handle->ooc)
  565. {
  566. /* Try to push it to some disk */
  567. unsigned i;
  568. unsigned nnodes = starpu_memory_nodes_get_count();
  569. for (i = 0; i < nnodes; i++)
  570. {
  571. if (starpu_node_get_kind(i) == STARPU_DISK_RAM)
  572. starpu_data_idle_prefetch_on_node(handle, i, 1);
  573. }
  574. }
  575. }
  576. }
  577. void starpu_data_wont_use(starpu_data_handle_t handle)
  578. {
  579. if (!handle->initialized)
  580. /* No value atm actually */
  581. return;
  582. if (starpu_data_get_nb_children(handle) != 0)
  583. {
  584. int i;
  585. for(i=0 ; i<starpu_data_get_nb_children(handle) ; i++)
  586. starpu_data_wont_use(starpu_data_get_child(handle, i));
  587. return;
  588. }
  589. if (handle->partitioned != 0)
  590. {
  591. unsigned i;
  592. for(i=0 ; i<handle->partitioned; i++)
  593. {
  594. unsigned j;
  595. for(j=0 ; j<handle->active_readonly_nchildren[i] ; j++)
  596. starpu_data_wont_use(handle->active_readonly_children[i][j]);
  597. }
  598. }
  599. if (handle->active_nchildren != 0)
  600. {
  601. unsigned j;
  602. for(j=0 ; j<handle->active_nchildren ; j++)
  603. starpu_data_wont_use(handle->active_children[j]);
  604. return;
  605. }
  606. _STARPU_TRACE_DATA_WONT_USE(handle);
  607. starpu_data_acquire_on_node_cb_sequential_consistency_quick(handle, STARPU_ACQUIRE_NO_NODE_LOCK_ALL, STARPU_R, _starpu_data_wont_use, handle, 1, 1);
  608. }
  609. /*
  610. * It is possible to specify that a piece of data can be discarded without
  611. * impacting the application.
  612. */
  613. int _starpu_has_not_important_data;
  614. void starpu_data_advise_as_important(starpu_data_handle_t handle, unsigned is_important)
  615. {
  616. if (!is_important)
  617. _starpu_has_not_important_data = 1;
  618. _starpu_spin_lock(&handle->header_lock);
  619. /* first take all the children lock (in order !) */
  620. unsigned child;
  621. for (child = 0; child < handle->nchildren; child++)
  622. {
  623. /* make sure the intermediate children is advised as well */
  624. starpu_data_handle_t child_handle = starpu_data_get_child(handle, child);
  625. if (child_handle->nchildren > 0)
  626. starpu_data_advise_as_important(child_handle, is_important);
  627. }
  628. handle->is_not_important = !is_important;
  629. /* now the parent may be used again so we release the lock */
  630. _starpu_spin_unlock(&handle->header_lock);
  631. }
  632. void starpu_data_set_sequential_consistency_flag(starpu_data_handle_t handle, unsigned flag)
  633. {
  634. _starpu_spin_lock(&handle->header_lock);
  635. unsigned child;
  636. for (child = 0; child < handle->nchildren; child++)
  637. {
  638. /* make sure that the flags are applied to the children as well */
  639. starpu_data_handle_t child_handle = starpu_data_get_child(handle, child);
  640. if (child_handle->nchildren > 0)
  641. starpu_data_set_sequential_consistency_flag(child_handle, flag);
  642. }
  643. STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
  644. handle->sequential_consistency = flag;
  645. STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  646. _starpu_spin_unlock(&handle->header_lock);
  647. }
  648. unsigned starpu_data_get_sequential_consistency_flag(starpu_data_handle_t handle)
  649. {
  650. return handle->sequential_consistency;
  651. }
  652. void starpu_data_set_ooc_flag(starpu_data_handle_t handle, unsigned flag)
  653. {
  654. handle->ooc = flag;
  655. }
  656. unsigned starpu_data_get_ooc_flag(starpu_data_handle_t handle)
  657. {
  658. return handle->ooc;
  659. }
  660. /* By default, sequential consistency is enabled */
  661. static unsigned default_sequential_consistency_flag = 1;
  662. unsigned starpu_data_get_default_sequential_consistency_flag(void)
  663. {
  664. return default_sequential_consistency_flag;
  665. }
  666. void starpu_data_set_default_sequential_consistency_flag(unsigned flag)
  667. {
  668. default_sequential_consistency_flag = flag;
  669. }
  670. /* Query the status of the handle on the specified memory node. */
  671. void starpu_data_query_status2(starpu_data_handle_t handle, int memory_node, int *is_allocated, int *is_valid, int *is_loading, int *is_requested)
  672. {
  673. // XXX : this is just a hint, so we don't take the lock ...
  674. // _starpu_spin_lock(&handle->header_lock);
  675. if (is_allocated)
  676. *is_allocated = handle->per_node[memory_node].allocated;
  677. if (is_valid)
  678. *is_valid = (handle->per_node[memory_node].state != STARPU_INVALID);
  679. if (is_loading)
  680. *is_loading = handle->per_node[memory_node].load_request != NULL;
  681. if (is_requested)
  682. {
  683. int requested = 0;
  684. unsigned node;
  685. for (node = 0; node < STARPU_MAXNODES; node++)
  686. {
  687. if (handle->per_node[memory_node].request[node])
  688. {
  689. requested = 1;
  690. break;
  691. }
  692. }
  693. *is_requested = requested;
  694. }
  695. // _starpu_spin_unlock(&handle->header_lock);
  696. }
  697. void starpu_data_query_status(starpu_data_handle_t handle, int memory_node, int *is_allocated, int *is_valid, int *is_requested)
  698. {
  699. return starpu_data_query_status2(handle, memory_node, is_allocated, is_valid, NULL, is_requested);
  700. }