sched_policy.c 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011-2017 Inria
  4. * Copyright (C) 2013 Simon Archipoff
  5. * Copyright (C) 2008-2018 Université de Bordeaux
  6. * Copyright (C) 2010-2017 CNRS
  7. * Copyright (C) 2013 Thibaut Lambert
  8. * Copyright (C) 2016 Uppsala University
  9. *
  10. * StarPU is free software; you can redistribute it and/or modify
  11. * it under the terms of the GNU Lesser General Public License as published by
  12. * the Free Software Foundation; either version 2.1 of the License, or (at
  13. * your option) any later version.
  14. *
  15. * StarPU is distributed in the hope that it will be useful, but
  16. * WITHOUT ANY WARRANTY; without even the implied warranty of
  17. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  18. *
  19. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  20. */
  21. #include <starpu.h>
  22. #include <common/config.h>
  23. #include <common/utils.h>
  24. #include <core/sched_policy.h>
  25. #include <profiling/profiling.h>
  26. #include <common/barrier.h>
  27. #include <core/debug.h>
  28. #include <core/task.h>
  29. static int use_prefetch = 0;
  30. static double idle[STARPU_NMAXWORKERS];
  31. static double idle_start[STARPU_NMAXWORKERS];
  32. long _starpu_task_break_on_push = -1;
  33. long _starpu_task_break_on_sched = -1;
  34. long _starpu_task_break_on_pop = -1;
  35. long _starpu_task_break_on_exec = -1;
  36. static const char *starpu_idle_file;
  37. void _starpu_sched_init(void)
  38. {
  39. _starpu_task_break_on_push = starpu_get_env_number_default("STARPU_TASK_BREAK_ON_PUSH", -1);
  40. _starpu_task_break_on_sched = starpu_get_env_number_default("STARPU_TASK_BREAK_ON_SCHED", -1);
  41. _starpu_task_break_on_pop = starpu_get_env_number_default("STARPU_TASK_BREAK_ON_POP", -1);
  42. _starpu_task_break_on_exec = starpu_get_env_number_default("STARPU_TASK_BREAK_ON_EXEC", -1);
  43. starpu_idle_file = starpu_getenv("STARPU_IDLE_FILE");
  44. }
  45. int starpu_get_prefetch_flag(void)
  46. {
  47. return use_prefetch;
  48. }
  49. static struct starpu_sched_policy *predefined_policies[] =
  50. {
  51. &_starpu_sched_modular_eager_policy,
  52. &_starpu_sched_modular_eager_prefetching_policy,
  53. &_starpu_sched_modular_prio_policy,
  54. &_starpu_sched_modular_prio_prefetching_policy,
  55. &_starpu_sched_modular_random_policy,
  56. &_starpu_sched_modular_random_prio_policy,
  57. &_starpu_sched_modular_random_prefetching_policy,
  58. &_starpu_sched_modular_random_prio_prefetching_policy,
  59. &_starpu_sched_modular_ws_policy,
  60. &_starpu_sched_modular_heft_policy,
  61. &_starpu_sched_modular_heft_prio_policy,
  62. &_starpu_sched_modular_heft2_policy,
  63. &_starpu_sched_eager_policy,
  64. &_starpu_sched_prio_policy,
  65. &_starpu_sched_random_policy,
  66. &_starpu_sched_lws_policy,
  67. &_starpu_sched_ws_policy,
  68. &_starpu_sched_dm_policy,
  69. &_starpu_sched_dmda_policy,
  70. &_starpu_sched_dmda_ready_policy,
  71. &_starpu_sched_dmda_sorted_policy,
  72. &_starpu_sched_dmda_sorted_decision_policy,
  73. &_starpu_sched_parallel_heft_policy,
  74. &_starpu_sched_peager_policy,
  75. &_starpu_sched_heteroprio_policy,
  76. &_starpu_sched_graph_test_policy,
  77. NULL
  78. };
  79. struct starpu_sched_policy **starpu_sched_get_predefined_policies()
  80. {
  81. return predefined_policies;
  82. }
  83. struct starpu_sched_policy *_starpu_get_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  84. {
  85. return sched_ctx->sched_policy;
  86. }
  87. /*
  88. * Methods to initialize the scheduling policy
  89. */
  90. static void load_sched_policy(struct starpu_sched_policy *sched_policy, struct _starpu_sched_ctx *sched_ctx)
  91. {
  92. STARPU_ASSERT(sched_policy);
  93. #ifdef STARPU_VERBOSE
  94. if (sched_policy->policy_name)
  95. {
  96. if (sched_policy->policy_description)
  97. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  98. else
  99. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  100. }
  101. #endif
  102. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  103. memcpy(policy, sched_policy, sizeof(*policy));
  104. }
  105. static struct starpu_sched_policy *find_sched_policy_from_name(const char *policy_name)
  106. {
  107. if (!policy_name)
  108. return NULL;
  109. if (strcmp(policy_name, "") == 0)
  110. return NULL;
  111. if (strncmp(policy_name, "heft", 4) == 0)
  112. {
  113. _STARPU_MSG("Warning: heft is now called \"dmda\".\n");
  114. return &_starpu_sched_dmda_policy;
  115. }
  116. struct starpu_sched_policy **policy;
  117. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  118. {
  119. struct starpu_sched_policy *p = *policy;
  120. if (p->policy_name)
  121. {
  122. if (strcmp(policy_name, p->policy_name) == 0)
  123. {
  124. /* we found a policy with the requested name */
  125. return p;
  126. }
  127. }
  128. }
  129. if (strcmp(policy_name, "help") != 0)
  130. _STARPU_MSG("Warning: scheduling policy '%s' was not found, try 'help' to get a list\n", policy_name);
  131. /* nothing was found */
  132. return NULL;
  133. }
  134. static void display_sched_help_message(FILE *stream)
  135. {
  136. const char *sched_env = starpu_getenv("STARPU_SCHED");
  137. if (sched_env && (strcmp(sched_env, "help") == 0))
  138. {
  139. /* display the description of all predefined policies */
  140. struct starpu_sched_policy **policy;
  141. fprintf(stream, "\nThe variable STARPU_SCHED can be set to one of the following strings:\n");
  142. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  143. {
  144. struct starpu_sched_policy *p = *policy;
  145. fprintf(stream, "%-30s\t-> %s\n", p->policy_name, p->policy_description);
  146. }
  147. fprintf(stream, "\n");
  148. }
  149. }
  150. struct starpu_sched_policy *_starpu_select_sched_policy(struct _starpu_machine_config *config, const char *required_policy)
  151. {
  152. struct starpu_sched_policy *selected_policy = NULL;
  153. struct starpu_conf *user_conf = &config->conf;
  154. if(required_policy)
  155. selected_policy = find_sched_policy_from_name(required_policy);
  156. /* If there is a policy that matches the required name, return it */
  157. if (selected_policy)
  158. return selected_policy;
  159. /* First, we check whether the application explicitely gave a scheduling policy or not */
  160. if (user_conf && (user_conf->sched_policy))
  161. return user_conf->sched_policy;
  162. /* Otherwise, we look if the application specified the name of a policy to load */
  163. const char *sched_pol_name;
  164. sched_pol_name = starpu_getenv("STARPU_SCHED");
  165. if (sched_pol_name == NULL && user_conf && user_conf->sched_policy_name)
  166. sched_pol_name = user_conf->sched_policy_name;
  167. if (sched_pol_name)
  168. selected_policy = find_sched_policy_from_name(sched_pol_name);
  169. /* If there is a policy that matches the name, return it */
  170. if (selected_policy)
  171. return selected_policy;
  172. /* If no policy was specified, we use the eager policy by default */
  173. return &_starpu_sched_lws_policy;
  174. }
  175. void _starpu_init_sched_policy(struct _starpu_machine_config *config, struct _starpu_sched_ctx *sched_ctx, struct starpu_sched_policy *selected_policy)
  176. {
  177. /* Perhaps we have to display some help */
  178. display_sched_help_message(stderr);
  179. /* Prefetch is activated by default */
  180. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  181. if (use_prefetch == -1)
  182. use_prefetch = 1;
  183. /* Set calibrate flag */
  184. _starpu_set_calibrate_flag(config->conf.calibrate);
  185. load_sched_policy(selected_policy, sched_ctx);
  186. if (starpu_get_env_number_default("STARPU_WORKER_TREE", 0))
  187. {
  188. #ifdef STARPU_HAVE_HWLOC
  189. sched_ctx->sched_policy->worker_type = STARPU_WORKER_TREE;
  190. #else
  191. _STARPU_DISP("STARPU_WORKER_TREE ignored, please rebuild StarPU with hwloc support to enable it.");
  192. #endif
  193. }
  194. starpu_sched_ctx_create_worker_collection(sched_ctx->id,
  195. sched_ctx->sched_policy->worker_type);
  196. _STARPU_SCHED_BEGIN;
  197. sched_ctx->sched_policy->init_sched(sched_ctx->id);
  198. _STARPU_SCHED_END;
  199. }
  200. void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  201. {
  202. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  203. if (policy->deinit_sched)
  204. {
  205. _STARPU_SCHED_BEGIN;
  206. policy->deinit_sched(sched_ctx->id);
  207. _STARPU_SCHED_END;
  208. }
  209. starpu_sched_ctx_delete_worker_collection(sched_ctx->id);
  210. }
  211. void _starpu_sched_task_submit(struct starpu_task *task)
  212. {
  213. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  214. if (!sched_ctx->sched_policy)
  215. return;
  216. if (!sched_ctx->sched_policy->submit_hook)
  217. return;
  218. _STARPU_SCHED_BEGIN;
  219. sched_ctx->sched_policy->submit_hook(task);
  220. _STARPU_SCHED_END;
  221. }
  222. void _starpu_sched_do_schedule(unsigned sched_ctx_id)
  223. {
  224. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  225. if (!sched_ctx->sched_policy)
  226. return;
  227. if (!sched_ctx->sched_policy->do_schedule)
  228. return;
  229. _STARPU_SCHED_BEGIN;
  230. sched_ctx->sched_policy->do_schedule(sched_ctx_id);
  231. _STARPU_SCHED_END;
  232. }
  233. static void _starpu_push_task_on_specific_worker_notify_sched(struct starpu_task *task, struct _starpu_worker *worker, int workerid, int perf_workerid)
  234. {
  235. /* if we push a task on a specific worker, notify all the sched_ctxs the worker belongs to */
  236. struct _starpu_sched_ctx_list_iterator list_it;
  237. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  238. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  239. {
  240. struct _starpu_sched_ctx_elt *e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  241. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  242. if (sched_ctx->sched_policy != NULL && sched_ctx->sched_policy->push_task_notify)
  243. {
  244. _STARPU_SCHED_BEGIN;
  245. sched_ctx->sched_policy->push_task_notify(task, workerid, perf_workerid, sched_ctx->id);
  246. _STARPU_SCHED_END;
  247. }
  248. }
  249. }
  250. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  251. * case workerid identifies a combined worker, a task will be enqueued into
  252. * each worker of the combination. */
  253. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  254. {
  255. int nbasic_workers = (int)starpu_worker_get_count();
  256. /* Is this a basic worker or a combined worker ? */
  257. int is_basic_worker = (workerid < nbasic_workers);
  258. struct _starpu_worker *worker = NULL;
  259. struct _starpu_combined_worker *combined_worker = NULL;
  260. if (is_basic_worker)
  261. {
  262. worker = _starpu_get_worker_struct(workerid);
  263. }
  264. else
  265. {
  266. combined_worker = _starpu_get_combined_worker_struct(workerid);
  267. }
  268. if (use_prefetch)
  269. starpu_prefetch_task_input_for(task, workerid);
  270. if (is_basic_worker)
  271. _starpu_push_task_on_specific_worker_notify_sched(task, worker, workerid, workerid);
  272. else
  273. {
  274. /* Notify all workers of the combined worker */
  275. int worker_size = combined_worker->worker_size;
  276. int *combined_workerid = combined_worker->combined_workerid;
  277. int j;
  278. for (j = 0; j < worker_size; j++)
  279. {
  280. int subworkerid = combined_workerid[j];
  281. _starpu_push_task_on_specific_worker_notify_sched(task, _starpu_get_worker_struct(subworkerid), subworkerid, workerid);
  282. }
  283. }
  284. #ifdef STARPU_USE_SC_HYPERVISOR
  285. starpu_sched_ctx_call_pushed_task_cb(workerid, task->sched_ctx);
  286. #endif //STARPU_USE_SC_HYPERVISOR
  287. if (is_basic_worker)
  288. {
  289. unsigned node = starpu_worker_get_memory_node(workerid);
  290. if (_starpu_task_uses_multiformat_handles(task))
  291. {
  292. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  293. unsigned i;
  294. for (i = 0; i < nbuffers; i++)
  295. {
  296. struct starpu_task *conversion_task;
  297. starpu_data_handle_t handle;
  298. handle = STARPU_TASK_GET_HANDLE(task, i);
  299. if (!_starpu_handle_needs_conversion_task(handle, node))
  300. continue;
  301. conversion_task = _starpu_create_conversion_task(handle, node);
  302. conversion_task->mf_skip = 1;
  303. conversion_task->execute_on_a_specific_worker = 1;
  304. conversion_task->workerid = workerid;
  305. _starpu_task_submit_conversion_task(conversion_task, workerid);
  306. //_STARPU_DEBUG("Pushing a conversion task\n");
  307. }
  308. for (i = 0; i < nbuffers; i++)
  309. {
  310. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  311. handle->mf_node = node;
  312. }
  313. }
  314. // if(task->sched_ctx != _starpu_get_initial_sched_ctx()->id)
  315. if(task->priority > 0)
  316. return _starpu_push_local_task(worker, task, 1);
  317. else
  318. return _starpu_push_local_task(worker, task, 0);
  319. }
  320. else
  321. {
  322. /* This is a combined worker so we create task aliases */
  323. int worker_size = combined_worker->worker_size;
  324. int *combined_workerid = combined_worker->combined_workerid;
  325. int ret = 0;
  326. struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
  327. job->task_size = worker_size;
  328. job->combined_workerid = workerid;
  329. job->active_task_alias_count = 0;
  330. STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, worker_size);
  331. STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, worker_size);
  332. job->after_work_busy_barrier = worker_size;
  333. /* Note: we have to call that early, or else the task may have
  334. * disappeared already */
  335. starpu_push_task_end(task);
  336. int j;
  337. for (j = 0; j < worker_size; j++)
  338. {
  339. struct starpu_task *alias = starpu_task_dup(task);
  340. alias->destroy = 1;
  341. _STARPU_TRACE_JOB_PUSH(alias, alias->priority > 0);
  342. worker = _starpu_get_worker_struct(combined_workerid[j]);
  343. ret |= _starpu_push_local_task(worker, alias, 0);
  344. }
  345. return ret;
  346. }
  347. }
  348. /* the generic interface that call the proper underlying implementation */
  349. int _starpu_push_task(struct _starpu_job *j)
  350. {
  351. if(j->task->prologue_callback_func)
  352. j->task->prologue_callback_func(j->task->prologue_callback_arg);
  353. return _starpu_repush_task(j);
  354. }
  355. int _starpu_repush_task(struct _starpu_job *j)
  356. {
  357. struct starpu_task *task = j->task;
  358. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  359. int ret;
  360. _STARPU_LOG_IN();
  361. unsigned can_push = _starpu_increment_nready_tasks_of_sched_ctx(task->sched_ctx, task->flops, task);
  362. task->status = STARPU_TASK_READY;
  363. STARPU_AYU_ADDTOTASKQUEUE(j->job_id, -1);
  364. /* if the context does not have any workers save the tasks in a temp list */
  365. if ((task->cl != NULL && task->where != STARPU_NOWHERE) && (!sched_ctx->is_initial_sched))
  366. {
  367. /*if there are workers in the ctx that are not able to execute tasks
  368. we consider the ctx empty */
  369. unsigned nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  370. if(nworkers == 0)
  371. {
  372. _starpu_sched_ctx_lock_write(sched_ctx->id);
  373. starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
  374. _starpu_sched_ctx_unlock_write(sched_ctx->id);
  375. #ifdef STARPU_USE_SC_HYPERVISOR
  376. if(sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  377. && sched_ctx->perf_counters->notify_empty_ctx)
  378. {
  379. _STARPU_TRACE_HYPERVISOR_BEGIN();
  380. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  381. _STARPU_TRACE_HYPERVISOR_END();
  382. }
  383. #endif
  384. return 0;
  385. }
  386. }
  387. if(!can_push)
  388. return 0;
  389. /* in case there is no codelet associated to the task (that's a control
  390. * task), we directly execute its callback and enforce the
  391. * corresponding dependencies */
  392. if (task->cl == NULL || task->where == STARPU_NOWHERE)
  393. {
  394. if (task->prologue_callback_pop_func)
  395. task->prologue_callback_pop_func(task->prologue_callback_pop_arg);
  396. if (task->cl && task->cl->specific_nodes)
  397. {
  398. /* Nothing to do, but we are asked to fetch data on some memory nodes */
  399. _starpu_fetch_nowhere_task_input(j);
  400. }
  401. else
  402. {
  403. if (task->cl)
  404. __starpu_push_task_output(j);
  405. _starpu_handle_job_termination(j);
  406. _STARPU_LOG_OUT_TAG("handle_job_termination");
  407. }
  408. return 0;
  409. }
  410. ret = _starpu_push_task_to_workers(task);
  411. if (ret == -EAGAIN)
  412. /* pushed to empty context, that's fine */
  413. ret = 0;
  414. return ret;
  415. }
  416. int _starpu_push_task_to_workers(struct starpu_task *task)
  417. {
  418. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  419. unsigned nworkers = 0;
  420. _STARPU_TRACE_JOB_PUSH(task, task->priority > 0);
  421. /* if the contexts still does not have workers put the task back to its place in
  422. the empty ctx list */
  423. if(!sched_ctx->is_initial_sched)
  424. {
  425. /*if there are workers in the ctx that are not able to execute tasks
  426. we consider the ctx empty */
  427. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  428. if (nworkers == 0)
  429. {
  430. _starpu_sched_ctx_lock_write(sched_ctx->id);
  431. starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task);
  432. _starpu_sched_ctx_unlock_write(sched_ctx->id);
  433. #ifdef STARPU_USE_SC_HYPERVISOR
  434. if(sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  435. && sched_ctx->perf_counters->notify_empty_ctx)
  436. {
  437. _STARPU_TRACE_HYPERVISOR_BEGIN();
  438. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  439. _STARPU_TRACE_HYPERVISOR_END();
  440. }
  441. #endif
  442. return -EAGAIN;
  443. }
  444. }
  445. _starpu_profiling_set_task_push_start_time(task);
  446. int ret = 0;
  447. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  448. {
  449. if (starpu_get_prefetch_flag())
  450. starpu_prefetch_task_input_for(task, task->workerid);
  451. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  452. }
  453. else
  454. {
  455. struct _starpu_machine_config *config = _starpu_get_machine_config();
  456. /* When a task can only be executed on a given arch and we have
  457. * only one memory node for that arch, we can systematically
  458. * prefetch before the scheduling decision. */
  459. if (starpu_get_prefetch_flag())
  460. {
  461. if (task->where == STARPU_CPU && config->cpus_nodeid >= 0)
  462. starpu_prefetch_task_input_on_node(task, config->cpus_nodeid);
  463. else if (task->where == STARPU_CUDA && config->cuda_nodeid >= 0)
  464. starpu_prefetch_task_input_on_node(task, config->cuda_nodeid);
  465. else if (task->where == STARPU_OPENCL && config->opencl_nodeid >= 0)
  466. starpu_prefetch_task_input_on_node(task, config->opencl_nodeid);
  467. else if (task->where == STARPU_MIC && config->mic_nodeid >= 0)
  468. starpu_prefetch_task_input_on_node(task, config->mic_nodeid);
  469. else if (task->where == STARPU_SCC && config->scc_nodeid >= 0)
  470. starpu_prefetch_task_input_on_node(task, config->scc_nodeid);
  471. }
  472. if(!sched_ctx->sched_policy)
  473. {
  474. /* Note: we have to call that early, or else the task may have
  475. * disappeared already */
  476. starpu_push_task_end(task);
  477. if(!sched_ctx->awake_workers)
  478. ret = _starpu_push_task_on_specific_worker(task, sched_ctx->main_master);
  479. else
  480. {
  481. struct starpu_worker_collection *workers = sched_ctx->workers;
  482. struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
  483. job->task_size = workers->nworkers;
  484. job->combined_workerid = -1; // workerid; its a ctx not combined worker
  485. job->active_task_alias_count = 0;
  486. STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, workers->nworkers);
  487. STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, workers->nworkers);
  488. job->after_work_busy_barrier = workers->nworkers;
  489. struct starpu_sched_ctx_iterator it;
  490. if(workers->init_iterator)
  491. workers->init_iterator(workers, &it);
  492. while(workers->has_next(workers, &it))
  493. {
  494. unsigned workerid = workers->get_next(workers, &it);
  495. struct starpu_task *alias;
  496. if (job->task_size > 1)
  497. {
  498. alias = starpu_task_dup(task);
  499. _STARPU_TRACE_JOB_PUSH(alias, alias->priority > 0);
  500. alias->destroy = 1;
  501. }
  502. else
  503. alias = task;
  504. ret |= _starpu_push_task_on_specific_worker(alias, workerid);
  505. }
  506. }
  507. }
  508. else
  509. {
  510. STARPU_ASSERT(sched_ctx->sched_policy->push_task);
  511. /* check out if there are any workers in the context */
  512. nworkers = starpu_sched_ctx_get_nworkers(sched_ctx->id);
  513. if (nworkers == 0)
  514. ret = -1;
  515. else
  516. {
  517. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  518. if (worker)
  519. {
  520. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  521. _starpu_worker_enter_sched_op(worker);
  522. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  523. }
  524. _STARPU_TASK_BREAK_ON(task, push);
  525. _STARPU_SCHED_BEGIN;
  526. ret = sched_ctx->sched_policy->push_task(task);
  527. _STARPU_SCHED_END;
  528. if (worker)
  529. {
  530. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  531. _starpu_worker_leave_sched_op(worker);
  532. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  533. }
  534. }
  535. }
  536. if(ret == -1)
  537. {
  538. _STARPU_MSG("repush task \n");
  539. _STARPU_TRACE_JOB_POP(task, task->priority > 0);
  540. ret = _starpu_push_task_to_workers(task);
  541. }
  542. }
  543. /* Note: from here, the task might have been destroyed already! */
  544. _STARPU_LOG_OUT();
  545. return ret;
  546. }
  547. /* This is called right after the scheduler has pushed a task to a queue
  548. * but just before releasing mutexes: we need the task to still be alive!
  549. */
  550. int starpu_push_task_end(struct starpu_task *task)
  551. {
  552. _starpu_profiling_set_task_push_end_time(task);
  553. task->scheduled = 1;
  554. return 0;
  555. }
  556. /* This is called right after the scheduler has pushed a task to a queue
  557. * but just before releasing mutexes: we need the task to still be alive!
  558. */
  559. int _starpu_pop_task_end(struct starpu_task *task)
  560. {
  561. if (!task)
  562. return 0;
  563. _STARPU_TRACE_JOB_POP(task, task->priority > 0);
  564. return 0;
  565. }
  566. /*
  567. * Given a handle that needs to be converted in order to be used on the given
  568. * node, returns a task that takes care of the conversion.
  569. */
  570. struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle,
  571. unsigned int node)
  572. {
  573. return _starpu_create_conversion_task_for_arch(handle, starpu_node_get_kind(node));
  574. }
  575. struct starpu_task *_starpu_create_conversion_task_for_arch(starpu_data_handle_t handle,
  576. enum starpu_node_kind node_kind)
  577. {
  578. struct starpu_task *conversion_task;
  579. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID)
  580. struct starpu_multiformat_interface *format_interface;
  581. #endif
  582. conversion_task = starpu_task_create();
  583. conversion_task->name = "conversion_task";
  584. conversion_task->synchronous = 0;
  585. STARPU_TASK_SET_HANDLE(conversion_task, handle, 0);
  586. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID)
  587. /* The node does not really matter here */
  588. format_interface = (struct starpu_multiformat_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
  589. #endif
  590. _starpu_spin_lock(&handle->header_lock);
  591. handle->refcnt++;
  592. handle->busy_count++;
  593. _starpu_spin_unlock(&handle->header_lock);
  594. switch(node_kind)
  595. {
  596. case STARPU_CPU_RAM:
  597. case STARPU_SCC_RAM:
  598. case STARPU_SCC_SHM:
  599. switch (starpu_node_get_kind(handle->mf_node))
  600. {
  601. case STARPU_CPU_RAM:
  602. case STARPU_SCC_RAM:
  603. case STARPU_SCC_SHM:
  604. STARPU_ABORT();
  605. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  606. case STARPU_CUDA_RAM:
  607. {
  608. struct starpu_multiformat_data_interface_ops *mf_ops;
  609. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  610. conversion_task->cl = mf_ops->cuda_to_cpu_cl;
  611. break;
  612. }
  613. #endif
  614. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  615. case STARPU_OPENCL_RAM:
  616. {
  617. struct starpu_multiformat_data_interface_ops *mf_ops;
  618. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  619. conversion_task->cl = mf_ops->opencl_to_cpu_cl;
  620. break;
  621. }
  622. #endif
  623. #ifdef STARPU_USE_MIC
  624. case STARPU_MIC_RAM:
  625. {
  626. struct starpu_multiformat_data_interface_ops *mf_ops;
  627. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  628. conversion_task->cl = mf_ops->mic_to_cpu_cl;
  629. break;
  630. }
  631. #endif
  632. default:
  633. _STARPU_ERROR("Oops : %u\n", handle->mf_node);
  634. }
  635. break;
  636. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  637. case STARPU_CUDA_RAM:
  638. {
  639. struct starpu_multiformat_data_interface_ops *mf_ops;
  640. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  641. conversion_task->cl = mf_ops->cpu_to_cuda_cl;
  642. break;
  643. }
  644. #endif
  645. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  646. case STARPU_OPENCL_RAM:
  647. {
  648. struct starpu_multiformat_data_interface_ops *mf_ops;
  649. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  650. conversion_task->cl = mf_ops->cpu_to_opencl_cl;
  651. break;
  652. }
  653. #endif
  654. #ifdef STARPU_USE_MIC
  655. case STARPU_MIC_RAM:
  656. {
  657. struct starpu_multiformat_data_interface_ops *mf_ops;
  658. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  659. conversion_task->cl = mf_ops->cpu_to_mic_cl;
  660. break;
  661. }
  662. #endif
  663. default:
  664. STARPU_ABORT();
  665. }
  666. STARPU_TASK_SET_MODE(conversion_task, STARPU_RW, 0);
  667. return conversion_task;
  668. }
  669. static
  670. struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker *worker)
  671. {
  672. struct _starpu_sched_ctx_elt *e = NULL;
  673. struct _starpu_sched_ctx_list_iterator list_it;
  674. int found = 0;
  675. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  676. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  677. {
  678. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  679. if (e->task_number > 0)
  680. return _starpu_get_sched_ctx_struct(e->sched_ctx);
  681. }
  682. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  683. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  684. {
  685. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  686. if (e->last_poped)
  687. {
  688. e->last_poped = 0;
  689. if (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  690. {
  691. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  692. found = 1;
  693. }
  694. break;
  695. }
  696. }
  697. if (!found)
  698. e = worker->sched_ctx_list->head;
  699. e->last_poped = 1;
  700. return _starpu_get_sched_ctx_struct(e->sched_ctx);
  701. }
  702. struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
  703. {
  704. struct starpu_task *task;
  705. int worker_id;
  706. unsigned node;
  707. /* We can't tell in advance which task will be picked up, so we measure
  708. * a timestamp, and will attribute it afterwards to the task. */
  709. int profiling = starpu_profiling_status_get();
  710. struct timespec pop_start_time;
  711. if (profiling)
  712. _starpu_clock_gettime(&pop_start_time);
  713. pick:
  714. /* perhaps there is some local task to be executed first */
  715. task = _starpu_pop_local_task(worker);
  716. if (task)
  717. _STARPU_TASK_BREAK_ON(task, pop);
  718. /* get tasks from the stacks of the strategy */
  719. if(!task)
  720. {
  721. struct _starpu_sched_ctx *sched_ctx ;
  722. #ifndef STARPU_NON_BLOCKING_DRIVERS
  723. int been_here[STARPU_NMAX_SCHED_CTXS];
  724. int i;
  725. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  726. been_here[i] = 0;
  727. while(!task)
  728. #endif
  729. {
  730. if(worker->nsched_ctxs == 1)
  731. sched_ctx = _starpu_get_initial_sched_ctx();
  732. else
  733. {
  734. while(1)
  735. {
  736. /** Caution
  737. * If you use multiple contexts your scheduler *needs*
  738. * to update the variable task_number of the ctx list.
  739. * In order to get the best performances.
  740. * This is done using functions :
  741. * starpu_sched_ctx_list_task_counters_increment...(...)
  742. * starpu_sched_ctx_list_task_counters_decrement...(...)
  743. **/
  744. sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
  745. if(worker->removed_from_ctx[sched_ctx->id] == 1 && worker->shares_tasks_lists[sched_ctx->id] == 1)
  746. {
  747. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  748. worker->removed_from_ctx[sched_ctx->id] = 0;
  749. sched_ctx = NULL;
  750. }
  751. else
  752. break;
  753. }
  754. }
  755. if(sched_ctx && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  756. {
  757. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
  758. {
  759. /* Note: we do not push the scheduling state here, because
  760. * otherwise when a worker is idle, we'd keep
  761. * pushing/popping a scheduling state here, while what we
  762. * want to see in the trace is a permanent idle state. */
  763. task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
  764. if (task)
  765. _STARPU_TASK_BREAK_ON(task, pop);
  766. _starpu_pop_task_end(task);
  767. }
  768. }
  769. if(!task)
  770. {
  771. /* it doesn't matter if it shares tasks list or not in the scheduler,
  772. if it does not have any task to pop just get it out of here */
  773. /* however if it shares a task list it will be removed as soon as he
  774. finishes this job (in handle_job_termination) */
  775. if(worker->removed_from_ctx[sched_ctx->id])
  776. {
  777. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  778. worker->removed_from_ctx[sched_ctx->id] = 0;
  779. }
  780. #ifdef STARPU_USE_SC_HYPERVISOR
  781. if(worker->pop_ctx_priority)
  782. {
  783. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  784. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_idle_cycle && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
  785. {
  786. // _STARPU_TRACE_HYPERVISOR_BEGIN();
  787. perf_counters->notify_idle_cycle(sched_ctx->id, worker->workerid, 1.0);
  788. // _STARPU_TRACE_HYPERVISOR_END();
  789. }
  790. }
  791. #endif //STARPU_USE_SC_HYPERVISOR
  792. #ifndef STARPU_NON_BLOCKING_DRIVERS
  793. if(been_here[sched_ctx->id] || worker->nsched_ctxs == 1)
  794. break;
  795. been_here[sched_ctx->id] = 1;
  796. #endif
  797. }
  798. }
  799. }
  800. if (!task)
  801. {
  802. if (starpu_idle_file)
  803. idle_start[worker->workerid] = starpu_timing_now();
  804. return NULL;
  805. }
  806. if(starpu_idle_file && idle_start[worker->workerid] != 0.0)
  807. {
  808. double idle_end = starpu_timing_now();
  809. idle[worker->workerid] += (idle_end - idle_start[worker->workerid]);
  810. idle_start[worker->workerid] = 0.0;
  811. }
  812. #ifdef STARPU_USE_SC_HYPERVISOR
  813. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  814. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  815. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_poped_task && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
  816. {
  817. // _STARPU_TRACE_HYPERVISOR_BEGIN();
  818. perf_counters->notify_poped_task(task->sched_ctx, worker->workerid);
  819. // _STARPU_TRACE_HYPERVISOR_END();
  820. }
  821. #endif //STARPU_USE_SC_HYPERVISOR
  822. /* Make sure we do not bother with all the multiformat-specific code if
  823. * it is not necessary. */
  824. if (!_starpu_task_uses_multiformat_handles(task))
  825. goto profiling;
  826. /* This is either a conversion task, or a regular task for which the
  827. * conversion tasks have already been created and submitted */
  828. if (task->mf_skip)
  829. goto profiling;
  830. /*
  831. * This worker may not be able to execute this task. In this case, we
  832. * should return the task anyway. It will be pushed back almost immediatly.
  833. * This way, we avoid computing and executing the conversions tasks.
  834. * Here, we do not care about what implementation is used.
  835. */
  836. worker_id = starpu_worker_get_id_check();
  837. if (!starpu_worker_can_execute_task_first_impl(worker_id, task, NULL))
  838. return task;
  839. node = starpu_worker_get_memory_node(worker_id);
  840. /*
  841. * We do have a task that uses multiformat handles. Let's create the
  842. * required conversion tasks.
  843. */
  844. unsigned i;
  845. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  846. for (i = 0; i < nbuffers; i++)
  847. {
  848. struct starpu_task *conversion_task;
  849. starpu_data_handle_t handle;
  850. handle = STARPU_TASK_GET_HANDLE(task, i);
  851. if (!_starpu_handle_needs_conversion_task(handle, node))
  852. continue;
  853. conversion_task = _starpu_create_conversion_task(handle, node);
  854. conversion_task->mf_skip = 1;
  855. conversion_task->execute_on_a_specific_worker = 1;
  856. conversion_task->workerid = worker_id;
  857. /*
  858. * Next tasks will need to know where these handles have gone.
  859. */
  860. handle->mf_node = node;
  861. _starpu_task_submit_conversion_task(conversion_task, worker_id);
  862. }
  863. task->mf_skip = 1;
  864. starpu_task_list_push_back(&worker->local_tasks, task);
  865. goto pick;
  866. profiling:
  867. if (profiling)
  868. {
  869. struct starpu_profiling_task_info *profiling_info;
  870. profiling_info = task->profiling_info;
  871. /* The task may have been created before profiling was enabled,
  872. * so we check if the profiling_info structure is available
  873. * even though we already tested if profiling is enabled. */
  874. if (profiling_info)
  875. {
  876. memcpy(&profiling_info->pop_start_time,
  877. &pop_start_time, sizeof(struct timespec));
  878. _starpu_clock_gettime(&profiling_info->pop_end_time);
  879. }
  880. }
  881. if(task->prologue_callback_pop_func)
  882. {
  883. _starpu_set_current_task(task);
  884. task->prologue_callback_pop_func(task->prologue_callback_pop_arg);
  885. _starpu_set_current_task(NULL);
  886. }
  887. return task;
  888. }
  889. struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx)
  890. {
  891. struct starpu_task *task = NULL;
  892. if(sched_ctx->sched_policy)
  893. {
  894. STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
  895. /* TODO set profiling info */
  896. if(sched_ctx->sched_policy->pop_every_task)
  897. {
  898. _STARPU_SCHED_BEGIN;
  899. task = sched_ctx->sched_policy->pop_every_task(sched_ctx->id);
  900. _STARPU_SCHED_END;
  901. }
  902. }
  903. return task;
  904. }
  905. void _starpu_sched_pre_exec_hook(struct starpu_task *task)
  906. {
  907. unsigned sched_ctx_id = starpu_sched_ctx_get_ctx_for_task(task);
  908. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  909. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pre_exec_hook)
  910. {
  911. _STARPU_SCHED_BEGIN;
  912. sched_ctx->sched_policy->pre_exec_hook(task, sched_ctx_id);
  913. _STARPU_SCHED_END;
  914. }
  915. if(!sched_ctx->sched_policy)
  916. {
  917. int workerid = starpu_worker_get_id();
  918. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  919. struct _starpu_sched_ctx_list_iterator list_it;
  920. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  921. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  922. {
  923. struct _starpu_sched_ctx *other_sched_ctx;
  924. struct _starpu_sched_ctx_elt *e = NULL;
  925. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  926. other_sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  927. if (other_sched_ctx != sched_ctx &&
  928. other_sched_ctx->sched_policy != NULL &&
  929. other_sched_ctx->sched_policy->pre_exec_hook)
  930. {
  931. _STARPU_SCHED_BEGIN;
  932. other_sched_ctx->sched_policy->pre_exec_hook(task, other_sched_ctx->id);
  933. _STARPU_SCHED_END;
  934. }
  935. }
  936. }
  937. }
  938. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  939. {
  940. STARPU_ASSERT(task->cl != NULL && task->cl->where != STARPU_NOWHERE);
  941. unsigned sched_ctx_id = starpu_sched_ctx_get_ctx_for_task(task);
  942. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  943. if (sched_ctx->sched_policy && sched_ctx->sched_policy->post_exec_hook)
  944. {
  945. _STARPU_SCHED_BEGIN;
  946. sched_ctx->sched_policy->post_exec_hook(task, sched_ctx_id);
  947. _STARPU_SCHED_END;
  948. }
  949. if(!sched_ctx->sched_policy)
  950. {
  951. int workerid = starpu_worker_get_id();
  952. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  953. struct _starpu_sched_ctx_list_iterator list_it;
  954. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  955. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  956. {
  957. struct _starpu_sched_ctx *other_sched_ctx;
  958. struct _starpu_sched_ctx_elt *e = NULL;
  959. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  960. other_sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  961. if (other_sched_ctx != sched_ctx &&
  962. other_sched_ctx->sched_policy != NULL &&
  963. other_sched_ctx->sched_policy->post_exec_hook)
  964. {
  965. _STARPU_SCHED_BEGIN;
  966. other_sched_ctx->sched_policy->post_exec_hook(task, other_sched_ctx->id);
  967. _STARPU_SCHED_END;
  968. }
  969. }
  970. }
  971. }
  972. void _starpu_wait_on_sched_event(void)
  973. {
  974. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  975. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  976. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  977. if (_starpu_machine_is_running())
  978. {
  979. #ifndef STARPU_NON_BLOCKING_DRIVERS
  980. STARPU_PTHREAD_COND_WAIT(&worker->sched_cond,
  981. &worker->sched_mutex);
  982. #endif
  983. }
  984. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  985. }
  986. /* The scheduling policy may put tasks directly into a worker's local queue so
  987. * that it is not always necessary to create its own queue when the local queue
  988. * is sufficient. If "back" not null, the task is put at the back of the queue
  989. * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  990. * a FIFO ordering. */
  991. int starpu_push_local_task(int workerid, struct starpu_task *task, int prio)
  992. {
  993. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  994. return _starpu_push_local_task(worker, task, prio);
  995. }
  996. void _starpu_print_idle_time()
  997. {
  998. if(!starpu_idle_file)
  999. return;
  1000. double all_idle = 0.0;
  1001. int i = 0;
  1002. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  1003. all_idle += idle[i];
  1004. FILE *f;
  1005. f = fopen(starpu_idle_file, "a");
  1006. if (!f)
  1007. {
  1008. _STARPU_MSG("couldn't open %s: %s\n", starpu_idle_file, strerror(errno));
  1009. }
  1010. else
  1011. {
  1012. fprintf(f, "%lf \n", all_idle);
  1013. fclose(f);
  1014. }
  1015. }
  1016. void starpu_sched_task_break(struct starpu_task *task)
  1017. {
  1018. _STARPU_TASK_BREAK_ON(task, sched);
  1019. }