sched_policy.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010 Université de Bordeaux 1
  4. * Copyright (C) 2010 Centre National de la Recherche Scientifique
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <pthread.h>
  18. #include <starpu.h>
  19. #include <common/config.h>
  20. #include <common/utils.h>
  21. #include <core/sched_policy.h>
  22. #include <profiling/profiling.h>
  23. //static struct starpu_sched_policy_s policy;
  24. static int use_prefetch = 0;
  25. int starpu_get_prefetch_flag(void)
  26. {
  27. return use_prefetch;
  28. }
  29. /*
  30. * Predefined policies
  31. */
  32. extern struct starpu_sched_policy_s _starpu_sched_ws_policy;
  33. extern struct starpu_sched_policy_s _starpu_sched_prio_policy;
  34. extern struct starpu_sched_policy_s _starpu_sched_no_prio_policy;
  35. extern struct starpu_sched_policy_s _starpu_sched_random_policy;
  36. extern struct starpu_sched_policy_s _starpu_sched_dm_policy;
  37. extern struct starpu_sched_policy_s _starpu_sched_dmda_policy;
  38. extern struct starpu_sched_policy_s _starpu_sched_dmda_ready_policy;
  39. extern struct starpu_sched_policy_s _starpu_sched_dmda_sorted_policy;
  40. extern struct starpu_sched_policy_s _starpu_sched_eager_policy;
  41. extern struct starpu_sched_policy_s _starpu_sched_parallel_heft_policy;
  42. extern struct starpu_sched_policy_s _starpu_sched_pgreedy_policy;
  43. extern struct starpu_sched_policy_s heft_policy;
  44. #define NPREDEFINED_POLICIES 12
  45. static struct starpu_sched_policy_s *predefined_policies[NPREDEFINED_POLICIES] = {
  46. &_starpu_sched_ws_policy,
  47. &_starpu_sched_prio_policy,
  48. &_starpu_sched_no_prio_policy,
  49. &_starpu_sched_dm_policy,
  50. &_starpu_sched_dmda_policy,
  51. &heft_policy,
  52. &_starpu_sched_dmda_ready_policy,
  53. &_starpu_sched_dmda_sorted_policy,
  54. &_starpu_sched_random_policy,
  55. &_starpu_sched_eager_policy,
  56. &_starpu_sched_parallel_heft_policy,
  57. &_starpu_sched_pgreedy_policy
  58. };
  59. struct starpu_sched_policy_s *_starpu_get_sched_policy(struct starpu_sched_ctx *sched_ctx)
  60. {
  61. return sched_ctx->sched_policy;
  62. }
  63. /*
  64. * Methods to initialize the scheduling policy
  65. */
  66. static void load_sched_policy(struct starpu_sched_policy_s *sched_policy, struct starpu_sched_ctx *sched_ctx)
  67. {
  68. STARPU_ASSERT(sched_policy);
  69. #ifdef STARPU_VERBOSE
  70. if (sched_policy->policy_name)
  71. {
  72. if (sched_policy->policy_description)
  73. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  74. else
  75. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  76. }
  77. #endif
  78. struct starpu_sched_policy_s *policy = sched_ctx->sched_policy;
  79. policy->init_sched = sched_policy->init_sched;
  80. policy->deinit_sched = sched_policy->deinit_sched;
  81. policy->push_task = sched_policy->push_task;
  82. policy->push_prio_task = sched_policy->push_prio_task;
  83. policy->pop_task = sched_policy->pop_task;
  84. policy->post_exec_hook = sched_policy->post_exec_hook;
  85. policy->pop_every_task = sched_policy->pop_every_task;
  86. policy->push_task_notify = sched_policy->push_task_notify;
  87. policy->policy_name = sched_policy->policy_name;
  88. }
  89. static struct starpu_sched_policy_s *find_sched_policy_from_name(const char *policy_name)
  90. {
  91. if (!policy_name)
  92. return NULL;
  93. unsigned i;
  94. for (i = 0; i < NPREDEFINED_POLICIES; i++)
  95. {
  96. struct starpu_sched_policy_s *p;
  97. p = predefined_policies[i];
  98. if (p->policy_name)
  99. {
  100. if (strcmp(policy_name, p->policy_name) == 0) {
  101. /* we found a policy with the requested name */
  102. return p;
  103. }
  104. }
  105. }
  106. /* nothing was found */
  107. return NULL;
  108. }
  109. static void display_sched_help_message(void)
  110. {
  111. const char *sched_env = getenv("STARPU_SCHED");
  112. if (sched_env && (strcmp(sched_env, "help") == 0)) {
  113. fprintf(stderr, "STARPU_SCHED can be either of\n");
  114. /* display the description of all predefined policies */
  115. unsigned i;
  116. for (i = 0; i < NPREDEFINED_POLICIES; i++)
  117. {
  118. struct starpu_sched_policy_s *p;
  119. p = predefined_policies[i];
  120. fprintf(stderr, "%s\t-> %s\n", p->policy_name, p->policy_description);
  121. }
  122. }
  123. }
  124. static struct starpu_sched_policy_s *select_sched_policy(struct starpu_machine_config_s *config, const char *policy_name)
  125. {
  126. struct starpu_sched_policy_s *selected_policy = NULL;
  127. struct starpu_conf *user_conf = config->user_conf;
  128. /* First, we check whether the application explicitely gave a scheduling policy or not */
  129. if (user_conf && (user_conf->sched_policy))
  130. return user_conf->sched_policy;
  131. /* Otherwise, we look if the application specified the name of a policy to load */
  132. const char *sched_pol_name;
  133. if (user_conf && (user_conf->sched_policy_name))
  134. {
  135. sched_pol_name = user_conf->sched_policy_name;
  136. }
  137. else {
  138. sched_pol_name = getenv("STARPU_SCHED");
  139. }
  140. if (sched_pol_name)
  141. selected_policy = find_sched_policy_from_name(sched_pol_name);
  142. else
  143. if(policy_name)
  144. selected_policy = find_sched_policy_from_name(policy_name);
  145. /* Perhaps there was no policy that matched the name */
  146. if (selected_policy)
  147. return selected_policy;
  148. /* If no policy was specified, we use the greedy policy as a default */
  149. // return &_starpu_sched_eager_policy;
  150. return &heft_policy;
  151. }
  152. void _starpu_init_sched_policy(struct starpu_machine_config_s *config, struct starpu_sched_ctx *sched_ctx, const char *policy_name)
  153. {
  154. /* Perhaps we have to display some help */
  155. display_sched_help_message();
  156. /* Prefetch is activated by default */
  157. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  158. if (use_prefetch == -1)
  159. use_prefetch = 1;
  160. /* By default, we don't calibrate */
  161. unsigned do_calibrate = 0;
  162. if (config->user_conf && (config->user_conf->calibrate != -1))
  163. {
  164. do_calibrate = config->user_conf->calibrate;
  165. }
  166. else {
  167. int res = starpu_get_env_number("STARPU_CALIBRATE");
  168. do_calibrate = (res < 0)?0:(unsigned)res;
  169. }
  170. _starpu_set_calibrate_flag(do_calibrate);
  171. struct starpu_sched_policy_s *selected_policy;
  172. selected_policy = select_sched_policy(config, policy_name);
  173. load_sched_policy(selected_policy, sched_ctx);
  174. sched_ctx->sched_policy->init_sched(sched_ctx);
  175. }
  176. void _starpu_deinit_sched_policy(struct starpu_machine_config_s *config, struct starpu_sched_ctx *sched_ctx)
  177. {
  178. struct starpu_sched_policy_s *policy = sched_ctx->sched_policy;
  179. if (policy->deinit_sched)
  180. policy->deinit_sched(sched_ctx);
  181. }
  182. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  183. * case workerid identifies a combined worker, a task will be enqueued into
  184. * each worker of the combination. */
  185. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  186. {
  187. int nbasic_workers = (int)starpu_worker_get_count();
  188. /* Is this a basic worker or a combined worker ? */
  189. int is_basic_worker = (workerid < nbasic_workers);
  190. unsigned memory_node;
  191. struct starpu_worker_s *worker;
  192. struct starpu_combined_worker_s *combined_worker;
  193. if (is_basic_worker)
  194. {
  195. worker = _starpu_get_worker_struct(workerid);
  196. memory_node = worker->memory_node;
  197. }
  198. else
  199. {
  200. combined_worker = _starpu_get_combined_worker_struct(workerid);
  201. memory_node = combined_worker->memory_node;
  202. }
  203. if (use_prefetch)
  204. starpu_prefetch_task_input_on_node(task, memory_node);
  205. unsigned i;
  206. for(i = 0; i < worker->nctxs; i++){
  207. if (worker->sched_ctx[i]->sched_policy->push_task_notify){
  208. worker->sched_ctx[i]->sched_policy->push_task_notify(task, workerid);
  209. }
  210. }
  211. if (is_basic_worker)
  212. {
  213. return _starpu_push_local_task(worker, task, 0);
  214. }
  215. else {
  216. /* This is a combined worker so we create task aliases */
  217. int worker_size = combined_worker->worker_size;
  218. int *combined_workerid = combined_worker->combined_workerid;
  219. int ret = 0;
  220. int i;
  221. starpu_job_t j = _starpu_get_job_associated_to_task(task);
  222. j->task_size = worker_size;
  223. j->combined_workerid = workerid;
  224. j->active_task_alias_count = 0;
  225. PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
  226. PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
  227. for (i = 0; i < worker_size; i++)
  228. {
  229. struct starpu_task *alias = _starpu_create_task_alias(task);
  230. worker = _starpu_get_worker_struct(combined_workerid[i]);
  231. ret |= _starpu_push_local_task(worker, alias, 0);
  232. }
  233. return ret;
  234. }
  235. }
  236. /* the generic interface that call the proper underlying implementation */
  237. int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
  238. {
  239. struct starpu_task *task = j->task;
  240. _STARPU_LOG_IN();
  241. task->status = STARPU_TASK_READY;
  242. _starpu_profiling_set_task_push_start_time(task);
  243. /* in case there is no codelet associated to the task (that's a control
  244. * task), we directly execute its callback and enforce the
  245. * corresponding dependencies */
  246. if (task->cl == NULL)
  247. {
  248. _starpu_handle_job_termination(j, job_is_already_locked);
  249. _STARPU_LOG_OUT_TAG("handle_job_termination");
  250. return 0;
  251. }
  252. int ret;
  253. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  254. {
  255. _starpu_increment_nsubmitted_tasks_of_worker(task->workerid);
  256. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  257. }
  258. else {
  259. struct starpu_sched_ctx *sched_ctx = task->sched_ctx;
  260. STARPU_ASSERT(sched_ctx->sched_policy->push_task);
  261. ret = sched_ctx->sched_policy->push_task(task, sched_ctx);
  262. }
  263. _starpu_profiling_set_task_push_end_time(task);
  264. _STARPU_LOG_OUT();
  265. return ret;
  266. }
  267. struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker)
  268. {
  269. struct starpu_task *task;
  270. /* We can't tell in advance which task will be picked up, so we measure
  271. * a timestamp, and will attribute it afterwards to the task. */
  272. int profiling = starpu_profiling_status_get();
  273. struct timespec pop_start_time;
  274. if (profiling)
  275. starpu_clock_gettime(&pop_start_time);
  276. /* perhaps there is some local task to be executed first */
  277. task = _starpu_pop_local_task(worker);
  278. if(!task)
  279. {
  280. struct starpu_sched_ctx *sched_ctx;
  281. unsigned i;
  282. for(i = 0; i < worker->nctxs; i++)
  283. {
  284. sched_ctx = worker->sched_ctx[i];
  285. if (sched_ctx->sched_policy->pop_task)
  286. {
  287. task = sched_ctx->sched_policy->pop_task();
  288. break;
  289. }
  290. }
  291. }
  292. /* Note that we may get a NULL task in case the scheduler was unlocked
  293. * for some reason. */
  294. if (profiling && task)
  295. {
  296. struct starpu_task_profiling_info *profiling_info;
  297. profiling_info = task->profiling_info;
  298. /* The task may have been created before profiling was enabled,
  299. * so we check if the profiling_info structure is available
  300. * even though we already tested if profiling is enabled. */
  301. if (profiling_info)
  302. {
  303. memcpy(&profiling_info->pop_start_time,
  304. &pop_start_time, sizeof(struct timespec));
  305. starpu_clock_gettime(&profiling_info->pop_end_time);
  306. }
  307. }
  308. return task;
  309. }
  310. struct starpu_task *_starpu_pop_every_task(struct starpu_sched_ctx *sched_ctx)
  311. {
  312. STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
  313. /* TODO set profiling info */
  314. return sched_ctx->sched_policy->pop_every_task();
  315. }
  316. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  317. {
  318. if (task->sched_ctx->sched_policy->post_exec_hook)
  319. task->sched_ctx->sched_policy->post_exec_hook(task);
  320. }
  321. void _starpu_wait_on_sched_event(void)
  322. {
  323. struct starpu_worker_s *worker = _starpu_get_local_worker_key();
  324. PTHREAD_MUTEX_LOCK(worker->sched_mutex);
  325. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  326. if (_starpu_machine_is_running())
  327. {
  328. #ifndef STARPU_NON_BLOCKING_DRIVERS
  329. pthread_cond_wait(worker->sched_cond, worker->sched_mutex);
  330. #endif
  331. }
  332. PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
  333. }
  334. /* The scheduling policy may put tasks directly into a worker's local queue so
  335. * that it is not always necessary to create its own queue when the local queue
  336. * is sufficient. If "back" not null, the task is put at the back of the queue
  337. * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  338. * a FIFO ordering. */
  339. int starpu_push_local_task(int workerid, struct starpu_task *task, int back)
  340. {
  341. struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
  342. return _starpu_push_local_task(worker, task, back);
  343. }