sched_policy.c 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011-2017 Inria
  4. * Copyright (C) 2013 Simon Archipoff
  5. * Copyright (C) 2008-2019 Université de Bordeaux
  6. * Copyright (C) 2010-2017, 2019 CNRS
  7. * Copyright (C) 2013 Thibaut Lambert
  8. * Copyright (C) 2016 Uppsala University
  9. *
  10. * StarPU is free software; you can redistribute it and/or modify
  11. * it under the terms of the GNU Lesser General Public License as published by
  12. * the Free Software Foundation; either version 2.1 of the License, or (at
  13. * your option) any later version.
  14. *
  15. * StarPU is distributed in the hope that it will be useful, but
  16. * WITHOUT ANY WARRANTY; without even the implied warranty of
  17. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  18. *
  19. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  20. */
  21. #include <starpu.h>
  22. #include <common/config.h>
  23. #include <common/utils.h>
  24. #include <core/sched_policy.h>
  25. #include <profiling/profiling.h>
  26. #include <common/barrier.h>
  27. #include <core/debug.h>
  28. #include <core/task.h>
  29. static int use_prefetch = 0;
  30. static double idle[STARPU_NMAXWORKERS];
  31. static double idle_start[STARPU_NMAXWORKERS];
  32. long _starpu_task_break_on_push = -1;
  33. long _starpu_task_break_on_sched = -1;
  34. long _starpu_task_break_on_pop = -1;
  35. long _starpu_task_break_on_exec = -1;
  36. static const char *starpu_idle_file;
  37. void _starpu_sched_init(void)
  38. {
  39. _starpu_task_break_on_push = starpu_get_env_number_default("STARPU_TASK_BREAK_ON_PUSH", -1);
  40. _starpu_task_break_on_sched = starpu_get_env_number_default("STARPU_TASK_BREAK_ON_SCHED", -1);
  41. _starpu_task_break_on_pop = starpu_get_env_number_default("STARPU_TASK_BREAK_ON_POP", -1);
  42. _starpu_task_break_on_exec = starpu_get_env_number_default("STARPU_TASK_BREAK_ON_EXEC", -1);
  43. starpu_idle_file = starpu_getenv("STARPU_IDLE_FILE");
  44. }
  45. int starpu_get_prefetch_flag(void)
  46. {
  47. return use_prefetch;
  48. }
  49. static struct starpu_sched_policy *predefined_policies[] =
  50. {
  51. &_starpu_sched_modular_eager_policy,
  52. &_starpu_sched_modular_eager_prefetching_policy,
  53. &_starpu_sched_modular_eager_prio_policy,
  54. &_starpu_sched_modular_gemm_policy,
  55. &_starpu_sched_modular_prio_policy,
  56. &_starpu_sched_modular_prio_prefetching_policy,
  57. &_starpu_sched_modular_random_policy,
  58. &_starpu_sched_modular_random_prio_policy,
  59. &_starpu_sched_modular_random_prefetching_policy,
  60. &_starpu_sched_modular_random_prio_prefetching_policy,
  61. &_starpu_sched_modular_parallel_random_policy,
  62. &_starpu_sched_modular_parallel_random_prio_policy,
  63. &_starpu_sched_modular_ws_policy,
  64. &_starpu_sched_modular_heft_policy,
  65. &_starpu_sched_modular_heft_prio_policy,
  66. &_starpu_sched_modular_heft2_policy,
  67. &_starpu_sched_modular_heteroprio_policy,
  68. &_starpu_sched_modular_heteroprio_heft_policy,
  69. &_starpu_sched_modular_parallel_heft_policy,
  70. &_starpu_sched_eager_policy,
  71. &_starpu_sched_prio_policy,
  72. &_starpu_sched_random_policy,
  73. &_starpu_sched_lws_policy,
  74. &_starpu_sched_ws_policy,
  75. &_starpu_sched_dm_policy,
  76. &_starpu_sched_dmda_policy,
  77. &_starpu_sched_dmda_prio_policy,
  78. &_starpu_sched_dmda_ready_policy,
  79. &_starpu_sched_dmda_sorted_policy,
  80. &_starpu_sched_dmda_sorted_decision_policy,
  81. &_starpu_sched_parallel_heft_policy,
  82. &_starpu_sched_peager_policy,
  83. &_starpu_sched_heteroprio_policy,
  84. &_starpu_sched_graph_test_policy,
  85. #ifdef STARPU_HAVE_HWLOC
  86. //&_starpu_sched_tree_heft_hierarchical_policy,
  87. #endif
  88. NULL
  89. };
  90. struct starpu_sched_policy **starpu_sched_get_predefined_policies()
  91. {
  92. return predefined_policies;
  93. }
  94. struct starpu_sched_policy *_starpu_get_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  95. {
  96. return sched_ctx->sched_policy;
  97. }
  98. /*
  99. * Methods to initialize the scheduling policy
  100. */
  101. static void load_sched_policy(struct starpu_sched_policy *sched_policy, struct _starpu_sched_ctx *sched_ctx)
  102. {
  103. STARPU_ASSERT(sched_policy);
  104. #ifdef STARPU_VERBOSE
  105. if (sched_policy->policy_name)
  106. {
  107. if (sched_policy->policy_description)
  108. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  109. else
  110. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  111. }
  112. #endif
  113. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  114. memcpy(policy, sched_policy, sizeof(*policy));
  115. }
  116. static struct starpu_sched_policy *find_sched_policy_from_name(const char *policy_name)
  117. {
  118. if (!policy_name)
  119. return NULL;
  120. if (strcmp(policy_name, "") == 0)
  121. return NULL;
  122. if (strncmp(policy_name, "heft", 4) == 0)
  123. {
  124. _STARPU_MSG("Warning: heft is now called \"dmda\".\n");
  125. return &_starpu_sched_dmda_policy;
  126. }
  127. struct starpu_sched_policy **policy;
  128. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  129. {
  130. struct starpu_sched_policy *p = *policy;
  131. if (p->policy_name)
  132. {
  133. if (strcmp(policy_name, p->policy_name) == 0)
  134. {
  135. /* we found a policy with the requested name */
  136. return p;
  137. }
  138. }
  139. }
  140. if (strcmp(policy_name, "help") != 0)
  141. _STARPU_MSG("Warning: scheduling policy '%s' was not found, try 'help' to get a list\n", policy_name);
  142. /* nothing was found */
  143. return NULL;
  144. }
  145. static void display_sched_help_message(FILE *stream)
  146. {
  147. const char *sched_env = starpu_getenv("STARPU_SCHED");
  148. if (sched_env && (strcmp(sched_env, "help") == 0))
  149. {
  150. /* display the description of all predefined policies */
  151. struct starpu_sched_policy **policy;
  152. fprintf(stream, "\nThe variable STARPU_SCHED can be set to one of the following strings:\n");
  153. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  154. {
  155. struct starpu_sched_policy *p = *policy;
  156. fprintf(stream, "%-30s\t-> %s\n", p->policy_name, p->policy_description);
  157. }
  158. fprintf(stream, "\n");
  159. }
  160. }
  161. struct starpu_sched_policy *_starpu_select_sched_policy(struct _starpu_machine_config *config, const char *required_policy)
  162. {
  163. struct starpu_sched_policy *selected_policy = NULL;
  164. struct starpu_conf *user_conf = &config->conf;
  165. if(required_policy)
  166. selected_policy = find_sched_policy_from_name(required_policy);
  167. /* If there is a policy that matches the required name, return it */
  168. if (selected_policy)
  169. return selected_policy;
  170. /* First, we check whether the application explicitely gave a scheduling policy or not */
  171. if (user_conf && (user_conf->sched_policy))
  172. return user_conf->sched_policy;
  173. /* Otherwise, we look if the application specified the name of a policy to load */
  174. const char *sched_pol_name;
  175. sched_pol_name = starpu_getenv("STARPU_SCHED");
  176. if (sched_pol_name == NULL && user_conf && user_conf->sched_policy_name)
  177. sched_pol_name = user_conf->sched_policy_name;
  178. if (sched_pol_name)
  179. selected_policy = find_sched_policy_from_name(sched_pol_name);
  180. /* If there is a policy that matches the name, return it */
  181. if (selected_policy)
  182. return selected_policy;
  183. /* If no policy was specified, we use the eager policy by default */
  184. return &_starpu_sched_lws_policy;
  185. }
  186. void _starpu_init_sched_policy(struct _starpu_machine_config *config, struct _starpu_sched_ctx *sched_ctx, struct starpu_sched_policy *selected_policy)
  187. {
  188. /* Perhaps we have to display some help */
  189. display_sched_help_message(stderr);
  190. /* Prefetch is activated by default */
  191. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  192. if (use_prefetch == -1)
  193. use_prefetch = 1;
  194. /* Set calibrate flag */
  195. _starpu_set_calibrate_flag(config->conf.calibrate);
  196. load_sched_policy(selected_policy, sched_ctx);
  197. if (starpu_get_env_number_default("STARPU_WORKER_TREE", 0))
  198. {
  199. #ifdef STARPU_HAVE_HWLOC
  200. sched_ctx->sched_policy->worker_type = STARPU_WORKER_TREE;
  201. #else
  202. _STARPU_DISP("STARPU_WORKER_TREE ignored, please rebuild StarPU with hwloc support to enable it.");
  203. #endif
  204. }
  205. starpu_sched_ctx_create_worker_collection(sched_ctx->id,
  206. sched_ctx->sched_policy->worker_type);
  207. _STARPU_SCHED_BEGIN;
  208. sched_ctx->sched_policy->init_sched(sched_ctx->id);
  209. _STARPU_SCHED_END;
  210. }
  211. void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  212. {
  213. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  214. if (policy->deinit_sched)
  215. {
  216. _STARPU_SCHED_BEGIN;
  217. policy->deinit_sched(sched_ctx->id);
  218. _STARPU_SCHED_END;
  219. }
  220. starpu_sched_ctx_delete_worker_collection(sched_ctx->id);
  221. }
  222. void _starpu_sched_task_submit(struct starpu_task *task)
  223. {
  224. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  225. if (!sched_ctx->sched_policy)
  226. return;
  227. if (!sched_ctx->sched_policy->submit_hook)
  228. return;
  229. _STARPU_SCHED_BEGIN;
  230. sched_ctx->sched_policy->submit_hook(task);
  231. _STARPU_SCHED_END;
  232. }
  233. void _starpu_sched_do_schedule(unsigned sched_ctx_id)
  234. {
  235. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  236. if (!sched_ctx->sched_policy)
  237. return;
  238. if (!sched_ctx->sched_policy->do_schedule)
  239. return;
  240. _STARPU_SCHED_BEGIN;
  241. sched_ctx->sched_policy->do_schedule(sched_ctx_id);
  242. _STARPU_SCHED_END;
  243. }
  244. static void _starpu_push_task_on_specific_worker_notify_sched(struct starpu_task *task, struct _starpu_worker *worker, int workerid, int perf_workerid)
  245. {
  246. /* if we push a task on a specific worker, notify all the sched_ctxs the worker belongs to */
  247. struct _starpu_sched_ctx_list_iterator list_it;
  248. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  249. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  250. {
  251. struct _starpu_sched_ctx_elt *e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  252. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  253. if (sched_ctx->sched_policy != NULL && sched_ctx->sched_policy->push_task_notify)
  254. {
  255. _STARPU_SCHED_BEGIN;
  256. sched_ctx->sched_policy->push_task_notify(task, workerid, perf_workerid, sched_ctx->id);
  257. _STARPU_SCHED_END;
  258. }
  259. }
  260. }
  261. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  262. * case workerid identifies a combined worker, a task will be enqueued into
  263. * each worker of the combination. */
  264. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  265. {
  266. int nbasic_workers = (int)starpu_worker_get_count();
  267. /* Is this a basic worker or a combined worker ? */
  268. int is_basic_worker = (workerid < nbasic_workers);
  269. struct _starpu_worker *worker = NULL;
  270. struct _starpu_combined_worker *combined_worker = NULL;
  271. if (is_basic_worker)
  272. {
  273. worker = _starpu_get_worker_struct(workerid);
  274. }
  275. else
  276. {
  277. combined_worker = _starpu_get_combined_worker_struct(workerid);
  278. }
  279. if (use_prefetch)
  280. starpu_prefetch_task_input_for(task, workerid);
  281. if (is_basic_worker)
  282. _starpu_push_task_on_specific_worker_notify_sched(task, worker, workerid, workerid);
  283. else
  284. {
  285. /* Notify all workers of the combined worker */
  286. int worker_size = combined_worker->worker_size;
  287. int *combined_workerid = combined_worker->combined_workerid;
  288. int j;
  289. for (j = 0; j < worker_size; j++)
  290. {
  291. int subworkerid = combined_workerid[j];
  292. _starpu_push_task_on_specific_worker_notify_sched(task, _starpu_get_worker_struct(subworkerid), subworkerid, workerid);
  293. }
  294. }
  295. #ifdef STARPU_USE_SC_HYPERVISOR
  296. starpu_sched_ctx_call_pushed_task_cb(workerid, task->sched_ctx);
  297. #endif //STARPU_USE_SC_HYPERVISOR
  298. if (is_basic_worker)
  299. {
  300. unsigned node = starpu_worker_get_memory_node(workerid);
  301. if (_starpu_task_uses_multiformat_handles(task))
  302. {
  303. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  304. unsigned i;
  305. for (i = 0; i < nbuffers; i++)
  306. {
  307. struct starpu_task *conversion_task;
  308. starpu_data_handle_t handle;
  309. handle = STARPU_TASK_GET_HANDLE(task, i);
  310. if (!_starpu_handle_needs_conversion_task(handle, node))
  311. continue;
  312. conversion_task = _starpu_create_conversion_task(handle, node);
  313. conversion_task->mf_skip = 1;
  314. conversion_task->execute_on_a_specific_worker = 1;
  315. conversion_task->workerid = workerid;
  316. _starpu_task_submit_conversion_task(conversion_task, workerid);
  317. //_STARPU_DEBUG("Pushing a conversion task\n");
  318. }
  319. for (i = 0; i < nbuffers; i++)
  320. {
  321. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  322. handle->mf_node = node;
  323. }
  324. }
  325. // if(task->sched_ctx != _starpu_get_initial_sched_ctx()->id)
  326. if(task->priority > 0)
  327. return _starpu_push_local_task(worker, task, 1);
  328. else
  329. return _starpu_push_local_task(worker, task, 0);
  330. }
  331. else
  332. {
  333. /* This is a combined worker so we create task aliases */
  334. int worker_size = combined_worker->worker_size;
  335. int *combined_workerid = combined_worker->combined_workerid;
  336. int ret = 0;
  337. struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
  338. job->task_size = worker_size;
  339. job->combined_workerid = workerid;
  340. job->active_task_alias_count = 0;
  341. STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, worker_size);
  342. STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, worker_size);
  343. job->after_work_busy_barrier = worker_size;
  344. /* Note: we have to call that early, or else the task may have
  345. * disappeared already */
  346. starpu_push_task_end(task);
  347. int j;
  348. for (j = 0; j < worker_size; j++)
  349. {
  350. struct starpu_task *alias = starpu_task_dup(task);
  351. alias->destroy = 1;
  352. _STARPU_TRACE_JOB_PUSH(alias, alias->priority > 0);
  353. worker = _starpu_get_worker_struct(combined_workerid[j]);
  354. ret |= _starpu_push_local_task(worker, alias, 0);
  355. }
  356. return ret;
  357. }
  358. }
  359. /* the generic interface that call the proper underlying implementation */
  360. int _starpu_push_task(struct _starpu_job *j)
  361. {
  362. if(j->task->prologue_callback_func)
  363. {
  364. _starpu_set_current_task(j->task);
  365. j->task->prologue_callback_func(j->task->prologue_callback_arg);
  366. _starpu_set_current_task(NULL);
  367. }
  368. return _starpu_repush_task(j);
  369. }
  370. int _starpu_repush_task(struct _starpu_job *j)
  371. {
  372. struct starpu_task *task = j->task;
  373. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  374. int ret;
  375. _STARPU_LOG_IN();
  376. unsigned can_push = _starpu_increment_nready_tasks_of_sched_ctx(task->sched_ctx, task->flops, task);
  377. STARPU_ASSERT(task->status == STARPU_TASK_BLOCKED || task->status == STARPU_TASK_BLOCKED_ON_TAG || task->status == STARPU_TASK_BLOCKED_ON_TASK || task->status == STARPU_TASK_BLOCKED_ON_DATA);
  378. task->status = STARPU_TASK_READY;
  379. const unsigned continuation =
  380. #ifdef STARPU_OPENMP
  381. j->continuation
  382. #else
  383. 0
  384. #endif
  385. ;
  386. if (!_starpu_perf_counter_paused() && !j->internal && !continuation)
  387. {
  388. (void) STARPU_ATOMIC_ADD64(& _starpu_task__g_current_submitted__value, -1);
  389. int64_t value = STARPU_ATOMIC_ADD64(& _starpu_task__g_current_ready__value, 1);
  390. _starpu_perf_counter_update_max_int64(&_starpu_task__g_peak_ready__value, value);
  391. if (task->cl && task->cl->perf_counter_values)
  392. {
  393. struct starpu_perf_counter_sample_cl_values * const pcv = task->cl->perf_counter_values;
  394. (void)STARPU_ATOMIC_ADD64(&pcv->task.current_submitted, -1);
  395. value = STARPU_ATOMIC_ADD64(&pcv->task.current_ready, 1);
  396. _starpu_perf_counter_update_max_int64(&pcv->task.peak_ready, value);
  397. }
  398. }
  399. STARPU_AYU_ADDTOTASKQUEUE(j->job_id, -1);
  400. /* if the context does not have any workers save the tasks in a temp list */
  401. if ((task->cl != NULL && task->where != STARPU_NOWHERE) && (!sched_ctx->is_initial_sched))
  402. {
  403. /*if there are workers in the ctx that are not able to execute tasks
  404. we consider the ctx empty */
  405. unsigned able = _starpu_workers_able_to_execute_task(task, sched_ctx);
  406. if(!able)
  407. {
  408. _starpu_sched_ctx_lock_write(sched_ctx->id);
  409. starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
  410. _starpu_sched_ctx_unlock_write(sched_ctx->id);
  411. #ifdef STARPU_USE_SC_HYPERVISOR
  412. if(sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  413. && sched_ctx->perf_counters->notify_empty_ctx)
  414. {
  415. _STARPU_TRACE_HYPERVISOR_BEGIN();
  416. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  417. _STARPU_TRACE_HYPERVISOR_END();
  418. }
  419. #endif
  420. return 0;
  421. }
  422. }
  423. if(!can_push)
  424. return 0;
  425. /* in case there is no codelet associated to the task (that's a control
  426. * task), we directly execute its callback and enforce the
  427. * corresponding dependencies */
  428. if (task->cl == NULL || task->where == STARPU_NOWHERE)
  429. {
  430. if (!_starpu_perf_counter_paused() && !j->internal)
  431. {
  432. (void)STARPU_ATOMIC_ADD64(& _starpu_task__g_current_ready__value, -1);
  433. if (task->cl && task->cl->perf_counter_values)
  434. {
  435. struct starpu_perf_counter_sample_cl_values * const pcv = task->cl->perf_counter_values;
  436. (void)STARPU_ATOMIC_ADD64(&pcv->task.current_ready, -1);
  437. }
  438. }
  439. task->status = STARPU_TASK_RUNNING;
  440. if (task->prologue_callback_pop_func)
  441. {
  442. _starpu_set_current_task(task);
  443. task->prologue_callback_pop_func(task->prologue_callback_pop_arg);
  444. _starpu_set_current_task(NULL);
  445. }
  446. if (task->cl && task->cl->specific_nodes)
  447. {
  448. /* Nothing to do, but we are asked to fetch data on some memory nodes */
  449. _starpu_fetch_nowhere_task_input(j);
  450. }
  451. else
  452. {
  453. if (task->cl)
  454. __starpu_push_task_output(j);
  455. _starpu_handle_job_termination(j);
  456. _STARPU_LOG_OUT_TAG("handle_job_termination");
  457. }
  458. return 0;
  459. }
  460. ret = _starpu_push_task_to_workers(task);
  461. if (ret == -EAGAIN)
  462. /* pushed to empty context, that's fine */
  463. ret = 0;
  464. return ret;
  465. }
  466. int _starpu_push_task_to_workers(struct starpu_task *task)
  467. {
  468. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  469. _STARPU_TRACE_JOB_PUSH(task, task->priority > 0);
  470. /* if the contexts still does not have workers put the task back to its place in
  471. the empty ctx list */
  472. if(!sched_ctx->is_initial_sched)
  473. {
  474. /*if there are workers in the ctx that are not able to execute tasks
  475. we consider the ctx empty */
  476. unsigned able = _starpu_workers_able_to_execute_task(task, sched_ctx);
  477. if (!able)
  478. {
  479. _starpu_sched_ctx_lock_write(sched_ctx->id);
  480. starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task);
  481. _starpu_sched_ctx_unlock_write(sched_ctx->id);
  482. #ifdef STARPU_USE_SC_HYPERVISOR
  483. if(sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  484. && sched_ctx->perf_counters->notify_empty_ctx)
  485. {
  486. _STARPU_TRACE_HYPERVISOR_BEGIN();
  487. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  488. _STARPU_TRACE_HYPERVISOR_END();
  489. }
  490. #endif
  491. return -EAGAIN;
  492. }
  493. }
  494. _starpu_profiling_set_task_push_start_time(task);
  495. int ret = 0;
  496. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  497. {
  498. if (starpu_get_prefetch_flag())
  499. starpu_prefetch_task_input_for(task, task->workerid);
  500. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  501. }
  502. else
  503. {
  504. struct _starpu_machine_config *config = _starpu_get_machine_config();
  505. /* When a task can only be executed on a given arch and we have
  506. * only one memory node for that arch, we can systematically
  507. * prefetch before the scheduling decision. */
  508. if (starpu_get_prefetch_flag() && starpu_memory_nodes_get_count() > 1)
  509. {
  510. if (task->where == STARPU_CPU && config->cpus_nodeid >= 0)
  511. starpu_prefetch_task_input_on_node(task, config->cpus_nodeid);
  512. else if (task->where == STARPU_CUDA && config->cuda_nodeid >= 0)
  513. starpu_prefetch_task_input_on_node(task, config->cuda_nodeid);
  514. else if (task->where == STARPU_OPENCL && config->opencl_nodeid >= 0)
  515. starpu_prefetch_task_input_on_node(task, config->opencl_nodeid);
  516. else if (task->where == STARPU_MIC && config->mic_nodeid >= 0)
  517. starpu_prefetch_task_input_on_node(task, config->mic_nodeid);
  518. }
  519. if(!sched_ctx->sched_policy)
  520. {
  521. /* Note: we have to call that early, or else the task may have
  522. * disappeared already */
  523. starpu_push_task_end(task);
  524. if(!sched_ctx->awake_workers)
  525. ret = _starpu_push_task_on_specific_worker(task, sched_ctx->main_master);
  526. else
  527. {
  528. struct starpu_worker_collection *workers = sched_ctx->workers;
  529. struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
  530. job->task_size = workers->nworkers;
  531. job->combined_workerid = -1; // workerid; its a ctx not combined worker
  532. job->active_task_alias_count = 0;
  533. STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, workers->nworkers);
  534. STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, workers->nworkers);
  535. job->after_work_busy_barrier = workers->nworkers;
  536. struct starpu_sched_ctx_iterator it;
  537. if(workers->init_iterator)
  538. workers->init_iterator(workers, &it);
  539. while(workers->has_next(workers, &it))
  540. {
  541. unsigned workerid = workers->get_next(workers, &it);
  542. struct starpu_task *alias;
  543. if (job->task_size > 1)
  544. {
  545. alias = starpu_task_dup(task);
  546. _STARPU_TRACE_JOB_PUSH(alias, alias->priority > 0);
  547. alias->destroy = 1;
  548. }
  549. else
  550. alias = task;
  551. ret |= _starpu_push_task_on_specific_worker(alias, workerid);
  552. }
  553. }
  554. }
  555. else
  556. {
  557. STARPU_ASSERT(sched_ctx->sched_policy->push_task);
  558. /* check out if there are any workers in the context */
  559. unsigned nworkers = starpu_sched_ctx_get_nworkers(sched_ctx->id);
  560. if (nworkers == 0)
  561. ret = -1;
  562. else
  563. {
  564. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  565. if (worker)
  566. {
  567. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  568. _starpu_worker_enter_sched_op(worker);
  569. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  570. }
  571. _STARPU_TASK_BREAK_ON(task, push);
  572. _STARPU_SCHED_BEGIN;
  573. ret = sched_ctx->sched_policy->push_task(task);
  574. _STARPU_SCHED_END;
  575. if (worker)
  576. {
  577. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  578. _starpu_worker_leave_sched_op(worker);
  579. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  580. }
  581. }
  582. }
  583. if(ret == -1)
  584. {
  585. _STARPU_MSG("repush task \n");
  586. _STARPU_TRACE_JOB_POP(task, task->priority > 0);
  587. ret = _starpu_push_task_to_workers(task);
  588. }
  589. }
  590. /* Note: from here, the task might have been destroyed already! */
  591. _STARPU_LOG_OUT();
  592. return ret;
  593. }
  594. /* This is called right after the scheduler has pushed a task to a queue
  595. * but just before releasing mutexes: we need the task to still be alive!
  596. */
  597. int starpu_push_task_end(struct starpu_task *task)
  598. {
  599. _starpu_profiling_set_task_push_end_time(task);
  600. task->scheduled = 1;
  601. return 0;
  602. }
  603. /* This is called right after the scheduler has pushed a task to a queue
  604. * but just before releasing mutexes: we need the task to still be alive!
  605. */
  606. int _starpu_pop_task_end(struct starpu_task *task)
  607. {
  608. if (!task)
  609. return 0;
  610. _STARPU_TRACE_JOB_POP(task, task->priority > 0);
  611. return 0;
  612. }
  613. /*
  614. * Given a handle that needs to be converted in order to be used on the given
  615. * node, returns a task that takes care of the conversion.
  616. */
  617. struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle, unsigned int node)
  618. {
  619. return _starpu_create_conversion_task_for_arch(handle, starpu_node_get_kind(node));
  620. }
  621. struct starpu_task *_starpu_create_conversion_task_for_arch(starpu_data_handle_t handle, enum starpu_node_kind node_kind)
  622. {
  623. struct starpu_task *conversion_task;
  624. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_SIMGRID)
  625. struct starpu_multiformat_interface *format_interface;
  626. #endif
  627. conversion_task = starpu_task_create();
  628. conversion_task->name = "conversion_task";
  629. conversion_task->synchronous = 0;
  630. STARPU_TASK_SET_HANDLE(conversion_task, handle, 0);
  631. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_SIMGRID)
  632. /* The node does not really matter here */
  633. format_interface = (struct starpu_multiformat_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
  634. #endif
  635. _starpu_spin_lock(&handle->header_lock);
  636. handle->refcnt++;
  637. handle->busy_count++;
  638. _starpu_spin_unlock(&handle->header_lock);
  639. switch(node_kind)
  640. {
  641. case STARPU_CPU_RAM:
  642. switch (starpu_node_get_kind(handle->mf_node))
  643. {
  644. case STARPU_CPU_RAM:
  645. STARPU_ABORT();
  646. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  647. case STARPU_CUDA_RAM:
  648. {
  649. struct starpu_multiformat_data_interface_ops *mf_ops;
  650. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  651. conversion_task->cl = mf_ops->cuda_to_cpu_cl;
  652. break;
  653. }
  654. #endif
  655. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  656. case STARPU_OPENCL_RAM:
  657. {
  658. struct starpu_multiformat_data_interface_ops *mf_ops;
  659. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  660. conversion_task->cl = mf_ops->opencl_to_cpu_cl;
  661. break;
  662. }
  663. #endif
  664. #ifdef STARPU_USE_MIC
  665. case STARPU_MIC_RAM:
  666. {
  667. struct starpu_multiformat_data_interface_ops *mf_ops;
  668. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  669. conversion_task->cl = mf_ops->mic_to_cpu_cl;
  670. break;
  671. }
  672. #endif
  673. default:
  674. _STARPU_ERROR("Oops : %u\n", handle->mf_node);
  675. }
  676. break;
  677. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  678. case STARPU_CUDA_RAM:
  679. {
  680. struct starpu_multiformat_data_interface_ops *mf_ops;
  681. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  682. conversion_task->cl = mf_ops->cpu_to_cuda_cl;
  683. break;
  684. }
  685. #endif
  686. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  687. case STARPU_OPENCL_RAM:
  688. {
  689. struct starpu_multiformat_data_interface_ops *mf_ops;
  690. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  691. conversion_task->cl = mf_ops->cpu_to_opencl_cl;
  692. break;
  693. }
  694. #endif
  695. #ifdef STARPU_USE_MIC
  696. case STARPU_MIC_RAM:
  697. {
  698. struct starpu_multiformat_data_interface_ops *mf_ops;
  699. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  700. conversion_task->cl = mf_ops->cpu_to_mic_cl;
  701. break;
  702. }
  703. #endif
  704. default:
  705. STARPU_ABORT();
  706. }
  707. _starpu_codelet_check_deprecated_fields(conversion_task->cl);
  708. STARPU_TASK_SET_MODE(conversion_task, STARPU_RW, 0);
  709. return conversion_task;
  710. }
  711. static
  712. struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker *worker)
  713. {
  714. struct _starpu_sched_ctx_elt *e = NULL;
  715. struct _starpu_sched_ctx_list_iterator list_it;
  716. int found = 0;
  717. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  718. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  719. {
  720. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  721. if (e->task_number > 0)
  722. return _starpu_get_sched_ctx_struct(e->sched_ctx);
  723. }
  724. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  725. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  726. {
  727. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  728. if (e->last_poped)
  729. {
  730. e->last_poped = 0;
  731. if (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  732. {
  733. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  734. found = 1;
  735. }
  736. break;
  737. }
  738. }
  739. if (!found)
  740. e = worker->sched_ctx_list->head;
  741. e->last_poped = 1;
  742. return _starpu_get_sched_ctx_struct(e->sched_ctx);
  743. }
  744. struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
  745. {
  746. struct starpu_task *task;
  747. int worker_id;
  748. unsigned node;
  749. /* We can't tell in advance which task will be picked up, so we measure
  750. * a timestamp, and will attribute it afterwards to the task. */
  751. int profiling = starpu_profiling_status_get();
  752. struct timespec pop_start_time;
  753. if (profiling)
  754. _starpu_clock_gettime(&pop_start_time);
  755. pick:
  756. /* perhaps there is some local task to be executed first */
  757. task = _starpu_pop_local_task(worker);
  758. if (task)
  759. _STARPU_TASK_BREAK_ON(task, pop);
  760. /* get tasks from the stacks of the strategy */
  761. if(!task)
  762. {
  763. struct _starpu_sched_ctx *sched_ctx ;
  764. #ifndef STARPU_NON_BLOCKING_DRIVERS
  765. int been_here[STARPU_NMAX_SCHED_CTXS];
  766. int i;
  767. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  768. been_here[i] = 0;
  769. while(!task)
  770. #endif
  771. {
  772. if(worker->nsched_ctxs == 1)
  773. sched_ctx = _starpu_get_initial_sched_ctx();
  774. else
  775. {
  776. while(1)
  777. {
  778. /** Caution
  779. * If you use multiple contexts your scheduler *needs*
  780. * to update the variable task_number of the ctx list.
  781. * In order to get the best performances.
  782. * This is done using functions :
  783. * starpu_sched_ctx_list_task_counters_increment...(...)
  784. * starpu_sched_ctx_list_task_counters_decrement...(...)
  785. **/
  786. sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
  787. if(worker->removed_from_ctx[sched_ctx->id] == 1 && worker->shares_tasks_lists[sched_ctx->id] == 1)
  788. {
  789. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  790. worker->removed_from_ctx[sched_ctx->id] = 0;
  791. sched_ctx = NULL;
  792. }
  793. else
  794. break;
  795. }
  796. }
  797. if(sched_ctx && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  798. {
  799. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
  800. {
  801. /* Note: we do not push the scheduling state here, because
  802. * otherwise when a worker is idle, we'd keep
  803. * pushing/popping a scheduling state here, while what we
  804. * want to see in the trace is a permanent idle state. */
  805. task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
  806. if (task)
  807. _STARPU_TASK_BREAK_ON(task, pop);
  808. _starpu_pop_task_end(task);
  809. }
  810. }
  811. if(!task)
  812. {
  813. /* it doesn't matter if it shares tasks list or not in the scheduler,
  814. if it does not have any task to pop just get it out of here */
  815. /* however if it shares a task list it will be removed as soon as he
  816. finishes this job (in handle_job_termination) */
  817. if(worker->removed_from_ctx[sched_ctx->id])
  818. {
  819. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  820. worker->removed_from_ctx[sched_ctx->id] = 0;
  821. }
  822. #ifdef STARPU_USE_SC_HYPERVISOR
  823. if(worker->pop_ctx_priority)
  824. {
  825. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  826. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_idle_cycle && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
  827. {
  828. // _STARPU_TRACE_HYPERVISOR_BEGIN();
  829. perf_counters->notify_idle_cycle(sched_ctx->id, worker->workerid, 1.0);
  830. // _STARPU_TRACE_HYPERVISOR_END();
  831. }
  832. }
  833. #endif //STARPU_USE_SC_HYPERVISOR
  834. #ifndef STARPU_NON_BLOCKING_DRIVERS
  835. if(been_here[sched_ctx->id] || worker->nsched_ctxs == 1)
  836. break;
  837. been_here[sched_ctx->id] = 1;
  838. #endif
  839. }
  840. }
  841. }
  842. if (!task)
  843. {
  844. if (starpu_idle_file)
  845. idle_start[worker->workerid] = starpu_timing_now();
  846. return NULL;
  847. }
  848. if(starpu_idle_file && idle_start[worker->workerid] != 0.0)
  849. {
  850. double idle_end = starpu_timing_now();
  851. idle[worker->workerid] += (idle_end - idle_start[worker->workerid]);
  852. idle_start[worker->workerid] = 0.0;
  853. }
  854. #ifdef STARPU_USE_SC_HYPERVISOR
  855. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  856. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  857. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_poped_task && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
  858. {
  859. // _STARPU_TRACE_HYPERVISOR_BEGIN();
  860. perf_counters->notify_poped_task(task->sched_ctx, worker->workerid);
  861. // _STARPU_TRACE_HYPERVISOR_END();
  862. }
  863. #endif //STARPU_USE_SC_HYPERVISOR
  864. /* Make sure we do not bother with all the multiformat-specific code if
  865. * it is not necessary. */
  866. if (!_starpu_task_uses_multiformat_handles(task))
  867. goto profiling;
  868. /* This is either a conversion task, or a regular task for which the
  869. * conversion tasks have already been created and submitted */
  870. if (task->mf_skip)
  871. goto profiling;
  872. /*
  873. * This worker may not be able to execute this task. In this case, we
  874. * should return the task anyway. It will be pushed back almost immediatly.
  875. * This way, we avoid computing and executing the conversions tasks.
  876. * Here, we do not care about what implementation is used.
  877. */
  878. worker_id = starpu_worker_get_id_check();
  879. if (!starpu_worker_can_execute_task_first_impl(worker_id, task, NULL))
  880. return task;
  881. node = starpu_worker_get_memory_node(worker_id);
  882. /*
  883. * We do have a task that uses multiformat handles. Let's create the
  884. * required conversion tasks.
  885. */
  886. unsigned i;
  887. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  888. for (i = 0; i < nbuffers; i++)
  889. {
  890. struct starpu_task *conversion_task;
  891. starpu_data_handle_t handle;
  892. handle = STARPU_TASK_GET_HANDLE(task, i);
  893. if (!_starpu_handle_needs_conversion_task(handle, node))
  894. continue;
  895. conversion_task = _starpu_create_conversion_task(handle, node);
  896. conversion_task->mf_skip = 1;
  897. conversion_task->execute_on_a_specific_worker = 1;
  898. conversion_task->workerid = worker_id;
  899. /*
  900. * Next tasks will need to know where these handles have gone.
  901. */
  902. handle->mf_node = node;
  903. _starpu_task_submit_conversion_task(conversion_task, worker_id);
  904. }
  905. task->mf_skip = 1;
  906. starpu_task_list_push_back(&worker->local_tasks, task);
  907. goto pick;
  908. profiling:
  909. if (profiling)
  910. {
  911. struct starpu_profiling_task_info *profiling_info;
  912. profiling_info = task->profiling_info;
  913. /* The task may have been created before profiling was enabled,
  914. * so we check if the profiling_info structure is available
  915. * even though we already tested if profiling is enabled. */
  916. if (profiling_info)
  917. {
  918. memcpy(&profiling_info->pop_start_time,
  919. &pop_start_time, sizeof(struct timespec));
  920. _starpu_clock_gettime(&profiling_info->pop_end_time);
  921. }
  922. }
  923. if(task->prologue_callback_pop_func)
  924. {
  925. _starpu_set_current_task(task);
  926. task->prologue_callback_pop_func(task->prologue_callback_pop_arg);
  927. _starpu_set_current_task(NULL);
  928. }
  929. return task;
  930. }
  931. struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx)
  932. {
  933. struct starpu_task *task = NULL;
  934. if(sched_ctx->sched_policy)
  935. {
  936. STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
  937. /* TODO set profiling info */
  938. if(sched_ctx->sched_policy->pop_every_task)
  939. {
  940. _STARPU_SCHED_BEGIN;
  941. task = sched_ctx->sched_policy->pop_every_task(sched_ctx->id);
  942. _STARPU_SCHED_END;
  943. }
  944. }
  945. return task;
  946. }
  947. void _starpu_sched_pre_exec_hook(struct starpu_task *task)
  948. {
  949. unsigned sched_ctx_id = starpu_sched_ctx_get_ctx_for_task(task);
  950. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  951. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pre_exec_hook)
  952. {
  953. _STARPU_SCHED_BEGIN;
  954. sched_ctx->sched_policy->pre_exec_hook(task, sched_ctx_id);
  955. _STARPU_SCHED_END;
  956. }
  957. if(!sched_ctx->sched_policy)
  958. {
  959. int workerid = starpu_worker_get_id();
  960. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  961. struct _starpu_sched_ctx_list_iterator list_it;
  962. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  963. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  964. {
  965. struct _starpu_sched_ctx *other_sched_ctx;
  966. struct _starpu_sched_ctx_elt *e;
  967. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  968. other_sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  969. if (other_sched_ctx != sched_ctx &&
  970. other_sched_ctx->sched_policy != NULL &&
  971. other_sched_ctx->sched_policy->pre_exec_hook)
  972. {
  973. _STARPU_SCHED_BEGIN;
  974. other_sched_ctx->sched_policy->pre_exec_hook(task, other_sched_ctx->id);
  975. _STARPU_SCHED_END;
  976. }
  977. }
  978. }
  979. }
  980. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  981. {
  982. STARPU_ASSERT(task->cl != NULL && task->cl->where != STARPU_NOWHERE);
  983. unsigned sched_ctx_id = starpu_sched_ctx_get_ctx_for_task(task);
  984. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  985. if (sched_ctx->sched_policy && sched_ctx->sched_policy->post_exec_hook)
  986. {
  987. _STARPU_SCHED_BEGIN;
  988. sched_ctx->sched_policy->post_exec_hook(task, sched_ctx_id);
  989. _STARPU_SCHED_END;
  990. }
  991. if(!sched_ctx->sched_policy)
  992. {
  993. int workerid = starpu_worker_get_id();
  994. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  995. struct _starpu_sched_ctx_list_iterator list_it;
  996. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  997. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  998. {
  999. struct _starpu_sched_ctx *other_sched_ctx;
  1000. struct _starpu_sched_ctx_elt *e;
  1001. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  1002. other_sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  1003. if (other_sched_ctx != sched_ctx &&
  1004. other_sched_ctx->sched_policy != NULL &&
  1005. other_sched_ctx->sched_policy->post_exec_hook)
  1006. {
  1007. _STARPU_SCHED_BEGIN;
  1008. other_sched_ctx->sched_policy->post_exec_hook(task, other_sched_ctx->id);
  1009. _STARPU_SCHED_END;
  1010. }
  1011. }
  1012. }
  1013. }
  1014. void _starpu_wait_on_sched_event(void)
  1015. {
  1016. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  1017. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  1018. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  1019. if (_starpu_machine_is_running())
  1020. {
  1021. #ifndef STARPU_NON_BLOCKING_DRIVERS
  1022. STARPU_PTHREAD_COND_WAIT(&worker->sched_cond,
  1023. &worker->sched_mutex);
  1024. #endif
  1025. }
  1026. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  1027. }
  1028. /* The scheduling policy may put tasks directly into a worker's local queue so
  1029. * that it is not always necessary to create its own queue when the local queue
  1030. * is sufficient. If "back" not null, the task is put at the back of the queue
  1031. * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  1032. * a FIFO ordering. */
  1033. int starpu_push_local_task(int workerid, struct starpu_task *task, int prio)
  1034. {
  1035. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1036. return _starpu_push_local_task(worker, task, prio);
  1037. }
  1038. void _starpu_print_idle_time()
  1039. {
  1040. if(!starpu_idle_file)
  1041. return;
  1042. double all_idle = 0.0;
  1043. int i = 0;
  1044. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  1045. all_idle += idle[i];
  1046. FILE *f;
  1047. f = fopen(starpu_idle_file, "a");
  1048. if (!f)
  1049. {
  1050. _STARPU_MSG("couldn't open %s: %s\n", starpu_idle_file, strerror(errno));
  1051. }
  1052. else
  1053. {
  1054. fprintf(f, "%lf \n", all_idle);
  1055. fclose(f);
  1056. }
  1057. }
  1058. void starpu_sched_task_break(struct starpu_task *task)
  1059. {
  1060. _STARPU_TASK_BREAK_ON(task, sched);
  1061. }