sched_policy.c 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2013 Université de Bordeaux 1
  4. * Copyright (C) 2010-2013 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <starpu.h>
  19. #include <common/config.h>
  20. #include <common/utils.h>
  21. #include <core/sched_policy.h>
  22. #include <profiling/profiling.h>
  23. #include <common/barrier.h>
  24. #include <core/debug.h>
  25. static int use_prefetch = 0;
  26. int starpu_get_prefetch_flag(void)
  27. {
  28. return use_prefetch;
  29. }
  30. static struct starpu_sched_policy *predefined_policies[] =
  31. {
  32. #ifdef STARPU_HAVE_HWLOC
  33. &_starpu_sched_tree_heft_hierarchical_policy,
  34. #endif
  35. &_starpu_sched_tree_eager_policy,
  36. &_starpu_sched_tree_random_policy,
  37. &_starpu_sched_tree_ws_policy,
  38. &_starpu_sched_tree_heft_policy,
  39. &_starpu_sched_eager_policy,
  40. &_starpu_sched_prio_policy,
  41. &_starpu_sched_random_policy,
  42. &_starpu_sched_ws_policy,
  43. &_starpu_sched_dm_policy,
  44. &_starpu_sched_dmda_policy,
  45. &_starpu_sched_dmda_ready_policy,
  46. &_starpu_sched_dmda_sorted_policy,
  47. &_starpu_sched_parallel_heft_policy,
  48. &_starpu_sched_peager_policy,
  49. NULL
  50. };
  51. struct starpu_sched_policy **starpu_sched_get_predefined_policies()
  52. {
  53. return predefined_policies;
  54. }
  55. struct starpu_sched_policy *_starpu_get_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  56. {
  57. return sched_ctx->sched_policy;
  58. }
  59. /*
  60. * Methods to initialize the scheduling policy
  61. */
  62. static void load_sched_policy(struct starpu_sched_policy *sched_policy, struct _starpu_sched_ctx *sched_ctx)
  63. {
  64. STARPU_ASSERT(sched_policy);
  65. #ifdef STARPU_VERBOSE
  66. if (sched_policy->policy_name)
  67. {
  68. if (sched_policy->policy_description)
  69. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  70. else
  71. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  72. }
  73. #endif
  74. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  75. memcpy(policy, sched_policy, sizeof(*policy));
  76. }
  77. static struct starpu_sched_policy *find_sched_policy_from_name(const char *policy_name)
  78. {
  79. if (!policy_name)
  80. return NULL;
  81. if (strncmp(policy_name, "heft", 5) == 0)
  82. {
  83. _STARPU_DISP("Warning: heft is now called \"dmda\".\n");
  84. return &_starpu_sched_dmda_policy;
  85. }
  86. struct starpu_sched_policy **policy;
  87. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  88. {
  89. struct starpu_sched_policy *p = *policy;
  90. if (p->policy_name)
  91. {
  92. if (strcmp(policy_name, p->policy_name) == 0)
  93. {
  94. /* we found a policy with the requested name */
  95. return p;
  96. }
  97. }
  98. }
  99. fprintf(stderr, "Warning: scheduling policy \"%s\" was not found, try \"help\" to get a list\n", policy_name);
  100. /* nothing was found */
  101. return NULL;
  102. }
  103. static void display_sched_help_message(void)
  104. {
  105. const char *sched_env = getenv("STARPU_SCHED");
  106. if (sched_env && (strcmp(sched_env, "help") == 0))
  107. {
  108. /* display the description of all predefined policies */
  109. struct starpu_sched_policy **policy;
  110. fprintf(stderr, "STARPU_SCHED can be either of\n");
  111. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  112. {
  113. struct starpu_sched_policy *p = *policy;
  114. fprintf(stderr, "%s\t-> %s\n", p->policy_name, p->policy_description);
  115. }
  116. }
  117. }
  118. struct starpu_sched_policy *_starpu_select_sched_policy(struct _starpu_machine_config *config, const char *required_policy)
  119. {
  120. struct starpu_sched_policy *selected_policy = NULL;
  121. struct starpu_conf *user_conf = config->conf;
  122. if(required_policy)
  123. selected_policy = find_sched_policy_from_name(required_policy);
  124. /* First, we check whether the application explicitely gave a scheduling policy or not */
  125. if (!selected_policy && user_conf && (user_conf->sched_policy))
  126. return user_conf->sched_policy;
  127. /* Otherwise, we look if the application specified the name of a policy to load */
  128. const char *sched_pol_name;
  129. sched_pol_name = getenv("STARPU_SCHED");
  130. if (sched_pol_name == NULL && user_conf && user_conf->sched_policy_name)
  131. sched_pol_name = user_conf->sched_policy_name;
  132. if (!selected_policy && sched_pol_name)
  133. selected_policy = find_sched_policy_from_name(sched_pol_name);
  134. /* Perhaps there was no policy that matched the name */
  135. if (selected_policy)
  136. return selected_policy;
  137. /* If no policy was specified, we use the greedy policy as a default */
  138. return &_starpu_sched_eager_policy;
  139. }
  140. void _starpu_init_sched_policy(struct _starpu_machine_config *config, struct _starpu_sched_ctx *sched_ctx, struct starpu_sched_policy *selected_policy)
  141. {
  142. /* Perhaps we have to display some help */
  143. display_sched_help_message();
  144. /* Prefetch is activated by default */
  145. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  146. if (use_prefetch == -1)
  147. use_prefetch = 1;
  148. /* Set calibrate flag */
  149. _starpu_set_calibrate_flag(config->conf->calibrate);
  150. load_sched_policy(selected_policy, sched_ctx);
  151. sched_ctx->sched_policy->init_sched(sched_ctx->id);
  152. }
  153. void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  154. {
  155. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  156. if (policy->deinit_sched)
  157. policy->deinit_sched(sched_ctx->id);
  158. }
  159. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  160. * case workerid identifies a combined worker, a task will be enqueued into
  161. * each worker of the combination. */
  162. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  163. {
  164. int nbasic_workers = (int)starpu_worker_get_count();
  165. /* Is this a basic worker or a combined worker ? */
  166. int is_basic_worker = (workerid < nbasic_workers);
  167. unsigned memory_node;
  168. struct _starpu_worker *worker = NULL;
  169. struct _starpu_combined_worker *combined_worker = NULL;
  170. if (is_basic_worker)
  171. {
  172. worker = _starpu_get_worker_struct(workerid);
  173. memory_node = worker->memory_node;
  174. }
  175. else
  176. {
  177. combined_worker = _starpu_get_combined_worker_struct(workerid);
  178. memory_node = combined_worker->memory_node;
  179. }
  180. if (use_prefetch)
  181. starpu_prefetch_task_input_on_node(task, memory_node);
  182. /* if we push a task on a specific worker, notify all the sched_ctxs the worker belongs to */
  183. unsigned i;
  184. struct _starpu_sched_ctx *sched_ctx;
  185. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  186. {
  187. sched_ctx = worker->sched_ctx[i];
  188. if (sched_ctx != NULL && sched_ctx->sched_policy != NULL && sched_ctx->sched_policy->push_task_notify)
  189. sched_ctx->sched_policy->push_task_notify(task, workerid, sched_ctx->id);
  190. }
  191. #ifdef STARPU_USE_SC_HYPERVISOR
  192. starpu_sched_ctx_call_pushed_task_cb(workerid, task->sched_ctx);
  193. #endif //STARPU_USE_SC_HYPERVISOR
  194. if (is_basic_worker)
  195. {
  196. unsigned node = starpu_worker_get_memory_node(workerid);
  197. if (_starpu_task_uses_multiformat_handles(task))
  198. {
  199. for (i = 0; i < task->cl->nbuffers; i++)
  200. {
  201. struct starpu_task *conversion_task;
  202. starpu_data_handle_t handle;
  203. handle = STARPU_TASK_GET_HANDLE(task, i);
  204. if (!_starpu_handle_needs_conversion_task(handle, node))
  205. continue;
  206. conversion_task = _starpu_create_conversion_task(handle, node);
  207. conversion_task->mf_skip = 1;
  208. conversion_task->execute_on_a_specific_worker = 1;
  209. conversion_task->workerid = workerid;
  210. _starpu_task_submit_conversion_task(conversion_task, workerid);
  211. //_STARPU_DEBUG("Pushing a conversion task\n");
  212. }
  213. for (i = 0; i < task->cl->nbuffers; i++)
  214. {
  215. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  216. handle->mf_node = node;
  217. }
  218. }
  219. // if(task->sched_ctx != _starpu_get_initial_sched_ctx()->id)
  220. if(task->priority > 0)
  221. return _starpu_push_local_task(worker, task, 1);
  222. else
  223. return _starpu_push_local_task(worker, task, 0);
  224. }
  225. else
  226. {
  227. /* This is a combined worker so we create task aliases */
  228. int worker_size = combined_worker->worker_size;
  229. int *combined_workerid = combined_worker->combined_workerid;
  230. int ret = 0;
  231. struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
  232. job->task_size = worker_size;
  233. job->combined_workerid = workerid;
  234. job->active_task_alias_count = 0;
  235. STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, worker_size);
  236. STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, worker_size);
  237. /* Note: we have to call that early, or else the task may have
  238. * disappeared already */
  239. starpu_push_task_end(task);
  240. int j;
  241. for (j = 0; j < worker_size; j++)
  242. {
  243. struct starpu_task *alias = starpu_task_dup(task);
  244. worker = _starpu_get_worker_struct(combined_workerid[j]);
  245. ret |= _starpu_push_local_task(worker, alias, 0);
  246. }
  247. return ret;
  248. }
  249. }
  250. static int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _starpu_sched_ctx *sched_ctx)
  251. {
  252. unsigned worker = 0, nworkers = 0;
  253. struct starpu_worker_collection *workers = sched_ctx->workers;
  254. struct starpu_sched_ctx_iterator it;
  255. if(workers->init_iterator)
  256. workers->init_iterator(workers, &it);
  257. while(workers->has_next(workers, &it))
  258. {
  259. worker = workers->get_next(workers, &it);
  260. if (starpu_worker_can_execute_task(worker, task, 0) && starpu_sched_ctx_is_ctxs_turn(worker, sched_ctx->id))
  261. nworkers++;
  262. }
  263. return nworkers;
  264. }
  265. /* the generic interface that call the proper underlying implementation */
  266. int _starpu_push_task(struct _starpu_job *j)
  267. {
  268. struct starpu_task *task = j->task;
  269. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  270. unsigned nworkers = 0;
  271. int ret;
  272. _STARPU_LOG_IN();
  273. _STARPU_TRACE_JOB_PUSH(task, task->priority > 0);
  274. _starpu_increment_nready_tasks();
  275. task->status = STARPU_TASK_READY;
  276. #ifdef HAVE_AYUDAME_H
  277. if (AYU_event)
  278. {
  279. int id = -1;
  280. AYU_event(AYU_ADDTASKTOQUEUE, j->job_id, &id);
  281. }
  282. #endif
  283. /* if the context does not have any workers save the tasks in a temp list */
  284. if(!sched_ctx->is_initial_sched)
  285. {
  286. /*if there are workers in the ctx that are not able to execute tasks
  287. we consider the ctx empty */
  288. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  289. if(nworkers == 0)
  290. {
  291. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  292. starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
  293. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  294. return 0;
  295. }
  296. }
  297. /* in case there is no codelet associated to the task (that's a control
  298. * task), we directly execute its callback and enforce the
  299. * corresponding dependencies */
  300. if (task->cl == NULL)
  301. {
  302. _starpu_handle_job_termination(j);
  303. _STARPU_LOG_OUT_TAG("handle_job_termination");
  304. return 0;
  305. }
  306. ret = _starpu_push_task_to_workers(task);
  307. if (ret == -EAGAIN)
  308. /* pushed to empty context, that's fine */
  309. ret = 0;
  310. return ret;
  311. }
  312. int _starpu_push_task_to_workers(struct starpu_task *task)
  313. {
  314. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  315. unsigned nworkers = 0;
  316. /* if the contexts still does not have workers put the task back to its place in
  317. the empty ctx list */
  318. if(!sched_ctx->is_initial_sched)
  319. {
  320. /*if there are workers in the ctx that are not able to execute tasks
  321. we consider the ctx empty */
  322. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  323. if (nworkers == 0)
  324. {
  325. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  326. starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task);
  327. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  328. return -EAGAIN;
  329. }
  330. }
  331. _starpu_profiling_set_task_push_start_time(task);
  332. int ret;
  333. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  334. {
  335. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  336. }
  337. else
  338. {
  339. STARPU_ASSERT(sched_ctx->sched_policy->push_task);
  340. /* check out if there are any workers in the context */
  341. starpu_pthread_mutex_t *changing_ctx_mutex = _starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx->id);
  342. STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
  343. nworkers = starpu_sched_ctx_get_nworkers(sched_ctx->id);
  344. ret = nworkers == 0 ? -1 : sched_ctx->sched_policy->push_task(task);
  345. STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
  346. if(ret == -1)
  347. {
  348. fprintf(stderr, "repush task \n");
  349. _STARPU_TRACE_JOB_POP(task, task->priority > 0);
  350. ret = _starpu_push_task_to_workers(task);
  351. }
  352. }
  353. /* Note: from here, the task might have been destroyed already! */
  354. _STARPU_LOG_OUT();
  355. return ret;
  356. }
  357. /* This is called right after the scheduler has pushed a task to a queue
  358. * but just before releasing mutexes: we need the task to still be alive!
  359. */
  360. int starpu_push_task_end(struct starpu_task *task)
  361. {
  362. _starpu_profiling_set_task_push_end_time(task);
  363. task->scheduled = 1;
  364. return 0;
  365. }
  366. /*
  367. * Given a handle that needs to be converted in order to be used on the given
  368. * node, returns a task that takes care of the conversion.
  369. */
  370. struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle,
  371. unsigned int node)
  372. {
  373. return _starpu_create_conversion_task_for_arch(handle, starpu_node_get_kind(node));
  374. }
  375. struct starpu_task *_starpu_create_conversion_task_for_arch(starpu_data_handle_t handle,
  376. enum starpu_node_kind node_kind)
  377. {
  378. struct starpu_task *conversion_task;
  379. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID)
  380. struct starpu_multiformat_interface *format_interface;
  381. #endif
  382. conversion_task = starpu_task_create();
  383. conversion_task->synchronous = 0;
  384. STARPU_TASK_SET_HANDLE(conversion_task, handle, 0);
  385. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID)
  386. /* The node does not really matter here */
  387. format_interface = (struct starpu_multiformat_interface *) starpu_data_get_interface_on_node(handle, 0);
  388. #endif
  389. _starpu_spin_lock(&handle->header_lock);
  390. handle->refcnt++;
  391. handle->busy_count++;
  392. _starpu_spin_unlock(&handle->header_lock);
  393. switch(node_kind)
  394. {
  395. case STARPU_CPU_RAM:
  396. case STARPU_SCC_RAM:
  397. case STARPU_SCC_SHM:
  398. switch (starpu_node_get_kind(handle->mf_node))
  399. {
  400. case STARPU_CPU_RAM:
  401. case STARPU_SCC_RAM:
  402. case STARPU_SCC_SHM:
  403. STARPU_ABORT();
  404. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  405. case STARPU_CUDA_RAM:
  406. {
  407. struct starpu_multiformat_data_interface_ops *mf_ops;
  408. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  409. conversion_task->cl = mf_ops->cuda_to_cpu_cl;
  410. break;
  411. }
  412. #endif
  413. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  414. case STARPU_OPENCL_RAM:
  415. {
  416. struct starpu_multiformat_data_interface_ops *mf_ops;
  417. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  418. conversion_task->cl = mf_ops->opencl_to_cpu_cl;
  419. break;
  420. }
  421. #endif
  422. #ifdef STARPU_USE_MIC
  423. case STARPU_MIC_RAM:
  424. {
  425. struct starpu_multiformat_data_interface_ops *mf_ops;
  426. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  427. conversion_task->cl = mf_ops->mic_to_cpu_cl;
  428. break;
  429. }
  430. #endif
  431. default:
  432. _STARPU_ERROR("Oops : %u\n", handle->mf_node);
  433. }
  434. break;
  435. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  436. case STARPU_CUDA_RAM:
  437. {
  438. struct starpu_multiformat_data_interface_ops *mf_ops;
  439. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  440. conversion_task->cl = mf_ops->cpu_to_cuda_cl;
  441. break;
  442. }
  443. #endif
  444. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  445. case STARPU_OPENCL_RAM:
  446. {
  447. struct starpu_multiformat_data_interface_ops *mf_ops;
  448. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  449. conversion_task->cl = mf_ops->cpu_to_opencl_cl;
  450. break;
  451. }
  452. #endif
  453. #ifdef STARPU_USE_MIC
  454. case STARPU_MIC_RAM:
  455. {
  456. struct starpu_multiformat_data_interface_ops *mf_ops;
  457. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  458. conversion_task->cl = mf_ops->cpu_to_mic_cl;
  459. break;
  460. }
  461. #endif
  462. default:
  463. STARPU_ABORT();
  464. }
  465. STARPU_CODELET_SET_MODE(conversion_task->cl, STARPU_RW, 0);
  466. return conversion_task;
  467. }
  468. struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker *worker)
  469. {
  470. while(1)
  471. {
  472. struct _starpu_sched_ctx *sched_ctx, *good_sched_ctx = NULL;
  473. unsigned smallest_counter = worker->nsched_ctxs;
  474. unsigned i;
  475. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  476. {
  477. sched_ctx = worker->sched_ctx[i];
  478. if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS && worker->removed_from_ctx[sched_ctx->id])
  479. return sched_ctx;
  480. if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS &&
  481. sched_ctx->pop_counter[worker->workerid] < worker->nsched_ctxs &&
  482. smallest_counter > sched_ctx->pop_counter[worker->workerid])
  483. {
  484. good_sched_ctx = sched_ctx;
  485. smallest_counter = sched_ctx->pop_counter[worker->workerid];
  486. }
  487. }
  488. if(good_sched_ctx == NULL)
  489. {
  490. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  491. {
  492. sched_ctx = worker->sched_ctx[i];
  493. if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  494. sched_ctx->pop_counter[worker->workerid] = 0;
  495. }
  496. continue;
  497. }
  498. return good_sched_ctx;
  499. }
  500. }
  501. struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
  502. {
  503. struct starpu_task *task;
  504. int worker_id;
  505. unsigned node;
  506. /* We can't tell in advance which task will be picked up, so we measure
  507. * a timestamp, and will attribute it afterwards to the task. */
  508. int profiling = starpu_profiling_status_get();
  509. struct timespec pop_start_time;
  510. if (profiling)
  511. _starpu_clock_gettime(&pop_start_time);
  512. pick:
  513. /* perhaps there is some local task to be executed first */
  514. task = _starpu_pop_local_task(worker);
  515. /* get tasks from the stacks of the strategy */
  516. if(!task)
  517. {
  518. struct _starpu_sched_ctx *sched_ctx;
  519. //unsigned lucky_ctx = STARPU_NMAX_SCHED_CTXS;
  520. int been_here[STARPU_NMAX_SCHED_CTXS];
  521. int i;
  522. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  523. been_here[i] = 0;
  524. while(!task)
  525. {
  526. if(worker->nsched_ctxs == 1)
  527. sched_ctx = _starpu_get_initial_sched_ctx();
  528. else
  529. sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
  530. if(sched_ctx != NULL && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  531. {
  532. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
  533. {
  534. task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
  535. //lucky_ctx = sched_ctx->id;
  536. }
  537. }
  538. if(!task && worker->removed_from_ctx[sched_ctx->id])
  539. {
  540. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  541. worker->removed_from_ctx[sched_ctx->id] = 0;
  542. }
  543. if((!task && sched_ctx->pop_counter[worker->workerid] == 0 && been_here[sched_ctx->id]) || worker->nsched_ctxs == 1)
  544. break;
  545. been_here[sched_ctx->id] = 1;
  546. sched_ctx->pop_counter[worker->workerid]++;
  547. }
  548. }
  549. if (!task)
  550. return NULL;
  551. /* Make sure we do not bother with all the multiformat-specific code if
  552. * it is not necessary. */
  553. if (!_starpu_task_uses_multiformat_handles(task))
  554. goto profiling;
  555. /* This is either a conversion task, or a regular task for which the
  556. * conversion tasks have already been created and submitted */
  557. if (task->mf_skip)
  558. goto profiling;
  559. worker_id = starpu_worker_get_id();
  560. if (!starpu_worker_can_execute_task(worker_id, task, 0))
  561. return task;
  562. node = starpu_worker_get_memory_node(worker_id);
  563. /*
  564. * We do have a task that uses multiformat handles. Let's create the
  565. * required conversion tasks.
  566. */
  567. unsigned i;
  568. for (i = 0; i < task->cl->nbuffers; i++)
  569. {
  570. struct starpu_task *conversion_task;
  571. starpu_data_handle_t handle;
  572. handle = STARPU_TASK_GET_HANDLE(task, i);
  573. if (!_starpu_handle_needs_conversion_task(handle, node))
  574. continue;
  575. conversion_task = _starpu_create_conversion_task(handle, node);
  576. conversion_task->mf_skip = 1;
  577. conversion_task->execute_on_a_specific_worker = 1;
  578. conversion_task->workerid = worker_id;
  579. /*
  580. * Next tasks will need to know where these handles have gone.
  581. */
  582. handle->mf_node = node;
  583. _starpu_task_submit_conversion_task(conversion_task, worker_id);
  584. }
  585. task->mf_skip = 1;
  586. starpu_task_list_push_back(&worker->local_tasks, task);
  587. goto pick;
  588. profiling:
  589. if (profiling)
  590. {
  591. struct starpu_profiling_task_info *profiling_info;
  592. profiling_info = task->profiling_info;
  593. /* The task may have been created before profiling was enabled,
  594. * so we check if the profiling_info structure is available
  595. * even though we already tested if profiling is enabled. */
  596. if (profiling_info)
  597. {
  598. memcpy(&profiling_info->pop_start_time,
  599. &pop_start_time, sizeof(struct timespec));
  600. _starpu_clock_gettime(&profiling_info->pop_end_time);
  601. }
  602. }
  603. return task;
  604. }
  605. struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx)
  606. {
  607. STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
  608. /* TODO set profiling info */
  609. if(sched_ctx->sched_policy->pop_every_task)
  610. return sched_ctx->sched_policy->pop_every_task(sched_ctx->id);
  611. return NULL;
  612. }
  613. void _starpu_sched_pre_exec_hook(struct starpu_task *task)
  614. {
  615. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  616. if (sched_ctx->sched_policy->pre_exec_hook)
  617. sched_ctx->sched_policy->pre_exec_hook(task);
  618. }
  619. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  620. {
  621. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  622. #ifdef STARPU_USE_SC_HYPERVISOR
  623. if(task->hypervisor_tag > 0 && sched_ctx != NULL &&
  624. sched_ctx->id != 0 && sched_ctx->perf_counters != NULL)
  625. sched_ctx->perf_counters->notify_post_exec_hook(sched_ctx->id, task->hypervisor_tag);
  626. #endif //STARPU_USE_SC_HYPERVISOR
  627. if (sched_ctx->sched_policy->post_exec_hook)
  628. sched_ctx->sched_policy->post_exec_hook(task);
  629. }
  630. void _starpu_wait_on_sched_event(void)
  631. {
  632. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  633. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  634. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  635. if (_starpu_machine_is_running())
  636. {
  637. #ifndef STARPU_NON_BLOCKING_DRIVERS
  638. STARPU_PTHREAD_COND_WAIT(&worker->sched_cond,
  639. &worker->sched_mutex);
  640. #endif
  641. }
  642. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  643. }
  644. /* The scheduling policy may put tasks directly into a worker's local queue so
  645. * that it is not always necessary to create its own queue when the local queue
  646. * is sufficient. If "back" not null, the task is put at the back of the queue
  647. * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  648. * a FIFO ordering. */
  649. int starpu_push_local_task(int workerid, struct starpu_task *task, int prio)
  650. {
  651. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  652. return _starpu_push_local_task(worker, task, prio);
  653. }