jobs.c 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009-2015 Université de Bordeaux
  4. * Copyright (C) 2010, 2011, 2012, 2013, 2014 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 Télécom-SudParis
  6. * Copyright (C) 2011, 2014 INRIA
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. #include <starpu.h>
  20. #include <core/jobs.h>
  21. #include <core/task.h>
  22. #include <core/workers.h>
  23. #include <core/dependencies/data_concurrency.h>
  24. #include <common/config.h>
  25. #include <common/utils.h>
  26. #include <profiling/profiling.h>
  27. #include <profiling/bound.h>
  28. #include <starpu_top.h>
  29. #include <top/starpu_top_core.h>
  30. #include <core/debug.h>
  31. /* we need to identify each task to generate the DAG. */
  32. static unsigned job_cnt = 0;
  33. void _starpu_exclude_task_from_dag(struct starpu_task *task)
  34. {
  35. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  36. j->exclude_from_dag = 1;
  37. }
  38. /* create an internal struct _starpu_job structure to encapsulate the task */
  39. struct _starpu_job* STARPU_ATTRIBUTE_MALLOC _starpu_job_create(struct starpu_task *task)
  40. {
  41. struct _starpu_job *job;
  42. _STARPU_LOG_IN();
  43. job = _starpu_job_new();
  44. /* As most of the fields must be initialized at NULL, let's put 0
  45. * everywhere */
  46. memset(job, 0, sizeof(*job));
  47. if (task->dyn_handles)
  48. {
  49. job->dyn_ordered_buffers = malloc(STARPU_TASK_GET_NBUFFERS(task) * sizeof(job->dyn_ordered_buffers[0]));
  50. job->dyn_dep_slots = calloc(STARPU_TASK_GET_NBUFFERS(task), sizeof(job->dyn_dep_slots[0]));
  51. }
  52. job->task = task;
  53. #ifndef STARPU_USE_FXT
  54. if (_starpu_bound_recording || _starpu_top_status_get()
  55. #ifdef HAVE_AYUDAME_H
  56. || AYU_event
  57. #endif
  58. )
  59. #endif
  60. {
  61. job->job_id = STARPU_ATOMIC_ADD(&job_cnt, 1);
  62. #ifdef HAVE_AYUDAME_H
  63. if (AYU_event)
  64. {
  65. /* Declare task to Ayudame */
  66. int64_t AYU_data[2] = {_starpu_ayudame_get_func_id(task->cl), task->priority > STARPU_MIN_PRIO};
  67. AYU_event(AYU_ADDTASK, job->job_id, AYU_data);
  68. }
  69. #endif
  70. }
  71. _starpu_cg_list_init(&job->job_successors);
  72. STARPU_PTHREAD_MUTEX_INIT(&job->sync_mutex, NULL);
  73. STARPU_PTHREAD_COND_INIT(&job->sync_cond, NULL);
  74. /* By default we have sequential tasks */
  75. job->task_size = 1;
  76. if (task->use_tag)
  77. _starpu_tag_declare(task->tag_id, job);
  78. _STARPU_LOG_OUT();
  79. return job;
  80. }
  81. void _starpu_job_destroy(struct _starpu_job *j)
  82. {
  83. /* Wait for any code that was still working on the job (and was
  84. * probably our waker) */
  85. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  86. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  87. STARPU_PTHREAD_COND_DESTROY(&j->sync_cond);
  88. STARPU_PTHREAD_MUTEX_DESTROY(&j->sync_mutex);
  89. if (j->task_size > 1)
  90. {
  91. STARPU_PTHREAD_BARRIER_DESTROY(&j->before_work_barrier);
  92. STARPU_PTHREAD_BARRIER_DESTROY(&j->after_work_barrier);
  93. }
  94. _starpu_cg_list_deinit(&j->job_successors);
  95. if (j->dyn_ordered_buffers)
  96. {
  97. free(j->dyn_ordered_buffers);
  98. j->dyn_ordered_buffers = NULL;
  99. }
  100. if (j->dyn_dep_slots)
  101. {
  102. free(j->dyn_dep_slots);
  103. j->dyn_dep_slots = NULL;
  104. }
  105. _starpu_job_delete(j);
  106. }
  107. int _starpu_job_finished(struct _starpu_job *j)
  108. {
  109. int ret;
  110. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  111. ret = j->terminated == 2;
  112. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  113. return ret;
  114. }
  115. void _starpu_wait_job(struct _starpu_job *j)
  116. {
  117. STARPU_ASSERT(j->task);
  118. STARPU_ASSERT(!j->task->detach);
  119. _STARPU_LOG_IN();
  120. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  121. /* We wait for the flag to have a value of 2 which means that both the
  122. * codelet's implementation and its callback have been executed. That
  123. * way, _starpu_wait_job won't return until the entire task was really
  124. * executed (so that we cannot destroy the task while it is still being
  125. * manipulated by the driver). */
  126. while (j->terminated != 2)
  127. {
  128. STARPU_PTHREAD_COND_WAIT(&j->sync_cond, &j->sync_mutex);
  129. }
  130. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  131. _STARPU_LOG_OUT();
  132. }
  133. #ifdef STARPU_OPENMP
  134. int _starpu_test_job_termination(struct _starpu_job *j)
  135. {
  136. STARPU_ASSERT(j->task);
  137. STARPU_ASSERT(!j->task->detach);
  138. return (j->terminated == 2);
  139. }
  140. void _starpu_job_prepare_for_continuation_ext(struct _starpu_job *j, unsigned continuation_resubmit,
  141. void (*continuation_callback_on_sleep)(void *arg), void *continuation_callback_on_sleep_arg)
  142. {
  143. STARPU_ASSERT(!j->continuation);
  144. /* continuation are not supported for parallel tasks for now */
  145. STARPU_ASSERT(j->task_size == 1);
  146. j->continuation = 1;
  147. j->continuation_resubmit = continuation_resubmit;
  148. j->continuation_callback_on_sleep = continuation_callback_on_sleep;
  149. j->continuation_callback_on_sleep_arg = continuation_callback_on_sleep_arg;
  150. j->job_successors.ndeps = 0;
  151. }
  152. /* Prepare a currently running job for accepting a new set of
  153. * dependencies in anticipation of becoming a continuation. */
  154. void _starpu_job_prepare_for_continuation(struct _starpu_job *j)
  155. {
  156. _starpu_job_prepare_for_continuation_ext(j, 1, NULL, NULL);
  157. }
  158. void _starpu_job_set_omp_cleanup_callback(struct _starpu_job *j,
  159. void (*omp_cleanup_callback)(void *arg), void *omp_cleanup_callback_arg)
  160. {
  161. j->omp_cleanup_callback = omp_cleanup_callback;
  162. j->omp_cleanup_callback_arg = omp_cleanup_callback_arg;
  163. }
  164. #endif
  165. void _starpu_handle_job_termination(struct _starpu_job *j)
  166. {
  167. struct starpu_task *task = j->task;
  168. unsigned sched_ctx = task->sched_ctx;
  169. double flops = task->flops;
  170. const unsigned continuation =
  171. #ifdef STARPU_OPENMP
  172. j->continuation
  173. #else
  174. 0
  175. #endif
  176. ;
  177. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  178. #ifdef STARPU_OPENMP
  179. if (continuation)
  180. {
  181. task->status = STARPU_TASK_STOPPED;
  182. }
  183. else
  184. #endif
  185. {
  186. task->status = STARPU_TASK_FINISHED;
  187. /* We must have set the j->terminated flag early, so that it is
  188. * possible to express task dependencies within the callback
  189. * function. A value of 1 means that the codelet was executed but that
  190. * the callback is not done yet. */
  191. j->terminated = 1;
  192. }
  193. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  194. #ifdef STARPU_USE_SC_HYPERVISOR
  195. size_t data_size = 0;
  196. #endif //STARPU_USE_SC_HYPERVISOR
  197. /* We release handle reference count */
  198. if (task->cl && !continuation)
  199. {
  200. unsigned i;
  201. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  202. #ifdef STARPU_USE_SC_HYPERVISOR
  203. for(i = 0; i < nbuffers; i++)
  204. {
  205. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  206. if (handle != NULL)
  207. data_size += _starpu_data_get_size(handle);
  208. }
  209. #endif //STARPU_USE_SC_HYPERVISOR
  210. for (i = 0; i < nbuffers; i++)
  211. {
  212. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  213. _starpu_spin_lock(&handle->header_lock);
  214. handle->busy_count--;
  215. if (!_starpu_data_check_not_busy(handle))
  216. _starpu_spin_unlock(&handle->header_lock);
  217. }
  218. }
  219. /* If this is a continuation, we do not release task dependencies now.
  220. * Task dependencies will be released only when the continued task
  221. * fully completes */
  222. if (!continuation)
  223. {
  224. /* Tell other tasks that we don't exist any more, thus no need for
  225. * implicit dependencies any more. */
  226. _starpu_release_task_enforce_sequential_consistency(j);
  227. }
  228. /* Task does not have a cl, but has explicit data dependencies, we need
  229. * to tell them that we will not exist any more before notifying the
  230. * tasks waiting for us
  231. *
  232. * For continuations, implicit dependency handles are only released
  233. * when the task fully completes */
  234. if (j->implicit_dep_handle && !continuation)
  235. {
  236. starpu_data_handle_t handle = j->implicit_dep_handle;
  237. _starpu_release_data_enforce_sequential_consistency(j->task, &j->implicit_dep_slot, handle);
  238. /* Release reference taken while setting implicit_dep_handle */
  239. _starpu_spin_lock(&handle->header_lock);
  240. handle->busy_count--;
  241. if (!_starpu_data_check_not_busy(handle))
  242. _starpu_spin_unlock(&handle->header_lock);
  243. }
  244. /* If this is a continuation, we do not notify task/tag dependencies
  245. * now. Task/tag dependencies will be notified only when the continued
  246. * task fully completes */
  247. if (!continuation)
  248. {
  249. /* in case there are dependencies, wake up the proper tasks */
  250. _starpu_notify_dependencies(j);
  251. }
  252. /* If this is a continuation, we do not execute the callback
  253. * now. The callback will be executed only when the continued
  254. * task fully completes */
  255. if (!continuation)
  256. {
  257. /* the callback is executed after the dependencies so that we may remove the tag
  258. * of the task itself */
  259. if (task->callback_func)
  260. {
  261. int profiling = starpu_profiling_status_get();
  262. if (profiling && task->profiling_info)
  263. _starpu_clock_gettime(&task->profiling_info->callback_start_time);
  264. /* so that we can check whether we are doing blocking calls
  265. * within the callback */
  266. _starpu_set_local_worker_status(STATUS_CALLBACK);
  267. /* Perhaps we have nested callbacks (eg. with chains of empty
  268. * tasks). So we store the current task and we will restore it
  269. * later. */
  270. struct starpu_task *current_task = starpu_task_get_current();
  271. _starpu_set_current_task(task);
  272. _STARPU_TRACE_START_CALLBACK(j);
  273. task->callback_func(task->callback_arg);
  274. _STARPU_TRACE_END_CALLBACK(j);
  275. _starpu_set_current_task(current_task);
  276. _starpu_set_local_worker_status(STATUS_UNKNOWN);
  277. if (profiling && task->profiling_info)
  278. _starpu_clock_gettime(&task->profiling_info->callback_end_time);
  279. }
  280. }
  281. /* If the job was executed on a combined worker there is no need for the
  282. * scheduler to process it : the task structure doesn't contain any valuable
  283. * data as it's not linked to an actual worker */
  284. /* control task should not execute post_exec_hook */
  285. if(j->task_size == 1 && task->cl != NULL && !j->internal
  286. #ifdef STARPU_OPENMP
  287. /* If this is a continuation, we do not execute the post_exec_hook. The
  288. * post_exec_hook will be run only when the continued task fully
  289. * completes.
  290. *
  291. * Note: If needed, a specific hook could be added to handle stopped
  292. * tasks */
  293. && !continuation
  294. #endif
  295. )
  296. {
  297. _starpu_sched_post_exec_hook(task);
  298. #ifdef STARPU_USE_SC_HYPERVISOR
  299. int workerid = starpu_worker_get_id();
  300. _starpu_sched_ctx_post_exec_task_cb(workerid, task, data_size, j->footprint);
  301. #endif //STARPU_USE_SC_HYPERVISOR
  302. }
  303. /* Note: For now, we keep the TASK_DONE trace event for continuation,
  304. * however we could add a specific event for stopped tasks if needed.
  305. */
  306. _STARPU_TRACE_TASK_DONE(j);
  307. /* NB: we do not save those values before the callback, in case the
  308. * application changes some parameters eventually (eg. a task may not
  309. * be generated if the application is terminated). */
  310. int destroy = task->destroy;
  311. int detach = task->detach;
  312. int regenerate = task->regenerate;
  313. /* we do not desallocate the job structure if some is going to
  314. * wait after the task */
  315. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  316. if (!continuation)
  317. {
  318. #ifdef STARPU_OPENMP
  319. if (j->omp_cleanup_callback)
  320. {
  321. j->omp_cleanup_callback(j->omp_cleanup_callback_arg);
  322. j->omp_cleanup_callback = NULL;
  323. j->omp_cleanup_callback_arg = NULL;
  324. }
  325. #endif
  326. /* A value of 2 is put to specify that not only the codelet but
  327. * also the callback were executed. */
  328. j->terminated = 2;
  329. }
  330. STARPU_PTHREAD_COND_BROADCAST(&j->sync_cond);
  331. #ifdef HAVE_AYUDAME_H
  332. if (AYU_event) AYU_event(AYU_REMOVETASK, j->job_id, NULL);
  333. #endif
  334. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  335. if (detach && !continuation)
  336. {
  337. /* no one is going to synchronize with that task so we release
  338. * the data structures now. In case the job was already locked
  339. * by the caller, it is its responsability to destroy the task.
  340. * */
  341. if (destroy)
  342. _starpu_task_destroy(task);
  343. }
  344. /* A continuation is not much different from a regenerated task. */
  345. if (regenerate || continuation)
  346. {
  347. STARPU_ASSERT_MSG((detach && !destroy && !task->synchronous)
  348. || continuation
  349. , "Regenerated task must be detached (was %d), and not have detroy=1 (was %d) or synchronous=1 (was %d)", detach, destroy, task->synchronous);
  350. #ifdef HAVE_AYUDAME_H
  351. if (AYU_event)
  352. {
  353. int64_t AYU_data[2] = {j->exclude_from_dag?0:_starpu_ayudame_get_func_id(task->cl), task->priority > STARPU_MIN_PRIO};
  354. AYU_event(AYU_ADDTASK, j->job_id, AYU_data);
  355. }
  356. #endif
  357. {
  358. #ifdef STARPU_OPENMP
  359. unsigned continuation_resubmit = j->continuation_resubmit;
  360. void (*continuation_callback_on_sleep)(void *arg) = j->continuation_callback_on_sleep;
  361. void *continuation_callback_on_sleep_arg = j->continuation_callback_on_sleep_arg;
  362. j->continuation_resubmit = 1;
  363. j->continuation_callback_on_sleep = NULL;
  364. j->continuation_callback_on_sleep_arg = NULL;
  365. if (!continuation || continuation_resubmit)
  366. #endif
  367. {
  368. /* We reuse the same job structure */
  369. int ret = _starpu_submit_job(j);
  370. STARPU_ASSERT(!ret);
  371. }
  372. #ifdef STARPU_OPENMP
  373. if (continuation && continuation_callback_on_sleep != NULL)
  374. {
  375. continuation_callback_on_sleep(continuation_callback_on_sleep_arg);
  376. }
  377. #endif
  378. }
  379. }
  380. _starpu_decrement_nready_tasks_of_sched_ctx(sched_ctx, flops);
  381. _starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx);
  382. struct _starpu_worker *worker;
  383. worker = _starpu_get_local_worker_key();
  384. if (worker)
  385. {
  386. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  387. if(worker->removed_from_ctx[sched_ctx] == 1 && worker->shares_tasks_lists[sched_ctx] == 1)
  388. {
  389. _starpu_worker_gets_out_of_ctx(sched_ctx, worker);
  390. worker->removed_from_ctx[sched_ctx] = 0;
  391. }
  392. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  393. }
  394. }
  395. /* This function is called when a new task is submitted to StarPU
  396. * it returns 1 if the tag deps are not fulfilled, 0 otherwise */
  397. static unsigned _starpu_not_all_tag_deps_are_fulfilled(struct _starpu_job *j)
  398. {
  399. unsigned ret;
  400. if (!j->task->use_tag)
  401. {
  402. /* this task does not use tags, so we can go on */
  403. return 0;
  404. }
  405. struct _starpu_tag *tag = j->tag;
  406. struct _starpu_cg_list *tag_successors = &tag->tag_successors;
  407. _starpu_spin_lock(&tag->lock);
  408. STARPU_ASSERT_MSG(tag->is_assigned == 1 || !tag_successors->ndeps, "a tag can be assigned only one task to wake (%llu had %u assigned tasks, and %u successors)", (unsigned long long) tag->id, tag->is_assigned, tag_successors->ndeps);
  409. if (tag_successors->ndeps != tag_successors->ndeps_completed)
  410. {
  411. tag->state = STARPU_BLOCKED;
  412. j->task->status = STARPU_TASK_BLOCKED_ON_TAG;
  413. ret = 1;
  414. }
  415. else
  416. {
  417. /* existing deps (if any) are fulfilled */
  418. /* If the same tag is being signaled by several tasks, do not
  419. * clear a DONE state. If it's the same job submitted several
  420. * times with the same tag, we have to do it */
  421. if (j->submitted == 2 || tag->state != STARPU_DONE)
  422. tag->state = STARPU_READY;
  423. /* already prepare for next run */
  424. tag_successors->ndeps_completed = 0;
  425. ret = 0;
  426. }
  427. _starpu_spin_unlock(&tag->lock);
  428. return ret;
  429. }
  430. static unsigned _starpu_not_all_task_deps_are_fulfilled(struct _starpu_job *j)
  431. {
  432. unsigned ret;
  433. struct _starpu_cg_list *job_successors = &j->job_successors;
  434. if (!j->submitted || (job_successors->ndeps != job_successors->ndeps_completed))
  435. {
  436. j->task->status = STARPU_TASK_BLOCKED_ON_TASK;
  437. ret = 1;
  438. }
  439. else
  440. {
  441. /* existing deps (if any) are fulfilled */
  442. /* already prepare for next run */
  443. job_successors->ndeps_completed = 0;
  444. ret = 0;
  445. }
  446. return ret;
  447. }
  448. /*
  449. * In order, we enforce tag, task and data dependencies. The task is
  450. * passed to the scheduler only once all these constraints are fulfilled.
  451. *
  452. * The job mutex has to be taken for atomicity with task submission, and
  453. * is released here.
  454. */
  455. unsigned _starpu_enforce_deps_and_schedule(struct _starpu_job *j)
  456. {
  457. unsigned ret;
  458. _STARPU_LOG_IN();
  459. /* enfore tag dependencies */
  460. if (_starpu_not_all_tag_deps_are_fulfilled(j))
  461. {
  462. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  463. _STARPU_LOG_OUT_TAG("not_all_tag_deps_are_fulfilled");
  464. return 0;
  465. }
  466. /* enfore task dependencies */
  467. if (_starpu_not_all_task_deps_are_fulfilled(j))
  468. {
  469. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  470. _STARPU_LOG_OUT_TAG("not_all_task_deps_are_fulfilled");
  471. return 0;
  472. }
  473. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  474. /* enforce data dependencies */
  475. if (_starpu_submit_job_enforce_data_deps(j))
  476. {
  477. _STARPU_LOG_OUT_TAG("enforce_data_deps");
  478. return 0;
  479. }
  480. ret = _starpu_push_task(j);
  481. _STARPU_LOG_OUT();
  482. return ret;
  483. }
  484. /* Tag deps are already fulfilled */
  485. unsigned _starpu_enforce_deps_starting_from_task(struct _starpu_job *j)
  486. {
  487. unsigned ret;
  488. /* enfore task dependencies */
  489. if (_starpu_not_all_task_deps_are_fulfilled(j))
  490. {
  491. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  492. return 0;
  493. }
  494. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  495. /* enforce data dependencies */
  496. if (_starpu_submit_job_enforce_data_deps(j))
  497. return 0;
  498. ret = _starpu_push_task(j);
  499. return ret;
  500. }
  501. #ifdef STARPU_OPENMP
  502. /* When waking up a continuation, we only enforce new task dependencies */
  503. unsigned _starpu_reenforce_task_deps_and_schedule(struct _starpu_job *j)
  504. {
  505. unsigned ret;
  506. _STARPU_LOG_IN();
  507. STARPU_ASSERT(j->discontinuous);
  508. /* enfore task dependencies */
  509. if (_starpu_not_all_task_deps_are_fulfilled(j))
  510. {
  511. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  512. _STARPU_LOG_OUT_TAG("not_all_task_deps_are_fulfilled");
  513. return 0;
  514. }
  515. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  516. ret = _starpu_push_task(j);
  517. _STARPU_LOG_OUT();
  518. return ret;
  519. }
  520. #endif
  521. /* Ordered tasks are simply recorded as they arrive in the local_ordered_tasks
  522. * ring buffer, indexed by order, and pulled from its head. */
  523. /* TODO: replace with perhaps a heap */
  524. /* This function must be called with worker->sched_mutex taken */
  525. struct starpu_task *_starpu_pop_local_task(struct _starpu_worker *worker)
  526. {
  527. struct starpu_task *task = NULL;
  528. if (worker->local_ordered_tasks_size)
  529. {
  530. task = worker->local_ordered_tasks[worker->current_ordered_task];
  531. if (task)
  532. {
  533. worker->local_ordered_tasks[worker->current_ordered_task] = NULL;
  534. STARPU_ASSERT(task->workerorder == worker->current_ordered_task_order);
  535. /* Next ordered task is there, return it */
  536. worker->current_ordered_task = (worker->current_ordered_task + 1) % worker->local_ordered_tasks_size;
  537. worker->current_ordered_task_order++;
  538. return task;
  539. }
  540. }
  541. if (!starpu_task_list_empty(&worker->local_tasks))
  542. task = starpu_task_list_pop_front(&worker->local_tasks);
  543. return task;
  544. }
  545. int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *task, int prio)
  546. {
  547. /* Check that the worker is able to execute the task ! */
  548. STARPU_ASSERT(task && task->cl);
  549. if (STARPU_UNLIKELY(!(worker->worker_mask & task->cl->where)))
  550. return -ENODEV;
  551. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  552. if (task->execute_on_a_specific_worker && task->workerorder)
  553. {
  554. STARPU_ASSERT_MSG(task->workerorder >= worker->current_ordered_task_order, "worker order values must not have duplicates (%d pushed to worker %d, but %d already passed)", task->workerorder, worker->workerid, worker->current_ordered_task_order);
  555. /* Put it in the ordered task ring */
  556. unsigned needed = task->workerorder - worker->current_ordered_task_order + 1;
  557. if (worker->local_ordered_tasks_size < needed)
  558. {
  559. /* Increase the size */
  560. unsigned alloc = worker->local_ordered_tasks_size;
  561. struct starpu_task **new;
  562. unsigned copied;
  563. if (!alloc)
  564. alloc = 1;
  565. while (alloc < needed)
  566. alloc *= 2;
  567. new = malloc(alloc * sizeof(*new));
  568. /* Put existing tasks at the beginning of the new ring */
  569. copied = worker->local_ordered_tasks_size - worker->current_ordered_task;
  570. memcpy(new, &worker->local_ordered_tasks[worker->current_ordered_task], copied * sizeof(*new));
  571. memcpy(new + copied, worker->local_ordered_tasks, (worker->local_ordered_tasks_size - copied) * sizeof(*new));
  572. memset(new + worker->local_ordered_tasks_size, 0, (alloc - worker->local_ordered_tasks_size) * sizeof(*new));
  573. free(worker->local_ordered_tasks);
  574. worker->local_ordered_tasks = new;
  575. worker->local_ordered_tasks_size = alloc;
  576. worker->current_ordered_task = 0;
  577. }
  578. worker->local_ordered_tasks[(worker->current_ordered_task + task->workerorder - worker->current_ordered_task_order) % worker->local_ordered_tasks_size] = task;
  579. }
  580. else
  581. {
  582. if (prio)
  583. starpu_task_list_push_front(&worker->local_tasks, task);
  584. else
  585. starpu_task_list_push_back(&worker->local_tasks, task);
  586. }
  587. STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
  588. starpu_push_task_end(task);
  589. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  590. return 0;
  591. }