sched_policy.c 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2008-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. * Copyright (C) 2013 Simon Archipoff
  5. * Copyright (C) 2013 Thibaut Lambert
  6. * Copyright (C) 2016 Uppsala University
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. #include <starpu.h>
  20. #include <common/config.h>
  21. #include <common/utils.h>
  22. #include <core/sched_policy.h>
  23. #include <profiling/profiling.h>
  24. #include <datawizard/memory_nodes.h>
  25. #include <common/barrier.h>
  26. #include <core/debug.h>
  27. #include <core/task.h>
  28. static int use_prefetch = 0;
  29. static double idle[STARPU_NMAXWORKERS];
  30. static double idle_start[STARPU_NMAXWORKERS];
  31. long _starpu_task_break_on_push = -1;
  32. long _starpu_task_break_on_sched = -1;
  33. long _starpu_task_break_on_pop = -1;
  34. long _starpu_task_break_on_exec = -1;
  35. static const char *starpu_idle_file;
  36. void _starpu_sched_init(void)
  37. {
  38. _starpu_task_break_on_push = starpu_get_env_number_default("STARPU_TASK_BREAK_ON_PUSH", -1);
  39. _starpu_task_break_on_sched = starpu_get_env_number_default("STARPU_TASK_BREAK_ON_SCHED", -1);
  40. _starpu_task_break_on_pop = starpu_get_env_number_default("STARPU_TASK_BREAK_ON_POP", -1);
  41. _starpu_task_break_on_exec = starpu_get_env_number_default("STARPU_TASK_BREAK_ON_EXEC", -1);
  42. starpu_idle_file = starpu_getenv("STARPU_IDLE_FILE");
  43. }
  44. int starpu_get_prefetch_flag(void)
  45. {
  46. return use_prefetch;
  47. }
  48. static struct starpu_sched_policy *predefined_policies[] =
  49. {
  50. &_starpu_sched_modular_eager_policy,
  51. &_starpu_sched_modular_eager_prefetching_policy,
  52. &_starpu_sched_modular_eager_prio_policy,
  53. &_starpu_sched_modular_gemm_policy,
  54. &_starpu_sched_modular_prio_policy,
  55. &_starpu_sched_modular_prio_prefetching_policy,
  56. &_starpu_sched_modular_random_policy,
  57. &_starpu_sched_modular_random_prio_policy,
  58. &_starpu_sched_modular_random_prefetching_policy,
  59. &_starpu_sched_modular_random_prio_prefetching_policy,
  60. &_starpu_sched_modular_parallel_random_policy,
  61. &_starpu_sched_modular_parallel_random_prio_policy,
  62. &_starpu_sched_modular_ws_policy,
  63. &_starpu_sched_modular_heft_policy,
  64. &_starpu_sched_modular_heft_prio_policy,
  65. &_starpu_sched_modular_heft2_policy,
  66. &_starpu_sched_modular_heteroprio_policy,
  67. &_starpu_sched_modular_heteroprio_heft_policy,
  68. &_starpu_sched_modular_parallel_heft_policy,
  69. &_starpu_sched_eager_policy,
  70. &_starpu_sched_prio_policy,
  71. &_starpu_sched_random_policy,
  72. &_starpu_sched_lws_policy,
  73. &_starpu_sched_ws_policy,
  74. &_starpu_sched_dm_policy,
  75. &_starpu_sched_dmda_policy,
  76. &_starpu_sched_dmda_prio_policy,
  77. &_starpu_sched_dmda_ready_policy,
  78. &_starpu_sched_dmda_sorted_policy,
  79. &_starpu_sched_dmda_sorted_decision_policy,
  80. &_starpu_sched_parallel_heft_policy,
  81. &_starpu_sched_peager_policy,
  82. &_starpu_sched_heteroprio_policy,
  83. &_starpu_sched_graph_test_policy,
  84. #ifdef STARPU_HAVE_HWLOC
  85. //&_starpu_sched_tree_heft_hierarchical_policy,
  86. #endif
  87. NULL
  88. };
  89. struct starpu_sched_policy **starpu_sched_get_predefined_policies()
  90. {
  91. return predefined_policies;
  92. }
  93. struct starpu_sched_policy *_starpu_get_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  94. {
  95. return sched_ctx->sched_policy;
  96. }
  97. /*
  98. * Methods to initialize the scheduling policy
  99. */
  100. static void load_sched_policy(struct starpu_sched_policy *sched_policy, struct _starpu_sched_ctx *sched_ctx)
  101. {
  102. STARPU_ASSERT(sched_policy);
  103. #ifdef STARPU_VERBOSE
  104. if (sched_policy->policy_name)
  105. {
  106. if (sched_policy->policy_description)
  107. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  108. else
  109. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  110. }
  111. #endif
  112. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  113. memcpy(policy, sched_policy, sizeof(*policy));
  114. }
  115. static struct starpu_sched_policy *find_sched_policy_from_name(const char *policy_name)
  116. {
  117. if (!policy_name)
  118. return NULL;
  119. if (strcmp(policy_name, "") == 0)
  120. return NULL;
  121. if (strncmp(policy_name, "heft", 4) == 0)
  122. {
  123. _STARPU_MSG("Warning: heft is now called \"dmda\".\n");
  124. return &_starpu_sched_dmda_policy;
  125. }
  126. struct starpu_sched_policy **policy;
  127. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  128. {
  129. struct starpu_sched_policy *p = *policy;
  130. if (p->policy_name)
  131. {
  132. if (strcmp(policy_name, p->policy_name) == 0)
  133. {
  134. /* we found a policy with the requested name */
  135. return p;
  136. }
  137. }
  138. }
  139. if (strcmp(policy_name, "help") != 0)
  140. _STARPU_MSG("Warning: scheduling policy '%s' was not found, try 'help' to get a list\n", policy_name);
  141. /* nothing was found */
  142. return NULL;
  143. }
  144. static void display_sched_help_message(FILE *stream)
  145. {
  146. const char *sched_env = starpu_getenv("STARPU_SCHED");
  147. if (sched_env && (strcmp(sched_env, "help") == 0))
  148. {
  149. /* display the description of all predefined policies */
  150. struct starpu_sched_policy **policy;
  151. fprintf(stream, "\nThe variable STARPU_SCHED can be set to one of the following strings:\n");
  152. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  153. {
  154. struct starpu_sched_policy *p = *policy;
  155. fprintf(stream, "%-30s\t-> %s\n", p->policy_name, p->policy_description);
  156. }
  157. fprintf(stream, "\n");
  158. }
  159. }
  160. struct starpu_sched_policy *_starpu_select_sched_policy(struct _starpu_machine_config *config, const char *required_policy)
  161. {
  162. struct starpu_sched_policy *selected_policy = NULL;
  163. struct starpu_conf *user_conf = &config->conf;
  164. if(required_policy)
  165. selected_policy = find_sched_policy_from_name(required_policy);
  166. /* If there is a policy that matches the required name, return it */
  167. if (selected_policy)
  168. return selected_policy;
  169. /* First, we check whether the application explicitely gave a scheduling policy or not */
  170. if (user_conf && (user_conf->sched_policy))
  171. return user_conf->sched_policy;
  172. /* Otherwise, we look if the application specified the name of a policy to load */
  173. const char *sched_pol_name;
  174. sched_pol_name = starpu_getenv("STARPU_SCHED");
  175. if (sched_pol_name == NULL && user_conf && user_conf->sched_policy_name)
  176. sched_pol_name = user_conf->sched_policy_name;
  177. if (sched_pol_name)
  178. selected_policy = find_sched_policy_from_name(sched_pol_name);
  179. /* If there is a policy that matches the name, return it */
  180. if (selected_policy)
  181. return selected_policy;
  182. /* If no policy was specified, we use the eager policy by default */
  183. return &_starpu_sched_lws_policy;
  184. }
  185. void _starpu_init_sched_policy(struct _starpu_machine_config *config, struct _starpu_sched_ctx *sched_ctx, struct starpu_sched_policy *selected_policy)
  186. {
  187. /* Perhaps we have to display some help */
  188. display_sched_help_message(stderr);
  189. /* Prefetch is activated by default */
  190. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  191. if (use_prefetch == -1)
  192. use_prefetch = 1;
  193. /* Set calibrate flag */
  194. _starpu_set_calibrate_flag(config->conf.calibrate);
  195. load_sched_policy(selected_policy, sched_ctx);
  196. if (starpu_get_env_number_default("STARPU_WORKER_TREE", 0))
  197. {
  198. #ifdef STARPU_HAVE_HWLOC
  199. sched_ctx->sched_policy->worker_type = STARPU_WORKER_TREE;
  200. #else
  201. _STARPU_DISP("STARPU_WORKER_TREE ignored, please rebuild StarPU with hwloc support to enable it.");
  202. #endif
  203. }
  204. starpu_sched_ctx_create_worker_collection(sched_ctx->id,
  205. sched_ctx->sched_policy->worker_type);
  206. _STARPU_SCHED_BEGIN;
  207. sched_ctx->sched_policy->init_sched(sched_ctx->id);
  208. _STARPU_SCHED_END;
  209. }
  210. void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  211. {
  212. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  213. if (policy->deinit_sched)
  214. {
  215. _STARPU_SCHED_BEGIN;
  216. policy->deinit_sched(sched_ctx->id);
  217. _STARPU_SCHED_END;
  218. }
  219. starpu_sched_ctx_delete_worker_collection(sched_ctx->id);
  220. }
  221. void _starpu_sched_task_submit(struct starpu_task *task)
  222. {
  223. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  224. if (!sched_ctx->sched_policy)
  225. return;
  226. if (!sched_ctx->sched_policy->submit_hook)
  227. return;
  228. _STARPU_SCHED_BEGIN;
  229. sched_ctx->sched_policy->submit_hook(task);
  230. _STARPU_SCHED_END;
  231. }
  232. void _starpu_sched_do_schedule(unsigned sched_ctx_id)
  233. {
  234. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  235. if (!sched_ctx->sched_policy)
  236. return;
  237. if (!sched_ctx->sched_policy->do_schedule)
  238. return;
  239. _STARPU_SCHED_BEGIN;
  240. sched_ctx->sched_policy->do_schedule(sched_ctx_id);
  241. _STARPU_SCHED_END;
  242. }
  243. static void _starpu_push_task_on_specific_worker_notify_sched(struct starpu_task *task, struct _starpu_worker *worker, int workerid, int perf_workerid)
  244. {
  245. /* if we push a task on a specific worker, notify all the sched_ctxs the worker belongs to */
  246. struct _starpu_sched_ctx_list_iterator list_it;
  247. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  248. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  249. {
  250. struct _starpu_sched_ctx_elt *e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  251. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  252. if (sched_ctx->sched_policy != NULL && sched_ctx->sched_policy->push_task_notify)
  253. {
  254. _STARPU_SCHED_BEGIN;
  255. sched_ctx->sched_policy->push_task_notify(task, workerid, perf_workerid, sched_ctx->id);
  256. _STARPU_SCHED_END;
  257. }
  258. }
  259. }
  260. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  261. * case workerid identifies a combined worker, a task will be enqueued into
  262. * each worker of the combination. */
  263. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  264. {
  265. int nbasic_workers = (int)starpu_worker_get_count();
  266. /* Is this a basic worker or a combined worker ? */
  267. int is_basic_worker = (workerid < nbasic_workers);
  268. struct _starpu_worker *worker = NULL;
  269. struct _starpu_combined_worker *combined_worker = NULL;
  270. if (is_basic_worker)
  271. {
  272. worker = _starpu_get_worker_struct(workerid);
  273. }
  274. else
  275. {
  276. combined_worker = _starpu_get_combined_worker_struct(workerid);
  277. }
  278. if (use_prefetch)
  279. starpu_prefetch_task_input_for(task, workerid);
  280. if (is_basic_worker)
  281. _starpu_push_task_on_specific_worker_notify_sched(task, worker, workerid, workerid);
  282. else
  283. {
  284. /* Notify all workers of the combined worker */
  285. int worker_size = combined_worker->worker_size;
  286. int *combined_workerid = combined_worker->combined_workerid;
  287. int j;
  288. for (j = 0; j < worker_size; j++)
  289. {
  290. int subworkerid = combined_workerid[j];
  291. _starpu_push_task_on_specific_worker_notify_sched(task, _starpu_get_worker_struct(subworkerid), subworkerid, workerid);
  292. }
  293. }
  294. #ifdef STARPU_USE_SC_HYPERVISOR
  295. starpu_sched_ctx_call_pushed_task_cb(workerid, task->sched_ctx);
  296. #endif //STARPU_USE_SC_HYPERVISOR
  297. if (is_basic_worker)
  298. {
  299. unsigned node = starpu_worker_get_memory_node(workerid);
  300. if (_starpu_task_uses_multiformat_handles(task))
  301. {
  302. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  303. unsigned i;
  304. for (i = 0; i < nbuffers; i++)
  305. {
  306. struct starpu_task *conversion_task;
  307. starpu_data_handle_t handle;
  308. handle = STARPU_TASK_GET_HANDLE(task, i);
  309. if (!_starpu_handle_needs_conversion_task(handle, node))
  310. continue;
  311. conversion_task = _starpu_create_conversion_task(handle, node);
  312. conversion_task->mf_skip = 1;
  313. conversion_task->execute_on_a_specific_worker = 1;
  314. conversion_task->workerid = workerid;
  315. _starpu_task_submit_conversion_task(conversion_task, workerid);
  316. //_STARPU_DEBUG("Pushing a conversion task\n");
  317. }
  318. for (i = 0; i < nbuffers; i++)
  319. {
  320. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  321. handle->mf_node = node;
  322. }
  323. }
  324. // if(task->sched_ctx != _starpu_get_initial_sched_ctx()->id)
  325. if(task->priority > 0)
  326. return _starpu_push_local_task(worker, task, 1);
  327. else
  328. return _starpu_push_local_task(worker, task, 0);
  329. }
  330. else
  331. {
  332. /* This is a combined worker so we create task aliases */
  333. int worker_size = combined_worker->worker_size;
  334. int *combined_workerid = combined_worker->combined_workerid;
  335. int ret = 0;
  336. struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
  337. job->task_size = worker_size;
  338. job->combined_workerid = workerid;
  339. job->active_task_alias_count = 0;
  340. STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, worker_size);
  341. STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, worker_size);
  342. job->after_work_busy_barrier = worker_size;
  343. /* Note: we have to call that early, or else the task may have
  344. * disappeared already */
  345. starpu_push_task_end(task);
  346. int j;
  347. for (j = 0; j < worker_size; j++)
  348. {
  349. struct starpu_task *alias = starpu_task_dup(task);
  350. alias->destroy = 1;
  351. _STARPU_TRACE_JOB_PUSH(alias, alias->priority);
  352. worker = _starpu_get_worker_struct(combined_workerid[j]);
  353. ret |= _starpu_push_local_task(worker, alias, 0);
  354. }
  355. return ret;
  356. }
  357. }
  358. /* the generic interface that call the proper underlying implementation */
  359. int _starpu_push_task(struct _starpu_job *j)
  360. {
  361. if(j->task->prologue_callback_func)
  362. {
  363. _starpu_set_current_task(j->task);
  364. j->task->prologue_callback_func(j->task->prologue_callback_arg);
  365. _starpu_set_current_task(NULL);
  366. }
  367. return _starpu_repush_task(j);
  368. }
  369. int _starpu_repush_task(struct _starpu_job *j)
  370. {
  371. struct starpu_task *task = j->task;
  372. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  373. int ret;
  374. _STARPU_LOG_IN();
  375. unsigned can_push = _starpu_increment_nready_tasks_of_sched_ctx(task->sched_ctx, task->flops, task);
  376. STARPU_ASSERT(task->status == STARPU_TASK_BLOCKED || task->status == STARPU_TASK_BLOCKED_ON_TAG || task->status == STARPU_TASK_BLOCKED_ON_TASK || task->status == STARPU_TASK_BLOCKED_ON_DATA);
  377. task->status = STARPU_TASK_READY;
  378. const unsigned continuation =
  379. #ifdef STARPU_OPENMP
  380. j->continuation
  381. #else
  382. 0
  383. #endif
  384. ;
  385. if (!_starpu_perf_counter_paused() && !j->internal && !continuation)
  386. {
  387. (void) STARPU_ATOMIC_ADD64(& _starpu_task__g_current_submitted__value, -1);
  388. int64_t value = STARPU_ATOMIC_ADD64(& _starpu_task__g_current_ready__value, 1);
  389. _starpu_perf_counter_update_max_int64(&_starpu_task__g_peak_ready__value, value);
  390. if (task->cl && task->cl->perf_counter_values)
  391. {
  392. struct starpu_perf_counter_sample_cl_values * const pcv = task->cl->perf_counter_values;
  393. (void)STARPU_ATOMIC_ADD64(&pcv->task.current_submitted, -1);
  394. value = STARPU_ATOMIC_ADD64(&pcv->task.current_ready, 1);
  395. _starpu_perf_counter_update_max_int64(&pcv->task.peak_ready, value);
  396. }
  397. }
  398. STARPU_AYU_ADDTOTASKQUEUE(j->job_id, -1);
  399. /* if the context does not have any workers save the tasks in a temp list */
  400. if ((task->cl != NULL && task->where != STARPU_NOWHERE) && (!sched_ctx->is_initial_sched))
  401. {
  402. /*if there are workers in the ctx that are not able to execute tasks
  403. we consider the ctx empty */
  404. unsigned able = _starpu_workers_able_to_execute_task(task, sched_ctx);
  405. if(!able)
  406. {
  407. _starpu_sched_ctx_lock_write(sched_ctx->id);
  408. starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
  409. _starpu_sched_ctx_unlock_write(sched_ctx->id);
  410. #ifdef STARPU_USE_SC_HYPERVISOR
  411. if(sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  412. && sched_ctx->perf_counters->notify_empty_ctx)
  413. {
  414. _STARPU_TRACE_HYPERVISOR_BEGIN();
  415. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  416. _STARPU_TRACE_HYPERVISOR_END();
  417. }
  418. #endif
  419. return 0;
  420. }
  421. }
  422. if(!can_push)
  423. return 0;
  424. /* in case there is no codelet associated to the task (that's a control
  425. * task), we directly execute its callback and enforce the
  426. * corresponding dependencies */
  427. if (task->cl == NULL || task->where == STARPU_NOWHERE)
  428. {
  429. if (!_starpu_perf_counter_paused() && !j->internal)
  430. {
  431. (void)STARPU_ATOMIC_ADD64(& _starpu_task__g_current_ready__value, -1);
  432. if (task->cl && task->cl->perf_counter_values)
  433. {
  434. struct starpu_perf_counter_sample_cl_values * const pcv = task->cl->perf_counter_values;
  435. (void)STARPU_ATOMIC_ADD64(&pcv->task.current_ready, -1);
  436. }
  437. }
  438. task->status = STARPU_TASK_RUNNING;
  439. if (task->prologue_callback_pop_func)
  440. {
  441. _starpu_set_current_task(task);
  442. task->prologue_callback_pop_func(task->prologue_callback_pop_arg);
  443. _starpu_set_current_task(NULL);
  444. }
  445. if (task->cl && task->cl->specific_nodes)
  446. {
  447. /* Nothing to do, but we are asked to fetch data on some memory nodes */
  448. _starpu_fetch_nowhere_task_input(j);
  449. }
  450. else
  451. {
  452. if (task->cl)
  453. __starpu_push_task_output(j);
  454. _starpu_handle_job_termination(j);
  455. _STARPU_LOG_OUT_TAG("handle_job_termination");
  456. }
  457. return 0;
  458. }
  459. ret = _starpu_push_task_to_workers(task);
  460. if (ret == -EAGAIN)
  461. /* pushed to empty context, that's fine */
  462. ret = 0;
  463. return ret;
  464. }
  465. int _starpu_push_task_to_workers(struct starpu_task *task)
  466. {
  467. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  468. _STARPU_TRACE_JOB_PUSH(task, task->priority);
  469. /* if the contexts still does not have workers put the task back to its place in
  470. the empty ctx list */
  471. if(!sched_ctx->is_initial_sched)
  472. {
  473. /*if there are workers in the ctx that are not able to execute tasks
  474. we consider the ctx empty */
  475. unsigned able = _starpu_workers_able_to_execute_task(task, sched_ctx);
  476. if (!able)
  477. {
  478. _starpu_sched_ctx_lock_write(sched_ctx->id);
  479. starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task);
  480. _starpu_sched_ctx_unlock_write(sched_ctx->id);
  481. #ifdef STARPU_USE_SC_HYPERVISOR
  482. if(sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  483. && sched_ctx->perf_counters->notify_empty_ctx)
  484. {
  485. _STARPU_TRACE_HYPERVISOR_BEGIN();
  486. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  487. _STARPU_TRACE_HYPERVISOR_END();
  488. }
  489. #endif
  490. return -EAGAIN;
  491. }
  492. }
  493. _starpu_profiling_set_task_push_start_time(task);
  494. int ret = 0;
  495. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  496. {
  497. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  498. }
  499. else
  500. {
  501. struct _starpu_machine_config *config = _starpu_get_machine_config();
  502. if(!sched_ctx->sched_policy)
  503. {
  504. /* Note: we have to call that early, or else the task may have
  505. * disappeared already */
  506. starpu_push_task_end(task);
  507. if(!sched_ctx->awake_workers)
  508. ret = _starpu_push_task_on_specific_worker(task, sched_ctx->main_master);
  509. else
  510. {
  511. struct starpu_worker_collection *workers = sched_ctx->workers;
  512. struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
  513. job->task_size = workers->nworkers;
  514. job->combined_workerid = -1; // workerid; its a ctx not combined worker
  515. job->active_task_alias_count = 0;
  516. STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, workers->nworkers);
  517. STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, workers->nworkers);
  518. job->after_work_busy_barrier = workers->nworkers;
  519. struct starpu_sched_ctx_iterator it;
  520. if(workers->init_iterator)
  521. workers->init_iterator(workers, &it);
  522. while(workers->has_next(workers, &it))
  523. {
  524. unsigned workerid = workers->get_next(workers, &it);
  525. struct starpu_task *alias;
  526. if (job->task_size > 1)
  527. {
  528. alias = starpu_task_dup(task);
  529. _STARPU_TRACE_JOB_PUSH(alias, alias->priority);
  530. alias->destroy = 1;
  531. }
  532. else
  533. alias = task;
  534. ret |= _starpu_push_task_on_specific_worker(alias, workerid);
  535. }
  536. }
  537. }
  538. else
  539. {
  540. /* When a task can only be executed on a given arch and we have
  541. * only one memory node for that arch, we can systematically
  542. * prefetch before the scheduling decision. */
  543. if (!sched_ctx->sched_policy->prefetches
  544. && starpu_get_prefetch_flag()
  545. && starpu_memory_nodes_get_count() > 1)
  546. {
  547. if (task->where == STARPU_CPU && config->cpus_nodeid >= 0)
  548. starpu_prefetch_task_input_on_node(task, config->cpus_nodeid);
  549. else if (task->where == STARPU_CUDA && config->cuda_nodeid >= 0)
  550. starpu_prefetch_task_input_on_node(task, config->cuda_nodeid);
  551. else if (task->where == STARPU_OPENCL && config->opencl_nodeid >= 0)
  552. starpu_prefetch_task_input_on_node(task, config->opencl_nodeid);
  553. else if (task->where == STARPU_MIC && config->mic_nodeid >= 0)
  554. starpu_prefetch_task_input_on_node(task, config->mic_nodeid);
  555. }
  556. STARPU_ASSERT(sched_ctx->sched_policy->push_task);
  557. /* check out if there are any workers in the context */
  558. unsigned nworkers = starpu_sched_ctx_get_nworkers(sched_ctx->id);
  559. if (nworkers == 0)
  560. ret = -1;
  561. else
  562. {
  563. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  564. if (worker)
  565. {
  566. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  567. _starpu_worker_enter_sched_op(worker);
  568. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  569. }
  570. _STARPU_TASK_BREAK_ON(task, push);
  571. _STARPU_SCHED_BEGIN;
  572. ret = sched_ctx->sched_policy->push_task(task);
  573. _STARPU_SCHED_END;
  574. if (worker)
  575. {
  576. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  577. _starpu_worker_leave_sched_op(worker);
  578. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  579. }
  580. }
  581. }
  582. if(ret == -1)
  583. {
  584. _STARPU_MSG("repush task \n");
  585. _STARPU_TRACE_JOB_POP(task, task->priority);
  586. ret = _starpu_push_task_to_workers(task);
  587. }
  588. }
  589. /* Note: from here, the task might have been destroyed already! */
  590. _STARPU_LOG_OUT();
  591. return ret;
  592. }
  593. /* This is called right after the scheduler has pushed a task to a queue
  594. * but just before releasing mutexes: we need the task to still be alive!
  595. */
  596. int starpu_push_task_end(struct starpu_task *task)
  597. {
  598. _starpu_profiling_set_task_push_end_time(task);
  599. task->scheduled = 1;
  600. return 0;
  601. }
  602. /* This is called right after the scheduler has pushed a task to a queue
  603. * but just before releasing mutexes: we need the task to still be alive!
  604. */
  605. int _starpu_pop_task_end(struct starpu_task *task)
  606. {
  607. if (!task)
  608. return 0;
  609. _STARPU_TRACE_JOB_POP(task, task->priority);
  610. return 0;
  611. }
  612. /*
  613. * Given a handle that needs to be converted in order to be used on the given
  614. * node, returns a task that takes care of the conversion.
  615. */
  616. struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle, unsigned int node)
  617. {
  618. return _starpu_create_conversion_task_for_arch(handle, starpu_node_get_kind(node));
  619. }
  620. struct starpu_task *_starpu_create_conversion_task_for_arch(starpu_data_handle_t handle, enum starpu_node_kind node_kind)
  621. {
  622. struct starpu_task *conversion_task;
  623. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_SIMGRID)
  624. struct starpu_multiformat_interface *format_interface;
  625. #endif
  626. conversion_task = starpu_task_create();
  627. conversion_task->name = "conversion_task";
  628. conversion_task->synchronous = 0;
  629. STARPU_TASK_SET_HANDLE(conversion_task, handle, 0);
  630. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_SIMGRID)
  631. /* The node does not really matter here */
  632. format_interface = (struct starpu_multiformat_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
  633. #endif
  634. _starpu_spin_lock(&handle->header_lock);
  635. handle->refcnt++;
  636. handle->busy_count++;
  637. _starpu_spin_unlock(&handle->header_lock);
  638. switch(node_kind)
  639. {
  640. case STARPU_CPU_RAM:
  641. switch (starpu_node_get_kind(handle->mf_node))
  642. {
  643. case STARPU_CPU_RAM:
  644. STARPU_ABORT();
  645. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  646. case STARPU_CUDA_RAM:
  647. {
  648. struct starpu_multiformat_data_interface_ops *mf_ops;
  649. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  650. conversion_task->cl = mf_ops->cuda_to_cpu_cl;
  651. break;
  652. }
  653. #endif
  654. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  655. case STARPU_OPENCL_RAM:
  656. {
  657. struct starpu_multiformat_data_interface_ops *mf_ops;
  658. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  659. conversion_task->cl = mf_ops->opencl_to_cpu_cl;
  660. break;
  661. }
  662. #endif
  663. #ifdef STARPU_USE_MIC
  664. case STARPU_MIC_RAM:
  665. {
  666. struct starpu_multiformat_data_interface_ops *mf_ops;
  667. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  668. conversion_task->cl = mf_ops->mic_to_cpu_cl;
  669. break;
  670. }
  671. #endif
  672. default:
  673. _STARPU_ERROR("Oops : %u\n", handle->mf_node);
  674. }
  675. break;
  676. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  677. case STARPU_CUDA_RAM:
  678. {
  679. struct starpu_multiformat_data_interface_ops *mf_ops;
  680. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  681. conversion_task->cl = mf_ops->cpu_to_cuda_cl;
  682. break;
  683. }
  684. #endif
  685. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  686. case STARPU_OPENCL_RAM:
  687. {
  688. struct starpu_multiformat_data_interface_ops *mf_ops;
  689. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  690. conversion_task->cl = mf_ops->cpu_to_opencl_cl;
  691. break;
  692. }
  693. #endif
  694. #ifdef STARPU_USE_MIC
  695. case STARPU_MIC_RAM:
  696. {
  697. struct starpu_multiformat_data_interface_ops *mf_ops;
  698. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  699. conversion_task->cl = mf_ops->cpu_to_mic_cl;
  700. break;
  701. }
  702. #endif
  703. default:
  704. STARPU_ABORT();
  705. }
  706. _starpu_codelet_check_deprecated_fields(conversion_task->cl);
  707. STARPU_TASK_SET_MODE(conversion_task, STARPU_RW, 0);
  708. return conversion_task;
  709. }
  710. static
  711. struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker *worker)
  712. {
  713. struct _starpu_sched_ctx_elt *e = NULL;
  714. struct _starpu_sched_ctx_list_iterator list_it;
  715. int found = 0;
  716. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  717. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  718. {
  719. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  720. if (e->task_number > 0)
  721. return _starpu_get_sched_ctx_struct(e->sched_ctx);
  722. }
  723. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  724. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  725. {
  726. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  727. if (e->last_poped)
  728. {
  729. e->last_poped = 0;
  730. if (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  731. {
  732. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  733. found = 1;
  734. }
  735. break;
  736. }
  737. }
  738. if (!found)
  739. e = worker->sched_ctx_list->head;
  740. e->last_poped = 1;
  741. return _starpu_get_sched_ctx_struct(e->sched_ctx);
  742. }
  743. struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
  744. {
  745. struct starpu_task *task;
  746. int worker_id;
  747. unsigned node;
  748. /* We can't tell in advance which task will be picked up, so we measure
  749. * a timestamp, and will attribute it afterwards to the task. */
  750. int profiling = starpu_profiling_status_get();
  751. struct timespec pop_start_time;
  752. if (profiling)
  753. _starpu_clock_gettime(&pop_start_time);
  754. pick:
  755. /* perhaps there is some local task to be executed first */
  756. task = _starpu_pop_local_task(worker);
  757. if (task)
  758. _STARPU_TASK_BREAK_ON(task, pop);
  759. /* get tasks from the stacks of the strategy */
  760. if(!task)
  761. {
  762. struct _starpu_sched_ctx *sched_ctx ;
  763. #ifndef STARPU_NON_BLOCKING_DRIVERS
  764. int been_here[STARPU_NMAX_SCHED_CTXS];
  765. int i;
  766. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  767. been_here[i] = 0;
  768. while(!task)
  769. #endif
  770. {
  771. if(worker->nsched_ctxs == 1)
  772. sched_ctx = _starpu_get_initial_sched_ctx();
  773. else
  774. {
  775. while(1)
  776. {
  777. /** Caution
  778. * If you use multiple contexts your scheduler *needs*
  779. * to update the variable task_number of the ctx list.
  780. * In order to get the best performances.
  781. * This is done using functions :
  782. * starpu_sched_ctx_list_task_counters_increment...(...)
  783. * starpu_sched_ctx_list_task_counters_decrement...(...)
  784. **/
  785. sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
  786. if(worker->removed_from_ctx[sched_ctx->id] == 1 && worker->shares_tasks_lists[sched_ctx->id] == 1)
  787. {
  788. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  789. worker->removed_from_ctx[sched_ctx->id] = 0;
  790. sched_ctx = NULL;
  791. }
  792. else
  793. break;
  794. }
  795. }
  796. if(sched_ctx && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  797. {
  798. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
  799. {
  800. /* Note: we do not push the scheduling state here, because
  801. * otherwise when a worker is idle, we'd keep
  802. * pushing/popping a scheduling state here, while what we
  803. * want to see in the trace is a permanent idle state. */
  804. task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
  805. if (task)
  806. _STARPU_TASK_BREAK_ON(task, pop);
  807. _starpu_pop_task_end(task);
  808. }
  809. }
  810. if(!task)
  811. {
  812. /* it doesn't matter if it shares tasks list or not in the scheduler,
  813. if it does not have any task to pop just get it out of here */
  814. /* however if it shares a task list it will be removed as soon as he
  815. finishes this job (in handle_job_termination) */
  816. if(worker->removed_from_ctx[sched_ctx->id])
  817. {
  818. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  819. worker->removed_from_ctx[sched_ctx->id] = 0;
  820. }
  821. #ifdef STARPU_USE_SC_HYPERVISOR
  822. if(worker->pop_ctx_priority)
  823. {
  824. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  825. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_idle_cycle && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
  826. {
  827. // _STARPU_TRACE_HYPERVISOR_BEGIN();
  828. perf_counters->notify_idle_cycle(sched_ctx->id, worker->workerid, 1.0);
  829. // _STARPU_TRACE_HYPERVISOR_END();
  830. }
  831. }
  832. #endif //STARPU_USE_SC_HYPERVISOR
  833. #ifndef STARPU_NON_BLOCKING_DRIVERS
  834. if(been_here[sched_ctx->id] || worker->nsched_ctxs == 1)
  835. break;
  836. been_here[sched_ctx->id] = 1;
  837. #endif
  838. }
  839. }
  840. }
  841. if (!task)
  842. {
  843. if (starpu_idle_file)
  844. idle_start[worker->workerid] = starpu_timing_now();
  845. return NULL;
  846. }
  847. if(starpu_idle_file && idle_start[worker->workerid] != 0.0)
  848. {
  849. double idle_end = starpu_timing_now();
  850. idle[worker->workerid] += (idle_end - idle_start[worker->workerid]);
  851. idle_start[worker->workerid] = 0.0;
  852. }
  853. #ifdef STARPU_USE_SC_HYPERVISOR
  854. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  855. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  856. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_poped_task && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
  857. {
  858. // _STARPU_TRACE_HYPERVISOR_BEGIN();
  859. perf_counters->notify_poped_task(task->sched_ctx, worker->workerid);
  860. // _STARPU_TRACE_HYPERVISOR_END();
  861. }
  862. #endif //STARPU_USE_SC_HYPERVISOR
  863. /* Make sure we do not bother with all the multiformat-specific code if
  864. * it is not necessary. */
  865. if (!_starpu_task_uses_multiformat_handles(task))
  866. goto profiling;
  867. /* This is either a conversion task, or a regular task for which the
  868. * conversion tasks have already been created and submitted */
  869. if (task->mf_skip)
  870. goto profiling;
  871. /*
  872. * This worker may not be able to execute this task. In this case, we
  873. * should return the task anyway. It will be pushed back almost immediatly.
  874. * This way, we avoid computing and executing the conversions tasks.
  875. * Here, we do not care about what implementation is used.
  876. */
  877. worker_id = starpu_worker_get_id_check();
  878. if (!starpu_worker_can_execute_task_first_impl(worker_id, task, NULL))
  879. return task;
  880. node = starpu_worker_get_memory_node(worker_id);
  881. /*
  882. * We do have a task that uses multiformat handles. Let's create the
  883. * required conversion tasks.
  884. */
  885. unsigned i;
  886. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  887. for (i = 0; i < nbuffers; i++)
  888. {
  889. struct starpu_task *conversion_task;
  890. starpu_data_handle_t handle;
  891. handle = STARPU_TASK_GET_HANDLE(task, i);
  892. if (!_starpu_handle_needs_conversion_task(handle, node))
  893. continue;
  894. conversion_task = _starpu_create_conversion_task(handle, node);
  895. conversion_task->mf_skip = 1;
  896. conversion_task->execute_on_a_specific_worker = 1;
  897. conversion_task->workerid = worker_id;
  898. /*
  899. * Next tasks will need to know where these handles have gone.
  900. */
  901. handle->mf_node = node;
  902. _starpu_task_submit_conversion_task(conversion_task, worker_id);
  903. }
  904. task->mf_skip = 1;
  905. starpu_task_list_push_back(&worker->local_tasks, task);
  906. goto pick;
  907. profiling:
  908. if (profiling)
  909. {
  910. struct starpu_profiling_task_info *profiling_info;
  911. profiling_info = task->profiling_info;
  912. /* The task may have been created before profiling was enabled,
  913. * so we check if the profiling_info structure is available
  914. * even though we already tested if profiling is enabled. */
  915. if (profiling_info)
  916. {
  917. memcpy(&profiling_info->pop_start_time,
  918. &pop_start_time, sizeof(struct timespec));
  919. _starpu_clock_gettime(&profiling_info->pop_end_time);
  920. }
  921. }
  922. if(task->prologue_callback_pop_func)
  923. {
  924. _starpu_set_current_task(task);
  925. task->prologue_callback_pop_func(task->prologue_callback_pop_arg);
  926. _starpu_set_current_task(NULL);
  927. }
  928. return task;
  929. }
  930. struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx)
  931. {
  932. struct starpu_task *task = NULL;
  933. if(sched_ctx->sched_policy)
  934. {
  935. STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
  936. /* TODO set profiling info */
  937. if(sched_ctx->sched_policy->pop_every_task)
  938. {
  939. _STARPU_SCHED_BEGIN;
  940. task = sched_ctx->sched_policy->pop_every_task(sched_ctx->id);
  941. _STARPU_SCHED_END;
  942. }
  943. }
  944. return task;
  945. }
  946. void _starpu_sched_pre_exec_hook(struct starpu_task *task)
  947. {
  948. unsigned sched_ctx_id = starpu_sched_ctx_get_ctx_for_task(task);
  949. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  950. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pre_exec_hook)
  951. {
  952. _STARPU_SCHED_BEGIN;
  953. sched_ctx->sched_policy->pre_exec_hook(task, sched_ctx_id);
  954. _STARPU_SCHED_END;
  955. }
  956. if(!sched_ctx->sched_policy)
  957. {
  958. int workerid = starpu_worker_get_id();
  959. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  960. struct _starpu_sched_ctx_list_iterator list_it;
  961. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  962. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  963. {
  964. struct _starpu_sched_ctx *other_sched_ctx;
  965. struct _starpu_sched_ctx_elt *e;
  966. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  967. other_sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  968. if (other_sched_ctx != sched_ctx &&
  969. other_sched_ctx->sched_policy != NULL &&
  970. other_sched_ctx->sched_policy->pre_exec_hook)
  971. {
  972. _STARPU_SCHED_BEGIN;
  973. other_sched_ctx->sched_policy->pre_exec_hook(task, other_sched_ctx->id);
  974. _STARPU_SCHED_END;
  975. }
  976. }
  977. }
  978. }
  979. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  980. {
  981. STARPU_ASSERT(task->cl != NULL && task->cl->where != STARPU_NOWHERE);
  982. unsigned sched_ctx_id = starpu_sched_ctx_get_ctx_for_task(task);
  983. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  984. if (sched_ctx->sched_policy && sched_ctx->sched_policy->post_exec_hook)
  985. {
  986. _STARPU_SCHED_BEGIN;
  987. sched_ctx->sched_policy->post_exec_hook(task, sched_ctx_id);
  988. _STARPU_SCHED_END;
  989. }
  990. if(!sched_ctx->sched_policy)
  991. {
  992. int workerid = starpu_worker_get_id();
  993. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  994. struct _starpu_sched_ctx_list_iterator list_it;
  995. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  996. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  997. {
  998. struct _starpu_sched_ctx *other_sched_ctx;
  999. struct _starpu_sched_ctx_elt *e;
  1000. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  1001. other_sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  1002. if (other_sched_ctx != sched_ctx &&
  1003. other_sched_ctx->sched_policy != NULL &&
  1004. other_sched_ctx->sched_policy->post_exec_hook)
  1005. {
  1006. _STARPU_SCHED_BEGIN;
  1007. other_sched_ctx->sched_policy->post_exec_hook(task, other_sched_ctx->id);
  1008. _STARPU_SCHED_END;
  1009. }
  1010. }
  1011. }
  1012. }
  1013. void _starpu_wait_on_sched_event(void)
  1014. {
  1015. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  1016. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  1017. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  1018. if (_starpu_machine_is_running())
  1019. {
  1020. #ifndef STARPU_NON_BLOCKING_DRIVERS
  1021. STARPU_PTHREAD_COND_WAIT(&worker->sched_cond,
  1022. &worker->sched_mutex);
  1023. #endif
  1024. }
  1025. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  1026. }
  1027. /* The scheduling policy may put tasks directly into a worker's local queue so
  1028. * that it is not always necessary to create its own queue when the local queue
  1029. * is sufficient. If "back" not null, the task is put at the back of the queue
  1030. * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  1031. * a FIFO ordering. */
  1032. int starpu_push_local_task(int workerid, struct starpu_task *task, int prio)
  1033. {
  1034. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1035. return _starpu_push_local_task(worker, task, prio);
  1036. }
  1037. void _starpu_print_idle_time()
  1038. {
  1039. if(!starpu_idle_file)
  1040. return;
  1041. double all_idle = 0.0;
  1042. int i = 0;
  1043. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  1044. all_idle += idle[i];
  1045. FILE *f;
  1046. f = fopen(starpu_idle_file, "a");
  1047. if (!f)
  1048. {
  1049. _STARPU_MSG("couldn't open %s: %s\n", starpu_idle_file, strerror(errno));
  1050. }
  1051. else
  1052. {
  1053. fprintf(f, "%lf \n", all_idle);
  1054. fclose(f);
  1055. }
  1056. }
  1057. void starpu_sched_task_break(struct starpu_task *task)
  1058. {
  1059. _STARPU_TASK_BREAK_ON(task, sched);
  1060. }