jobs.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009-2013 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011, 2012, 2013 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 Télécom-SudParis
  6. * Copyright (C) 2011 INRIA
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. #include <starpu.h>
  20. #include <core/jobs.h>
  21. #include <core/task.h>
  22. #include <core/workers.h>
  23. #include <core/dependencies/data_concurrency.h>
  24. #include <common/config.h>
  25. #include <common/utils.h>
  26. #include <profiling/profiling.h>
  27. #include <profiling/bound.h>
  28. #include <starpu_top.h>
  29. #include <top/starpu_top_core.h>
  30. #include <core/debug.h>
  31. /* we need to identify each task to generate the DAG. */
  32. static unsigned job_cnt = 0;
  33. void _starpu_exclude_task_from_dag(struct starpu_task *task)
  34. {
  35. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  36. j->exclude_from_dag = 1;
  37. }
  38. /* create an internal struct _starpu_job structure to encapsulate the task */
  39. struct _starpu_job* __attribute__((malloc)) _starpu_job_create(struct starpu_task *task)
  40. {
  41. struct _starpu_job *job;
  42. _STARPU_LOG_IN();
  43. job = _starpu_job_new();
  44. /* As most of the fields must be initialized at NULL, let's put 0
  45. * everywhere */
  46. memset(job, 0, sizeof(*job));
  47. if (task->dyn_handles)
  48. job->dyn_ordered_buffers = malloc(task->cl->nbuffers * sizeof(struct starpu_data_descr));
  49. job->task = task;
  50. #ifndef STARPU_USE_FXT
  51. if (_starpu_bound_recording || _starpu_top_status_get()
  52. #ifdef HAVE_AYUDAME_H
  53. || AYU_event
  54. #endif
  55. )
  56. #endif
  57. {
  58. job->job_id = STARPU_ATOMIC_ADD(&job_cnt, 1);
  59. #ifdef HAVE_AYUDAME_H
  60. if (AYU_event)
  61. {
  62. /* Declare task to Ayudame */
  63. int64_t AYU_data[2] = {_starpu_ayudame_get_func_id(task->cl), task->priority > STARPU_MIN_PRIO};
  64. AYU_event(AYU_ADDTASK, job->job_id, AYU_data);
  65. }
  66. #endif
  67. }
  68. _starpu_cg_list_init(&job->job_successors);
  69. STARPU_PTHREAD_MUTEX_INIT(&job->sync_mutex, NULL);
  70. STARPU_PTHREAD_COND_INIT(&job->sync_cond, NULL);
  71. /* By default we have sequential tasks */
  72. job->task_size = 1;
  73. if (task->use_tag)
  74. _starpu_tag_declare(task->tag_id, job);
  75. _STARPU_LOG_OUT();
  76. return job;
  77. }
  78. void _starpu_job_destroy(struct _starpu_job *j)
  79. {
  80. /* Wait for any code that was still working on the job (and was
  81. * probably our waker) */
  82. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  83. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  84. STARPU_PTHREAD_COND_DESTROY(&j->sync_cond);
  85. STARPU_PTHREAD_MUTEX_DESTROY(&j->sync_mutex);
  86. if (j->task_size > 1)
  87. {
  88. STARPU_PTHREAD_BARRIER_DESTROY(&j->before_work_barrier);
  89. STARPU_PTHREAD_BARRIER_DESTROY(&j->after_work_barrier);
  90. }
  91. _starpu_cg_list_deinit(&j->job_successors);
  92. if (j->dyn_ordered_buffers)
  93. {
  94. free(j->dyn_ordered_buffers);
  95. j->dyn_ordered_buffers = NULL;
  96. }
  97. _starpu_job_delete(j);
  98. }
  99. void _starpu_wait_job(struct _starpu_job *j)
  100. {
  101. STARPU_ASSERT(j->task);
  102. STARPU_ASSERT(!j->task->detach);
  103. _STARPU_LOG_IN();
  104. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  105. /* We wait for the flag to have a value of 2 which means that both the
  106. * codelet's implementation and its callback have been executed. That
  107. * way, _starpu_wait_job won't return until the entire task was really
  108. * executed (so that we cannot destroy the task while it is still being
  109. * manipulated by the driver). */
  110. while (j->terminated != 2)
  111. STARPU_PTHREAD_COND_WAIT(&j->sync_cond, &j->sync_mutex);
  112. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  113. _STARPU_LOG_OUT();
  114. }
  115. void _starpu_handle_job_termination(struct _starpu_job *j)
  116. {
  117. struct starpu_task *task = j->task;
  118. unsigned sched_ctx = task->sched_ctx;
  119. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  120. task->status = STARPU_TASK_FINISHED;
  121. /* We must have set the j->terminated flag early, so that it is
  122. * possible to express task dependencies within the callback
  123. * function. A value of 1 means that the codelet was executed but that
  124. * the callback is not done yet. */
  125. j->terminated = 1;
  126. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  127. #ifdef STARPU_USE_SC_HYPERVISOR
  128. int workerid = starpu_worker_get_id();
  129. int i;
  130. size_t data_size = 0;
  131. for(i = 0; i < STARPU_NMAXBUFS; i++)
  132. {
  133. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  134. if (handle != NULL)
  135. data_size += _starpu_data_get_size(handle);
  136. }
  137. #endif //STARPU_USE_SC_HYPERVISOR
  138. /* We release handle reference count */
  139. if (task->cl)
  140. {
  141. unsigned i;
  142. for (i=0; i<task->cl->nbuffers; i++)
  143. {
  144. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  145. _starpu_spin_lock(&handle->header_lock);
  146. handle->busy_count--;
  147. if (!_starpu_data_check_not_busy(handle))
  148. _starpu_spin_unlock(&handle->header_lock);
  149. }
  150. }
  151. /* Tell other tasks that we don't exist any more, thus no need for
  152. * implicit dependencies any more. */
  153. _starpu_release_task_enforce_sequential_consistency(j);
  154. /* Task does not have a cl, but has explicit data dependencies, we need
  155. * to tell them that we will not exist any more before notifying the
  156. * tasks waiting for us */
  157. if (j->implicit_dep_handle) {
  158. starpu_data_handle_t handle = j->implicit_dep_handle;
  159. _starpu_release_data_enforce_sequential_consistency(j->task, handle);
  160. /* Release reference taken while setting implicit_dep_handle */
  161. _starpu_spin_lock(&handle->header_lock);
  162. handle->busy_count--;
  163. if (!_starpu_data_check_not_busy(handle))
  164. _starpu_spin_unlock(&handle->header_lock);
  165. }
  166. /* in case there are dependencies, wake up the proper tasks */
  167. _starpu_notify_dependencies(j);
  168. /* the callback is executed after the dependencies so that we may remove the tag
  169. * of the task itself */
  170. if (task->callback_func)
  171. {
  172. int profiling = starpu_profiling_status_get();
  173. if (profiling && task->profiling_info)
  174. _starpu_clock_gettime(&task->profiling_info->callback_start_time);
  175. /* so that we can check whether we are doing blocking calls
  176. * within the callback */
  177. _starpu_set_local_worker_status(STATUS_CALLBACK);
  178. /* Perhaps we have nested callbacks (eg. with chains of empty
  179. * tasks). So we store the current task and we will restore it
  180. * later. */
  181. struct starpu_task *current_task = starpu_task_get_current();
  182. _starpu_set_current_task(task);
  183. _STARPU_TRACE_START_CALLBACK(j);
  184. task->callback_func(task->callback_arg);
  185. _STARPU_TRACE_END_CALLBACK(j);
  186. _starpu_set_current_task(current_task);
  187. _starpu_set_local_worker_status(STATUS_UNKNOWN);
  188. if (profiling && task->profiling_info)
  189. _starpu_clock_gettime(&task->profiling_info->callback_end_time);
  190. }
  191. /* If the job was executed on a combined worker there is no need for the
  192. * scheduler to process it : the task structure doesn't contain any valuable
  193. * data as it's not linked to an actual worker */
  194. /* control task should not execute post_exec_hook */
  195. if(j->task_size == 1 && task->cl != NULL && !j->internal)
  196. {
  197. _starpu_sched_post_exec_hook(task);
  198. #ifdef STARPU_USE_SC_HYPERVISOR
  199. _starpu_sched_ctx_call_poped_task_cb(workerid, task, data_size, j->footprint);
  200. #endif //STARPU_USE_SC_HYPERVISOR
  201. }
  202. _STARPU_TRACE_TASK_DONE(j);
  203. /* NB: we do not save those values before the callback, in case the
  204. * application changes some parameters eventually (eg. a task may not
  205. * be generated if the application is terminated). */
  206. int destroy = task->destroy;
  207. int detach = task->detach;
  208. int regenerate = task->regenerate;
  209. /* we do not desallocate the job structure if some is going to
  210. * wait after the task */
  211. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  212. /* A value of 2 is put to specify that not only the codelet but
  213. * also the callback were executed. */
  214. j->terminated = 2;
  215. STARPU_PTHREAD_COND_BROADCAST(&j->sync_cond);
  216. #ifdef HAVE_AYUDAME_H
  217. if (AYU_event) AYU_event(AYU_REMOVETASK, j->job_id, NULL);
  218. #endif
  219. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  220. if (detach)
  221. {
  222. /* no one is going to synchronize with that task so we release
  223. * the data structures now. In case the job was already locked
  224. * by the caller, it is its responsability to destroy the task.
  225. * */
  226. if (destroy)
  227. _starpu_task_destroy(task);
  228. }
  229. if (regenerate)
  230. {
  231. STARPU_ASSERT_MSG(detach && !destroy && !task->synchronous, "Regenerated task must be detached (was %d), and not have detroy=1 (was %d) or synchronous=1 (was %d)", detach, destroy, task->synchronous);
  232. #ifdef HAVE_AYUDAME_H
  233. if (AYU_event)
  234. {
  235. int64_t AYU_data[2] = {j->exclude_from_dag?-1:_starpu_ayudame_get_func_id(task->cl), task->priority > STARPU_MIN_PRIO};
  236. AYU_event(AYU_ADDTASK, j->job_id, AYU_data);
  237. }
  238. #endif
  239. /* We reuse the same job structure */
  240. int ret = _starpu_submit_job(j);
  241. STARPU_ASSERT(!ret);
  242. }
  243. _starpu_decrement_nsubmitted_tasks();
  244. _starpu_decrement_nready_tasks();
  245. _starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx);
  246. }
  247. /* This function is called when a new task is submitted to StarPU
  248. * it returns 1 if the tag deps are not fulfilled, 0 otherwise */
  249. static unsigned _starpu_not_all_tag_deps_are_fulfilled(struct _starpu_job *j)
  250. {
  251. unsigned ret;
  252. if (!j->task->use_tag)
  253. {
  254. /* this task does not use tags, so we can go on */
  255. return 0;
  256. }
  257. struct _starpu_tag *tag = j->tag;
  258. struct _starpu_cg_list *tag_successors = &tag->tag_successors;
  259. _starpu_spin_lock(&tag->lock);
  260. STARPU_ASSERT_MSG(tag->is_assigned == 1 || !tag_successors->ndeps, "a tag can be assigned only one task to wake");
  261. if (tag_successors->ndeps != tag_successors->ndeps_completed)
  262. {
  263. tag->state = STARPU_BLOCKED;
  264. j->task->status = STARPU_TASK_BLOCKED_ON_TAG;
  265. ret = 1;
  266. }
  267. else
  268. {
  269. /* existing deps (if any) are fulfilled */
  270. /* If the same tag is being signaled by several tasks, do not
  271. * clear a DONE state. If it's the same job submitted several
  272. * times with the same tag, we have to do it */
  273. if (j->submitted == 2 || tag->state != STARPU_DONE)
  274. tag->state = STARPU_READY;
  275. /* already prepare for next run */
  276. tag_successors->ndeps_completed = 0;
  277. ret = 0;
  278. }
  279. _starpu_spin_unlock(&tag->lock);
  280. return ret;
  281. }
  282. static unsigned _starpu_not_all_task_deps_are_fulfilled(struct _starpu_job *j)
  283. {
  284. unsigned ret;
  285. struct _starpu_cg_list *job_successors = &j->job_successors;
  286. if (!j->submitted || (job_successors->ndeps != job_successors->ndeps_completed))
  287. {
  288. j->task->status = STARPU_TASK_BLOCKED_ON_TASK;
  289. ret = 1;
  290. }
  291. else
  292. {
  293. /* existing deps (if any) are fulfilled */
  294. /* already prepare for next run */
  295. job_successors->ndeps_completed = 0;
  296. ret = 0;
  297. }
  298. return ret;
  299. }
  300. /*
  301. * In order, we enforce tag, task and data dependencies. The task is
  302. * passed to the scheduler only once all these constraints are fulfilled.
  303. *
  304. * The job mutex has to be taken for atomicity with task submission, and
  305. * is released here.
  306. */
  307. unsigned _starpu_enforce_deps_and_schedule(struct _starpu_job *j)
  308. {
  309. unsigned ret;
  310. _STARPU_LOG_IN();
  311. /* enfore tag dependencies */
  312. if (_starpu_not_all_tag_deps_are_fulfilled(j))
  313. {
  314. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  315. _STARPU_LOG_OUT_TAG("not_all_tag_deps_are_fulfilled");
  316. return 0;
  317. }
  318. /* enfore task dependencies */
  319. if (_starpu_not_all_task_deps_are_fulfilled(j))
  320. {
  321. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  322. _STARPU_LOG_OUT_TAG("not_all_task_deps_are_fulfilled");
  323. return 0;
  324. }
  325. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  326. /* enforce data dependencies */
  327. if (_starpu_submit_job_enforce_data_deps(j))
  328. {
  329. _STARPU_LOG_OUT_TAG("enforce_data_deps");
  330. return 0;
  331. }
  332. ret = _starpu_push_task(j);
  333. _STARPU_LOG_OUT();
  334. return ret;
  335. }
  336. /* Tag deps are already fulfilled */
  337. unsigned _starpu_enforce_deps_starting_from_task(struct _starpu_job *j)
  338. {
  339. unsigned ret;
  340. /* enfore task dependencies */
  341. if (_starpu_not_all_task_deps_are_fulfilled(j))
  342. {
  343. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  344. return 0;
  345. }
  346. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  347. /* enforce data dependencies */
  348. if (_starpu_submit_job_enforce_data_deps(j))
  349. return 0;
  350. ret = _starpu_push_task(j);
  351. return ret;
  352. }
  353. /* This function must be called with worker->sched_mutex taken */
  354. struct starpu_task *_starpu_pop_local_task(struct _starpu_worker *worker)
  355. {
  356. struct starpu_task *task = NULL;
  357. if (!starpu_task_list_empty(&worker->local_tasks))
  358. task = starpu_task_list_pop_front(&worker->local_tasks);
  359. return task;
  360. }
  361. int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *task, int prio)
  362. {
  363. /* Check that the worker is able to execute the task ! */
  364. STARPU_ASSERT(task && task->cl);
  365. if (STARPU_UNLIKELY(!(worker->worker_mask & task->cl->where)))
  366. return -ENODEV;
  367. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  368. if (prio)
  369. starpu_task_list_push_front(&worker->local_tasks, task);
  370. else
  371. starpu_task_list_push_back(&worker->local_tasks, task);
  372. STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
  373. starpu_push_task_end(task);
  374. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  375. return 0;
  376. }