sched_policy.c 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2012 Université de Bordeaux 1
  4. * Copyright (C) 2010-2012 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <pthread.h>
  19. #include <starpu.h>
  20. #include <common/config.h>
  21. #include <common/utils.h>
  22. #include <core/sched_policy.h>
  23. #include <profiling/profiling.h>
  24. #include <common/barrier.h>
  25. static int use_prefetch = 0;
  26. int starpu_get_prefetch_flag(void)
  27. {
  28. return use_prefetch;
  29. }
  30. static struct starpu_sched_policy *predefined_policies[] =
  31. {
  32. &_starpu_sched_ws_policy,
  33. &_starpu_sched_prio_policy,
  34. &_starpu_sched_dm_policy,
  35. &_starpu_sched_dmda_policy,
  36. &_starpu_sched_heft_policy,
  37. &_starpu_sched_dmda_ready_policy,
  38. &_starpu_sched_dmda_sorted_policy,
  39. &_starpu_sched_random_policy,
  40. &_starpu_sched_eager_policy,
  41. &_starpu_sched_parallel_heft_policy,
  42. &_starpu_sched_pgreedy_policy
  43. };
  44. struct starpu_sched_policy *_starpu_get_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  45. {
  46. return sched_ctx->sched_policy;
  47. }
  48. /*
  49. * Methods to initialize the scheduling policy
  50. */
  51. static void load_sched_policy(struct starpu_sched_policy *sched_policy, struct _starpu_sched_ctx *sched_ctx)
  52. {
  53. STARPU_ASSERT(sched_policy);
  54. #ifdef STARPU_VERBOSE
  55. if (sched_policy->policy_name)
  56. {
  57. if (sched_policy->policy_description)
  58. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  59. else
  60. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  61. }
  62. #endif
  63. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  64. memcpy(policy, sched_policy, sizeof(*policy));
  65. }
  66. static struct starpu_sched_policy *find_sched_policy_from_name(const char *policy_name)
  67. {
  68. if (!policy_name)
  69. return NULL;
  70. unsigned i;
  71. for (i = 0; i < sizeof(predefined_policies)/sizeof(predefined_policies[0]); i++)
  72. {
  73. struct starpu_sched_policy *p;
  74. p = predefined_policies[i];
  75. if (p->policy_name)
  76. {
  77. if (strcmp(policy_name, p->policy_name) == 0)
  78. {
  79. /* we found a policy with the requested name */
  80. return p;
  81. }
  82. }
  83. }
  84. fprintf(stderr, "Warning: scheduling policy \"%s\" was not found, try \"help\" to get a list\n", policy_name);
  85. /* nothing was found */
  86. return NULL;
  87. }
  88. static void display_sched_help_message(void)
  89. {
  90. const char *sched_env = getenv("STARPU_SCHED");
  91. if (sched_env && (strcmp(sched_env, "help") == 0))
  92. {
  93. fprintf(stderr, "STARPU_SCHED can be either of\n");
  94. /* display the description of all predefined policies */
  95. unsigned i;
  96. for (i = 0; i < sizeof(predefined_policies)/sizeof(predefined_policies[0]); i++)
  97. {
  98. struct starpu_sched_policy *p;
  99. p = predefined_policies[i];
  100. fprintf(stderr, "%s\t-> %s\n", p->policy_name, p->policy_description);
  101. }
  102. }
  103. }
  104. static struct starpu_sched_policy *select_sched_policy(struct _starpu_machine_config *config, const char *required_policy)
  105. {
  106. struct starpu_sched_policy *selected_policy = NULL;
  107. struct starpu_conf *user_conf = config->conf;
  108. if(required_policy)
  109. selected_policy = find_sched_policy_from_name(required_policy);
  110. /* First, we check whether the application explicitely gave a scheduling policy or not */
  111. if (!selected_policy && user_conf && (user_conf->sched_policy))
  112. return user_conf->sched_policy;
  113. /* Otherwise, we look if the application specified the name of a policy to load */
  114. const char *sched_pol_name;
  115. sched_pol_name = getenv("STARPU_SCHED");
  116. if (sched_pol_name == NULL && user_conf && user_conf->sched_policy_name)
  117. sched_pol_name = user_conf->sched_policy_name;
  118. if (!selected_policy && sched_pol_name)
  119. selected_policy = find_sched_policy_from_name(sched_pol_name);
  120. /* Perhaps there was no policy that matched the name */
  121. if (selected_policy)
  122. return selected_policy;
  123. /* If no policy was specified, we use the greedy policy as a default */
  124. return &_starpu_sched_eager_policy;
  125. }
  126. void _starpu_init_sched_policy(struct _starpu_machine_config *config, struct _starpu_sched_ctx *sched_ctx, const char *required_policy)
  127. {
  128. /* Perhaps we have to display some help */
  129. display_sched_help_message();
  130. /* Prefetch is activated by default */
  131. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  132. if (use_prefetch == -1)
  133. use_prefetch = 1;
  134. /* Set calibrate flag */
  135. _starpu_set_calibrate_flag(config->conf->calibrate);
  136. struct starpu_sched_policy *selected_policy;
  137. selected_policy = select_sched_policy(config, required_policy);
  138. load_sched_policy(selected_policy, sched_ctx);
  139. sched_ctx->sched_policy->init_sched(sched_ctx->id);
  140. }
  141. void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  142. {
  143. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  144. if (policy->deinit_sched)
  145. policy->deinit_sched(sched_ctx->id);
  146. }
  147. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  148. * case workerid identifies a combined worker, a task will be enqueued into
  149. * each worker of the combination. */
  150. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  151. {
  152. int nbasic_workers = (int)starpu_worker_get_count();
  153. /* Is this a basic worker or a combined worker ? */
  154. int is_basic_worker = (workerid < nbasic_workers);
  155. unsigned memory_node;
  156. struct _starpu_worker *worker = NULL;
  157. struct _starpu_combined_worker *combined_worker = NULL;
  158. if (is_basic_worker)
  159. {
  160. worker = _starpu_get_worker_struct(workerid);
  161. memory_node = worker->memory_node;
  162. }
  163. else
  164. {
  165. combined_worker = _starpu_get_combined_worker_struct(workerid);
  166. memory_node = combined_worker->memory_node;
  167. }
  168. if (use_prefetch)
  169. starpu_prefetch_task_input_on_node(task, memory_node);
  170. /* if we push a task on a specific worker, notify all the sched_ctxs the worker belongs to */
  171. unsigned i;
  172. struct _starpu_sched_ctx *sched_ctx;
  173. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  174. {
  175. sched_ctx = worker->sched_ctx[i];
  176. if (sched_ctx != NULL && sched_ctx->sched_policy != NULL && sched_ctx->sched_policy->push_task_notify)
  177. {
  178. sched_ctx->sched_policy->push_task_notify(task, workerid);
  179. }
  180. }
  181. #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
  182. starpu_call_pushed_task_cb(workerid, task->sched_ctx);
  183. #endif //STARPU_USE_SCHED_CTX_HYPERVISOR
  184. if (is_basic_worker)
  185. {
  186. unsigned node = starpu_worker_get_memory_node(workerid);
  187. if (_starpu_task_uses_multiformat_handles(task))
  188. {
  189. unsigned i;
  190. for (i = 0; i < task->cl->nbuffers; i++)
  191. {
  192. struct starpu_task *conversion_task;
  193. starpu_data_handle_t handle;
  194. handle = task->handles[i];
  195. if (!_starpu_handle_needs_conversion_task(handle, node))
  196. continue;
  197. conversion_task = _starpu_create_conversion_task(handle, node);
  198. conversion_task->mf_skip = 1;
  199. conversion_task->execute_on_a_specific_worker = 1;
  200. conversion_task->workerid = workerid;
  201. _starpu_task_submit_conversion_task(conversion_task, workerid);
  202. //_STARPU_DEBUG("Pushing a conversion task\n");
  203. }
  204. for (i = 0; i < task->cl->nbuffers; i++)
  205. task->handles[i]->mf_node = node;
  206. }
  207. if(task->priority > 0)
  208. return _starpu_push_local_task(worker, task, 1);
  209. else
  210. return _starpu_push_local_task(worker, task, 0);
  211. }
  212. else
  213. {
  214. /* This is a combined worker so we create task aliases */
  215. int worker_size = combined_worker->worker_size;
  216. int *combined_workerid = combined_worker->combined_workerid;
  217. int ret = 0;
  218. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  219. j->task_size = worker_size;
  220. j->combined_workerid = workerid;
  221. j->active_task_alias_count = 0;
  222. _STARPU_PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
  223. _STARPU_PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
  224. int i;
  225. for (i = 0; i < worker_size; i++)
  226. {
  227. struct starpu_task *alias = _starpu_create_task_alias(task);
  228. worker = _starpu_get_worker_struct(combined_workerid[i]);
  229. ret |= _starpu_push_local_task(worker, alias, 0);
  230. }
  231. return ret;
  232. }
  233. }
  234. static int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _starpu_sched_ctx *sched_ctx)
  235. {
  236. int worker = -1, nworkers = 0;
  237. struct worker_collection *workers = sched_ctx->workers;
  238. if(workers->init_cursor)
  239. workers->init_cursor(workers);
  240. while(workers->has_next(workers))
  241. {
  242. worker = workers->get_next(workers);
  243. if (starpu_worker_can_execute_task(worker, task, 0) && starpu_is_ctxs_turn(worker, sched_ctx->id))
  244. nworkers++;
  245. }
  246. if(workers->init_cursor)
  247. workers->deinit_cursor(workers);
  248. return nworkers;
  249. }
  250. /* the generic interface that call the proper underlying implementation */
  251. int _starpu_push_task(struct _starpu_job *j)
  252. {
  253. struct starpu_task *task = j->task;
  254. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  255. unsigned nworkers = 0;
  256. if(!sched_ctx->is_initial_sched)
  257. {
  258. /*if there are workers in the ctx that are not able to execute tasks
  259. we consider the ctx empty */
  260. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  261. if(nworkers == 0)
  262. {
  263. if(task->already_pushed)
  264. {
  265. _STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  266. starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task);
  267. _STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  268. return -1;
  269. }
  270. else
  271. {
  272. _STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  273. task->already_pushed = 1;
  274. starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
  275. _STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  276. return 0;
  277. }
  278. }
  279. }
  280. _STARPU_LOG_IN();
  281. _starpu_increment_nready_tasks();
  282. task->status = STARPU_TASK_READY;
  283. _starpu_profiling_set_task_push_start_time(task);
  284. /* in case there is no codelet associated to the task (that's a control
  285. * task), we directly execute its callback and enforce the
  286. * corresponding dependencies */
  287. if (task->cl == NULL)
  288. {
  289. _starpu_handle_job_termination(j, -1);
  290. _STARPU_LOG_OUT_TAG("handle_job_termination");
  291. return 0;
  292. }
  293. int ret;
  294. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  295. {
  296. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  297. }
  298. else
  299. {
  300. STARPU_ASSERT(sched_ctx->sched_policy->push_task);
  301. ret = sched_ctx->sched_policy->push_task(task);
  302. if(ret == -1)
  303. {
  304. fprintf(stderr, "repush task \n");
  305. _starpu_decrement_nready_tasks();
  306. ret = _starpu_push_task(j);
  307. }
  308. }
  309. _starpu_profiling_set_task_push_end_time(task);
  310. _STARPU_LOG_OUT();
  311. return ret;
  312. }
  313. /*
  314. * Given a handle that needs to be converted in order to be used on the given
  315. * node, returns a task that takes care of the conversion.
  316. */
  317. struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle,
  318. unsigned int node)
  319. {
  320. struct starpu_task *conversion_task;
  321. struct starpu_multiformat_interface *format_interface;
  322. enum starpu_node_kind node_kind;
  323. conversion_task = starpu_task_create();
  324. conversion_task->synchronous = 0;
  325. conversion_task->handles[0] = handle;
  326. /* The node does not really matter here */
  327. format_interface = (struct starpu_multiformat_interface *) starpu_data_get_interface_on_node(handle, 0);
  328. node_kind = starpu_node_get_kind(node);
  329. _starpu_spin_lock(&handle->header_lock);
  330. handle->refcnt++;
  331. handle->busy_count++;
  332. _starpu_spin_unlock(&handle->header_lock);
  333. switch(node_kind)
  334. {
  335. case STARPU_CPU_RAM:
  336. switch (starpu_node_get_kind(handle->mf_node))
  337. {
  338. case STARPU_CPU_RAM:
  339. STARPU_ABORT();
  340. #ifdef STARPU_USE_CUDA
  341. case STARPU_CUDA_RAM:
  342. {
  343. struct starpu_multiformat_data_interface_ops *mf_ops;
  344. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  345. conversion_task->cl = mf_ops->cuda_to_cpu_cl;
  346. break;
  347. }
  348. #endif
  349. #ifdef STARPU_USE_OPENCL
  350. case STARPU_OPENCL_RAM:
  351. {
  352. struct starpu_multiformat_data_interface_ops *mf_ops;
  353. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  354. conversion_task->cl = mf_ops->opencl_to_cpu_cl;
  355. break;
  356. }
  357. #endif
  358. default:
  359. _STARPU_ERROR("Oops : %u\n", handle->mf_node);
  360. }
  361. break;
  362. #ifdef STARPU_USE_CUDA
  363. case STARPU_CUDA_RAM:
  364. {
  365. struct starpu_multiformat_data_interface_ops *mf_ops;
  366. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  367. conversion_task->cl = mf_ops->cpu_to_cuda_cl;
  368. break;
  369. }
  370. #endif
  371. #ifdef STARPU_USE_OPENCL
  372. case STARPU_OPENCL_RAM:
  373. {
  374. struct starpu_multiformat_data_interface_ops *mf_ops;
  375. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  376. conversion_task->cl = mf_ops->cpu_to_opencl_cl;
  377. break;
  378. }
  379. #endif
  380. case STARPU_SPU_LS: /* Not supported */
  381. default:
  382. STARPU_ABORT();
  383. }
  384. conversion_task->cl->modes[0] = STARPU_RW;
  385. return conversion_task;
  386. }
  387. struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker *worker)
  388. {
  389. double max_time_on_ctx = starpu_get_max_time_worker_on_ctx();
  390. struct _starpu_sched_ctx *sched_ctx, *good_sched_ctx = NULL;
  391. int smallest_counter = worker->nsched_ctxs;
  392. unsigned i;
  393. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  394. {
  395. sched_ctx = worker->sched_ctx[i];
  396. if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS &&
  397. sched_ctx->pop_counter[worker->workerid] < worker->nsched_ctxs &&
  398. smallest_counter > sched_ctx->pop_counter[worker->workerid])
  399. {
  400. good_sched_ctx = sched_ctx;
  401. smallest_counter = sched_ctx->pop_counter[worker->workerid];
  402. }
  403. }
  404. if(good_sched_ctx == NULL)
  405. {
  406. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  407. {
  408. sched_ctx = worker->sched_ctx[i];
  409. if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  410. sched_ctx->pop_counter[worker->workerid] = 0;
  411. }
  412. return _get_next_sched_ctx_to_pop_into(worker);
  413. }
  414. return good_sched_ctx;
  415. }
  416. struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
  417. {
  418. struct starpu_task *task;
  419. int worker_id;
  420. unsigned node;
  421. /* We can't tell in advance which task will be picked up, so we measure
  422. * a timestamp, and will attribute it afterwards to the task. */
  423. int profiling = starpu_profiling_status_get();
  424. struct timespec pop_start_time;
  425. if (profiling)
  426. _starpu_clock_gettime(&pop_start_time);
  427. pick:
  428. _STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  429. /* perhaps there is some local task to be executed first */
  430. task = _starpu_pop_local_task(worker);
  431. _STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  432. /* get tasks from the stacks of the strategy */
  433. if(!task)
  434. {
  435. struct _starpu_sched_ctx *sched_ctx;
  436. pthread_mutex_t *sched_ctx_mutex;
  437. int been_here[STARPU_NMAX_SCHED_CTXS];
  438. int i;
  439. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  440. been_here[i] = 0;
  441. while(!task)
  442. {
  443. if(worker->nsched_ctxs == 1)
  444. sched_ctx = _starpu_get_initial_sched_ctx();
  445. else
  446. sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
  447. if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  448. {
  449. sched_ctx_mutex = _starpu_get_sched_mutex(sched_ctx, worker->workerid);
  450. if(sched_ctx_mutex != NULL)
  451. {
  452. _STARPU_PTHREAD_MUTEX_LOCK(sched_ctx_mutex);
  453. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
  454. task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
  455. _STARPU_PTHREAD_MUTEX_UNLOCK(sched_ctx_mutex);
  456. }
  457. }
  458. if((!task && sched_ctx->pop_counter[worker->workerid] == 0 && been_here[sched_ctx->id]) || worker->nsched_ctxs == 1)
  459. break;
  460. been_here[sched_ctx->id] = 1;
  461. sched_ctx->pop_counter[worker->workerid]++;
  462. }
  463. }
  464. #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
  465. struct _starpu_sched_ctx *sched_ctx = NULL;
  466. struct starpu_performance_counters *perf_counters = NULL;
  467. int j;
  468. for(j = 0; j < STARPU_NMAX_SCHED_CTXS; j++)
  469. {
  470. sched_ctx = worker->sched_ctx[j];
  471. if(sched_ctx != NULL && sched_ctx->id != 0)
  472. {
  473. perf_counters = sched_ctx->perf_counters;
  474. if(perf_counters != NULL && perf_counters->notify_idle_cycle && perf_counters->notify_idle_end)
  475. {
  476. if(!task)
  477. perf_counters->notify_idle_cycle(sched_ctx->id, worker->workerid, 1.0);
  478. else
  479. perf_counters->notify_idle_end(sched_ctx->id, worker->workerid);
  480. }
  481. }
  482. }
  483. #endif //STARPU_USE_SCHED_CTX_HYPERVISOR
  484. if (!task)
  485. goto profiling;
  486. /* Make sure we do not bother with all the multiformat-specific code if
  487. * it is not necessary. */
  488. if (!_starpu_task_uses_multiformat_handles(task))
  489. goto profiling;
  490. /* This is either a conversion task, or a regular task for which the
  491. * conversion tasks have already been created and submitted */
  492. if (task->mf_skip)
  493. goto profiling;
  494. worker_id = starpu_worker_get_id();
  495. if (!starpu_worker_can_execute_task(worker_id, task, 0))
  496. return task;
  497. node = starpu_worker_get_memory_node(worker_id);
  498. /*
  499. * We do have a task that uses multiformat handles. Let's create the
  500. * required conversion tasks.
  501. */
  502. unsigned i;
  503. for (i = 0; i < task->cl->nbuffers; i++)
  504. {
  505. struct starpu_task *conversion_task;
  506. starpu_data_handle_t handle;
  507. handle = task->handles[i];
  508. if (!_starpu_handle_needs_conversion_task(handle, node))
  509. continue;
  510. conversion_task = _starpu_create_conversion_task(handle, node);
  511. conversion_task->mf_skip = 1;
  512. conversion_task->execute_on_a_specific_worker = 1;
  513. conversion_task->workerid = worker_id;
  514. /*
  515. * Next tasks will need to know where these handles have gone.
  516. */
  517. handle->mf_node = node;
  518. _starpu_task_submit_conversion_task(conversion_task, worker_id);
  519. }
  520. task->mf_skip = 1;
  521. starpu_task_list_push_front(&worker->local_tasks, task);
  522. goto pick;
  523. profiling:
  524. if (profiling && task)
  525. {
  526. struct starpu_task_profiling_info *profiling_info;
  527. profiling_info = task->profiling_info;
  528. /* The task may have been created before profiling was enabled,
  529. * so we check if the profiling_info structure is available
  530. * even though we already tested if profiling is enabled. */
  531. if (profiling_info)
  532. {
  533. memcpy(&profiling_info->pop_start_time,
  534. &pop_start_time, sizeof(struct timespec));
  535. _starpu_clock_gettime(&profiling_info->pop_end_time);
  536. }
  537. }
  538. return task;
  539. }
  540. struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx)
  541. {
  542. STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
  543. /* TODO set profiling info */
  544. if(sched_ctx->sched_policy->pop_every_task)
  545. return sched_ctx->sched_policy->pop_every_task(sched_ctx->id);
  546. return NULL;
  547. }
  548. void _starpu_sched_pre_exec_hook(struct starpu_task *task)
  549. {
  550. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  551. if (sched_ctx->sched_policy->pre_exec_hook)
  552. sched_ctx->sched_policy->pre_exec_hook(task);
  553. }
  554. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  555. {
  556. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  557. #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
  558. if(task->hypervisor_tag > 0 && sched_ctx != NULL &&
  559. sched_ctx->id != 0 && sched_ctx->perf_counters != NULL)
  560. sched_ctx->perf_counters->notify_post_exec_hook(sched_ctx->id, task->hypervisor_tag);
  561. #endif //STARPU_USE_SCHED_CTX_HYPERVISOR
  562. if (sched_ctx->sched_policy->post_exec_hook)
  563. sched_ctx->sched_policy->post_exec_hook(task);
  564. }
  565. void _starpu_wait_on_sched_event(void)
  566. {
  567. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  568. _STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  569. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  570. if (_starpu_machine_is_running())
  571. {
  572. #ifndef STARPU_NON_BLOCKING_DRIVERS
  573. _STARPU_PTHREAD_COND_WAIT(worker->sched_cond,
  574. worker->sched_mutex);
  575. #endif
  576. }
  577. _STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  578. }
  579. /* The scheduling policy may put tasks directly into a worker's local queue so
  580. * that it is not always necessary to create its own queue when the local queue
  581. * is sufficient. If "back" not null, the task is put at the back of the queue
  582. * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  583. * a FIFO ordering. */
  584. int starpu_push_local_task(int workerid, struct starpu_task *task, int back)
  585. {
  586. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  587. int ret = _starpu_push_local_task(worker, task, back);
  588. task->scheduled = 1;
  589. return ret;
  590. }