sched_policy.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2012 Université de Bordeaux 1
  4. * Copyright (C) 2010-2012 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <pthread.h>
  19. #include <starpu.h>
  20. #include <common/config.h>
  21. #include <common/utils.h>
  22. #include <core/sched_policy.h>
  23. #include <profiling/profiling.h>
  24. #include <common/barrier.h>
  25. static int use_prefetch = 0;
  26. int starpu_get_prefetch_flag(void)
  27. {
  28. return use_prefetch;
  29. }
  30. /*
  31. * Predefined policies
  32. */
  33. extern struct starpu_sched_policy _starpu_sched_ws_policy;
  34. extern struct starpu_sched_policy _starpu_sched_prio_policy;
  35. extern struct starpu_sched_policy _starpu_sched_random_policy;
  36. extern struct starpu_sched_policy _starpu_sched_dm_policy;
  37. extern struct starpu_sched_policy _starpu_sched_dmda_policy;
  38. extern struct starpu_sched_policy _starpu_sched_dmda_ready_policy;
  39. extern struct starpu_sched_policy _starpu_sched_dmda_sorted_policy;
  40. extern struct starpu_sched_policy _starpu_sched_eager_policy;
  41. extern struct starpu_sched_policy _starpu_sched_parallel_heft_policy;
  42. extern struct starpu_sched_policy _starpu_sched_pgreedy_policy;
  43. extern struct starpu_sched_policy heft_policy;
  44. static struct starpu_sched_policy *predefined_policies[] =
  45. {
  46. &_starpu_sched_ws_policy,
  47. &_starpu_sched_prio_policy,
  48. &_starpu_sched_dm_policy,
  49. &_starpu_sched_dmda_policy,
  50. &heft_policy,
  51. &_starpu_sched_dmda_ready_policy,
  52. &_starpu_sched_dmda_sorted_policy,
  53. &_starpu_sched_random_policy,
  54. &_starpu_sched_eager_policy,
  55. &_starpu_sched_parallel_heft_policy,
  56. &_starpu_sched_pgreedy_policy
  57. };
  58. struct starpu_sched_policy *_starpu_get_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  59. {
  60. return sched_ctx->sched_policy;
  61. }
  62. /*
  63. * Methods to initialize the scheduling policy
  64. */
  65. static void load_sched_policy(struct starpu_sched_policy *sched_policy, struct _starpu_sched_ctx *sched_ctx)
  66. {
  67. STARPU_ASSERT(sched_policy);
  68. #ifdef STARPU_VERBOSE
  69. if (sched_policy->policy_name)
  70. {
  71. if (sched_policy->policy_description)
  72. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  73. else
  74. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  75. }
  76. #endif
  77. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  78. policy->init_sched = sched_policy->init_sched;
  79. policy->deinit_sched = sched_policy->deinit_sched;
  80. policy->push_task = sched_policy->push_task;
  81. policy->pop_task = sched_policy->pop_task;
  82. policy->pre_exec_hook = sched_policy->pre_exec_hook;
  83. policy->post_exec_hook = sched_policy->post_exec_hook;
  84. policy->pop_every_task = sched_policy->pop_every_task;
  85. policy->push_task_notify = sched_policy->push_task_notify;
  86. policy->policy_name = sched_policy->policy_name;
  87. policy->add_workers = sched_policy->add_workers;
  88. policy->remove_workers = sched_policy->remove_workers;
  89. }
  90. static struct starpu_sched_policy *find_sched_policy_from_name(const char *policy_name)
  91. {
  92. if (!policy_name)
  93. return NULL;
  94. unsigned i;
  95. for (i = 0; i < sizeof(predefined_policies)/sizeof(predefined_policies[0]); i++)
  96. {
  97. struct starpu_sched_policy *p;
  98. p = predefined_policies[i];
  99. if (p->policy_name)
  100. {
  101. if (strcmp(policy_name, p->policy_name) == 0)
  102. {
  103. /* we found a policy with the requested name */
  104. return p;
  105. }
  106. }
  107. }
  108. fprintf(stderr, "Warning: scheduling policy \"%s\" was not found, try \"help\" to get a list\n", policy_name);
  109. /* nothing was found */
  110. return NULL;
  111. }
  112. static void display_sched_help_message(void)
  113. {
  114. const char *sched_env = getenv("STARPU_SCHED");
  115. if (sched_env && (strcmp(sched_env, "help") == 0))
  116. {
  117. fprintf(stderr, "STARPU_SCHED can be either of\n");
  118. /* display the description of all predefined policies */
  119. unsigned i;
  120. for (i = 0; i < sizeof(predefined_policies)/sizeof(predefined_policies[0]); i++)
  121. {
  122. struct starpu_sched_policy *p;
  123. p = predefined_policies[i];
  124. fprintf(stderr, "%s\t-> %s\n", p->policy_name, p->policy_description);
  125. }
  126. }
  127. }
  128. static struct starpu_sched_policy *select_sched_policy(struct _starpu_machine_config *config, const char *required_policy)
  129. {
  130. struct starpu_sched_policy *selected_policy = NULL;
  131. struct starpu_conf *user_conf = config->conf;
  132. if(required_policy)
  133. selected_policy = find_sched_policy_from_name(required_policy);
  134. /* First, we check whether the application explicitely gave a scheduling policy or not */
  135. if (!selected_policy && user_conf && (user_conf->sched_policy))
  136. return user_conf->sched_policy;
  137. /* Otherwise, we look if the application specified the name of a policy to load */
  138. const char *sched_pol_name;
  139. sched_pol_name = getenv("STARPU_SCHED");
  140. if (sched_pol_name == NULL && user_conf && user_conf->sched_policy_name)
  141. sched_pol_name = user_conf->sched_policy_name;
  142. if (!selected_policy && sched_pol_name)
  143. selected_policy = find_sched_policy_from_name(sched_pol_name);
  144. /* Perhaps there was no policy that matched the name */
  145. if (selected_policy)
  146. return selected_policy;
  147. /* If no policy was specified, we use the greedy policy as a default */
  148. return &_starpu_sched_eager_policy;
  149. }
  150. void _starpu_init_sched_policy(struct _starpu_machine_config *config, struct _starpu_sched_ctx *sched_ctx, const char *required_policy)
  151. {
  152. /* Perhaps we have to display some help */
  153. display_sched_help_message();
  154. /* Prefetch is activated by default */
  155. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  156. if (use_prefetch == -1)
  157. use_prefetch = 1;
  158. /* Set calibrate flag */
  159. _starpu_set_calibrate_flag(config->conf->calibrate);
  160. struct starpu_sched_policy *selected_policy;
  161. selected_policy = select_sched_policy(config, required_policy);
  162. load_sched_policy(selected_policy, sched_ctx);
  163. sched_ctx->sched_policy->init_sched(sched_ctx->id);
  164. }
  165. void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  166. {
  167. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  168. if (policy->deinit_sched)
  169. policy->deinit_sched(sched_ctx->id);
  170. }
  171. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  172. * case workerid identifies a combined worker, a task will be enqueued into
  173. * each worker of the combination. */
  174. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  175. {
  176. int nbasic_workers = (int)starpu_worker_get_count();
  177. /* Is this a basic worker or a combined worker ? */
  178. int is_basic_worker = (workerid < nbasic_workers);
  179. unsigned memory_node;
  180. struct _starpu_worker *worker = NULL;
  181. struct _starpu_combined_worker *combined_worker = NULL;
  182. if (is_basic_worker)
  183. {
  184. worker = _starpu_get_worker_struct(workerid);
  185. memory_node = worker->memory_node;
  186. }
  187. else
  188. {
  189. combined_worker = _starpu_get_combined_worker_struct(workerid);
  190. memory_node = combined_worker->memory_node;
  191. }
  192. if (use_prefetch)
  193. starpu_prefetch_task_input_on_node(task, memory_node);
  194. /* if we push a task on a specific worker, notify all the sched_ctxs the worker belongs to */
  195. unsigned i;
  196. struct _starpu_sched_ctx *sched_ctx;
  197. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  198. {
  199. sched_ctx = worker->sched_ctx[i];
  200. if (sched_ctx != NULL && sched_ctx->sched_policy != NULL && sched_ctx->sched_policy->push_task_notify)
  201. {
  202. sched_ctx->sched_policy->push_task_notify(task, workerid);
  203. }
  204. }
  205. #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
  206. starpu_call_pushed_task_cb(workerid, task->sched_ctx);
  207. #endif //STARPU_USE_SCHED_CTX_HYPERVISOR
  208. if (is_basic_worker)
  209. {
  210. unsigned node = starpu_worker_get_memory_node(workerid);
  211. if (_starpu_task_uses_multiformat_handles(task))
  212. {
  213. unsigned i;
  214. for (i = 0; i < task->cl->nbuffers; i++)
  215. {
  216. struct starpu_task *conversion_task;
  217. starpu_data_handle_t handle;
  218. handle = task->handles[i];
  219. if (!_starpu_handle_needs_conversion_task(handle, node))
  220. continue;
  221. conversion_task = _starpu_create_conversion_task(handle, node);
  222. conversion_task->mf_skip = 1;
  223. conversion_task->execute_on_a_specific_worker = 1;
  224. conversion_task->workerid = workerid;
  225. _starpu_task_submit_conversion_task(conversion_task, workerid);
  226. //_STARPU_DEBUG("Pushing a conversion task\n");
  227. }
  228. for (i = 0; i < task->cl->nbuffers; i++)
  229. task->handles[i]->mf_node = node;
  230. }
  231. return _starpu_push_local_task(worker, task, 0);
  232. }
  233. else
  234. {
  235. /* This is a combined worker so we create task aliases */
  236. int worker_size = combined_worker->worker_size;
  237. int *combined_workerid = combined_worker->combined_workerid;
  238. int ret = 0;
  239. int i;
  240. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  241. j->task_size = worker_size;
  242. j->combined_workerid = workerid;
  243. j->active_task_alias_count = 0;
  244. _STARPU_PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
  245. _STARPU_PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
  246. int k;
  247. for (k = 0; k < worker_size; k++)
  248. {
  249. struct starpu_task *alias = _starpu_create_task_alias(task);
  250. worker = _starpu_get_worker_struct(combined_workerid[k]);
  251. ret |= _starpu_push_local_task(worker, alias, 0);
  252. }
  253. return ret;
  254. }
  255. }
  256. static int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _starpu_sched_ctx *sched_ctx)
  257. {
  258. int worker = -1, nworkers = 0;
  259. struct worker_collection *workers = sched_ctx->workers;
  260. if(workers->init_cursor)
  261. workers->init_cursor(workers);
  262. while(workers->has_next(workers))
  263. {
  264. worker = workers->get_next(workers);
  265. if (starpu_worker_can_execute_task(worker, task, 0))
  266. nworkers++;
  267. }
  268. if(workers->init_cursor)
  269. workers->deinit_cursor(workers);
  270. return nworkers;
  271. }
  272. /* the generic interface that call the proper underlying implementation */
  273. int _starpu_push_task(struct _starpu_job *j)
  274. {
  275. struct starpu_task *task = j->task;
  276. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  277. int workerid = starpu_worker_get_id();
  278. unsigned nworkers = 0;
  279. if(!sched_ctx->is_initial_sched)
  280. {
  281. /*if there are workers in the ctx that are not able to execute tasks
  282. we consider the ctx empty */
  283. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  284. if(nworkers == 0)
  285. {
  286. if(workerid == -1)
  287. {
  288. _STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->no_workers_mutex);
  289. _STARPU_PTHREAD_COND_WAIT(&sched_ctx->no_workers_cond, &sched_ctx->no_workers_mutex);
  290. _STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->no_workers_mutex);
  291. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  292. if(nworkers == 0) return _starpu_push_task(j);
  293. }
  294. else
  295. {
  296. _STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  297. starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
  298. _STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  299. return 0;
  300. }
  301. }
  302. }
  303. _STARPU_LOG_IN();
  304. _starpu_increment_nready_tasks();
  305. task->status = STARPU_TASK_READY;
  306. _starpu_profiling_set_task_push_start_time(task);
  307. /* in case there is no codelet associated to the task (that's a control
  308. * task), we directly execute its callback and enforce the
  309. * corresponding dependencies */
  310. if (task->cl == NULL)
  311. {
  312. _starpu_handle_job_termination(j, -1);
  313. _STARPU_LOG_OUT_TAG("handle_job_termination");
  314. return 0;
  315. }
  316. int ret;
  317. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  318. {
  319. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  320. }
  321. else
  322. {
  323. STARPU_ASSERT(sched_ctx->sched_policy->push_task);
  324. ret = sched_ctx->sched_policy->push_task(task);
  325. if(ret == -1)
  326. {
  327. fprintf(stderr, "repush task \n");
  328. ret = _starpu_push_task(j);
  329. }
  330. }
  331. _starpu_profiling_set_task_push_end_time(task);
  332. _STARPU_LOG_OUT();
  333. return ret;
  334. }
  335. /*
  336. * Given a handle that needs to be converted in order to be used on the given
  337. * node, returns a task that takes care of the conversion.
  338. */
  339. struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle,
  340. unsigned int node)
  341. {
  342. struct starpu_task *conversion_task;
  343. struct starpu_multiformat_interface *format_interface;
  344. enum starpu_node_kind node_kind;
  345. conversion_task = starpu_task_create();
  346. conversion_task->synchronous = 0;
  347. conversion_task->handles[0] = handle;
  348. /* The node does not really matter here */
  349. format_interface = (struct starpu_multiformat_interface *) starpu_data_get_interface_on_node(handle, 0);
  350. node_kind = starpu_node_get_kind(node);
  351. _starpu_spin_lock(&handle->header_lock);
  352. handle->refcnt++;
  353. handle->busy_count++;
  354. _starpu_spin_unlock(&handle->header_lock);
  355. struct starpu_multiformat_data_interface_ops *mf_ops;
  356. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  357. switch(node_kind)
  358. {
  359. case STARPU_CPU_RAM:
  360. switch (starpu_node_get_kind(handle->mf_node))
  361. {
  362. case STARPU_CPU_RAM:
  363. STARPU_ASSERT(0);
  364. #ifdef STARPU_USE_CUDA
  365. case STARPU_CUDA_RAM:
  366. conversion_task->cl = mf_ops->cuda_to_cpu_cl;
  367. break;
  368. #endif
  369. #ifdef STARPU_USE_OPENCL
  370. case STARPU_OPENCL_RAM:
  371. conversion_task->cl = mf_ops->opencl_to_cpu_cl;
  372. break;
  373. #endif
  374. default:
  375. fprintf(stderr, "Oops : %u\n", handle->mf_node);
  376. STARPU_ASSERT(0);
  377. }
  378. break;
  379. #ifdef STARPU_USE_CUDA
  380. case STARPU_CUDA_RAM:
  381. conversion_task->cl = mf_ops->cpu_to_cuda_cl;
  382. break;
  383. #endif
  384. #ifdef STARPU_USE_OPENCL
  385. case STARPU_OPENCL_RAM:
  386. conversion_task->cl = mf_ops->cpu_to_opencl_cl;
  387. break;
  388. #endif
  389. case STARPU_SPU_LS: /* Not supported */
  390. default:
  391. STARPU_ASSERT(0);
  392. }
  393. conversion_task->cl->modes[0] = STARPU_RW;
  394. return conversion_task;
  395. }
  396. struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
  397. {
  398. struct starpu_task *task;
  399. int worker_id;
  400. unsigned node;
  401. /* We can't tell in advance which task will be picked up, so we measure
  402. * a timestamp, and will attribute it afterwards to the task. */
  403. int profiling = starpu_profiling_status_get();
  404. struct timespec pop_start_time;
  405. if (profiling)
  406. _starpu_clock_gettime(&pop_start_time);
  407. pick:
  408. _STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  409. /* perhaps there is some local task to be executed first */
  410. task = _starpu_pop_local_task(worker);
  411. _STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  412. /* get tasks from the stacks of the strategy */
  413. if(!task)
  414. {
  415. struct _starpu_sched_ctx *sched_ctx;
  416. pthread_mutex_t *sched_ctx_mutex;
  417. unsigned i;
  418. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  419. {
  420. sched_ctx = worker->sched_ctx[i];
  421. if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  422. {
  423. sched_ctx_mutex = _starpu_get_sched_mutex(sched_ctx, worker->workerid);
  424. if(sched_ctx_mutex != NULL)
  425. {
  426. _STARPU_PTHREAD_MUTEX_LOCK(sched_ctx_mutex);
  427. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
  428. {
  429. task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
  430. _STARPU_PTHREAD_MUTEX_UNLOCK(sched_ctx_mutex);
  431. break;
  432. }
  433. _STARPU_PTHREAD_MUTEX_UNLOCK(sched_ctx_mutex);
  434. }
  435. }
  436. }
  437. }
  438. if (!task)
  439. goto profiling;
  440. /* Make sure we do not bother with all the multiformat-specific code if
  441. * it is not necessary. */
  442. if (!_starpu_task_uses_multiformat_handles(task))
  443. goto profiling;
  444. /* This is either a conversion task, or a regular task for which the
  445. * conversion tasks have already been created and submitted */
  446. if (task->mf_skip)
  447. goto profiling;
  448. worker_id = starpu_worker_get_id();
  449. if (!starpu_worker_can_execute_task(worker_id, task, 0))
  450. return task;
  451. node = starpu_worker_get_memory_node(worker_id);
  452. /*
  453. * We do have a task that uses multiformat handles. Let's create the
  454. * required conversion tasks.
  455. */
  456. unsigned i;
  457. for (i = 0; i < task->cl->nbuffers; i++)
  458. {
  459. struct starpu_task *conversion_task;
  460. starpu_data_handle_t handle;
  461. handle = task->handles[i];
  462. if (!_starpu_handle_needs_conversion_task(handle, node))
  463. continue;
  464. conversion_task = _starpu_create_conversion_task(handle, node);
  465. conversion_task->mf_skip = 1;
  466. conversion_task->execute_on_a_specific_worker = 1;
  467. conversion_task->workerid = worker_id;
  468. /*
  469. * Next tasks will need to know where these handles have gone.
  470. */
  471. handle->mf_node = node;
  472. _starpu_task_submit_conversion_task(conversion_task, worker_id);
  473. }
  474. task->mf_skip = 1;
  475. starpu_task_list_push_front(&worker->local_tasks, task);
  476. goto pick;
  477. profiling:
  478. if (profiling && task)
  479. {
  480. struct starpu_task_profiling_info *profiling_info;
  481. profiling_info = task->profiling_info;
  482. /* The task may have been created before profiling was enabled,
  483. * so we check if the profiling_info structure is available
  484. * even though we already tested if profiling is enabled. */
  485. if (profiling_info)
  486. {
  487. memcpy(&profiling_info->pop_start_time,
  488. &pop_start_time, sizeof(struct timespec));
  489. _starpu_clock_gettime(&profiling_info->pop_end_time);
  490. }
  491. }
  492. #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
  493. struct _starpu_sched_ctx *sched_ctx = NULL;
  494. struct starpu_performance_counters *perf_counters = NULL;
  495. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  496. {
  497. sched_ctx = worker->sched_ctx[i];
  498. if(sched_ctx != NULL && sched_ctx->id != 0)
  499. {
  500. perf_counters = sched_ctx->perf_counters;
  501. if(perf_counters != NULL && perf_counters->notify_idle_cycle && perf_counters->notify_idle_end)
  502. {
  503. if(!task)
  504. perf_counters->notify_idle_cycle(sched_ctx->id, worker->workerid, 1.0);
  505. else
  506. perf_counters->notify_idle_end(sched_ctx->id, worker->workerid);
  507. }
  508. }
  509. }
  510. #endif //STARPU_USE_SCHED_CTX_HYPERVISOR
  511. return task;
  512. }
  513. struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx)
  514. {
  515. STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
  516. /* TODO set profiling info */
  517. if(sched_ctx->sched_policy->pop_every_task)
  518. return sched_ctx->sched_policy->pop_every_task(sched_ctx->id);
  519. return NULL;
  520. }
  521. void _starpu_sched_pre_exec_hook(struct starpu_task *task)
  522. {
  523. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  524. if (sched_ctx->sched_policy->pre_exec_hook)
  525. sched_ctx->sched_policy->pre_exec_hook(task);
  526. }
  527. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  528. {
  529. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  530. #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
  531. if(task->hypervisor_tag > 0 && sched_ctx != NULL &&
  532. sched_ctx->id != 0 && sched_ctx->perf_counters != NULL)
  533. sched_ctx->perf_counters->notify_post_exec_hook(sched_ctx->id, task->hypervisor_tag);
  534. #endif //STARPU_USE_SCHED_CTX_HYPERVISOR
  535. if (sched_ctx->sched_policy->post_exec_hook)
  536. sched_ctx->sched_policy->post_exec_hook(task);
  537. }
  538. void _starpu_wait_on_sched_event(void)
  539. {
  540. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  541. _STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  542. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  543. if (_starpu_machine_is_running())
  544. {
  545. #ifndef STARPU_NON_BLOCKING_DRIVERS
  546. _STARPU_PTHREAD_COND_WAIT(worker->sched_cond,
  547. worker->sched_mutex);
  548. #endif
  549. }
  550. _STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  551. }
  552. /* The scheduling policy may put tasks directly into a worker's local queue so
  553. * that it is not always necessary to create its own queue when the local queue
  554. * is sufficient. If "back" not null, the task is put at the back of the queue
  555. * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  556. * a FIFO ordering. */
  557. int starpu_push_local_task(int workerid, struct starpu_task *task, int back)
  558. {
  559. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  560. return _starpu_push_local_task(worker, task, back);
  561. }