jobs.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009-2013 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011, 2012, 2013 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 Télécom-SudParis
  6. * Copyright (C) 2011 INRIA
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. #include <starpu.h>
  20. #include <core/jobs.h>
  21. #include <core/task.h>
  22. #include <core/workers.h>
  23. #include <core/dependencies/data_concurrency.h>
  24. #include <common/config.h>
  25. #include <common/utils.h>
  26. #include <profiling/profiling.h>
  27. #include <profiling/bound.h>
  28. #include <starpu_top.h>
  29. #include <top/starpu_top_core.h>
  30. #include <core/debug.h>
  31. /* we need to identify each task to generate the DAG. */
  32. static unsigned job_cnt = 0;
  33. void _starpu_exclude_task_from_dag(struct starpu_task *task)
  34. {
  35. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  36. j->exclude_from_dag = 1;
  37. }
  38. /* create an internal struct _starpu_job structure to encapsulate the task */
  39. struct _starpu_job* STARPU_ATTRIBUTE_MALLOC _starpu_job_create(struct starpu_task *task)
  40. {
  41. struct _starpu_job *job;
  42. _STARPU_LOG_IN();
  43. job = _starpu_job_new();
  44. /* As most of the fields must be initialized at NULL, let's put 0
  45. * everywhere */
  46. memset(job, 0, sizeof(*job));
  47. if (task->dyn_handles)
  48. job->dyn_ordered_buffers = malloc(task->cl->nbuffers * sizeof(struct starpu_data_descr));
  49. job->task = task;
  50. #ifndef STARPU_USE_FXT
  51. if (_starpu_bound_recording || _starpu_top_status_get()
  52. #ifdef HAVE_AYUDAME_H
  53. || AYU_event
  54. #endif
  55. )
  56. #endif
  57. {
  58. job->job_id = STARPU_ATOMIC_ADD(&job_cnt, 1);
  59. #ifdef HAVE_AYUDAME_H
  60. if (AYU_event)
  61. {
  62. /* Declare task to Ayudame */
  63. int64_t AYU_data[2] = {_starpu_ayudame_get_func_id(task->cl), task->priority > STARPU_MIN_PRIO};
  64. AYU_event(AYU_ADDTASK, job->job_id, AYU_data);
  65. }
  66. #endif
  67. }
  68. _starpu_cg_list_init(&job->job_successors);
  69. STARPU_PTHREAD_MUTEX_INIT(&job->sync_mutex, NULL);
  70. STARPU_PTHREAD_COND_INIT(&job->sync_cond, NULL);
  71. /* By default we have sequential tasks */
  72. job->task_size = 1;
  73. if (task->use_tag)
  74. _starpu_tag_declare(task->tag_id, job);
  75. _STARPU_LOG_OUT();
  76. return job;
  77. }
  78. void _starpu_job_destroy(struct _starpu_job *j)
  79. {
  80. /* Wait for any code that was still working on the job (and was
  81. * probably our waker) */
  82. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  83. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  84. STARPU_PTHREAD_COND_DESTROY(&j->sync_cond);
  85. STARPU_PTHREAD_MUTEX_DESTROY(&j->sync_mutex);
  86. if (j->task_size > 1)
  87. {
  88. STARPU_PTHREAD_BARRIER_DESTROY(&j->before_work_barrier);
  89. STARPU_PTHREAD_BARRIER_DESTROY(&j->after_work_barrier);
  90. }
  91. _starpu_cg_list_deinit(&j->job_successors);
  92. if (j->dyn_ordered_buffers)
  93. {
  94. free(j->dyn_ordered_buffers);
  95. j->dyn_ordered_buffers = NULL;
  96. }
  97. _starpu_job_delete(j);
  98. }
  99. void _starpu_wait_job(struct _starpu_job *j)
  100. {
  101. STARPU_ASSERT(j->task);
  102. STARPU_ASSERT(!j->task->detach);
  103. _STARPU_LOG_IN();
  104. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  105. /* We wait for the flag to have a value of 2 which means that both the
  106. * codelet's implementation and its callback have been executed. That
  107. * way, _starpu_wait_job won't return until the entire task was really
  108. * executed (so that we cannot destroy the task while it is still being
  109. * manipulated by the driver). */
  110. while (j->terminated != 2)
  111. {
  112. STARPU_PTHREAD_COND_WAIT(&j->sync_cond, &j->sync_mutex);
  113. }
  114. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  115. _STARPU_LOG_OUT();
  116. }
  117. void _starpu_handle_job_termination(struct _starpu_job *j)
  118. {
  119. struct starpu_task *task = j->task;
  120. unsigned sched_ctx = task->sched_ctx;
  121. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  122. task->status = STARPU_TASK_FINISHED;
  123. /* We must have set the j->terminated flag early, so that it is
  124. * possible to express task dependencies within the callback
  125. * function. A value of 1 means that the codelet was executed but that
  126. * the callback is not done yet. */
  127. j->terminated = 1;
  128. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  129. #ifdef STARPU_USE_SC_HYPERVISOR
  130. int workerid = starpu_worker_get_id();
  131. int i;
  132. size_t data_size = 0;
  133. for(i = 0; i < STARPU_NMAXBUFS; i++)
  134. {
  135. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  136. if (handle != NULL)
  137. data_size += _starpu_data_get_size(handle);
  138. }
  139. #endif //STARPU_USE_SC_HYPERVISOR
  140. /* We release handle reference count */
  141. if (task->cl)
  142. {
  143. unsigned i;
  144. for (i=0; i<task->cl->nbuffers; i++)
  145. {
  146. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  147. _starpu_spin_lock(&handle->header_lock);
  148. handle->busy_count--;
  149. if (!_starpu_data_check_not_busy(handle))
  150. _starpu_spin_unlock(&handle->header_lock);
  151. }
  152. }
  153. /* Tell other tasks that we don't exist any more, thus no need for
  154. * implicit dependencies any more. */
  155. _starpu_release_task_enforce_sequential_consistency(j);
  156. /* Task does not have a cl, but has explicit data dependencies, we need
  157. * to tell them that we will not exist any more before notifying the
  158. * tasks waiting for us */
  159. if (j->implicit_dep_handle) {
  160. starpu_data_handle_t handle = j->implicit_dep_handle;
  161. _starpu_release_data_enforce_sequential_consistency(j->task, handle);
  162. /* Release reference taken while setting implicit_dep_handle */
  163. _starpu_spin_lock(&handle->header_lock);
  164. handle->busy_count--;
  165. if (!_starpu_data_check_not_busy(handle))
  166. _starpu_spin_unlock(&handle->header_lock);
  167. }
  168. /* in case there are dependencies, wake up the proper tasks */
  169. _starpu_notify_dependencies(j);
  170. /* the callback is executed after the dependencies so that we may remove the tag
  171. * of the task itself */
  172. if (task->callback_func)
  173. {
  174. int profiling = starpu_profiling_status_get();
  175. if (profiling && task->profiling_info)
  176. _starpu_clock_gettime(&task->profiling_info->callback_start_time);
  177. /* so that we can check whether we are doing blocking calls
  178. * within the callback */
  179. _starpu_set_local_worker_status(STATUS_CALLBACK);
  180. /* Perhaps we have nested callbacks (eg. with chains of empty
  181. * tasks). So we store the current task and we will restore it
  182. * later. */
  183. struct starpu_task *current_task = starpu_task_get_current();
  184. _starpu_set_current_task(task);
  185. _STARPU_TRACE_START_CALLBACK(j);
  186. task->callback_func(task->callback_arg);
  187. _STARPU_TRACE_END_CALLBACK(j);
  188. _starpu_set_current_task(current_task);
  189. _starpu_set_local_worker_status(STATUS_UNKNOWN);
  190. if (profiling && task->profiling_info)
  191. _starpu_clock_gettime(&task->profiling_info->callback_end_time);
  192. }
  193. /* If the job was executed on a combined worker there is no need for the
  194. * scheduler to process it : the task structure doesn't contain any valuable
  195. * data as it's not linked to an actual worker */
  196. /* control task should not execute post_exec_hook */
  197. if(j->task_size == 1 && task->cl != NULL && !j->internal)
  198. {
  199. _starpu_sched_post_exec_hook(task);
  200. #ifdef STARPU_USE_SC_HYPERVISOR
  201. _starpu_sched_ctx_call_poped_task_cb(workerid, task, data_size, j->footprint);
  202. #endif //STARPU_USE_SC_HYPERVISOR
  203. }
  204. _STARPU_TRACE_TASK_DONE(j);
  205. /* NB: we do not save those values before the callback, in case the
  206. * application changes some parameters eventually (eg. a task may not
  207. * be generated if the application is terminated). */
  208. int destroy = task->destroy;
  209. int detach = task->detach;
  210. int regenerate = task->regenerate;
  211. /* we do not desallocate the job structure if some is going to
  212. * wait after the task */
  213. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  214. /* A value of 2 is put to specify that not only the codelet but
  215. * also the callback were executed. */
  216. j->terminated = 2;
  217. STARPU_PTHREAD_COND_BROADCAST(&j->sync_cond);
  218. #ifdef HAVE_AYUDAME_H
  219. if (AYU_event) AYU_event(AYU_REMOVETASK, j->job_id, NULL);
  220. #endif
  221. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  222. if (detach)
  223. {
  224. /* no one is going to synchronize with that task so we release
  225. * the data structures now. In case the job was already locked
  226. * by the caller, it is its responsability to destroy the task.
  227. * */
  228. if (destroy)
  229. _starpu_task_destroy(task);
  230. }
  231. if (regenerate)
  232. {
  233. STARPU_ASSERT_MSG(detach && !destroy && !task->synchronous, "Regenerated task must be detached (was %d), and not have detroy=1 (was %d) or synchronous=1 (was %d)", detach, destroy, task->synchronous);
  234. #ifdef HAVE_AYUDAME_H
  235. if (AYU_event)
  236. {
  237. int64_t AYU_data[2] = {j->exclude_from_dag?0:_starpu_ayudame_get_func_id(task->cl), task->priority > STARPU_MIN_PRIO};
  238. AYU_event(AYU_ADDTASK, j->job_id, AYU_data);
  239. }
  240. #endif
  241. /* We reuse the same job structure */
  242. int ret = _starpu_submit_job(j);
  243. STARPU_ASSERT(!ret);
  244. }
  245. _starpu_decrement_nsubmitted_tasks();
  246. _starpu_decrement_nready_tasks();
  247. _starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx);
  248. struct _starpu_worker *worker;
  249. worker = _starpu_get_local_worker_key();
  250. if (worker)
  251. {
  252. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  253. if(worker->removed_from_ctx[sched_ctx] == 1 && worker->shares_tasks_lists[sched_ctx] == 1)
  254. {
  255. _starpu_worker_gets_out_of_ctx(sched_ctx, worker);
  256. worker->removed_from_ctx[sched_ctx] = 0;
  257. }
  258. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  259. }
  260. }
  261. /* This function is called when a new task is submitted to StarPU
  262. * it returns 1 if the tag deps are not fulfilled, 0 otherwise */
  263. static unsigned _starpu_not_all_tag_deps_are_fulfilled(struct _starpu_job *j)
  264. {
  265. unsigned ret;
  266. if (!j->task->use_tag)
  267. {
  268. /* this task does not use tags, so we can go on */
  269. return 0;
  270. }
  271. struct _starpu_tag *tag = j->tag;
  272. struct _starpu_cg_list *tag_successors = &tag->tag_successors;
  273. _starpu_spin_lock(&tag->lock);
  274. STARPU_ASSERT_MSG(tag->is_assigned == 1 || !tag_successors->ndeps, "a tag can be assigned only one task to wake (%llu had %u assigned tasks, and %u successors)", (unsigned long long) tag->id, tag->is_assigned, tag_successors->ndeps);
  275. if (tag_successors->ndeps != tag_successors->ndeps_completed)
  276. {
  277. tag->state = STARPU_BLOCKED;
  278. j->task->status = STARPU_TASK_BLOCKED_ON_TAG;
  279. ret = 1;
  280. }
  281. else
  282. {
  283. /* existing deps (if any) are fulfilled */
  284. /* If the same tag is being signaled by several tasks, do not
  285. * clear a DONE state. If it's the same job submitted several
  286. * times with the same tag, we have to do it */
  287. if (j->submitted == 2 || tag->state != STARPU_DONE)
  288. tag->state = STARPU_READY;
  289. /* already prepare for next run */
  290. tag_successors->ndeps_completed = 0;
  291. ret = 0;
  292. }
  293. _starpu_spin_unlock(&tag->lock);
  294. return ret;
  295. }
  296. static unsigned _starpu_not_all_task_deps_are_fulfilled(struct _starpu_job *j)
  297. {
  298. unsigned ret;
  299. struct _starpu_cg_list *job_successors = &j->job_successors;
  300. if (!j->submitted || (job_successors->ndeps != job_successors->ndeps_completed))
  301. {
  302. j->task->status = STARPU_TASK_BLOCKED_ON_TASK;
  303. ret = 1;
  304. }
  305. else
  306. {
  307. /* existing deps (if any) are fulfilled */
  308. /* already prepare for next run */
  309. job_successors->ndeps_completed = 0;
  310. ret = 0;
  311. }
  312. return ret;
  313. }
  314. /*
  315. * In order, we enforce tag, task and data dependencies. The task is
  316. * passed to the scheduler only once all these constraints are fulfilled.
  317. *
  318. * The job mutex has to be taken for atomicity with task submission, and
  319. * is released here.
  320. */
  321. unsigned _starpu_enforce_deps_and_schedule(struct _starpu_job *j)
  322. {
  323. unsigned ret;
  324. _STARPU_LOG_IN();
  325. /* enfore tag dependencies */
  326. if (_starpu_not_all_tag_deps_are_fulfilled(j))
  327. {
  328. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  329. _STARPU_LOG_OUT_TAG("not_all_tag_deps_are_fulfilled");
  330. return 0;
  331. }
  332. /* enfore task dependencies */
  333. if (_starpu_not_all_task_deps_are_fulfilled(j))
  334. {
  335. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  336. _STARPU_LOG_OUT_TAG("not_all_task_deps_are_fulfilled");
  337. return 0;
  338. }
  339. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  340. /* enforce data dependencies */
  341. if (_starpu_submit_job_enforce_data_deps(j))
  342. {
  343. _STARPU_LOG_OUT_TAG("enforce_data_deps");
  344. return 0;
  345. }
  346. ret = _starpu_push_task(j);
  347. _STARPU_LOG_OUT();
  348. return ret;
  349. }
  350. /* Tag deps are already fulfilled */
  351. unsigned _starpu_enforce_deps_starting_from_task(struct _starpu_job *j)
  352. {
  353. unsigned ret;
  354. /* enfore task dependencies */
  355. if (_starpu_not_all_task_deps_are_fulfilled(j))
  356. {
  357. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  358. return 0;
  359. }
  360. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  361. /* enforce data dependencies */
  362. if (_starpu_submit_job_enforce_data_deps(j))
  363. return 0;
  364. ret = _starpu_push_task(j);
  365. return ret;
  366. }
  367. /* This function must be called with worker->sched_mutex taken */
  368. struct starpu_task *_starpu_pop_local_task(struct _starpu_worker *worker)
  369. {
  370. struct starpu_task *task = NULL;
  371. if (!starpu_task_list_empty(&worker->local_tasks))
  372. task = starpu_task_list_pop_front(&worker->local_tasks);
  373. return task;
  374. }
  375. int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *task, int prio)
  376. {
  377. /* Check that the worker is able to execute the task ! */
  378. STARPU_ASSERT(task && task->cl);
  379. if (STARPU_UNLIKELY(!(worker->worker_mask & task->cl->where)))
  380. return -ENODEV;
  381. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  382. if (prio)
  383. starpu_task_list_push_front(&worker->local_tasks, task);
  384. else
  385. starpu_task_list_push_back(&worker->local_tasks, task);
  386. STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
  387. starpu_push_task_end(task);
  388. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  389. return 0;
  390. }