implicit_data_deps.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. /*
  2. * StarPU
  3. * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <starpu.h>
  17. #include <common/config.h>
  18. #include <core/task.h>
  19. #include <datawizard/datawizard.h>
  20. #include <profiling/bound.h>
  21. #if 0
  22. # define _STARPU_DEP_DEBUG(fmt, args ...) fprintf(stderr, fmt, ##args);
  23. #else
  24. # define _STARPU_DEP_DEBUG(fmt, args ...) 0
  25. #endif
  26. /* This function adds the implicit task dependencies introduced by data
  27. * sequential consistency. Two tasks are provided: pre_sync and post_sync which
  28. * respectively indicates which task is going to depend on the previous deps
  29. * and on which task future deps should wait. In the case of a dependency
  30. * introduced by a task submission, both tasks are just the submitted task, but
  31. * in the case of user interactions with the DSM, these may be different tasks.
  32. * */
  33. /* NB : handle->sequential_consistency_mutex must be hold by the caller */
  34. void _starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task,
  35. starpu_data_handle handle, starpu_access_mode mode)
  36. {
  37. STARPU_ASSERT(!(mode & STARPU_SCRATCH));
  38. if (handle->sequential_consistency)
  39. {
  40. _STARPU_DEP_DEBUG("Tasks %p %p\n", pre_sync_task, post_sync_task);
  41. /* In case we are generating the DAG, we add an implicit
  42. * dependency between the pre and the post sync tasks in case
  43. * they are not the same. */
  44. if (pre_sync_task != post_sync_task
  45. #ifndef STARPU_USE_FXT
  46. && _starpu_bound_recording
  47. #endif
  48. )
  49. {
  50. starpu_job_t pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
  51. starpu_job_t post_sync_job = _starpu_get_job_associated_to_task(post_sync_task);
  52. STARPU_TRACE_GHOST_TASK_DEPS(pre_sync_job->job_id, post_sync_job->job_id);
  53. _starpu_bound_task_dep(post_sync_job, pre_sync_job);
  54. }
  55. starpu_access_mode previous_mode = handle->last_submitted_mode;
  56. if (mode & STARPU_W)
  57. {
  58. _STARPU_DEP_DEBUG("W %p\n", handle);
  59. if (previous_mode & STARPU_W)
  60. {
  61. _STARPU_DEP_DEBUG("WAW %p\n", handle);
  62. /* (Read) Write */
  63. /* This task depends on the previous writer */
  64. if (handle->last_submitted_writer)
  65. {
  66. struct starpu_task *task_array[1] = {handle->last_submitted_writer};
  67. starpu_task_declare_deps_array(pre_sync_task, 1, task_array);
  68. _STARPU_DEP_DEBUG("dep %p -> %p\n", handle->last_submitted_writer, pre_sync_task);
  69. } else
  70. _STARPU_DEP_DEBUG("No dep\n");
  71. /* If there is a ghost writer instead, we
  72. * should declare a ghost dependency here, and
  73. * invalidate the ghost value. */
  74. #ifndef STARPU_USE_FXT
  75. if (_starpu_bound_recording)
  76. #endif
  77. {
  78. if (handle->last_submitted_ghost_writer_id_is_valid)
  79. {
  80. starpu_job_t pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
  81. STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id, pre_sync_job->job_id);
  82. _starpu_bound_job_id_dep(pre_sync_job, handle->last_submitted_ghost_writer_id);
  83. _STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_writer_id, pre_sync_task);
  84. handle->last_submitted_ghost_writer_id_is_valid = 0;
  85. } else
  86. _STARPU_DEP_DEBUG("No dep ID\n");
  87. }
  88. handle->last_submitted_writer = post_sync_task;
  89. }
  90. else {
  91. /* The task submitted previously were in read-only
  92. * mode: this task must depend on all those read-only
  93. * tasks and we get rid of the list of readers */
  94. _STARPU_DEP_DEBUG("WAR %p\n", handle);
  95. /* Count the readers */
  96. unsigned nreaders = 0;
  97. struct starpu_task_wrapper_list *l;
  98. l = handle->last_submitted_readers;
  99. while (l)
  100. {
  101. nreaders++;
  102. l = l->next;
  103. }
  104. _STARPU_DEP_DEBUG("%d readers\n", nreaders);
  105. struct starpu_task *task_array[nreaders];
  106. unsigned i = 0;
  107. l = handle->last_submitted_readers;
  108. while (l)
  109. {
  110. STARPU_ASSERT(l->task);
  111. task_array[i++] = l->task;
  112. _STARPU_DEP_DEBUG("dep %p -> %p\n", l->task, pre_sync_task);
  113. struct starpu_task_wrapper_list *prev = l;
  114. l = l->next;
  115. free(prev);
  116. }
  117. #ifndef STARPU_USE_FXT
  118. if (_starpu_bound_recording)
  119. #endif
  120. {
  121. /* Declare all dependencies with ghost readers */
  122. starpu_job_t pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
  123. struct starpu_jobid_list *ghost_readers_id = handle->last_submitted_ghost_readers_id;
  124. while (ghost_readers_id)
  125. {
  126. unsigned long id = ghost_readers_id->id;
  127. STARPU_TRACE_GHOST_TASK_DEPS(id, pre_sync_job->job_id);
  128. _starpu_bound_job_id_dep(pre_sync_job, id);
  129. _STARPU_DEP_DEBUG("dep ID%lu -> %p\n", id, pre_sync_task);
  130. struct starpu_jobid_list *prev = ghost_readers_id;
  131. ghost_readers_id = ghost_readers_id->next;
  132. free(prev);
  133. }
  134. handle->last_submitted_ghost_readers_id = NULL;
  135. }
  136. handle->last_submitted_readers = NULL;
  137. handle->last_submitted_writer = post_sync_task;
  138. starpu_task_declare_deps_array(pre_sync_task, nreaders, task_array);
  139. }
  140. }
  141. else {
  142. _STARPU_DEP_DEBUG("R %p\n", handle);
  143. /* Add a reader */
  144. STARPU_ASSERT(pre_sync_task);
  145. STARPU_ASSERT(post_sync_task);
  146. /* Add this task to the list of readers */
  147. struct starpu_task_wrapper_list *link = malloc(sizeof(struct starpu_task_wrapper_list));
  148. link->task = post_sync_task;
  149. link->next = handle->last_submitted_readers;
  150. handle->last_submitted_readers = link;
  151. /* This task depends on the previous writer if any */
  152. if (handle->last_submitted_writer)
  153. {
  154. _STARPU_DEP_DEBUG("RAW %p\n", handle);
  155. struct starpu_task *task_array[1] = {handle->last_submitted_writer};
  156. _STARPU_DEP_DEBUG("dep %p -> %p\n", handle->last_submitted_writer, pre_sync_task);
  157. starpu_task_declare_deps_array(pre_sync_task, 1, task_array);
  158. } else
  159. _STARPU_DEP_DEBUG("No dep\n");
  160. /* There was perhaps no last submitted writer but a
  161. * ghost one, we should report that here, and keep the
  162. * ghost writer valid */
  163. if (
  164. #ifndef STARPU_USE_FXT
  165. _starpu_bound_recording &&
  166. #endif
  167. handle->last_submitted_ghost_writer_id_is_valid)
  168. {
  169. starpu_job_t pre_sync_job = _starpu_get_job_associated_to_task(pre_sync_task);
  170. STARPU_TRACE_GHOST_TASK_DEPS(handle->last_submitted_ghost_writer_id, pre_sync_job->job_id);
  171. _starpu_bound_job_id_dep(pre_sync_job, handle->last_submitted_ghost_writer_id);
  172. _STARPU_DEP_DEBUG("dep ID%lu -> %p\n", handle->last_submitted_ghost_writer_id, pre_sync_task);
  173. }
  174. }
  175. handle->last_submitted_mode = mode;
  176. }
  177. }
  178. /* Create the implicit dependencies for a newly submitted task */
  179. void _starpu_detect_implicit_data_deps(struct starpu_task *task)
  180. {
  181. STARPU_ASSERT(task->cl);
  182. unsigned nbuffers = task->cl->nbuffers;
  183. unsigned buffer;
  184. for (buffer = 0; buffer < nbuffers; buffer++)
  185. {
  186. starpu_data_handle handle = task->buffers[buffer].handle;
  187. starpu_access_mode mode = task->buffers[buffer].mode;
  188. /* Scratch memory does not introduce any deps */
  189. if (mode & STARPU_SCRATCH)
  190. continue;
  191. PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
  192. _starpu_detect_implicit_data_deps_with_handle(task, task, handle, mode);
  193. PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  194. }
  195. }
  196. /* This function is called when a task has been executed so that we don't
  197. * create dependencies to task that do not exist anymore. */
  198. /* NB: We maintain a list of "ghost deps" in case FXT is enabled. Ghost
  199. * dependencies are the dependencies that are implicitely enforced by StarPU
  200. * even if they do not imply a real dependency. For instance in the following
  201. * sequence, f(Ar) g(Ar) h(Aw), we expect to have h depend on both f and g, but
  202. * if h is submitted after the termination of f or g, StarPU will not create a
  203. * dependency as this is not needed anymore. */
  204. void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, starpu_data_handle handle)
  205. {
  206. PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
  207. if (handle->sequential_consistency)
  208. {
  209. /* If this is the last writer, there is no point in adding
  210. * extra deps to that tasks that does not exists anymore */
  211. if (task == handle->last_submitted_writer)
  212. {
  213. handle->last_submitted_writer = NULL;
  214. #ifndef STARPU_USE_FXT
  215. if (_starpu_bound_recording)
  216. #endif
  217. {
  218. /* Save the previous writer as the ghost last writer */
  219. handle->last_submitted_ghost_writer_id_is_valid = 1;
  220. starpu_job_t ghost_job = _starpu_get_job_associated_to_task(task);
  221. handle->last_submitted_ghost_writer_id = ghost_job->job_id;
  222. }
  223. }
  224. /* XXX can a task be both the last writer associated to a data
  225. * and be in its list of readers ? If not, we should not go
  226. * through the entire list once we have detected it was the
  227. * last writer. */
  228. /* Same if this is one of the readers: we go through the list
  229. * of readers and remove the task if it is found. */
  230. struct starpu_task_wrapper_list *l;
  231. l = handle->last_submitted_readers;
  232. struct starpu_task_wrapper_list *prev = NULL;
  233. while (l)
  234. {
  235. struct starpu_task_wrapper_list *next = l->next;
  236. if (l->task == task)
  237. {
  238. /* If we found the task in the reader list */
  239. free(l);
  240. #ifndef STARPU_USE_FXT
  241. if (_starpu_bound_recording)
  242. #endif
  243. {
  244. /* Save the job id of the reader task in the ghost reader linked list list */
  245. starpu_job_t ghost_reader_job = _starpu_get_job_associated_to_task(task);
  246. struct starpu_jobid_list *link = malloc(sizeof(struct starpu_jobid_list));
  247. STARPU_ASSERT(link);
  248. link->next = handle->last_submitted_ghost_readers_id;
  249. link->id = ghost_reader_job->job_id;
  250. handle->last_submitted_ghost_readers_id = link;
  251. }
  252. if (prev)
  253. {
  254. prev->next = next;
  255. }
  256. else {
  257. /* This is the first element of the list */
  258. handle->last_submitted_readers = next;
  259. }
  260. /* XXX can we really find the same task again
  261. * once we have found it ? Otherwise, we should
  262. * avoid going through the entire list and stop
  263. * as soon as we find the task. TODO: check how
  264. * duplicate dependencies are treated. */
  265. }
  266. else {
  267. prev = l;
  268. }
  269. l = next;
  270. }
  271. }
  272. PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  273. }
  274. void _starpu_add_post_sync_tasks(struct starpu_task *post_sync_task, starpu_data_handle handle)
  275. {
  276. PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
  277. if (handle->sequential_consistency)
  278. {
  279. handle->post_sync_tasks_cnt++;
  280. struct starpu_task_wrapper_list *link = malloc(sizeof(struct starpu_task_wrapper_list));
  281. link->task = post_sync_task;
  282. link->next = handle->post_sync_tasks;
  283. handle->post_sync_tasks = link;
  284. }
  285. PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  286. }
  287. void _starpu_unlock_post_sync_tasks(starpu_data_handle handle)
  288. {
  289. struct starpu_task_wrapper_list *post_sync_tasks = NULL;
  290. unsigned do_submit_tasks = 0;
  291. PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
  292. if (handle->sequential_consistency)
  293. {
  294. STARPU_ASSERT(handle->post_sync_tasks_cnt > 0);
  295. if (--handle->post_sync_tasks_cnt == 0)
  296. {
  297. /* unlock all tasks : we need not hold the lock while unlocking all these tasks */
  298. do_submit_tasks = 1;
  299. post_sync_tasks = handle->post_sync_tasks;
  300. handle->post_sync_tasks = NULL;
  301. }
  302. }
  303. PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  304. if (do_submit_tasks)
  305. {
  306. struct starpu_task_wrapper_list *link = post_sync_tasks;
  307. while (link) {
  308. /* There is no need to depend on that task now, since it was already unlocked */
  309. _starpu_release_data_enforce_sequential_consistency(link->task, handle);
  310. int ret = starpu_task_submit(link->task);
  311. STARPU_ASSERT(!ret);
  312. link = link->next;
  313. }
  314. }
  315. }
  316. /* If sequential consistency mode is enabled, this function blocks until the
  317. * handle is available in the requested access mode. */
  318. int _starpu_data_wait_until_available(starpu_data_handle handle, starpu_access_mode mode)
  319. {
  320. /* If sequential consistency is enabled, wait until data is available */
  321. PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
  322. int sequential_consistency = handle->sequential_consistency;
  323. if (sequential_consistency)
  324. {
  325. struct starpu_task *sync_task;
  326. sync_task = starpu_task_create();
  327. sync_task->detach = 0;
  328. sync_task->destroy = 1;
  329. /* It is not really a RW access, but we want to make sure that
  330. * all previous accesses are done */
  331. _starpu_detect_implicit_data_deps_with_handle(sync_task, sync_task, handle, mode);
  332. PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  333. /* TODO detect if this is superflous */
  334. int ret = starpu_task_submit(sync_task);
  335. STARPU_ASSERT(!ret);
  336. starpu_task_wait(sync_task);
  337. }
  338. else {
  339. PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
  340. }
  341. return 0;
  342. }