sched_policy.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2013 Université de Bordeaux 1
  4. * Copyright (C) 2010-2012 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <pthread.h>
  19. #include <starpu.h>
  20. #include <common/config.h>
  21. #include <common/utils.h>
  22. #include <core/sched_policy.h>
  23. #include <profiling/profiling.h>
  24. #include <common/barrier.h>
  25. #include <core/debug.h>
  26. static int use_prefetch = 0;
  27. int starpu_get_prefetch_flag(void)
  28. {
  29. return use_prefetch;
  30. }
  31. static struct starpu_sched_policy *predefined_policies[] =
  32. {
  33. &_starpu_sched_eager_policy,
  34. &_starpu_sched_prio_policy,
  35. &_starpu_sched_random_policy,
  36. &_starpu_sched_ws_policy,
  37. &_starpu_sched_dm_policy,
  38. &_starpu_sched_dmda_policy,
  39. &_starpu_sched_dmda_ready_policy,
  40. &_starpu_sched_dmda_sorted_policy,
  41. &_starpu_sched_parallel_heft_policy,
  42. &_starpu_sched_peager_policy,
  43. NULL
  44. };
  45. struct starpu_sched_policy **starpu_sched_get_predefined_policies()
  46. {
  47. return predefined_policies;
  48. }
  49. struct starpu_sched_policy *_starpu_get_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  50. {
  51. return sched_ctx->sched_policy;
  52. }
  53. /*
  54. * Methods to initialize the scheduling policy
  55. */
  56. static void load_sched_policy(struct starpu_sched_policy *sched_policy, struct _starpu_sched_ctx *sched_ctx)
  57. {
  58. STARPU_ASSERT(sched_policy);
  59. #ifdef STARPU_VERBOSE
  60. if (sched_policy->policy_name)
  61. {
  62. if (sched_policy->policy_description)
  63. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  64. else
  65. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  66. }
  67. #endif
  68. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  69. memcpy(policy, sched_policy, sizeof(*policy));
  70. }
  71. static struct starpu_sched_policy *find_sched_policy_from_name(const char *policy_name)
  72. {
  73. if (!policy_name)
  74. return NULL;
  75. if (strncmp(policy_name, "heft", 5) == 0)
  76. {
  77. _STARPU_DISP("Warning: heft is now called \"dmda\".\n");
  78. return &_starpu_sched_dmda_policy;
  79. }
  80. struct starpu_sched_policy **policy;
  81. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  82. {
  83. struct starpu_sched_policy *p = *policy;
  84. if (p->policy_name)
  85. {
  86. if (strcmp(policy_name, p->policy_name) == 0)
  87. {
  88. /* we found a policy with the requested name */
  89. return p;
  90. }
  91. }
  92. }
  93. fprintf(stderr, "Warning: scheduling policy \"%s\" was not found, try \"help\" to get a list\n", policy_name);
  94. /* nothing was found */
  95. return NULL;
  96. }
  97. static void display_sched_help_message(void)
  98. {
  99. const char *sched_env = getenv("STARPU_SCHED");
  100. if (sched_env && (strcmp(sched_env, "help") == 0))
  101. {
  102. /* display the description of all predefined policies */
  103. struct starpu_sched_policy **policy;
  104. fprintf(stderr, "STARPU_SCHED can be either of\n");
  105. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  106. {
  107. struct starpu_sched_policy *p = *policy;
  108. fprintf(stderr, "%s\t-> %s\n", p->policy_name, p->policy_description);
  109. }
  110. }
  111. }
  112. static struct starpu_sched_policy *select_sched_policy(struct _starpu_machine_config *config, const char *required_policy)
  113. {
  114. struct starpu_sched_policy *selected_policy = NULL;
  115. struct starpu_conf *user_conf = config->conf;
  116. if(required_policy)
  117. selected_policy = find_sched_policy_from_name(required_policy);
  118. /* First, we check whether the application explicitely gave a scheduling policy or not */
  119. if (!selected_policy && user_conf && (user_conf->sched_policy))
  120. return user_conf->sched_policy;
  121. /* Otherwise, we look if the application specified the name of a policy to load */
  122. const char *sched_pol_name;
  123. sched_pol_name = getenv("STARPU_SCHED");
  124. if (sched_pol_name == NULL && user_conf && user_conf->sched_policy_name)
  125. sched_pol_name = user_conf->sched_policy_name;
  126. if (!selected_policy && sched_pol_name)
  127. selected_policy = find_sched_policy_from_name(sched_pol_name);
  128. /* Perhaps there was no policy that matched the name */
  129. if (selected_policy)
  130. return selected_policy;
  131. /* If no policy was specified, we use the greedy policy as a default */
  132. return &_starpu_sched_eager_policy;
  133. }
  134. void _starpu_init_sched_policy(struct _starpu_machine_config *config, struct _starpu_sched_ctx *sched_ctx, const char *required_policy)
  135. {
  136. /* Perhaps we have to display some help */
  137. display_sched_help_message();
  138. /* Prefetch is activated by default */
  139. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  140. if (use_prefetch == -1)
  141. use_prefetch = 1;
  142. /* Set calibrate flag */
  143. _starpu_set_calibrate_flag(config->conf->calibrate);
  144. struct starpu_sched_policy *selected_policy;
  145. selected_policy = select_sched_policy(config, required_policy);
  146. load_sched_policy(selected_policy, sched_ctx);
  147. sched_ctx->sched_policy->init_sched(sched_ctx->id);
  148. }
  149. void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  150. {
  151. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  152. if (policy->deinit_sched)
  153. policy->deinit_sched(sched_ctx->id);
  154. }
  155. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  156. * case workerid identifies a combined worker, a task will be enqueued into
  157. * each worker of the combination. */
  158. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  159. {
  160. int nbasic_workers = (int)starpu_worker_get_count();
  161. /* Is this a basic worker or a combined worker ? */
  162. int is_basic_worker = (workerid < nbasic_workers);
  163. unsigned memory_node;
  164. struct _starpu_worker *worker = NULL;
  165. struct _starpu_combined_worker *combined_worker = NULL;
  166. if (is_basic_worker)
  167. {
  168. worker = _starpu_get_worker_struct(workerid);
  169. memory_node = worker->memory_node;
  170. }
  171. else
  172. {
  173. combined_worker = _starpu_get_combined_worker_struct(workerid);
  174. memory_node = combined_worker->memory_node;
  175. }
  176. if (use_prefetch)
  177. starpu_prefetch_task_input_on_node(task, memory_node);
  178. /* if we push a task on a specific worker, notify all the sched_ctxs the worker belongs to */
  179. unsigned i;
  180. struct _starpu_sched_ctx *sched_ctx;
  181. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  182. {
  183. sched_ctx = worker->sched_ctx[i];
  184. if (sched_ctx != NULL && sched_ctx->sched_policy != NULL && sched_ctx->sched_policy->push_task_notify)
  185. sched_ctx->sched_policy->push_task_notify(task, workerid, sched_ctx->id);
  186. }
  187. #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
  188. starpu_call_pushed_task_cb(workerid, task->sched_ctx);
  189. #endif //STARPU_USE_SCHED_CTX_HYPERVISOR
  190. if (is_basic_worker)
  191. {
  192. unsigned node = starpu_worker_get_memory_node(workerid);
  193. if (_starpu_task_uses_multiformat_handles(task))
  194. {
  195. unsigned i;
  196. for (i = 0; i < task->cl->nbuffers; i++)
  197. {
  198. struct starpu_task *conversion_task;
  199. starpu_data_handle_t handle;
  200. handle = task->handles[i];
  201. if (!_starpu_handle_needs_conversion_task(handle, node))
  202. continue;
  203. conversion_task = _starpu_create_conversion_task(handle, node);
  204. conversion_task->mf_skip = 1;
  205. conversion_task->execute_on_a_specific_worker = 1;
  206. conversion_task->workerid = workerid;
  207. _starpu_task_submit_conversion_task(conversion_task, workerid);
  208. //_STARPU_DEBUG("Pushing a conversion task\n");
  209. }
  210. for (i = 0; i < task->cl->nbuffers; i++)
  211. task->handles[i]->mf_node = node;
  212. }
  213. // if(task->sched_ctx != _starpu_get_initial_sched_ctx()->id)
  214. if(task->priority > 0)
  215. return _starpu_push_local_task(worker, task, 1);
  216. else
  217. return _starpu_push_local_task(worker, task, 0);
  218. }
  219. else
  220. {
  221. /* This is a combined worker so we create task aliases */
  222. int worker_size = combined_worker->worker_size;
  223. int *combined_workerid = combined_worker->combined_workerid;
  224. int ret = 0;
  225. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  226. j->task_size = worker_size;
  227. j->combined_workerid = workerid;
  228. j->active_task_alias_count = 0;
  229. _STARPU_PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
  230. _STARPU_PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
  231. /* Note: we have to call that early, or else the task may have
  232. * disappeared already */
  233. _starpu_push_task_end(task);
  234. int i;
  235. for (i = 0; i < worker_size; i++)
  236. {
  237. struct starpu_task *alias = _starpu_create_task_alias(task);
  238. worker = _starpu_get_worker_struct(combined_workerid[i]);
  239. ret |= _starpu_push_local_task(worker, alias, 0);
  240. }
  241. return ret;
  242. }
  243. }
  244. static int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _starpu_sched_ctx *sched_ctx)
  245. {
  246. int worker = -1, nworkers = 0;
  247. struct starpu_sched_ctx_worker_collection *workers = sched_ctx->workers;
  248. struct starpu_iterator it;
  249. if(workers->init_iterator)
  250. workers->init_iterator(workers, &it);
  251. while(workers->has_next(workers, &it))
  252. {
  253. worker = workers->get_next(workers, &it);
  254. if (starpu_worker_can_execute_task(worker, task, 0) && starpu_is_ctxs_turn(worker, sched_ctx->id))
  255. nworkers++;
  256. }
  257. return nworkers;
  258. }
  259. /* the generic interface that call the proper underlying implementation */
  260. int _starpu_push_task(struct _starpu_job *j)
  261. {
  262. struct starpu_task *task = j->task;
  263. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  264. unsigned nworkers = 0;
  265. int ret;
  266. _STARPU_LOG_IN();
  267. _STARPU_TRACE_JOB_PUSH(task, task->priority > 0);
  268. _starpu_increment_nready_tasks();
  269. task->status = STARPU_TASK_READY;
  270. #ifdef HAVE_AYUDAME_H
  271. if (AYU_event) {
  272. int id = -1;
  273. AYU_event(AYU_ADDTASKTOQUEUE, j->job_id, &id);
  274. }
  275. #endif
  276. /* if the context does not have any workers save the tasks in a temp list */
  277. if(!sched_ctx->is_initial_sched)
  278. {
  279. /*if there are workers in the ctx that are not able to execute tasks
  280. we consider the ctx empty */
  281. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  282. if(nworkers == 0)
  283. {
  284. _STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  285. starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
  286. _STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  287. return 0;
  288. }
  289. }
  290. ret = _starpu_push_task_to_workers(task);
  291. if (ret == -EAGAIN)
  292. /* pushed to empty context, that's fine */
  293. ret = 0;
  294. return ret;
  295. }
  296. int _starpu_push_task_to_workers(struct starpu_task *task)
  297. {
  298. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  299. unsigned nworkers = 0;
  300. /* if the contexts still does not have workers put the task back to its place in
  301. the empty ctx list */
  302. if(!sched_ctx->is_initial_sched)
  303. {
  304. /*if there are workers in the ctx that are not able to execute tasks
  305. we consider the ctx empty */
  306. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  307. if (nworkers == 0)
  308. {
  309. _STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  310. starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task);
  311. _STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  312. return -EAGAIN;
  313. }
  314. }
  315. _starpu_profiling_set_task_push_start_time(task);
  316. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  317. /* in case there is no codelet associated to the task (that's a control
  318. * task), we directly execute its callback and enforce the
  319. * corresponding dependencies */
  320. if (task->cl == NULL)
  321. {
  322. _starpu_handle_job_termination(j);
  323. _STARPU_LOG_OUT_TAG("handle_job_termination");
  324. return 0;
  325. }
  326. int ret;
  327. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  328. {
  329. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  330. }
  331. else
  332. {
  333. STARPU_ASSERT(sched_ctx->sched_policy->push_task);
  334. ret = sched_ctx->sched_policy->push_task(task);
  335. if(ret == -1)
  336. {
  337. fprintf(stderr, "repush task \n");
  338. _STARPU_TRACE_JOB_POP(task, task->priority > 0);
  339. ret = _starpu_push_task_to_workers(task);
  340. }
  341. }
  342. /* Note: from here, the task might have been destroyed already! */
  343. _STARPU_LOG_OUT();
  344. return ret;
  345. }
  346. /* This is called right after the scheduler has pushed a task to a queue
  347. * but just before releasing mutexes: we need the task to still be alive!
  348. */
  349. int _starpu_push_task_end(struct starpu_task *task)
  350. {
  351. _starpu_profiling_set_task_push_end_time(task);
  352. task->scheduled = 1;
  353. return 0;
  354. }
  355. /*
  356. * Given a handle that needs to be converted in order to be used on the given
  357. * node, returns a task that takes care of the conversion.
  358. */
  359. struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle,
  360. unsigned int node)
  361. {
  362. struct starpu_task *conversion_task;
  363. struct starpu_multiformat_interface *format_interface;
  364. enum starpu_node_kind node_kind;
  365. conversion_task = starpu_task_create();
  366. conversion_task->synchronous = 0;
  367. conversion_task->handles[0] = handle;
  368. /* The node does not really matter here */
  369. format_interface = (struct starpu_multiformat_interface *) starpu_data_get_interface_on_node(handle, 0);
  370. node_kind = starpu_node_get_kind(node);
  371. _starpu_spin_lock(&handle->header_lock);
  372. handle->refcnt++;
  373. handle->busy_count++;
  374. _starpu_spin_unlock(&handle->header_lock);
  375. switch(node_kind)
  376. {
  377. case STARPU_CPU_RAM:
  378. switch (starpu_node_get_kind(handle->mf_node))
  379. {
  380. case STARPU_CPU_RAM:
  381. STARPU_ABORT();
  382. #ifdef STARPU_USE_CUDA
  383. case STARPU_CUDA_RAM:
  384. {
  385. struct starpu_multiformat_data_interface_ops *mf_ops;
  386. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  387. conversion_task->cl = mf_ops->cuda_to_cpu_cl;
  388. break;
  389. }
  390. #endif
  391. #ifdef STARPU_USE_OPENCL
  392. case STARPU_OPENCL_RAM:
  393. {
  394. struct starpu_multiformat_data_interface_ops *mf_ops;
  395. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  396. conversion_task->cl = mf_ops->opencl_to_cpu_cl;
  397. break;
  398. }
  399. #endif
  400. default:
  401. _STARPU_ERROR("Oops : %u\n", handle->mf_node);
  402. }
  403. break;
  404. #ifdef STARPU_USE_CUDA
  405. case STARPU_CUDA_RAM:
  406. {
  407. struct starpu_multiformat_data_interface_ops *mf_ops;
  408. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  409. conversion_task->cl = mf_ops->cpu_to_cuda_cl;
  410. break;
  411. }
  412. #endif
  413. #ifdef STARPU_USE_OPENCL
  414. case STARPU_OPENCL_RAM:
  415. {
  416. struct starpu_multiformat_data_interface_ops *mf_ops;
  417. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  418. conversion_task->cl = mf_ops->cpu_to_opencl_cl;
  419. break;
  420. }
  421. #endif
  422. case STARPU_SPU_LS: /* Not supported */
  423. default:
  424. STARPU_ABORT();
  425. }
  426. conversion_task->cl->modes[0] = STARPU_RW;
  427. return conversion_task;
  428. }
  429. struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker *worker)
  430. {
  431. struct _starpu_sched_ctx *sched_ctx, *good_sched_ctx = NULL;
  432. unsigned smallest_counter = worker->nsched_ctxs;
  433. unsigned i;
  434. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  435. {
  436. sched_ctx = worker->sched_ctx[i];
  437. if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS &&
  438. sched_ctx->pop_counter[worker->workerid] < worker->nsched_ctxs &&
  439. smallest_counter > sched_ctx->pop_counter[worker->workerid])
  440. {
  441. good_sched_ctx = sched_ctx;
  442. smallest_counter = sched_ctx->pop_counter[worker->workerid];
  443. }
  444. }
  445. if(good_sched_ctx == NULL)
  446. {
  447. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  448. {
  449. sched_ctx = worker->sched_ctx[i];
  450. if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  451. sched_ctx->pop_counter[worker->workerid] = 0;
  452. }
  453. return _get_next_sched_ctx_to_pop_into(worker);
  454. }
  455. return good_sched_ctx;
  456. }
  457. struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
  458. {
  459. struct starpu_task *task;
  460. int worker_id;
  461. unsigned node;
  462. /* We can't tell in advance which task will be picked up, so we measure
  463. * a timestamp, and will attribute it afterwards to the task. */
  464. int profiling = starpu_profiling_status_get();
  465. struct timespec pop_start_time;
  466. if (profiling)
  467. _starpu_clock_gettime(&pop_start_time);
  468. pick:
  469. /* perhaps there is some local task to be executed first */
  470. task = _starpu_pop_local_task(worker);
  471. /* get tasks from the stacks of the strategy */
  472. if(!task)
  473. {
  474. struct _starpu_sched_ctx *sched_ctx;
  475. int been_here[STARPU_NMAX_SCHED_CTXS];
  476. int i;
  477. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  478. been_here[i] = 0;
  479. while(!task)
  480. {
  481. if(worker->nsched_ctxs == 1)
  482. sched_ctx = _starpu_get_initial_sched_ctx();
  483. else
  484. sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
  485. if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  486. {
  487. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
  488. task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
  489. }
  490. if((!task && sched_ctx->pop_counter[worker->workerid] == 0 && been_here[sched_ctx->id]) || worker->nsched_ctxs == 1)
  491. break;
  492. been_here[sched_ctx->id] = 1;
  493. sched_ctx->pop_counter[worker->workerid]++;
  494. }
  495. }
  496. #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
  497. struct _starpu_sched_ctx *sched_ctx = NULL;
  498. struct starpu_performance_counters *perf_counters = NULL;
  499. int j;
  500. for(j = 0; j < STARPU_NMAX_SCHED_CTXS; j++)
  501. {
  502. sched_ctx = worker->sched_ctx[j];
  503. if(sched_ctx != NULL && sched_ctx->id != 0)
  504. {
  505. perf_counters = sched_ctx->perf_counters;
  506. if(perf_counters != NULL && perf_counters->notify_idle_cycle && perf_counters->notify_idle_end)
  507. {
  508. if(!task)
  509. perf_counters->notify_idle_cycle(sched_ctx->id, worker->workerid, 1.0);
  510. else
  511. perf_counters->notify_idle_end(sched_ctx->id, worker->workerid);
  512. }
  513. }
  514. }
  515. #endif //STARPU_USE_SCHED_CTX_HYPERVISOR
  516. if (!task)
  517. return NULL;
  518. /* Make sure we do not bother with all the multiformat-specific code if
  519. * it is not necessary. */
  520. if (!_starpu_task_uses_multiformat_handles(task))
  521. goto profiling;
  522. /* This is either a conversion task, or a regular task for which the
  523. * conversion tasks have already been created and submitted */
  524. if (task->mf_skip)
  525. goto profiling;
  526. worker_id = starpu_worker_get_id();
  527. if (!starpu_worker_can_execute_task(worker_id, task, 0))
  528. return task;
  529. node = starpu_worker_get_memory_node(worker_id);
  530. /*
  531. * We do have a task that uses multiformat handles. Let's create the
  532. * required conversion tasks.
  533. */
  534. unsigned i;
  535. for (i = 0; i < task->cl->nbuffers; i++)
  536. {
  537. struct starpu_task *conversion_task;
  538. starpu_data_handle_t handle;
  539. handle = task->handles[i];
  540. if (!_starpu_handle_needs_conversion_task(handle, node))
  541. continue;
  542. conversion_task = _starpu_create_conversion_task(handle, node);
  543. conversion_task->mf_skip = 1;
  544. conversion_task->execute_on_a_specific_worker = 1;
  545. conversion_task->workerid = worker_id;
  546. /*
  547. * Next tasks will need to know where these handles have gone.
  548. */
  549. handle->mf_node = node;
  550. _starpu_task_submit_conversion_task(conversion_task, worker_id);
  551. }
  552. task->mf_skip = 1;
  553. starpu_task_list_push_front(&worker->local_tasks, task);
  554. goto pick;
  555. profiling:
  556. if (profiling)
  557. {
  558. struct starpu_task_profiling_info *profiling_info;
  559. profiling_info = task->profiling_info;
  560. /* The task may have been created before profiling was enabled,
  561. * so we check if the profiling_info structure is available
  562. * even though we already tested if profiling is enabled. */
  563. if (profiling_info)
  564. {
  565. memcpy(&profiling_info->pop_start_time,
  566. &pop_start_time, sizeof(struct timespec));
  567. _starpu_clock_gettime(&profiling_info->pop_end_time);
  568. }
  569. }
  570. return task;
  571. }
  572. struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx)
  573. {
  574. STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
  575. /* TODO set profiling info */
  576. if(sched_ctx->sched_policy->pop_every_task)
  577. return sched_ctx->sched_policy->pop_every_task(sched_ctx->id);
  578. return NULL;
  579. }
  580. void _starpu_sched_pre_exec_hook(struct starpu_task *task)
  581. {
  582. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  583. if (sched_ctx->sched_policy->pre_exec_hook)
  584. sched_ctx->sched_policy->pre_exec_hook(task);
  585. }
  586. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  587. {
  588. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  589. #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
  590. if(task->hypervisor_tag > 0 && sched_ctx != NULL &&
  591. sched_ctx->id != 0 && sched_ctx->perf_counters != NULL)
  592. sched_ctx->perf_counters->notify_post_exec_hook(sched_ctx->id, task->hypervisor_tag);
  593. #endif //STARPU_USE_SCHED_CTX_HYPERVISOR
  594. if (sched_ctx->sched_policy->post_exec_hook)
  595. sched_ctx->sched_policy->post_exec_hook(task);
  596. }
  597. void _starpu_wait_on_sched_event(void)
  598. {
  599. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  600. _STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  601. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  602. if (_starpu_machine_is_running())
  603. {
  604. #ifndef STARPU_NON_BLOCKING_DRIVERS
  605. _STARPU_PTHREAD_COND_WAIT(&worker->sched_cond,
  606. &worker->sched_mutex);
  607. #endif
  608. }
  609. _STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  610. }
  611. /* The scheduling policy may put tasks directly into a worker's local queue so
  612. * that it is not always necessary to create its own queue when the local queue
  613. * is sufficient. If "back" not null, the task is put at the back of the queue
  614. * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  615. * a FIFO ordering. */
  616. int starpu_push_local_task(int workerid, struct starpu_task *task, int back)
  617. {
  618. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  619. return _starpu_push_local_task(worker, task, back);
  620. }