sched_policy.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010 Université de Bordeaux 1
  4. * Copyright (C) 2010 Centre National de la Recherche Scientifique
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <pthread.h>
  18. #include <starpu.h>
  19. #include <common/config.h>
  20. #include <common/utils.h>
  21. #include <core/sched_policy.h>
  22. #include <profiling/profiling.h>
  23. static struct starpu_sched_policy_s policy;
  24. static int use_prefetch = 0;
  25. int starpu_get_prefetch_flag(void)
  26. {
  27. return use_prefetch;
  28. }
  29. /*
  30. * Predefined policies
  31. */
  32. extern struct starpu_sched_policy_s _starpu_sched_ws_policy;
  33. extern struct starpu_sched_policy_s _starpu_sched_prio_policy;
  34. extern struct starpu_sched_policy_s _starpu_sched_no_prio_policy;
  35. extern struct starpu_sched_policy_s _starpu_sched_random_policy;
  36. extern struct starpu_sched_policy_s _starpu_sched_dm_policy;
  37. extern struct starpu_sched_policy_s _starpu_sched_dmda_policy;
  38. extern struct starpu_sched_policy_s _starpu_sched_dmda_ready_policy;
  39. extern struct starpu_sched_policy_s _starpu_sched_dmda_sorted_policy;
  40. extern struct starpu_sched_policy_s _starpu_sched_eager_policy;
  41. extern struct starpu_sched_policy_s _starpu_sched_parallel_heft_policy;
  42. extern struct starpu_sched_policy_s _starpu_sched_pgreedy_policy;
  43. #define NPREDEFINED_POLICIES 11
  44. static struct starpu_sched_policy_s *predefined_policies[NPREDEFINED_POLICIES] = {
  45. &_starpu_sched_ws_policy,
  46. &_starpu_sched_prio_policy,
  47. &_starpu_sched_no_prio_policy,
  48. &_starpu_sched_dm_policy,
  49. &_starpu_sched_dmda_policy,
  50. &_starpu_sched_dmda_ready_policy,
  51. &_starpu_sched_dmda_sorted_policy,
  52. &_starpu_sched_random_policy,
  53. &_starpu_sched_eager_policy,
  54. &_starpu_sched_parallel_heft_policy,
  55. &_starpu_sched_pgreedy_policy
  56. };
  57. struct starpu_sched_policy_s *_starpu_get_sched_policy(void)
  58. {
  59. return &policy;
  60. }
  61. /*
  62. * Methods to initialize the scheduling policy
  63. */
  64. static void load_sched_policy(struct starpu_sched_policy_s *sched_policy)
  65. {
  66. STARPU_ASSERT(sched_policy);
  67. #ifdef STARPU_VERBOSE
  68. if (sched_policy->policy_name)
  69. {
  70. if (sched_policy->policy_description)
  71. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  72. else
  73. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  74. }
  75. #endif
  76. policy.init_sched = sched_policy->init_sched;
  77. policy.deinit_sched = sched_policy->deinit_sched;
  78. policy.push_task = sched_policy->push_task;
  79. policy.push_prio_task = sched_policy->push_prio_task;
  80. policy.pop_task = sched_policy->pop_task;
  81. policy.post_exec_hook = sched_policy->post_exec_hook;
  82. policy.pop_every_task = sched_policy->pop_every_task;
  83. }
  84. static struct starpu_sched_policy_s *find_sched_policy_from_name(const char *policy_name)
  85. {
  86. if (!policy_name)
  87. return NULL;
  88. unsigned i;
  89. for (i = 0; i < NPREDEFINED_POLICIES; i++)
  90. {
  91. struct starpu_sched_policy_s *p;
  92. p = predefined_policies[i];
  93. if (p->policy_name)
  94. {
  95. if (strcmp(policy_name, p->policy_name) == 0) {
  96. /* we found a policy with the requested name */
  97. return p;
  98. }
  99. }
  100. }
  101. /* nothing was found */
  102. return NULL;
  103. }
  104. static void display_sched_help_message(void)
  105. {
  106. const char *sched_env = getenv("STARPU_SCHED");
  107. if (sched_env && (strcmp(sched_env, "help") == 0)) {
  108. fprintf(stderr, "STARPU_SCHED can be either of\n");
  109. /* display the description of all predefined policies */
  110. unsigned i;
  111. for (i = 0; i < NPREDEFINED_POLICIES; i++)
  112. {
  113. struct starpu_sched_policy_s *p;
  114. p = predefined_policies[i];
  115. fprintf(stderr, "%s\t-> %s\n", p->policy_name, p->policy_description);
  116. }
  117. }
  118. }
  119. static struct starpu_sched_policy_s *select_sched_policy(struct starpu_machine_config_s *config)
  120. {
  121. struct starpu_sched_policy_s *selected_policy = NULL;
  122. struct starpu_conf *user_conf = config->user_conf;
  123. /* First, we check whether the application explicitely gave a scheduling policy or not */
  124. if (user_conf && (user_conf->sched_policy))
  125. return user_conf->sched_policy;
  126. /* Otherwise, we look if the application specified the name of a policy to load */
  127. const char *sched_pol_name;
  128. if (user_conf && (user_conf->sched_policy_name))
  129. {
  130. sched_pol_name = user_conf->sched_policy_name;
  131. }
  132. else {
  133. sched_pol_name = getenv("STARPU_SCHED");
  134. }
  135. if (sched_pol_name)
  136. selected_policy = find_sched_policy_from_name(sched_pol_name);
  137. /* Perhaps there was no policy that matched the name */
  138. if (selected_policy)
  139. return selected_policy;
  140. /* If no policy was specified, we use the greedy policy as a default */
  141. return &_starpu_sched_eager_policy;
  142. }
  143. void _starpu_init_sched_policy(struct starpu_machine_config_s *config)
  144. {
  145. /* Perhaps we have to display some help */
  146. display_sched_help_message();
  147. /* Prefetch is activated by default */
  148. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  149. if (use_prefetch == -1)
  150. use_prefetch = 1;
  151. /* By default, we don't calibrate */
  152. unsigned do_calibrate = 0;
  153. if (config->user_conf && (config->user_conf->calibrate != -1))
  154. {
  155. do_calibrate = config->user_conf->calibrate;
  156. }
  157. else {
  158. int res = starpu_get_env_number("STARPU_CALIBRATE");
  159. do_calibrate = (res < 0)?0:(unsigned)res;
  160. }
  161. _starpu_set_calibrate_flag(do_calibrate);
  162. struct starpu_sched_policy_s *selected_policy;
  163. selected_policy = select_sched_policy(config);
  164. load_sched_policy(selected_policy);
  165. policy.init_sched(&config->topology, &policy);
  166. }
  167. void _starpu_deinit_sched_policy(struct starpu_machine_config_s *config)
  168. {
  169. if (policy.deinit_sched)
  170. policy.deinit_sched(&config->topology, &policy);
  171. }
  172. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  173. * case workerid identifies a combined worker, a task will be enqueued into
  174. * each worker of the combination. */
  175. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  176. {
  177. int nbasic_workers = (int)starpu_worker_get_count();
  178. /* Is this a basic worker or a combined worker ? */
  179. int is_basic_worker = (workerid < nbasic_workers);
  180. unsigned memory_node;
  181. struct starpu_worker_s *worker;
  182. struct starpu_combined_worker_s *combined_worker;
  183. if (is_basic_worker)
  184. {
  185. worker = _starpu_get_worker_struct(workerid);
  186. memory_node = worker->memory_node;
  187. }
  188. else
  189. {
  190. combined_worker = _starpu_get_combined_worker_struct(workerid);
  191. memory_node = combined_worker->memory_node;
  192. }
  193. if (use_prefetch)
  194. starpu_prefetch_task_input_on_node(task, memory_node);
  195. if (is_basic_worker)
  196. {
  197. return _starpu_push_local_task(worker, task);
  198. }
  199. else {
  200. /* This is a combined worker so we create task aliases */
  201. int worker_size = combined_worker->worker_size;
  202. int *combined_workerid = combined_worker->combined_workerid;
  203. int ret = 0;
  204. int i;
  205. starpu_job_t j = _starpu_get_job_associated_to_task(task);
  206. j->task_size = worker_size;
  207. j->combined_workerid = workerid;
  208. j->active_task_alias_count = 0;
  209. PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
  210. PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
  211. for (i = 0; i < worker_size; i++)
  212. {
  213. struct starpu_task *alias = _starpu_create_task_alias(task);
  214. worker = _starpu_get_worker_struct(combined_workerid[i]);
  215. ret |= _starpu_push_local_task(worker, alias);
  216. }
  217. return ret;
  218. }
  219. }
  220. /* the generic interface that call the proper underlying implementation */
  221. int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
  222. {
  223. struct starpu_task *task = j->task;
  224. _STARPU_LOG_IN();
  225. task->status = STARPU_TASK_READY;
  226. _starpu_profiling_set_task_push_start_time(task);
  227. /* in case there is no codelet associated to the task (that's a control
  228. * task), we directly execute its callback and enforce the
  229. * corresponding dependencies */
  230. if (task->cl == NULL)
  231. {
  232. _starpu_handle_job_termination(j, job_is_already_locked);
  233. _STARPU_LOG_OUT_TAG("handle_job_termination");
  234. return 0;
  235. }
  236. int ret;
  237. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  238. {
  239. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  240. }
  241. else {
  242. STARPU_ASSERT(policy.push_task);
  243. ret = policy.push_task(task);
  244. }
  245. _starpu_profiling_set_task_push_end_time(task);
  246. _STARPU_LOG_OUT();
  247. return ret;
  248. }
  249. struct starpu_task *_starpu_pop_task(void)
  250. {
  251. struct starpu_task *task;
  252. /* We can't tell in advance which task will be picked up, so we measure
  253. * a timestamp, and will attribute it afterwards to the task. */
  254. int profiling = starpu_profiling_status_get();
  255. struct timespec pop_start_time;
  256. if (profiling)
  257. starpu_clock_gettime(&pop_start_time);
  258. task = policy.pop_task();
  259. /* Note that we may get a NULL task in case the scheduler was unlocked
  260. * for some reason. */
  261. if (profiling && task)
  262. {
  263. struct starpu_task_profiling_info *profiling_info;
  264. profiling_info = task->profiling_info;
  265. /* The task may have been created before profiling was enabled,
  266. * so we check if the profiling_info structure is available
  267. * even though we already tested if profiling is enabled. */
  268. if (profiling_info)
  269. {
  270. memcpy(&profiling_info->pop_start_time,
  271. &pop_start_time, sizeof(struct timespec));
  272. starpu_clock_gettime(&profiling_info->pop_end_time);
  273. }
  274. }
  275. return task;
  276. }
  277. struct starpu_task *_starpu_pop_every_task(void)
  278. {
  279. STARPU_ASSERT(policy.pop_every_task);
  280. /* TODO set profiling info */
  281. return policy.pop_every_task();
  282. }
  283. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  284. {
  285. if (policy.post_exec_hook)
  286. policy.post_exec_hook(task);
  287. }
  288. void _starpu_wait_on_sched_event(void)
  289. {
  290. struct starpu_worker_s *worker = _starpu_get_local_worker_key();
  291. PTHREAD_MUTEX_LOCK(worker->sched_mutex);
  292. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  293. if (_starpu_machine_is_running())
  294. {
  295. #ifndef STARPU_NON_BLOCKING_DRIVERS
  296. pthread_cond_wait(worker->sched_cond, worker->sched_mutex);
  297. #endif
  298. }
  299. PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
  300. }