sched_policy.c 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2017 Université de Bordeaux
  4. * Copyright (C) 2010-2017 CNRS
  5. * Copyright (C) 2011, 2016 INRIA
  6. * Copyright (C) 2016 Uppsala University
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. #include <starpu.h>
  20. #include <common/config.h>
  21. #include <common/utils.h>
  22. #include <core/sched_policy.h>
  23. #include <profiling/profiling.h>
  24. #include <common/barrier.h>
  25. #include <core/debug.h>
  26. static int use_prefetch = 0;
  27. static double idle[STARPU_NMAXWORKERS];
  28. static double idle_start[STARPU_NMAXWORKERS];
  29. long _starpu_task_break_on_push = -1;
  30. long _starpu_task_break_on_pop = -1;
  31. long _starpu_task_break_on_sched = -1;
  32. static const char *starpu_idle_file;
  33. void _starpu_sched_init(void)
  34. {
  35. _starpu_task_break_on_push = starpu_get_env_number_default("STARPU_TASK_BREAK_ON_PUSH", -1);
  36. _starpu_task_break_on_pop = starpu_get_env_number_default("STARPU_TASK_BREAK_ON_POP", -1);
  37. _starpu_task_break_on_sched = starpu_get_env_number_default("STARPU_TASK_BREAK_ON_SCHED", -1);
  38. starpu_idle_file = starpu_getenv("STARPU_IDLE_FILE");
  39. }
  40. int starpu_get_prefetch_flag(void)
  41. {
  42. return use_prefetch;
  43. }
  44. static struct starpu_sched_policy *predefined_policies[] =
  45. {
  46. &_starpu_sched_modular_eager_policy,
  47. &_starpu_sched_modular_eager_prefetching_policy,
  48. &_starpu_sched_modular_prio_policy,
  49. &_starpu_sched_modular_prio_prefetching_policy,
  50. &_starpu_sched_modular_random_policy,
  51. &_starpu_sched_modular_random_prio_policy,
  52. &_starpu_sched_modular_random_prefetching_policy,
  53. &_starpu_sched_modular_random_prio_prefetching_policy,
  54. &_starpu_sched_modular_ws_policy,
  55. &_starpu_sched_modular_heft_policy,
  56. &_starpu_sched_modular_heft_prio_policy,
  57. &_starpu_sched_modular_heft2_policy,
  58. &_starpu_sched_eager_policy,
  59. &_starpu_sched_prio_policy,
  60. &_starpu_sched_random_policy,
  61. &_starpu_sched_lws_policy,
  62. &_starpu_sched_ws_policy,
  63. &_starpu_sched_dm_policy,
  64. &_starpu_sched_dmda_policy,
  65. &_starpu_sched_dmda_ready_policy,
  66. &_starpu_sched_dmda_sorted_policy,
  67. &_starpu_sched_dmda_sorted_decision_policy,
  68. &_starpu_sched_parallel_heft_policy,
  69. &_starpu_sched_peager_policy,
  70. &_starpu_sched_heteroprio_policy,
  71. &_starpu_sched_graph_test_policy,
  72. NULL
  73. };
  74. struct starpu_sched_policy **starpu_sched_get_predefined_policies()
  75. {
  76. return predefined_policies;
  77. }
  78. struct starpu_sched_policy *_starpu_get_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  79. {
  80. return sched_ctx->sched_policy;
  81. }
  82. /*
  83. * Methods to initialize the scheduling policy
  84. */
  85. static void load_sched_policy(struct starpu_sched_policy *sched_policy, struct _starpu_sched_ctx *sched_ctx)
  86. {
  87. STARPU_ASSERT(sched_policy);
  88. #ifdef STARPU_VERBOSE
  89. if (sched_policy->policy_name)
  90. {
  91. if (sched_policy->policy_description)
  92. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  93. else
  94. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  95. }
  96. #endif
  97. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  98. memcpy(policy, sched_policy, sizeof(*policy));
  99. }
  100. static struct starpu_sched_policy *find_sched_policy_from_name(const char *policy_name)
  101. {
  102. if (!policy_name)
  103. return NULL;
  104. if (strcmp(policy_name, "") == 0)
  105. return NULL;
  106. if (strncmp(policy_name, "heft", 4) == 0)
  107. {
  108. _STARPU_MSG("Warning: heft is now called \"dmda\".\n");
  109. return &_starpu_sched_dmda_policy;
  110. }
  111. struct starpu_sched_policy **policy;
  112. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  113. {
  114. struct starpu_sched_policy *p = *policy;
  115. if (p->policy_name)
  116. {
  117. if (strcmp(policy_name, p->policy_name) == 0)
  118. {
  119. /* we found a policy with the requested name */
  120. return p;
  121. }
  122. }
  123. }
  124. if (strcmp(policy_name, "help") != 0)
  125. _STARPU_MSG("Warning: scheduling policy '%s' was not found, try 'help' to get a list\n", policy_name);
  126. /* nothing was found */
  127. return NULL;
  128. }
  129. static void display_sched_help_message(FILE *stream)
  130. {
  131. const char *sched_env = starpu_getenv("STARPU_SCHED");
  132. if (sched_env && (strcmp(sched_env, "help") == 0))
  133. {
  134. /* display the description of all predefined policies */
  135. struct starpu_sched_policy **policy;
  136. fprintf(stream, "\nThe variable STARPU_SCHED can be set to one of the following strings:\n");
  137. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  138. {
  139. struct starpu_sched_policy *p = *policy;
  140. fprintf(stream, "%-30s\t-> %s\n", p->policy_name, p->policy_description);
  141. }
  142. fprintf(stream, "\n");
  143. }
  144. }
  145. struct starpu_sched_policy *_starpu_select_sched_policy(struct _starpu_machine_config *config, const char *required_policy)
  146. {
  147. struct starpu_sched_policy *selected_policy = NULL;
  148. struct starpu_conf *user_conf = &config->conf;
  149. if(required_policy)
  150. selected_policy = find_sched_policy_from_name(required_policy);
  151. /* If there is a policy that matches the required name, return it */
  152. if (selected_policy)
  153. return selected_policy;
  154. /* First, we check whether the application explicitely gave a scheduling policy or not */
  155. if (user_conf && (user_conf->sched_policy))
  156. return user_conf->sched_policy;
  157. /* Otherwise, we look if the application specified the name of a policy to load */
  158. const char *sched_pol_name;
  159. sched_pol_name = starpu_getenv("STARPU_SCHED");
  160. if (sched_pol_name == NULL && user_conf && user_conf->sched_policy_name)
  161. sched_pol_name = user_conf->sched_policy_name;
  162. if (sched_pol_name)
  163. selected_policy = find_sched_policy_from_name(sched_pol_name);
  164. /* If there is a policy that matches the name, return it */
  165. if (selected_policy)
  166. return selected_policy;
  167. /* If no policy was specified, we use the eager policy by default */
  168. return &_starpu_sched_eager_policy;
  169. }
  170. void _starpu_init_sched_policy(struct _starpu_machine_config *config, struct _starpu_sched_ctx *sched_ctx, struct starpu_sched_policy *selected_policy)
  171. {
  172. /* Perhaps we have to display some help */
  173. display_sched_help_message(stderr);
  174. /* Prefetch is activated by default */
  175. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  176. if (use_prefetch == -1)
  177. use_prefetch = 1;
  178. /* Set calibrate flag */
  179. _starpu_set_calibrate_flag(config->conf.calibrate);
  180. load_sched_policy(selected_policy, sched_ctx);
  181. if (starpu_get_env_number_default("STARPU_WORKER_TREE", 0))
  182. {
  183. #ifdef STARPU_HAVE_HWLOC
  184. sched_ctx->sched_policy->worker_type = STARPU_WORKER_TREE;
  185. #else
  186. _STARPU_DISP("STARPU_WORKER_TREE ignored, please rebuild StarPU with hwloc support to enable it.");
  187. #endif
  188. }
  189. starpu_sched_ctx_create_worker_collection(sched_ctx->id,
  190. sched_ctx->sched_policy->worker_type);
  191. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  192. sched_ctx->sched_policy->init_sched(sched_ctx->id);
  193. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  194. }
  195. void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  196. {
  197. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  198. if (policy->deinit_sched)
  199. {
  200. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  201. policy->deinit_sched(sched_ctx->id);
  202. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  203. }
  204. starpu_sched_ctx_delete_worker_collection(sched_ctx->id);
  205. }
  206. void _starpu_sched_task_submit(struct starpu_task *task)
  207. {
  208. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  209. if (!sched_ctx->sched_policy)
  210. return;
  211. if (!sched_ctx->sched_policy->submit_hook)
  212. return;
  213. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  214. sched_ctx->sched_policy->submit_hook(task);
  215. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  216. }
  217. void _starpu_sched_do_schedule(unsigned sched_ctx_id)
  218. {
  219. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  220. if (!sched_ctx->sched_policy)
  221. return;
  222. if (!sched_ctx->sched_policy->do_schedule)
  223. return;
  224. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  225. sched_ctx->sched_policy->do_schedule(sched_ctx_id);
  226. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  227. }
  228. static void _starpu_push_task_on_specific_worker_notify_sched(struct starpu_task *task, struct _starpu_worker *worker, int workerid, int perf_workerid)
  229. {
  230. /* if we push a task on a specific worker, notify all the sched_ctxs the worker belongs to */
  231. struct _starpu_sched_ctx_list_iterator list_it;
  232. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  233. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  234. {
  235. struct _starpu_sched_ctx_elt *e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  236. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  237. if (sched_ctx->sched_policy != NULL && sched_ctx->sched_policy->push_task_notify)
  238. {
  239. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  240. sched_ctx->sched_policy->push_task_notify(task, workerid, perf_workerid, sched_ctx->id);
  241. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  242. }
  243. }
  244. }
  245. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  246. * case workerid identifies a combined worker, a task will be enqueued into
  247. * each worker of the combination. */
  248. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  249. {
  250. int nbasic_workers = (int)starpu_worker_get_count();
  251. /* Is this a basic worker or a combined worker ? */
  252. int is_basic_worker = (workerid < nbasic_workers);
  253. unsigned memory_node;
  254. struct _starpu_worker *worker = NULL;
  255. struct _starpu_combined_worker *combined_worker = NULL;
  256. if (is_basic_worker)
  257. {
  258. worker = _starpu_get_worker_struct(workerid);
  259. memory_node = worker->memory_node;
  260. }
  261. else
  262. {
  263. combined_worker = _starpu_get_combined_worker_struct(workerid);
  264. memory_node = combined_worker->memory_node;
  265. }
  266. if (use_prefetch)
  267. starpu_prefetch_task_input_on_node(task, memory_node);
  268. if (is_basic_worker)
  269. _starpu_push_task_on_specific_worker_notify_sched(task, worker, workerid, workerid);
  270. else
  271. {
  272. /* Notify all workers of the combined worker */
  273. int worker_size = combined_worker->worker_size;
  274. int *combined_workerid = combined_worker->combined_workerid;
  275. int j;
  276. for (j = 0; j < worker_size; j++)
  277. {
  278. int subworkerid = combined_workerid[j];
  279. _starpu_push_task_on_specific_worker_notify_sched(task, _starpu_get_worker_struct(subworkerid), subworkerid, workerid);
  280. }
  281. }
  282. #ifdef STARPU_USE_SC_HYPERVISOR
  283. starpu_sched_ctx_call_pushed_task_cb(workerid, task->sched_ctx);
  284. #endif //STARPU_USE_SC_HYPERVISOR
  285. if (is_basic_worker)
  286. {
  287. unsigned node = starpu_worker_get_memory_node(workerid);
  288. if (_starpu_task_uses_multiformat_handles(task))
  289. {
  290. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  291. unsigned i;
  292. for (i = 0; i < nbuffers; i++)
  293. {
  294. struct starpu_task *conversion_task;
  295. starpu_data_handle_t handle;
  296. handle = STARPU_TASK_GET_HANDLE(task, i);
  297. if (!_starpu_handle_needs_conversion_task(handle, node))
  298. continue;
  299. conversion_task = _starpu_create_conversion_task(handle, node);
  300. conversion_task->mf_skip = 1;
  301. conversion_task->execute_on_a_specific_worker = 1;
  302. conversion_task->workerid = workerid;
  303. _starpu_task_submit_conversion_task(conversion_task, workerid);
  304. //_STARPU_DEBUG("Pushing a conversion task\n");
  305. }
  306. for (i = 0; i < nbuffers; i++)
  307. {
  308. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  309. handle->mf_node = node;
  310. }
  311. }
  312. // if(task->sched_ctx != _starpu_get_initial_sched_ctx()->id)
  313. if(task->priority > 0)
  314. return _starpu_push_local_task(worker, task, 1);
  315. else
  316. return _starpu_push_local_task(worker, task, 0);
  317. }
  318. else
  319. {
  320. /* This is a combined worker so we create task aliases */
  321. int worker_size = combined_worker->worker_size;
  322. int *combined_workerid = combined_worker->combined_workerid;
  323. int ret = 0;
  324. struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
  325. job->task_size = worker_size;
  326. job->combined_workerid = workerid;
  327. job->active_task_alias_count = 0;
  328. STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, worker_size);
  329. STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, worker_size);
  330. job->after_work_busy_barrier = worker_size;
  331. /* Note: we have to call that early, or else the task may have
  332. * disappeared already */
  333. starpu_push_task_end(task);
  334. int j;
  335. for (j = 0; j < worker_size; j++)
  336. {
  337. struct starpu_task *alias = starpu_task_dup(task);
  338. alias->destroy = 1;
  339. worker = _starpu_get_worker_struct(combined_workerid[j]);
  340. ret |= _starpu_push_local_task(worker, alias, 0);
  341. }
  342. return ret;
  343. }
  344. }
  345. /* the generic interface that call the proper underlying implementation */
  346. int _starpu_push_task(struct _starpu_job *j)
  347. {
  348. if(j->task->prologue_callback_func)
  349. j->task->prologue_callback_func(j->task->prologue_callback_arg);
  350. return _starpu_repush_task(j);
  351. }
  352. int _starpu_repush_task(struct _starpu_job *j)
  353. {
  354. struct starpu_task *task = j->task;
  355. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  356. int ret;
  357. _STARPU_LOG_IN();
  358. unsigned can_push = _starpu_increment_nready_tasks_of_sched_ctx(task->sched_ctx, task->flops, task);
  359. task->status = STARPU_TASK_READY;
  360. STARPU_AYU_ADDTOTASKQUEUE(j->job_id, -1);
  361. /* if the context does not have any workers save the tasks in a temp list */
  362. if(!sched_ctx->is_initial_sched)
  363. {
  364. /*if there are workers in the ctx that are not able to execute tasks
  365. we consider the ctx empty */
  366. unsigned nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  367. if(nworkers == 0)
  368. {
  369. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  370. starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
  371. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  372. #ifdef STARPU_USE_SC_HYPERVISOR
  373. if(sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  374. && sched_ctx->perf_counters->notify_empty_ctx)
  375. {
  376. _STARPU_TRACE_HYPERVISOR_BEGIN();
  377. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  378. _STARPU_TRACE_HYPERVISOR_END();
  379. }
  380. #endif
  381. return 0;
  382. }
  383. }
  384. if(!can_push)
  385. return 0;
  386. /* in case there is no codelet associated to the task (that's a control
  387. * task), we directly execute its callback and enforce the
  388. * corresponding dependencies */
  389. if (task->cl == NULL || task->cl->where == STARPU_NOWHERE)
  390. {
  391. if (task->prologue_callback_pop_func)
  392. task->prologue_callback_pop_func(task->prologue_callback_pop_arg);
  393. if (task->cl && task->cl->specific_nodes)
  394. {
  395. /* Nothing to do, but we are asked to fetch data on some memory nodes */
  396. _starpu_fetch_nowhere_task_input(j);
  397. }
  398. else
  399. {
  400. if (task->cl)
  401. __starpu_push_task_output(j);
  402. _starpu_handle_job_termination(j);
  403. _STARPU_LOG_OUT_TAG("handle_job_termination");
  404. }
  405. return 0;
  406. }
  407. ret = _starpu_push_task_to_workers(task);
  408. if (ret == -EAGAIN)
  409. /* pushed to empty context, that's fine */
  410. ret = 0;
  411. return ret;
  412. }
  413. int _starpu_push_task_to_workers(struct starpu_task *task)
  414. {
  415. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  416. unsigned nworkers = 0;
  417. _STARPU_TRACE_JOB_PUSH(task, task->priority > 0);
  418. /* if the contexts still does not have workers put the task back to its place in
  419. the empty ctx list */
  420. if(!sched_ctx->is_initial_sched)
  421. {
  422. /*if there are workers in the ctx that are not able to execute tasks
  423. we consider the ctx empty */
  424. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  425. if (nworkers == 0)
  426. {
  427. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  428. starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task);
  429. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  430. #ifdef STARPU_USE_SC_HYPERVISOR
  431. if(sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  432. && sched_ctx->perf_counters->notify_empty_ctx)
  433. {
  434. _STARPU_TRACE_HYPERVISOR_BEGIN();
  435. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  436. _STARPU_TRACE_HYPERVISOR_END();
  437. }
  438. #endif
  439. return -EAGAIN;
  440. }
  441. }
  442. _starpu_profiling_set_task_push_start_time(task);
  443. int ret = 0;
  444. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  445. {
  446. unsigned node = starpu_worker_get_memory_node(task->workerid);
  447. if (starpu_get_prefetch_flag())
  448. starpu_prefetch_task_input_on_node(task, node);
  449. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  450. }
  451. else
  452. {
  453. struct _starpu_machine_config *config = _starpu_get_machine_config();
  454. /* When a task can only be executed on a given arch and we have
  455. * only one memory node for that arch, we can systematically
  456. * prefetch before the scheduling decision. */
  457. if (starpu_get_prefetch_flag())
  458. {
  459. if (task->cl->where == STARPU_CPU && config->cpus_nodeid >= 0)
  460. starpu_prefetch_task_input_on_node(task, config->cpus_nodeid);
  461. else if (task->cl->where == STARPU_CUDA && config->cuda_nodeid >= 0)
  462. starpu_prefetch_task_input_on_node(task, config->cuda_nodeid);
  463. else if (task->cl->where == STARPU_OPENCL && config->opencl_nodeid >= 0)
  464. starpu_prefetch_task_input_on_node(task, config->opencl_nodeid);
  465. else if (task->cl->where == STARPU_MIC && config->mic_nodeid >= 0)
  466. starpu_prefetch_task_input_on_node(task, config->mic_nodeid);
  467. else if (task->cl->where == STARPU_SCC && config->scc_nodeid >= 0)
  468. starpu_prefetch_task_input_on_node(task, config->scc_nodeid);
  469. }
  470. if(!sched_ctx->sched_policy)
  471. {
  472. /* Note: we have to call that early, or else the task may have
  473. * disappeared already */
  474. starpu_push_task_end(task);
  475. if(!sched_ctx->awake_workers)
  476. ret = _starpu_push_task_on_specific_worker(task, sched_ctx->main_master);
  477. else
  478. {
  479. struct starpu_worker_collection *workers = sched_ctx->workers;
  480. struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
  481. job->task_size = workers->nworkers;
  482. job->combined_workerid = -1; // workerid; its a ctx not combined worker
  483. job->active_task_alias_count = 0;
  484. STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, workers->nworkers);
  485. STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, workers->nworkers);
  486. job->after_work_busy_barrier = workers->nworkers;
  487. struct starpu_sched_ctx_iterator it;
  488. if(workers->init_iterator)
  489. workers->init_iterator(workers, &it);
  490. while(workers->has_next(workers, &it))
  491. {
  492. unsigned workerid = workers->get_next(workers, &it);
  493. struct starpu_task *alias;
  494. if (job->task_size > 1)
  495. {
  496. alias = starpu_task_dup(task);
  497. alias->destroy = 1;
  498. }
  499. else
  500. alias = task;
  501. ret |= _starpu_push_task_on_specific_worker(alias, workerid);
  502. }
  503. }
  504. }
  505. else
  506. {
  507. STARPU_ASSERT(sched_ctx->sched_policy->push_task);
  508. /* check out if there are any workers in the context */
  509. starpu_pthread_rwlock_t *changing_ctx_mutex = _starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx->id);
  510. STARPU_PTHREAD_RWLOCK_RDLOCK(changing_ctx_mutex);
  511. nworkers = starpu_sched_ctx_get_nworkers(sched_ctx->id);
  512. if (nworkers == 0)
  513. ret = -1;
  514. else
  515. {
  516. _STARPU_TASK_BREAK_ON(task, push);
  517. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  518. ret = sched_ctx->sched_policy->push_task(task);
  519. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  520. }
  521. STARPU_PTHREAD_RWLOCK_UNLOCK(changing_ctx_mutex);
  522. }
  523. if(ret == -1)
  524. {
  525. _STARPU_MSG("repush task \n");
  526. _STARPU_TRACE_JOB_POP(task, task->priority > 0);
  527. ret = _starpu_push_task_to_workers(task);
  528. }
  529. }
  530. /* Note: from here, the task might have been destroyed already! */
  531. _STARPU_LOG_OUT();
  532. return ret;
  533. }
  534. /* This is called right after the scheduler has pushed a task to a queue
  535. * but just before releasing mutexes: we need the task to still be alive!
  536. */
  537. int starpu_push_task_end(struct starpu_task *task)
  538. {
  539. _starpu_profiling_set_task_push_end_time(task);
  540. task->scheduled = 1;
  541. return 0;
  542. }
  543. /* This is called right after the scheduler has pushed a task to a queue
  544. * but just before releasing mutexes: we need the task to still be alive!
  545. */
  546. int _starpu_pop_task_end(struct starpu_task *task)
  547. {
  548. if (!task)
  549. return 0;
  550. _STARPU_TRACE_JOB_POP(task, task->priority > 0);
  551. return 0;
  552. }
  553. /*
  554. * Given a handle that needs to be converted in order to be used on the given
  555. * node, returns a task that takes care of the conversion.
  556. */
  557. struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle,
  558. unsigned int node)
  559. {
  560. return _starpu_create_conversion_task_for_arch(handle, starpu_node_get_kind(node));
  561. }
  562. struct starpu_task *_starpu_create_conversion_task_for_arch(starpu_data_handle_t handle,
  563. enum starpu_node_kind node_kind)
  564. {
  565. struct starpu_task *conversion_task;
  566. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID)
  567. struct starpu_multiformat_interface *format_interface;
  568. #endif
  569. conversion_task = starpu_task_create();
  570. conversion_task->name = "conversion_task";
  571. conversion_task->synchronous = 0;
  572. STARPU_TASK_SET_HANDLE(conversion_task, handle, 0);
  573. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID)
  574. /* The node does not really matter here */
  575. format_interface = (struct starpu_multiformat_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
  576. #endif
  577. _starpu_spin_lock(&handle->header_lock);
  578. handle->refcnt++;
  579. handle->busy_count++;
  580. _starpu_spin_unlock(&handle->header_lock);
  581. switch(node_kind)
  582. {
  583. case STARPU_CPU_RAM:
  584. case STARPU_SCC_RAM:
  585. case STARPU_SCC_SHM:
  586. switch (starpu_node_get_kind(handle->mf_node))
  587. {
  588. case STARPU_CPU_RAM:
  589. case STARPU_SCC_RAM:
  590. case STARPU_SCC_SHM:
  591. STARPU_ABORT();
  592. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  593. case STARPU_CUDA_RAM:
  594. {
  595. struct starpu_multiformat_data_interface_ops *mf_ops;
  596. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  597. conversion_task->cl = mf_ops->cuda_to_cpu_cl;
  598. break;
  599. }
  600. #endif
  601. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  602. case STARPU_OPENCL_RAM:
  603. {
  604. struct starpu_multiformat_data_interface_ops *mf_ops;
  605. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  606. conversion_task->cl = mf_ops->opencl_to_cpu_cl;
  607. break;
  608. }
  609. #endif
  610. #ifdef STARPU_USE_MIC
  611. case STARPU_MIC_RAM:
  612. {
  613. struct starpu_multiformat_data_interface_ops *mf_ops;
  614. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  615. conversion_task->cl = mf_ops->mic_to_cpu_cl;
  616. break;
  617. }
  618. #endif
  619. default:
  620. _STARPU_ERROR("Oops : %u\n", handle->mf_node);
  621. }
  622. break;
  623. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  624. case STARPU_CUDA_RAM:
  625. {
  626. struct starpu_multiformat_data_interface_ops *mf_ops;
  627. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  628. conversion_task->cl = mf_ops->cpu_to_cuda_cl;
  629. break;
  630. }
  631. #endif
  632. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  633. case STARPU_OPENCL_RAM:
  634. {
  635. struct starpu_multiformat_data_interface_ops *mf_ops;
  636. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  637. conversion_task->cl = mf_ops->cpu_to_opencl_cl;
  638. break;
  639. }
  640. #endif
  641. #ifdef STARPU_USE_MIC
  642. case STARPU_MIC_RAM:
  643. {
  644. struct starpu_multiformat_data_interface_ops *mf_ops;
  645. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  646. conversion_task->cl = mf_ops->cpu_to_mic_cl;
  647. break;
  648. }
  649. #endif
  650. default:
  651. STARPU_ABORT();
  652. }
  653. STARPU_TASK_SET_MODE(conversion_task, STARPU_RW, 0);
  654. return conversion_task;
  655. }
  656. static
  657. struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker *worker)
  658. {
  659. struct _starpu_sched_ctx_elt *e = NULL;
  660. struct _starpu_sched_ctx_list_iterator list_it;
  661. int found = 0;
  662. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  663. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  664. {
  665. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  666. if (e->task_number > 0)
  667. return _starpu_get_sched_ctx_struct(e->sched_ctx);
  668. }
  669. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  670. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  671. {
  672. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  673. if (e->last_poped)
  674. {
  675. e->last_poped = 0;
  676. if (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  677. {
  678. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  679. found = 1;
  680. }
  681. break;
  682. }
  683. }
  684. if (!found)
  685. e = worker->sched_ctx_list->head;
  686. e->last_poped = 1;
  687. return _starpu_get_sched_ctx_struct(e->sched_ctx);
  688. }
  689. struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
  690. {
  691. struct starpu_task *task;
  692. int worker_id;
  693. unsigned node;
  694. /* We can't tell in advance which task will be picked up, so we measure
  695. * a timestamp, and will attribute it afterwards to the task. */
  696. int profiling = starpu_profiling_status_get();
  697. struct timespec pop_start_time;
  698. if (profiling)
  699. _starpu_clock_gettime(&pop_start_time);
  700. pick:
  701. /* perhaps there is some local task to be executed first */
  702. task = _starpu_pop_local_task(worker);
  703. if (task)
  704. _STARPU_TASK_BREAK_ON(task, pop);
  705. /* get tasks from the stacks of the strategy */
  706. if(!task)
  707. {
  708. struct _starpu_sched_ctx *sched_ctx ;
  709. #ifndef STARPU_NON_BLOCKING_DRIVERS
  710. int been_here[STARPU_NMAX_SCHED_CTXS];
  711. int i;
  712. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  713. been_here[i] = 0;
  714. while(!task)
  715. #endif
  716. {
  717. if(worker->nsched_ctxs == 1)
  718. sched_ctx = _starpu_get_initial_sched_ctx();
  719. else
  720. {
  721. while(1)
  722. {
  723. /** Caution
  724. * If you use multiple contexts your scheduler *needs*
  725. * to update the variable task_number of the ctx list.
  726. * In order to get the best performances.
  727. * This is done using functions :
  728. * starpu_sched_ctx_list_task_counters_increment...(...)
  729. * starpu_sched_ctx_list_task_counters_decrement...(...)
  730. **/
  731. sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
  732. if(worker->removed_from_ctx[sched_ctx->id] == 1 && worker->shares_tasks_lists[sched_ctx->id] == 1)
  733. {
  734. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  735. worker->removed_from_ctx[sched_ctx->id] = 0;
  736. sched_ctx = NULL;
  737. }
  738. else
  739. break;
  740. }
  741. }
  742. if(sched_ctx && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  743. {
  744. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
  745. {
  746. /* Note: we do not push the scheduling state here, because
  747. * otherwise when a worker is idle, we'd keep
  748. * pushing/popping a scheduling state here, while what we
  749. * want to see in the trace is a permanent idle state. */
  750. task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
  751. if (task)
  752. _STARPU_TASK_BREAK_ON(task, pop);
  753. _starpu_pop_task_end(task);
  754. }
  755. }
  756. if(!task)
  757. {
  758. /* it doesn't matter if it shares tasks list or not in the scheduler,
  759. if it does not have any task to pop just get it out of here */
  760. /* however if it shares a task list it will be removed as soon as he
  761. finishes this job (in handle_job_termination) */
  762. if(worker->removed_from_ctx[sched_ctx->id])
  763. {
  764. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  765. worker->removed_from_ctx[sched_ctx->id] = 0;
  766. }
  767. #ifdef STARPU_USE_SC_HYPERVISOR
  768. if(worker->pop_ctx_priority)
  769. {
  770. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  771. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_idle_cycle && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
  772. {
  773. // _STARPU_TRACE_HYPERVISOR_BEGIN();
  774. perf_counters->notify_idle_cycle(sched_ctx->id, worker->workerid, 1.0);
  775. // _STARPU_TRACE_HYPERVISOR_END();
  776. }
  777. }
  778. #endif //STARPU_USE_SC_HYPERVISOR
  779. #ifndef STARPU_NON_BLOCKING_DRIVERS
  780. if(been_here[sched_ctx->id] || worker->nsched_ctxs == 1)
  781. break;
  782. been_here[sched_ctx->id] = 1;
  783. #endif
  784. }
  785. }
  786. }
  787. if (!task)
  788. {
  789. if (starpu_idle_file)
  790. idle_start[worker->workerid] = starpu_timing_now();
  791. return NULL;
  792. }
  793. if(starpu_idle_file && idle_start[worker->workerid] != 0.0)
  794. {
  795. double idle_end = starpu_timing_now();
  796. idle[worker->workerid] += (idle_end - idle_start[worker->workerid]);
  797. idle_start[worker->workerid] = 0.0;
  798. }
  799. #ifdef STARPU_USE_SC_HYPERVISOR
  800. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  801. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  802. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_poped_task && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
  803. {
  804. // _STARPU_TRACE_HYPERVISOR_BEGIN();
  805. perf_counters->notify_poped_task(task->sched_ctx, worker->workerid);
  806. // _STARPU_TRACE_HYPERVISOR_END();
  807. }
  808. #endif //STARPU_USE_SC_HYPERVISOR
  809. /* Make sure we do not bother with all the multiformat-specific code if
  810. * it is not necessary. */
  811. if (!_starpu_task_uses_multiformat_handles(task))
  812. goto profiling;
  813. /* This is either a conversion task, or a regular task for which the
  814. * conversion tasks have already been created and submitted */
  815. if (task->mf_skip)
  816. goto profiling;
  817. /*
  818. * This worker may not be able to execute this task. In this case, we
  819. * should return the task anyway. It will be pushed back almost immediatly.
  820. * This way, we avoid computing and executing the conversions tasks.
  821. * Here, we do not care about what implementation is used.
  822. */
  823. worker_id = starpu_worker_get_id_check();
  824. if (!starpu_worker_can_execute_task_first_impl(worker_id, task, NULL))
  825. return task;
  826. node = starpu_worker_get_memory_node(worker_id);
  827. /*
  828. * We do have a task that uses multiformat handles. Let's create the
  829. * required conversion tasks.
  830. */
  831. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  832. unsigned i;
  833. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  834. for (i = 0; i < nbuffers; i++)
  835. {
  836. struct starpu_task *conversion_task;
  837. starpu_data_handle_t handle;
  838. handle = STARPU_TASK_GET_HANDLE(task, i);
  839. if (!_starpu_handle_needs_conversion_task(handle, node))
  840. continue;
  841. conversion_task = _starpu_create_conversion_task(handle, node);
  842. conversion_task->mf_skip = 1;
  843. conversion_task->execute_on_a_specific_worker = 1;
  844. conversion_task->workerid = worker_id;
  845. /*
  846. * Next tasks will need to know where these handles have gone.
  847. */
  848. handle->mf_node = node;
  849. _starpu_task_submit_conversion_task(conversion_task, worker_id);
  850. }
  851. task->mf_skip = 1;
  852. starpu_task_list_push_back(&worker->local_tasks, task);
  853. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  854. goto pick;
  855. profiling:
  856. if (profiling)
  857. {
  858. struct starpu_profiling_task_info *profiling_info;
  859. profiling_info = task->profiling_info;
  860. /* The task may have been created before profiling was enabled,
  861. * so we check if the profiling_info structure is available
  862. * even though we already tested if profiling is enabled. */
  863. if (profiling_info)
  864. {
  865. memcpy(&profiling_info->pop_start_time,
  866. &pop_start_time, sizeof(struct timespec));
  867. _starpu_clock_gettime(&profiling_info->pop_end_time);
  868. }
  869. }
  870. if(task->prologue_callback_pop_func)
  871. {
  872. _starpu_set_current_task(task);
  873. task->prologue_callback_pop_func(task->prologue_callback_pop_arg);
  874. _starpu_set_current_task(NULL);
  875. }
  876. return task;
  877. }
  878. struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx)
  879. {
  880. struct starpu_task *task = NULL;
  881. if(sched_ctx->sched_policy)
  882. {
  883. STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
  884. /* TODO set profiling info */
  885. if(sched_ctx->sched_policy->pop_every_task)
  886. {
  887. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  888. task = sched_ctx->sched_policy->pop_every_task(sched_ctx->id);
  889. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  890. }
  891. }
  892. return task;
  893. }
  894. void _starpu_sched_pre_exec_hook(struct starpu_task *task)
  895. {
  896. unsigned sched_ctx_id = starpu_sched_ctx_get_ctx_for_task(task);
  897. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  898. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pre_exec_hook)
  899. {
  900. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  901. sched_ctx->sched_policy->pre_exec_hook(task, sched_ctx_id);
  902. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  903. }
  904. if(!sched_ctx->sched_policy)
  905. {
  906. int workerid = starpu_worker_get_id();
  907. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  908. struct _starpu_sched_ctx *other_sched_ctx;
  909. struct _starpu_sched_ctx_elt *e = NULL;
  910. struct _starpu_sched_ctx_list_iterator list_it;
  911. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  912. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  913. {
  914. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  915. other_sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  916. if (other_sched_ctx != sched_ctx &&
  917. other_sched_ctx->sched_policy != NULL &&
  918. other_sched_ctx->sched_policy->pre_exec_hook)
  919. {
  920. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  921. other_sched_ctx->sched_policy->pre_exec_hook(task, other_sched_ctx->id);
  922. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  923. }
  924. }
  925. }
  926. }
  927. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  928. {
  929. unsigned sched_ctx_id = starpu_sched_ctx_get_ctx_for_task(task);
  930. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  931. if (sched_ctx->sched_policy && sched_ctx->sched_policy->post_exec_hook)
  932. {
  933. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  934. sched_ctx->sched_policy->post_exec_hook(task, sched_ctx_id);
  935. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  936. }
  937. if(!sched_ctx->sched_policy)
  938. {
  939. int workerid = starpu_worker_get_id();
  940. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  941. struct _starpu_sched_ctx *other_sched_ctx;
  942. struct _starpu_sched_ctx_elt *e = NULL;
  943. struct _starpu_sched_ctx_list_iterator list_it;
  944. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  945. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  946. {
  947. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  948. other_sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  949. if (other_sched_ctx != sched_ctx &&
  950. other_sched_ctx->sched_policy != NULL &&
  951. other_sched_ctx->sched_policy->post_exec_hook)
  952. {
  953. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  954. other_sched_ctx->sched_policy->post_exec_hook(task, other_sched_ctx->id);
  955. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  956. }
  957. }
  958. }
  959. }
  960. void _starpu_wait_on_sched_event(void)
  961. {
  962. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  963. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  964. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  965. if (_starpu_machine_is_running())
  966. {
  967. #ifndef STARPU_NON_BLOCKING_DRIVERS
  968. STARPU_PTHREAD_COND_WAIT(&worker->sched_cond,
  969. &worker->sched_mutex);
  970. #endif
  971. }
  972. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  973. }
  974. /* The scheduling policy may put tasks directly into a worker's local queue so
  975. * that it is not always necessary to create its own queue when the local queue
  976. * is sufficient. If "back" not null, the task is put at the back of the queue
  977. * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  978. * a FIFO ordering. */
  979. int starpu_push_local_task(int workerid, struct starpu_task *task, int prio)
  980. {
  981. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  982. return _starpu_push_local_task(worker, task, prio);
  983. }
  984. void _starpu_print_idle_time()
  985. {
  986. if(!starpu_idle_file)
  987. return;
  988. double all_idle = 0.0;
  989. int i = 0;
  990. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  991. all_idle += idle[i];
  992. FILE *f;
  993. f = fopen(starpu_idle_file, "a");
  994. if (!f)
  995. {
  996. _STARPU_MSG("couldn't open %s: %s\n", starpu_idle_file, strerror(errno));
  997. }
  998. else
  999. {
  1000. fprintf(f, "%lf \n", all_idle);
  1001. fclose(f);
  1002. }
  1003. }