implicit_data_deps.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2012 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011 Centre National de la Recherche Scientifique
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <starpu.h>
  18. #include <common/config.h>
  19. #include <core/task.h>
  20. #include <datawizard/datawizard.h>
  21. #include <profiling/bound.h>
  22. #if 0
  23. # define _STARPU_DEP_DEBUG(fmt, args ...) fprintf(stderr, fmt, ##args);
  24. #else
  25. # define _STARPU_DEP_DEBUG(fmt, args ...)
  26. #endif
  27. /* Read after Write (RAW) or Read after Read (RAR) */
  28. static void _starpu_add_reader_after_writer(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task)
  29. {
  30. /* Add this task to the list of readers */
  31. struct _starpu_task_wrapper_list *link = (struct _starpu_task_wrapper_list *) malloc(sizeof(struct _starpu_task_wrapper_list));
  32. link->task = post_sync_task;
  33. link->next = handle->last_submitted_readers;
  34. handle->last_submitted_readers = link;
  35. /* This task depends on the previous writer if any */
  36. if (handle->last_submitted_writer && handle->last_submitted_writer != post_sync_task)
  37. {
  38. _STARPU_DEP_DEBUG("RAW %p\n", handle);
  39. struct starpu_task *task_array[1] = {handle->last_submitted_writer};
  40. _STARPU_DEP_DEBUG("dep %p -> %p\n", handle->last_submitted_writer, pre_sync_task);
  41. _starpu_task_declare_deps_array(pre_sync_task, 1, task_array, 0);
  42. }
  43. else
  44. {
  45. _STARPU_DEP_DEBUG("No dep\n");
  46. }
  47. /* There was perhaps no last submitted writer but a
  48. * ghost one, we should report that here, and keep the
  49. * ghost writer valid */
  50. if (
  51. #ifndef STARPU_USE_FXT
  52. _starpu_bound_recording &&
  53. #endif
  54. handle->last_submitted_ghost_writer_id_is_valid)
  55. {
  56. struct _starpu_job *pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
  57. _STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id, pre_sync_job->job_id);
  58. _starpu_bound_job_id_dep(pre_sync_job, handle->last_submitted_ghost_writer_id);
  59. _STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_writer_id, pre_sync_task);
  60. }
  61. if (!pre_sync_task->cl)
  62. _starpu_get_job_associated_to_task(pre_sync_task)->implicit_dep_handle = handle;
  63. }
  64. /* Write after Read (WAR) */
  65. static void _starpu_add_writer_after_readers(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task)
  66. {
  67. /* Count the readers */
  68. unsigned nreaders = 0;
  69. struct _starpu_task_wrapper_list *l;
  70. l = handle->last_submitted_readers;
  71. while (l)
  72. {
  73. if (l->task != post_sync_task)
  74. nreaders++;
  75. l = l->next;
  76. }
  77. _STARPU_DEP_DEBUG("%d readers\n", nreaders);
  78. if (nreaders > 0)
  79. {
  80. /* Put all tasks in the list into task_array */
  81. struct starpu_task *task_array[nreaders];
  82. unsigned i = 0;
  83. l = handle->last_submitted_readers;
  84. while (l)
  85. {
  86. STARPU_ASSERT(l->task);
  87. if (l->task != post_sync_task) {
  88. task_array[i++] = l->task;
  89. _STARPU_DEP_DEBUG("dep %p -> %p\n", l->task, pre_sync_task);
  90. }
  91. struct _starpu_task_wrapper_list *prev = l;
  92. l = l->next;
  93. free(prev);
  94. }
  95. _starpu_task_declare_deps_array(pre_sync_task, nreaders, task_array, 0);
  96. }
  97. #ifndef STARPU_USE_FXT
  98. if (_starpu_bound_recording)
  99. #endif
  100. {
  101. /* Declare all dependencies with ghost readers */
  102. struct _starpu_job *pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
  103. struct _starpu_jobid_list *ghost_readers_id = handle->last_submitted_ghost_readers_id;
  104. while (ghost_readers_id)
  105. {
  106. unsigned long id = ghost_readers_id->id;
  107. _STARPU_TRACE_GHOST_TASK_DEPS(id, pre_sync_job->job_id);
  108. _starpu_bound_job_id_dep(pre_sync_job, id);
  109. _STARPU_DEP_DEBUG("dep ID%lu -> %p\n", id, pre_sync_task);
  110. struct _starpu_jobid_list *prev = ghost_readers_id;
  111. ghost_readers_id = ghost_readers_id->next;
  112. free(prev);
  113. }
  114. handle->last_submitted_ghost_readers_id = NULL;
  115. }
  116. handle->last_submitted_readers = NULL;
  117. handle->last_submitted_writer = post_sync_task;
  118. if (!post_sync_task->cl)
  119. _starpu_get_job_associated_to_task(post_sync_task)->implicit_dep_handle = handle;
  120. }
  121. /* Write after Write (WAW) */
  122. static void _starpu_add_writer_after_writer(starpu_data_handle_t handle, struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task)
  123. {
  124. /* (Read) Write */
  125. /* This task depends on the previous writer */
  126. if (handle->last_submitted_writer && handle->last_submitted_writer != post_sync_task)
  127. {
  128. struct starpu_task *task_array[1] = {handle->last_submitted_writer};
  129. _starpu_task_declare_deps_array(pre_sync_task, 1, task_array, 0);
  130. _STARPU_DEP_DEBUG("dep %p -> %p\n", handle->last_submitted_writer, pre_sync_task);
  131. }
  132. else
  133. {
  134. _STARPU_DEP_DEBUG("No dep\n");
  135. }
  136. /* If there is a ghost writer instead, we
  137. * should declare a ghost dependency here, and
  138. * invalidate the ghost value. */
  139. #ifndef STARPU_USE_FXT
  140. if (_starpu_bound_recording)
  141. #endif
  142. {
  143. if (handle->last_submitted_ghost_writer_id_is_valid)
  144. {
  145. struct _starpu_job *pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
  146. _STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id, pre_sync_job->job_id);
  147. _starpu_bound_job_id_dep(pre_sync_job, handle->last_submitted_ghost_writer_id);
  148. _STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_writer_id, pre_sync_task);
  149. handle->last_submitted_ghost_writer_id_is_valid = 0;
  150. }
  151. else
  152. {
  153. _STARPU_DEP_DEBUG("No dep ID\n");
  154. }
  155. }
  156. handle->last_submitted_writer = post_sync_task;
  157. if (!post_sync_task->cl)
  158. _starpu_get_job_associated_to_task(post_sync_task)->implicit_dep_handle = handle;
  159. }
  160. /* This function adds the implicit task dependencies introduced by data
  161. * sequential consistency. Two tasks are provided: pre_sync and post_sync which
  162. * respectively indicates which task is going to depend on the previous deps
  163. * and on which task future deps should wait. In the case of a dependency
  164. * introduced by a task submission, both tasks are just the submitted task, but
  165. * in the case of user interactions with the DSM, these may be different tasks.
  166. * */
  167. /* NB : handle->sequential_consistency_mutex must be hold by the caller;
  168. * returns a task, to be submitted after releasing that mutex. */
  169. struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task,
  170. starpu_data_handle_t handle, enum starpu_access_mode mode)
  171. {
  172. struct starpu_task *task = NULL;
  173. STARPU_ASSERT(!(mode & STARPU_SCRATCH));
  174. _STARPU_LOG_IN();
  175. if (handle->sequential_consistency)
  176. {
  177. struct _starpu_job *pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
  178. struct _starpu_job *post_sync_job = _starpu_get_job_associated_to_task(post_sync_task);
  179. /* Skip tasks that are associated to a reduction phase so that
  180. * they do not interfere with the application. */
  181. if (pre_sync_job->reduction_task || post_sync_job->reduction_task)
  182. return NULL;
  183. _STARPU_DEP_DEBUG("Tasks %p %p\n", pre_sync_task, post_sync_task);
  184. /* In case we are generating the DAG, we add an implicit
  185. * dependency between the pre and the post sync tasks in case
  186. * they are not the same. */
  187. if (pre_sync_task != post_sync_task
  188. #ifndef STARPU_USE_FXT
  189. && _starpu_bound_recording
  190. #endif
  191. )
  192. {
  193. _STARPU_TRACE_GHOST_TASK_DEPS(pre_sync_job->job_id, post_sync_job->job_id);
  194. _starpu_bound_task_dep(post_sync_job, pre_sync_job);
  195. }
  196. enum starpu_access_mode previous_mode = handle->last_submitted_mode;
  197. if (mode & STARPU_W)
  198. {
  199. _STARPU_DEP_DEBUG("W %p\n", handle);
  200. if (previous_mode & STARPU_W)
  201. {
  202. _STARPU_DEP_DEBUG("WAW %p\n", handle);
  203. _starpu_add_writer_after_writer(handle, pre_sync_task, post_sync_task);
  204. }
  205. else
  206. {
  207. /* The task submitted previously were in read-only
  208. * mode: this task must depend on all those read-only
  209. * tasks and we get rid of the list of readers */
  210. _STARPU_DEP_DEBUG("WAR %p\n", handle);
  211. _starpu_add_writer_after_readers(handle, pre_sync_task, post_sync_task);
  212. }
  213. }
  214. else
  215. {
  216. _STARPU_DEP_DEBUG("R %p %d -> %d\n", handle, previous_mode, mode);
  217. /* Add a reader, after a writer or a reader. */
  218. STARPU_ASSERT(pre_sync_task);
  219. STARPU_ASSERT(post_sync_task);
  220. STARPU_ASSERT(mode & (STARPU_R|STARPU_REDUX));
  221. if (!(previous_mode & STARPU_W) && (mode != previous_mode))
  222. {
  223. /* Read after Redux or Redux after Read: we
  224. * insert a dummy synchronization task so that
  225. * we don't need to have a gigantic number of
  226. * dependencies between all readers and all
  227. * redux tasks. */
  228. /* Create an empty task */
  229. struct starpu_task *new_sync_task;
  230. new_sync_task = starpu_task_create();
  231. STARPU_ASSERT(new_sync_task);
  232. new_sync_task->cl = NULL;
  233. #ifdef STARPU_USE_FXT
  234. _starpu_get_job_associated_to_task(new_sync_task)->model_name = "sync_task_redux";
  235. #endif
  236. _starpu_add_writer_after_readers(handle, new_sync_task, new_sync_task);
  237. task = new_sync_task;
  238. }
  239. _starpu_add_reader_after_writer(handle, pre_sync_task, post_sync_task);
  240. }
  241. handle->last_submitted_mode = mode;
  242. }
  243. _STARPU_LOG_OUT();
  244. return task;
  245. }
  246. /* Create the implicit dependencies for a newly submitted task */
  247. void _starpu_detect_implicit_data_deps(struct starpu_task *task)
  248. {
  249. STARPU_ASSERT(task->cl);
  250. _STARPU_LOG_IN();
  251. /* We don't want to enforce a sequential consistency for tasks that are
  252. * not visible to the application. */
  253. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  254. if (j->reduction_task)
  255. return;
  256. unsigned nbuffers = task->cl->nbuffers;
  257. unsigned buffer;
  258. for (buffer = 0; buffer < nbuffers; buffer++)
  259. {
  260. starpu_data_handle_t handle = task->handles[buffer];
  261. enum starpu_access_mode mode = task->cl->modes[buffer];
  262. struct starpu_task *new_task;
  263. /* Scratch memory does not introduce any deps */
  264. if (mode & STARPU_SCRATCH)
  265. continue;
  266. _STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
  267. new_task = _starpu_detect_implicit_data_deps_with_handle(task, task, handle, mode);
  268. _STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  269. if (new_task) {
  270. int ret = starpu_task_submit_internal(new_task);
  271. STARPU_ASSERT(!ret);
  272. }
  273. }
  274. _STARPU_LOG_OUT();
  275. }
  276. /* This function is called when a task has been executed so that we don't
  277. * create dependencies to task that do not exist anymore. */
  278. /* NB: We maintain a list of "ghost deps" in case FXT is enabled. Ghost
  279. * dependencies are the dependencies that are implicitely enforced by StarPU
  280. * even if they do not imply a real dependency. For instance in the following
  281. * sequence, f(Ar) g(Ar) h(Aw), we expect to have h depend on both f and g, but
  282. * if h is submitted after the termination of f or g, StarPU will not create a
  283. * dependency as this is not needed anymore. */
  284. /* the sequential_consistency_mutex of the handle has to be already held */
  285. void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, starpu_data_handle_t handle)
  286. {
  287. _STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
  288. if (handle->sequential_consistency)
  289. {
  290. /* If this is the last writer, there is no point in adding
  291. * extra deps to that tasks that does not exists anymore */
  292. if (task == handle->last_submitted_writer)
  293. {
  294. handle->last_submitted_writer = NULL;
  295. #ifndef STARPU_USE_FXT
  296. if (_starpu_bound_recording)
  297. #endif
  298. {
  299. /* Save the previous writer as the ghost last writer */
  300. handle->last_submitted_ghost_writer_id_is_valid = 1;
  301. struct _starpu_job *ghost_job = _starpu_get_job_associated_to_task(task);
  302. handle->last_submitted_ghost_writer_id = ghost_job->job_id;
  303. }
  304. }
  305. /* XXX can a task be both the last writer associated to a data
  306. * and be in its list of readers ? If not, we should not go
  307. * through the entire list once we have detected it was the
  308. * last writer. */
  309. /* Same if this is one of the readers: we go through the list
  310. * of readers and remove the task if it is found. */
  311. struct _starpu_task_wrapper_list *l;
  312. l = handle->last_submitted_readers;
  313. struct _starpu_task_wrapper_list *prev = NULL;
  314. #ifdef STARPU_DEVEL
  315. #warning TODO: use double-linked list to make finding ourself fast
  316. #endif
  317. while (l)
  318. {
  319. struct _starpu_task_wrapper_list *next = l->next;
  320. if (l->task == task)
  321. {
  322. /* If we found the task in the reader list */
  323. free(l);
  324. #ifndef STARPU_USE_FXT
  325. if (_starpu_bound_recording)
  326. #endif
  327. {
  328. /* Save the job id of the reader task in the ghost reader linked list list */
  329. struct _starpu_job *ghost_reader_job = _starpu_get_job_associated_to_task(task);
  330. struct _starpu_jobid_list *link = (struct _starpu_jobid_list *) malloc(sizeof(struct _starpu_jobid_list));
  331. STARPU_ASSERT(link);
  332. link->next = handle->last_submitted_ghost_readers_id;
  333. link->id = ghost_reader_job->job_id;
  334. handle->last_submitted_ghost_readers_id = link;
  335. }
  336. if (prev)
  337. {
  338. prev->next = next;
  339. }
  340. else
  341. {
  342. /* This is the first element of the list */
  343. handle->last_submitted_readers = next;
  344. }
  345. /* XXX can we really find the same task again
  346. * once we have found it ? Otherwise, we should
  347. * avoid going through the entire list and stop
  348. * as soon as we find the task. TODO: check how
  349. * duplicate dependencies are treated. */
  350. }
  351. else
  352. {
  353. prev = l;
  354. }
  355. l = next;
  356. }
  357. }
  358. _STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  359. }
  360. void _starpu_add_post_sync_tasks(struct starpu_task *post_sync_task, starpu_data_handle_t handle)
  361. {
  362. _STARPU_LOG_IN();
  363. _STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
  364. if (handle->sequential_consistency)
  365. {
  366. handle->post_sync_tasks_cnt++;
  367. struct _starpu_task_wrapper_list *link = (struct _starpu_task_wrapper_list *) malloc(sizeof(struct _starpu_task_wrapper_list));
  368. link->task = post_sync_task;
  369. link->next = handle->post_sync_tasks;
  370. handle->post_sync_tasks = link;
  371. }
  372. _STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  373. _STARPU_LOG_OUT();
  374. }
  375. void _starpu_unlock_post_sync_tasks(starpu_data_handle_t handle)
  376. {
  377. struct _starpu_task_wrapper_list *post_sync_tasks = NULL;
  378. unsigned do_submit_tasks = 0;
  379. _STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
  380. if (handle->sequential_consistency)
  381. {
  382. STARPU_ASSERT(handle->post_sync_tasks_cnt > 0);
  383. if (--handle->post_sync_tasks_cnt == 0)
  384. {
  385. /* unlock all tasks : we need not hold the lock while unlocking all these tasks */
  386. do_submit_tasks = 1;
  387. post_sync_tasks = handle->post_sync_tasks;
  388. handle->post_sync_tasks = NULL;
  389. }
  390. }
  391. _STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  392. if (do_submit_tasks)
  393. {
  394. struct _starpu_task_wrapper_list *link = post_sync_tasks;
  395. while (link)
  396. {
  397. /* There is no need to depend on that task now, since it was already unlocked */
  398. _starpu_release_data_enforce_sequential_consistency(link->task, handle);
  399. int ret = starpu_task_submit_internal(link->task);
  400. STARPU_ASSERT(!ret);
  401. struct _starpu_task_wrapper_list *tmp = link;
  402. link = link->next;
  403. free(tmp);
  404. }
  405. }
  406. }
  407. /* If sequential consistency mode is enabled, this function blocks until the
  408. * handle is available in the requested access mode. */
  409. int _starpu_data_wait_until_available(starpu_data_handle_t handle, enum starpu_access_mode mode)
  410. {
  411. /* If sequential consistency is enabled, wait until data is available */
  412. _STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
  413. int sequential_consistency = handle->sequential_consistency;
  414. if (sequential_consistency)
  415. {
  416. struct starpu_task *sync_task, *new_task;
  417. sync_task = starpu_task_create();
  418. sync_task->detach = 0;
  419. sync_task->destroy = 1;
  420. #ifdef STARPU_USE_FXT
  421. _starpu_get_job_associated_to_task(sync_task)->model_name = "sync_task";
  422. #endif
  423. /* It is not really a RW access, but we want to make sure that
  424. * all previous accesses are done */
  425. new_task = _starpu_detect_implicit_data_deps_with_handle(sync_task, sync_task, handle, mode);
  426. _STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  427. if (new_task) {
  428. int ret = starpu_task_submit_internal(new_task);
  429. STARPU_ASSERT(!ret);
  430. }
  431. /* TODO detect if this is superflous */
  432. int ret = starpu_task_submit_internal(sync_task);
  433. STARPU_ASSERT(!ret);
  434. starpu_task_wait(sync_task);
  435. }
  436. else
  437. {
  438. _STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  439. }
  440. return 0;
  441. }