jobs.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009-2013 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011, 2012, 2013 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 Télécom-SudParis
  6. * Copyright (C) 2011 INRIA
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. #include <starpu.h>
  20. #include <core/jobs.h>
  21. #include <core/task.h>
  22. #include <core/workers.h>
  23. #include <core/dependencies/data_concurrency.h>
  24. #include <common/config.h>
  25. #include <common/utils.h>
  26. #include <profiling/profiling.h>
  27. #include <profiling/bound.h>
  28. #include <starpu_top.h>
  29. #include <top/starpu_top_core.h>
  30. #include <core/debug.h>
  31. /* we need to identify each task to generate the DAG. */
  32. static unsigned job_cnt = 0;
  33. void _starpu_exclude_task_from_dag(struct starpu_task *task)
  34. {
  35. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  36. j->exclude_from_dag = 1;
  37. }
  38. /* create an internal struct _starpu_job structure to encapsulate the task */
  39. struct _starpu_job* __attribute__((malloc)) _starpu_job_create(struct starpu_task *task)
  40. {
  41. struct _starpu_job *job;
  42. _STARPU_LOG_IN();
  43. job = _starpu_job_new();
  44. /* As most of the fields must be initialized at NULL, let's put 0
  45. * everywhere */
  46. memset(job, 0, sizeof(*job));
  47. job->task = task;
  48. #ifndef STARPU_USE_FXT
  49. if (_starpu_bound_recording || _starpu_top_status_get()
  50. #ifdef HAVE_AYUDAME_H
  51. || AYU_event
  52. #endif
  53. )
  54. #endif
  55. {
  56. job->job_id = STARPU_ATOMIC_ADD(&job_cnt, 1);
  57. #ifdef HAVE_AYUDAME_H
  58. if (AYU_event)
  59. {
  60. /* Declare task to Ayudame */
  61. int64_t AYU_data[2] = {_starpu_ayudame_get_func_id(task->cl), task->priority > STARPU_MIN_PRIO};
  62. AYU_event(AYU_ADDTASK, job->job_id, AYU_data);
  63. }
  64. #endif
  65. }
  66. _starpu_cg_list_init(&job->job_successors);
  67. _STARPU_PTHREAD_MUTEX_INIT(&job->sync_mutex, NULL);
  68. _STARPU_PTHREAD_COND_INIT(&job->sync_cond, NULL);
  69. /* By default we have sequential tasks */
  70. job->task_size = 1;
  71. if (task->use_tag)
  72. _starpu_tag_declare(task->tag_id, job);
  73. _STARPU_LOG_OUT();
  74. return job;
  75. }
  76. void _starpu_job_destroy(struct _starpu_job *j)
  77. {
  78. /* Wait for any code that was still working on the job (and was
  79. * probably our waker) */
  80. _STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  81. _STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  82. _STARPU_PTHREAD_COND_DESTROY(&j->sync_cond);
  83. _STARPU_PTHREAD_MUTEX_DESTROY(&j->sync_mutex);
  84. if (j->task_size > 1)
  85. {
  86. _STARPU_PTHREAD_BARRIER_DESTROY(&j->before_work_barrier);
  87. _STARPU_PTHREAD_BARRIER_DESTROY(&j->after_work_barrier);
  88. }
  89. _starpu_cg_list_deinit(&j->job_successors);
  90. _starpu_job_delete(j);
  91. }
  92. void _starpu_wait_job(struct _starpu_job *j)
  93. {
  94. STARPU_ASSERT(j->task);
  95. STARPU_ASSERT(!j->task->detach);
  96. _STARPU_LOG_IN();
  97. _STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  98. /* We wait for the flag to have a value of 2 which means that both the
  99. * codelet's implementation and its callback have been executed. That
  100. * way, _starpu_wait_job won't return until the entire task was really
  101. * executed (so that we cannot destroy the task while it is still being
  102. * manipulated by the driver). */
  103. while (j->terminated != 2)
  104. _STARPU_PTHREAD_COND_WAIT(&j->sync_cond, &j->sync_mutex);
  105. _STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  106. _STARPU_LOG_OUT();
  107. }
  108. void _starpu_handle_job_termination(struct _starpu_job *j)
  109. {
  110. struct starpu_task *task = j->task;
  111. unsigned sched_ctx = task->sched_ctx;
  112. _STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  113. task->status = STARPU_TASK_FINISHED;
  114. /* We must have set the j->terminated flag early, so that it is
  115. * possible to express task dependencies within the callback
  116. * function. A value of 1 means that the codelet was executed but that
  117. * the callback is not done yet. */
  118. j->terminated = 1;
  119. _STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  120. #ifdef STARPU_USE_SC_HYPERVISOR
  121. int workerid = starpu_worker_get_id();
  122. int i;
  123. size_t data_size = 0;
  124. for(i = 0; i < STARPU_NMAXBUFS; i++)
  125. if(task->handles[i] != NULL)
  126. data_size += _starpu_data_get_size(task->handles[i]);
  127. #endif //STARPU_USE_SC_HYPERVISOR
  128. /* We release handle reference count */
  129. if (task->cl)
  130. {
  131. unsigned i;
  132. for (i=0; i<task->cl->nbuffers; i++)
  133. {
  134. starpu_data_handle_t handle = task->handles[i];
  135. _starpu_spin_lock(&handle->header_lock);
  136. handle->busy_count--;
  137. if (!_starpu_data_check_not_busy(handle))
  138. _starpu_spin_unlock(&handle->header_lock);
  139. }
  140. }
  141. /* Tell other tasks that we don't exist any more, thus no need for
  142. * implicit dependencies any more. */
  143. _starpu_release_task_enforce_sequential_consistency(j);
  144. /* Task does not have a cl, but has explicit data dependencies, we need
  145. * to tell them that we will not exist any more before notifying the
  146. * tasks waiting for us */
  147. if (j->implicit_dep_handle) {
  148. starpu_data_handle_t handle = j->implicit_dep_handle;
  149. _starpu_release_data_enforce_sequential_consistency(j->task, handle);
  150. /* Release reference taken while setting implicit_dep_handle */
  151. _starpu_spin_lock(&handle->header_lock);
  152. handle->busy_count--;
  153. if (!_starpu_data_check_not_busy(handle))
  154. _starpu_spin_unlock(&handle->header_lock);
  155. }
  156. /* in case there are dependencies, wake up the proper tasks */
  157. _starpu_notify_dependencies(j);
  158. /* the callback is executed after the dependencies so that we may remove the tag
  159. * of the task itself */
  160. if (task->callback_func)
  161. {
  162. int profiling = starpu_profiling_status_get();
  163. if (profiling && task->profiling_info)
  164. _starpu_clock_gettime(&task->profiling_info->callback_start_time);
  165. /* so that we can check whether we are doing blocking calls
  166. * within the callback */
  167. _starpu_set_local_worker_status(STATUS_CALLBACK);
  168. /* Perhaps we have nested callbacks (eg. with chains of empty
  169. * tasks). So we store the current task and we will restore it
  170. * later. */
  171. struct starpu_task *current_task = starpu_task_get_current();
  172. _starpu_set_current_task(task);
  173. _STARPU_TRACE_START_CALLBACK(j);
  174. task->callback_func(task->callback_arg);
  175. _STARPU_TRACE_END_CALLBACK(j);
  176. _starpu_set_current_task(current_task);
  177. _starpu_set_local_worker_status(STATUS_UNKNOWN);
  178. if (profiling && task->profiling_info)
  179. _starpu_clock_gettime(&task->profiling_info->callback_end_time);
  180. }
  181. /* If the job was executed on a combined worker there is no need for the
  182. * scheduler to process it : the task structure doesn't contain any valuable
  183. * data as it's not linked to an actual worker */
  184. /* control task should not execute post_exec_hook */
  185. if(j->task_size == 1 && task->cl != NULL && !j->internal)
  186. {
  187. _starpu_sched_post_exec_hook(task);
  188. #ifdef STARPU_USE_SC_HYPERVISOR
  189. _starpu_sched_ctx_call_poped_task_cb(workerid, task, data_size, j->footprint);
  190. #endif //STARPU_USE_SC_HYPERVISOR
  191. }
  192. _STARPU_TRACE_TASK_DONE(j);
  193. /* NB: we do not save those values before the callback, in case the
  194. * application changes some parameters eventually (eg. a task may not
  195. * be generated if the application is terminated). */
  196. int destroy = task->destroy;
  197. int detach = task->detach;
  198. int regenerate = task->regenerate;
  199. /* we do not desallocate the job structure if some is going to
  200. * wait after the task */
  201. _STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  202. /* A value of 2 is put to specify that not only the codelet but
  203. * also the callback were executed. */
  204. j->terminated = 2;
  205. _STARPU_PTHREAD_COND_BROADCAST(&j->sync_cond);
  206. #ifdef HAVE_AYUDAME_H
  207. if (AYU_event) AYU_event(AYU_REMOVETASK, j->job_id, NULL);
  208. #endif
  209. _STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  210. if (detach)
  211. {
  212. /* no one is going to synchronize with that task so we release
  213. * the data structures now. In case the job was already locked
  214. * by the caller, it is its responsability to destroy the task.
  215. * */
  216. if (destroy)
  217. _starpu_task_destroy(task);
  218. }
  219. if (regenerate)
  220. {
  221. STARPU_ASSERT_MSG(detach && !destroy && !task->synchronous, "Regenerated task must be detached (was %d), and not have detroy=1 (was %d) or synchronous=1 (was %d)", detach, destroy, task->synchronous);
  222. #ifdef HAVE_AYUDAME_H
  223. if (AYU_event)
  224. {
  225. int64_t AYU_data[2] = {j->exclude_from_dag?-1:_starpu_ayudame_get_func_id(task->cl), task->priority > STARPU_MIN_PRIO};
  226. AYU_event(AYU_ADDTASK, j->job_id, AYU_data);
  227. }
  228. #endif
  229. /* We reuse the same job structure */
  230. int ret = _starpu_submit_job(j);
  231. STARPU_ASSERT(!ret);
  232. }
  233. _starpu_decrement_nsubmitted_tasks();
  234. _starpu_decrement_nready_tasks();
  235. _starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx);
  236. }
  237. /* This function is called when a new task is submitted to StarPU
  238. * it returns 1 if the tag deps are not fulfilled, 0 otherwise */
  239. static unsigned _starpu_not_all_tag_deps_are_fulfilled(struct _starpu_job *j)
  240. {
  241. unsigned ret;
  242. if (!j->task->use_tag)
  243. {
  244. /* this task does not use tags, so we can go on */
  245. return 0;
  246. }
  247. struct _starpu_tag *tag = j->tag;
  248. struct _starpu_cg_list *tag_successors = &tag->tag_successors;
  249. _starpu_spin_lock(&tag->lock);
  250. STARPU_ASSERT_MSG(tag->is_assigned == 1 || !tag_successors->ndeps, "a tag can be assigned only one task to wake");
  251. if (tag_successors->ndeps != tag_successors->ndeps_completed)
  252. {
  253. tag->state = STARPU_BLOCKED;
  254. j->task->status = STARPU_TASK_BLOCKED_ON_TAG;
  255. ret = 1;
  256. }
  257. else
  258. {
  259. /* existing deps (if any) are fulfilled */
  260. /* If the same tag is being signaled by several tasks, do not
  261. * clear a DONE state. If it's the same job submitted several
  262. * times with the same tag, we have to do it */
  263. if (j->submitted == 2 || tag->state != STARPU_DONE)
  264. tag->state = STARPU_READY;
  265. /* already prepare for next run */
  266. tag_successors->ndeps_completed = 0;
  267. ret = 0;
  268. }
  269. _starpu_spin_unlock(&tag->lock);
  270. return ret;
  271. }
  272. static unsigned _starpu_not_all_task_deps_are_fulfilled(struct _starpu_job *j)
  273. {
  274. unsigned ret;
  275. struct _starpu_cg_list *job_successors = &j->job_successors;
  276. if (!j->submitted || (job_successors->ndeps != job_successors->ndeps_completed))
  277. {
  278. j->task->status = STARPU_TASK_BLOCKED_ON_TASK;
  279. ret = 1;
  280. }
  281. else
  282. {
  283. /* existing deps (if any) are fulfilled */
  284. /* already prepare for next run */
  285. job_successors->ndeps_completed = 0;
  286. ret = 0;
  287. }
  288. return ret;
  289. }
  290. /*
  291. * In order, we enforce tag, task and data dependencies. The task is
  292. * passed to the scheduler only once all these constraints are fulfilled.
  293. *
  294. * The job mutex has to be taken for atomicity with task submission, and
  295. * is released here.
  296. */
  297. unsigned _starpu_enforce_deps_and_schedule(struct _starpu_job *j)
  298. {
  299. unsigned ret;
  300. _STARPU_LOG_IN();
  301. /* enfore tag dependencies */
  302. if (_starpu_not_all_tag_deps_are_fulfilled(j))
  303. {
  304. _STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  305. _STARPU_LOG_OUT_TAG("not_all_tag_deps_are_fulfilled");
  306. return 0;
  307. }
  308. /* enfore task dependencies */
  309. if (_starpu_not_all_task_deps_are_fulfilled(j))
  310. {
  311. _STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  312. _STARPU_LOG_OUT_TAG("not_all_task_deps_are_fulfilled");
  313. return 0;
  314. }
  315. _STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  316. /* enforce data dependencies */
  317. if (_starpu_submit_job_enforce_data_deps(j))
  318. {
  319. _STARPU_LOG_OUT_TAG("enforce_data_deps");
  320. return 0;
  321. }
  322. ret = _starpu_push_task(j);
  323. _STARPU_LOG_OUT();
  324. return ret;
  325. }
  326. /* Tag deps are already fulfilled */
  327. unsigned _starpu_enforce_deps_starting_from_task(struct _starpu_job *j)
  328. {
  329. unsigned ret;
  330. /* enfore task dependencies */
  331. if (_starpu_not_all_task_deps_are_fulfilled(j))
  332. {
  333. _STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  334. return 0;
  335. }
  336. _STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  337. /* enforce data dependencies */
  338. if (_starpu_submit_job_enforce_data_deps(j))
  339. return 0;
  340. ret = _starpu_push_task(j);
  341. return ret;
  342. }
  343. /* This function must be called with worker->sched_mutex taken */
  344. struct starpu_task *_starpu_pop_local_task(struct _starpu_worker *worker)
  345. {
  346. struct starpu_task *task = NULL;
  347. if (!starpu_task_list_empty(&worker->local_tasks))
  348. task = starpu_task_list_pop_back(&worker->local_tasks);
  349. return task;
  350. }
  351. int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *task, int back)
  352. {
  353. /* Check that the worker is able to execute the task ! */
  354. STARPU_ASSERT(task && task->cl);
  355. if (STARPU_UNLIKELY(!(worker->worker_mask & task->cl->where)))
  356. return -ENODEV;
  357. _STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  358. if (back)
  359. starpu_task_list_push_back(&worker->local_tasks, task);
  360. else
  361. starpu_task_list_push_front(&worker->local_tasks, task);
  362. _STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
  363. starpu_push_task_end(task);
  364. _STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  365. return 0;
  366. }