jobs.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009-2015 Université de Bordeaux
  4. * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015 CNRS
  5. * Copyright (C) 2011 Télécom-SudParis
  6. * Copyright (C) 2011, 2014 INRIA
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. #include <starpu.h>
  20. #include <core/jobs.h>
  21. #include <core/task.h>
  22. #include <core/workers.h>
  23. #include <core/dependencies/data_concurrency.h>
  24. #include <common/config.h>
  25. #include <common/utils.h>
  26. #include <profiling/profiling.h>
  27. #include <profiling/bound.h>
  28. #include <starpu_top.h>
  29. #include <top/starpu_top_core.h>
  30. #include <core/debug.h>
  31. /* we need to identify each task to generate the DAG. */
  32. static unsigned job_cnt = 0;
  33. void _starpu_exclude_task_from_dag(struct starpu_task *task)
  34. {
  35. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  36. j->exclude_from_dag = 1;
  37. }
  38. /* create an internal struct _starpu_job structure to encapsulate the task */
  39. struct _starpu_job* STARPU_ATTRIBUTE_MALLOC _starpu_job_create(struct starpu_task *task)
  40. {
  41. struct _starpu_job *job;
  42. _STARPU_LOG_IN();
  43. job = _starpu_job_new();
  44. /* As most of the fields must be initialized at NULL, let's put 0
  45. * everywhere */
  46. memset(job, 0, sizeof(*job));
  47. if (task->dyn_handles)
  48. {
  49. job->dyn_ordered_buffers = malloc(STARPU_TASK_GET_NBUFFERS(task) * sizeof(job->dyn_ordered_buffers[0]));
  50. job->dyn_dep_slots = calloc(STARPU_TASK_GET_NBUFFERS(task), sizeof(job->dyn_dep_slots[0]));
  51. }
  52. job->task = task;
  53. #ifndef STARPU_USE_FXT
  54. if (_starpu_bound_recording || _starpu_top_status_get()
  55. #ifdef HAVE_AYUDAME_H
  56. || AYU_event
  57. #endif
  58. )
  59. #endif
  60. {
  61. job->job_id = STARPU_ATOMIC_ADD(&job_cnt, 1);
  62. #ifdef HAVE_AYUDAME_H
  63. if (AYU_event)
  64. {
  65. /* Declare task to Ayudame */
  66. int64_t AYU_data[2] = {_starpu_ayudame_get_func_id(task->cl), task->priority > STARPU_MIN_PRIO};
  67. AYU_event(AYU_ADDTASK, job->job_id, AYU_data);
  68. }
  69. #endif
  70. }
  71. _starpu_cg_list_init(&job->job_successors);
  72. STARPU_PTHREAD_MUTEX_INIT(&job->sync_mutex, NULL);
  73. STARPU_PTHREAD_COND_INIT(&job->sync_cond, NULL);
  74. /* By default we have sequential tasks */
  75. job->task_size = 1;
  76. if (task->use_tag)
  77. _starpu_tag_declare(task->tag_id, job);
  78. _STARPU_LOG_OUT();
  79. return job;
  80. }
  81. void _starpu_job_destroy(struct _starpu_job *j)
  82. {
  83. /* Wait for any code that was still working on the job (and was
  84. * probably our waker) */
  85. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  86. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  87. STARPU_PTHREAD_COND_DESTROY(&j->sync_cond);
  88. STARPU_PTHREAD_MUTEX_DESTROY(&j->sync_mutex);
  89. if (j->task_size > 1)
  90. {
  91. STARPU_PTHREAD_BARRIER_DESTROY(&j->before_work_barrier);
  92. STARPU_PTHREAD_BARRIER_DESTROY(&j->after_work_barrier);
  93. }
  94. _starpu_cg_list_deinit(&j->job_successors);
  95. if (j->dyn_ordered_buffers)
  96. {
  97. free(j->dyn_ordered_buffers);
  98. j->dyn_ordered_buffers = NULL;
  99. }
  100. if (j->dyn_dep_slots)
  101. {
  102. free(j->dyn_dep_slots);
  103. j->dyn_dep_slots = NULL;
  104. }
  105. _starpu_job_delete(j);
  106. }
  107. int _starpu_job_finished(struct _starpu_job *j)
  108. {
  109. int ret;
  110. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  111. ret = j->terminated == 2;
  112. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  113. return ret;
  114. }
  115. void _starpu_wait_job(struct _starpu_job *j)
  116. {
  117. STARPU_ASSERT(j->task);
  118. STARPU_ASSERT(!j->task->detach);
  119. _STARPU_LOG_IN();
  120. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  121. /* We wait for the flag to have a value of 2 which means that both the
  122. * codelet's implementation and its callback have been executed. That
  123. * way, _starpu_wait_job won't return until the entire task was really
  124. * executed (so that we cannot destroy the task while it is still being
  125. * manipulated by the driver). */
  126. while (j->terminated != 2)
  127. {
  128. STARPU_PTHREAD_COND_WAIT(&j->sync_cond, &j->sync_mutex);
  129. }
  130. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  131. _STARPU_LOG_OUT();
  132. }
  133. #ifdef STARPU_OPENMP
  134. int _starpu_test_job_termination(struct _starpu_job *j)
  135. {
  136. STARPU_ASSERT(j->task);
  137. STARPU_ASSERT(!j->task->detach);
  138. /* Disable Helgrind race complaint, since we really just want to poll j->terminated */
  139. if (STARPU_RUNNING_ON_VALGRIND)
  140. {
  141. int v = STARPU_PTHREAD_MUTEX_TRYLOCK(&j->sync_mutex);
  142. if (v != EBUSY)
  143. {
  144. STARPU_ASSERT(v == 0);
  145. int ret = (j->terminated == 2);
  146. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  147. return ret;
  148. }
  149. else
  150. {
  151. return 0;
  152. }
  153. }
  154. else
  155. {
  156. STARPU_SYNCHRONIZE();
  157. return (j->terminated == 2);
  158. }
  159. }
  160. void _starpu_job_prepare_for_continuation_ext(struct _starpu_job *j, unsigned continuation_resubmit,
  161. void (*continuation_callback_on_sleep)(void *arg), void *continuation_callback_on_sleep_arg)
  162. {
  163. STARPU_ASSERT(!j->continuation);
  164. /* continuation are not supported for parallel tasks for now */
  165. STARPU_ASSERT(j->task_size == 1);
  166. j->continuation = 1;
  167. j->continuation_resubmit = continuation_resubmit;
  168. j->continuation_callback_on_sleep = continuation_callback_on_sleep;
  169. j->continuation_callback_on_sleep_arg = continuation_callback_on_sleep_arg;
  170. j->job_successors.ndeps = 0;
  171. }
  172. /* Prepare a currently running job for accepting a new set of
  173. * dependencies in anticipation of becoming a continuation. */
  174. void _starpu_job_prepare_for_continuation(struct _starpu_job *j)
  175. {
  176. _starpu_job_prepare_for_continuation_ext(j, 1, NULL, NULL);
  177. }
  178. void _starpu_job_set_omp_cleanup_callback(struct _starpu_job *j,
  179. void (*omp_cleanup_callback)(void *arg), void *omp_cleanup_callback_arg)
  180. {
  181. j->omp_cleanup_callback = omp_cleanup_callback;
  182. j->omp_cleanup_callback_arg = omp_cleanup_callback_arg;
  183. }
  184. #endif
  185. void _starpu_handle_job_termination(struct _starpu_job *j)
  186. {
  187. struct starpu_task *task = j->task;
  188. unsigned sched_ctx = task->sched_ctx;
  189. double flops = task->flops;
  190. const unsigned continuation =
  191. #ifdef STARPU_OPENMP
  192. j->continuation
  193. #else
  194. 0
  195. #endif
  196. ;
  197. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  198. #ifdef STARPU_OPENMP
  199. if (continuation)
  200. {
  201. task->status = STARPU_TASK_STOPPED;
  202. }
  203. else
  204. #endif
  205. {
  206. task->status = STARPU_TASK_FINISHED;
  207. /* We must have set the j->terminated flag early, so that it is
  208. * possible to express task dependencies within the callback
  209. * function. A value of 1 means that the codelet was executed but that
  210. * the callback is not done yet. */
  211. j->terminated = 1;
  212. }
  213. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  214. #ifdef STARPU_USE_SC_HYPERVISOR
  215. size_t data_size = 0;
  216. #endif //STARPU_USE_SC_HYPERVISOR
  217. /* We release handle reference count */
  218. if (task->cl && !continuation)
  219. {
  220. unsigned i;
  221. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  222. #ifdef STARPU_USE_SC_HYPERVISOR
  223. for(i = 0; i < nbuffers; i++)
  224. {
  225. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  226. if (handle != NULL)
  227. data_size += _starpu_data_get_size(handle);
  228. }
  229. #endif //STARPU_USE_SC_HYPERVISOR
  230. for (i = 0; i < nbuffers; i++)
  231. {
  232. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  233. _starpu_spin_lock(&handle->header_lock);
  234. handle->busy_count--;
  235. if (!_starpu_data_check_not_busy(handle))
  236. _starpu_spin_unlock(&handle->header_lock);
  237. }
  238. }
  239. /* If this is a continuation, we do not release task dependencies now.
  240. * Task dependencies will be released only when the continued task
  241. * fully completes */
  242. if (!continuation)
  243. {
  244. /* Tell other tasks that we don't exist any more, thus no need for
  245. * implicit dependencies any more. */
  246. _starpu_release_task_enforce_sequential_consistency(j);
  247. }
  248. /* Task does not have a cl, but has explicit data dependencies, we need
  249. * to tell them that we will not exist any more before notifying the
  250. * tasks waiting for us
  251. *
  252. * For continuations, implicit dependency handles are only released
  253. * when the task fully completes */
  254. if (j->implicit_dep_handle && !continuation)
  255. {
  256. starpu_data_handle_t handle = j->implicit_dep_handle;
  257. _starpu_release_data_enforce_sequential_consistency(j->task, &j->implicit_dep_slot, handle);
  258. /* Release reference taken while setting implicit_dep_handle */
  259. _starpu_spin_lock(&handle->header_lock);
  260. handle->busy_count--;
  261. if (!_starpu_data_check_not_busy(handle))
  262. _starpu_spin_unlock(&handle->header_lock);
  263. }
  264. /* If this is a continuation, we do not notify task/tag dependencies
  265. * now. Task/tag dependencies will be notified only when the continued
  266. * task fully completes */
  267. if (!continuation)
  268. {
  269. /* in case there are dependencies, wake up the proper tasks */
  270. _starpu_notify_dependencies(j);
  271. }
  272. /* If this is a continuation, we do not execute the callback
  273. * now. The callback will be executed only when the continued
  274. * task fully completes */
  275. if (!continuation)
  276. {
  277. /* the callback is executed after the dependencies so that we may remove the tag
  278. * of the task itself */
  279. if (task->callback_func)
  280. {
  281. int profiling = starpu_profiling_status_get();
  282. if (profiling && task->profiling_info)
  283. _starpu_clock_gettime(&task->profiling_info->callback_start_time);
  284. /* so that we can check whether we are doing blocking calls
  285. * within the callback */
  286. _starpu_set_local_worker_status(STATUS_CALLBACK);
  287. /* Perhaps we have nested callbacks (eg. with chains of empty
  288. * tasks). So we store the current task and we will restore it
  289. * later. */
  290. struct starpu_task *current_task = starpu_task_get_current();
  291. _starpu_set_current_task(task);
  292. _STARPU_TRACE_START_CALLBACK(j);
  293. task->callback_func(task->callback_arg);
  294. _STARPU_TRACE_END_CALLBACK(j);
  295. _starpu_set_current_task(current_task);
  296. _starpu_set_local_worker_status(STATUS_UNKNOWN);
  297. if (profiling && task->profiling_info)
  298. _starpu_clock_gettime(&task->profiling_info->callback_end_time);
  299. }
  300. }
  301. /* If the job was executed on a combined worker there is no need for the
  302. * scheduler to process it : the task structure doesn't contain any valuable
  303. * data as it's not linked to an actual worker */
  304. /* control task should not execute post_exec_hook */
  305. if(j->task_size == 1 && task->cl != NULL && !j->internal
  306. #ifdef STARPU_OPENMP
  307. /* If this is a continuation, we do not execute the post_exec_hook. The
  308. * post_exec_hook will be run only when the continued task fully
  309. * completes.
  310. *
  311. * Note: If needed, a specific hook could be added to handle stopped
  312. * tasks */
  313. && !continuation
  314. #endif
  315. )
  316. {
  317. _starpu_sched_post_exec_hook(task);
  318. #ifdef STARPU_USE_SC_HYPERVISOR
  319. int workerid = starpu_worker_get_id();
  320. _starpu_sched_ctx_post_exec_task_cb(workerid, task, data_size, j->footprint);
  321. #endif //STARPU_USE_SC_HYPERVISOR
  322. }
  323. /* Note: For now, we keep the TASK_DONE trace event for continuation,
  324. * however we could add a specific event for stopped tasks if needed.
  325. */
  326. _STARPU_TRACE_TASK_DONE(j);
  327. /* NB: we do not save those values before the callback, in case the
  328. * application changes some parameters eventually (eg. a task may not
  329. * be generated if the application is terminated). */
  330. int destroy = task->destroy;
  331. int detach = task->detach;
  332. int regenerate = task->regenerate;
  333. /* we do not desallocate the job structure if some is going to
  334. * wait after the task */
  335. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  336. if (!continuation)
  337. {
  338. #ifdef STARPU_OPENMP
  339. if (j->omp_cleanup_callback)
  340. {
  341. j->omp_cleanup_callback(j->omp_cleanup_callback_arg);
  342. j->omp_cleanup_callback = NULL;
  343. j->omp_cleanup_callback_arg = NULL;
  344. }
  345. #endif
  346. /* A value of 2 is put to specify that not only the codelet but
  347. * also the callback were executed. */
  348. j->terminated = 2;
  349. }
  350. STARPU_PTHREAD_COND_BROADCAST(&j->sync_cond);
  351. #ifdef HAVE_AYUDAME_H
  352. if (AYU_event) AYU_event(AYU_REMOVETASK, j->job_id, NULL);
  353. #endif
  354. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  355. if (detach && !continuation)
  356. {
  357. /* no one is going to synchronize with that task so we release
  358. * the data structures now. In case the job was already locked
  359. * by the caller, it is its responsability to destroy the task.
  360. * */
  361. if (destroy)
  362. _starpu_task_destroy(task);
  363. }
  364. /* A continuation is not much different from a regenerated task. */
  365. if (regenerate || continuation)
  366. {
  367. STARPU_ASSERT_MSG((detach && !destroy && !task->synchronous)
  368. || continuation
  369. , "Regenerated task must be detached (was %d), and not have detroy=1 (was %d) or synchronous=1 (was %d)", detach, destroy, task->synchronous);
  370. #ifdef HAVE_AYUDAME_H
  371. if (AYU_event)
  372. {
  373. int64_t AYU_data[2] = {j->exclude_from_dag?0:_starpu_ayudame_get_func_id(task->cl), task->priority > STARPU_MIN_PRIO};
  374. AYU_event(AYU_ADDTASK, j->job_id, AYU_data);
  375. }
  376. #endif
  377. {
  378. #ifdef STARPU_OPENMP
  379. unsigned continuation_resubmit = j->continuation_resubmit;
  380. void (*continuation_callback_on_sleep)(void *arg) = j->continuation_callback_on_sleep;
  381. void *continuation_callback_on_sleep_arg = j->continuation_callback_on_sleep_arg;
  382. j->continuation_resubmit = 1;
  383. j->continuation_callback_on_sleep = NULL;
  384. j->continuation_callback_on_sleep_arg = NULL;
  385. if (!continuation || continuation_resubmit)
  386. #endif
  387. {
  388. /* We reuse the same job structure */
  389. int ret = _starpu_submit_job(j);
  390. STARPU_ASSERT(!ret);
  391. }
  392. #ifdef STARPU_OPENMP
  393. if (continuation && continuation_callback_on_sleep != NULL)
  394. {
  395. continuation_callback_on_sleep(continuation_callback_on_sleep_arg);
  396. }
  397. #endif
  398. }
  399. }
  400. _starpu_decrement_nready_tasks_of_sched_ctx(sched_ctx, flops);
  401. _starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx);
  402. struct _starpu_worker *worker;
  403. worker = _starpu_get_local_worker_key();
  404. if (worker)
  405. {
  406. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  407. if(worker->removed_from_ctx[sched_ctx] == 1 && worker->shares_tasks_lists[sched_ctx] == 1)
  408. {
  409. _starpu_worker_gets_out_of_ctx(sched_ctx, worker);
  410. worker->removed_from_ctx[sched_ctx] = 0;
  411. }
  412. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  413. }
  414. }
  415. /* This function is called when a new task is submitted to StarPU
  416. * it returns 1 if the tag deps are not fulfilled, 0 otherwise */
  417. static unsigned _starpu_not_all_tag_deps_are_fulfilled(struct _starpu_job *j)
  418. {
  419. unsigned ret;
  420. if (!j->task->use_tag)
  421. {
  422. /* this task does not use tags, so we can go on */
  423. return 0;
  424. }
  425. struct _starpu_tag *tag = j->tag;
  426. struct _starpu_cg_list *tag_successors = &tag->tag_successors;
  427. _starpu_spin_lock(&tag->lock);
  428. STARPU_ASSERT_MSG(tag->is_assigned == 1 || !tag_successors->ndeps, "a tag can be assigned only one task to wake (%llu had %u assigned tasks, and %u successors)", (unsigned long long) tag->id, tag->is_assigned, tag_successors->ndeps);
  429. if (tag_successors->ndeps != tag_successors->ndeps_completed)
  430. {
  431. tag->state = STARPU_BLOCKED;
  432. j->task->status = STARPU_TASK_BLOCKED_ON_TAG;
  433. ret = 1;
  434. }
  435. else
  436. {
  437. /* existing deps (if any) are fulfilled */
  438. /* If the same tag is being signaled by several tasks, do not
  439. * clear a DONE state. If it's the same job submitted several
  440. * times with the same tag, we have to do it */
  441. if (j->submitted == 2 || tag->state != STARPU_DONE)
  442. tag->state = STARPU_READY;
  443. /* already prepare for next run */
  444. tag_successors->ndeps_completed = 0;
  445. ret = 0;
  446. }
  447. _starpu_spin_unlock(&tag->lock);
  448. return ret;
  449. }
  450. static unsigned _starpu_not_all_task_deps_are_fulfilled(struct _starpu_job *j)
  451. {
  452. unsigned ret;
  453. struct _starpu_cg_list *job_successors = &j->job_successors;
  454. if (!j->submitted || (job_successors->ndeps != job_successors->ndeps_completed))
  455. {
  456. j->task->status = STARPU_TASK_BLOCKED_ON_TASK;
  457. ret = 1;
  458. }
  459. else
  460. {
  461. /* existing deps (if any) are fulfilled */
  462. /* already prepare for next run */
  463. job_successors->ndeps_completed = 0;
  464. ret = 0;
  465. }
  466. return ret;
  467. }
  468. /*
  469. * In order, we enforce tag, task and data dependencies. The task is
  470. * passed to the scheduler only once all these constraints are fulfilled.
  471. *
  472. * The job mutex has to be taken for atomicity with task submission, and
  473. * is released here.
  474. */
  475. unsigned _starpu_enforce_deps_and_schedule(struct _starpu_job *j)
  476. {
  477. unsigned ret;
  478. _STARPU_LOG_IN();
  479. /* enfore tag dependencies */
  480. if (_starpu_not_all_tag_deps_are_fulfilled(j))
  481. {
  482. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  483. _STARPU_LOG_OUT_TAG("not_all_tag_deps_are_fulfilled");
  484. return 0;
  485. }
  486. /* enfore task dependencies */
  487. if (_starpu_not_all_task_deps_are_fulfilled(j))
  488. {
  489. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  490. _STARPU_LOG_OUT_TAG("not_all_task_deps_are_fulfilled");
  491. return 0;
  492. }
  493. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  494. /* enforce data dependencies */
  495. if (_starpu_submit_job_enforce_data_deps(j))
  496. {
  497. _STARPU_LOG_OUT_TAG("enforce_data_deps");
  498. return 0;
  499. }
  500. ret = _starpu_push_task(j);
  501. _STARPU_LOG_OUT();
  502. return ret;
  503. }
  504. /* Tag deps are already fulfilled */
  505. unsigned _starpu_enforce_deps_starting_from_task(struct _starpu_job *j)
  506. {
  507. unsigned ret;
  508. /* enfore task dependencies */
  509. if (_starpu_not_all_task_deps_are_fulfilled(j))
  510. {
  511. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  512. return 0;
  513. }
  514. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  515. /* enforce data dependencies */
  516. if (_starpu_submit_job_enforce_data_deps(j))
  517. return 0;
  518. ret = _starpu_push_task(j);
  519. return ret;
  520. }
  521. #ifdef STARPU_OPENMP
  522. /* When waking up a continuation, we only enforce new task dependencies */
  523. unsigned _starpu_reenforce_task_deps_and_schedule(struct _starpu_job *j)
  524. {
  525. unsigned ret;
  526. _STARPU_LOG_IN();
  527. STARPU_ASSERT(j->discontinuous);
  528. /* enfore task dependencies */
  529. if (_starpu_not_all_task_deps_are_fulfilled(j))
  530. {
  531. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  532. _STARPU_LOG_OUT_TAG("not_all_task_deps_are_fulfilled");
  533. return 0;
  534. }
  535. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  536. ret = _starpu_push_task(j);
  537. _STARPU_LOG_OUT();
  538. return ret;
  539. }
  540. #endif
  541. /* Ordered tasks are simply recorded as they arrive in the local_ordered_tasks
  542. * ring buffer, indexed by order, and pulled from its head. */
  543. /* TODO: replace with perhaps a heap */
  544. /* This function must be called with worker->sched_mutex taken */
  545. struct starpu_task *_starpu_pop_local_task(struct _starpu_worker *worker)
  546. {
  547. struct starpu_task *task = NULL;
  548. if (worker->local_ordered_tasks_size)
  549. {
  550. task = worker->local_ordered_tasks[worker->current_ordered_task];
  551. if (task)
  552. {
  553. worker->local_ordered_tasks[worker->current_ordered_task] = NULL;
  554. STARPU_ASSERT(task->workerorder == worker->current_ordered_task_order);
  555. /* Next ordered task is there, return it */
  556. worker->current_ordered_task = (worker->current_ordered_task + 1) % worker->local_ordered_tasks_size;
  557. worker->current_ordered_task_order++;
  558. return task;
  559. }
  560. }
  561. if (!starpu_task_list_empty(&worker->local_tasks))
  562. task = starpu_task_list_pop_front(&worker->local_tasks);
  563. return task;
  564. }
  565. int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *task, int prio)
  566. {
  567. /* Check that the worker is able to execute the task ! */
  568. STARPU_ASSERT(task && task->cl);
  569. if (STARPU_UNLIKELY(!(worker->worker_mask & task->cl->where)))
  570. return -ENODEV;
  571. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  572. if (task->execute_on_a_specific_worker && task->workerorder)
  573. {
  574. STARPU_ASSERT_MSG(task->workerorder >= worker->current_ordered_task_order, "worker order values must not have duplicates (%d pushed to worker %d, but %d already passed)", task->workerorder, worker->workerid, worker->current_ordered_task_order);
  575. /* Put it in the ordered task ring */
  576. unsigned needed = task->workerorder - worker->current_ordered_task_order + 1;
  577. if (worker->local_ordered_tasks_size < needed)
  578. {
  579. /* Increase the size */
  580. unsigned alloc = worker->local_ordered_tasks_size;
  581. struct starpu_task **new;
  582. unsigned copied;
  583. if (!alloc)
  584. alloc = 1;
  585. while (alloc < needed)
  586. alloc *= 2;
  587. new = malloc(alloc * sizeof(*new));
  588. /* Put existing tasks at the beginning of the new ring */
  589. copied = worker->local_ordered_tasks_size - worker->current_ordered_task;
  590. memcpy(new, &worker->local_ordered_tasks[worker->current_ordered_task], copied * sizeof(*new));
  591. memcpy(new + copied, worker->local_ordered_tasks, (worker->local_ordered_tasks_size - copied) * sizeof(*new));
  592. memset(new + worker->local_ordered_tasks_size, 0, (alloc - worker->local_ordered_tasks_size) * sizeof(*new));
  593. free(worker->local_ordered_tasks);
  594. worker->local_ordered_tasks = new;
  595. worker->local_ordered_tasks_size = alloc;
  596. worker->current_ordered_task = 0;
  597. }
  598. worker->local_ordered_tasks[(worker->current_ordered_task + task->workerorder - worker->current_ordered_task_order) % worker->local_ordered_tasks_size] = task;
  599. }
  600. else
  601. {
  602. if (prio)
  603. starpu_task_list_push_front(&worker->local_tasks, task);
  604. else
  605. starpu_task_list_push_back(&worker->local_tasks, task);
  606. }
  607. STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
  608. starpu_push_task_end(task);
  609. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  610. return 0;
  611. }