sched_policy.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2011 Université de Bordeaux 1
  4. * Copyright (C) 2010-2011 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <pthread.h>
  19. #include <starpu.h>
  20. #include <common/config.h>
  21. #include <common/utils.h>
  22. #include <core/sched_policy.h>
  23. #include <profiling/profiling.h>
  24. #include <common/barrier.h>
  25. static int use_prefetch = 0;
  26. int starpu_get_prefetch_flag(void)
  27. {
  28. return use_prefetch;
  29. }
  30. /*
  31. * Predefined policies
  32. */
  33. /* extern struct starpu_sched_policy_s _starpu_sched_ws_policy; */
  34. /* extern struct starpu_sched_policy_s _starpu_sched_prio_policy; */
  35. /* extern struct starpu_sched_policy_s _starpu_sched_random_policy; */
  36. /* extern struct starpu_sched_policy_s _starpu_sched_dm_policy; */
  37. /* extern struct starpu_sched_policy_s _starpu_sched_dmda_policy; */
  38. /* extern struct starpu_sched_policy_s _starpu_sched_dmda_ready_policy; */
  39. /* extern struct starpu_sched_policy_s _starpu_sched_dmda_sorted_policy; */
  40. /* extern struct starpu_sched_policy_s _starpu_sched_eager_policy; */
  41. /* extern struct starpu_sched_policy_s _starpu_sched_parallel_heft_policy; */
  42. /* extern struct starpu_sched_policy_s _starpu_sched_pgreedy_policy; */
  43. extern struct starpu_sched_policy_s heft_policy;
  44. static struct starpu_sched_policy_s *predefined_policies[] = {
  45. /* &_starpu_sched_ws_policy, */
  46. /* &_starpu_sched_prio_policy, */
  47. /* &_starpu_sched_dm_policy, */
  48. /* &_starpu_sched_dmda_policy, */
  49. &heft_policy
  50. /* &_starpu_sched_dmda_ready_policy, */
  51. /* &_starpu_sched_dmda_sorted_policy, */
  52. /* &_starpu_sched_random_policy, */
  53. /* &_starpu_sched_eager_policy, */
  54. /* &_starpu_sched_parallel_heft_policy, */
  55. /* &_starpu_sched_pgreedy_policy */
  56. };
  57. struct starpu_sched_policy_s *_starpu_get_sched_policy(struct starpu_sched_ctx *sched_ctx)
  58. {
  59. return sched_ctx->sched_policy;
  60. }
  61. /*
  62. * Methods to initialize the scheduling policy
  63. */
  64. static void load_sched_policy(struct starpu_sched_policy_s *sched_policy, struct starpu_sched_ctx *sched_ctx)
  65. {
  66. STARPU_ASSERT(sched_policy);
  67. #ifdef STARPU_VERBOSE
  68. if (sched_policy->policy_name)
  69. {
  70. if (sched_policy->policy_description)
  71. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  72. else
  73. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  74. }
  75. #endif
  76. struct starpu_sched_policy_s *policy = sched_ctx->sched_policy;
  77. policy->init_sched = sched_policy->init_sched;
  78. policy->deinit_sched = sched_policy->deinit_sched;
  79. policy->push_task = sched_policy->push_task;
  80. policy->pop_task = sched_policy->pop_task;
  81. policy->post_exec_hook = sched_policy->post_exec_hook;
  82. policy->pop_every_task = sched_policy->pop_every_task;
  83. policy->push_task_notify = sched_policy->push_task_notify;
  84. policy->policy_name = sched_policy->policy_name;
  85. policy->add_workers = sched_policy->add_workers;
  86. policy->remove_workers = sched_policy->remove_workers;
  87. }
  88. static struct starpu_sched_policy_s *find_sched_policy_from_name(const char *policy_name)
  89. {
  90. if (!policy_name)
  91. return NULL;
  92. unsigned i;
  93. for (i = 0; i < sizeof(predefined_policies)/sizeof(predefined_policies[0]); i++)
  94. {
  95. struct starpu_sched_policy_s *p;
  96. p = predefined_policies[i];
  97. if (p->policy_name)
  98. {
  99. if (strcmp(policy_name, p->policy_name) == 0) {
  100. /* we found a policy with the requested name */
  101. return p;
  102. }
  103. }
  104. }
  105. fprintf(stderr, "Warning: scheduling policy \"%s\" was not found, try \"help\" to get a list\n", policy_name);
  106. /* nothing was found */
  107. return NULL;
  108. }
  109. static void display_sched_help_message(void)
  110. {
  111. const char *sched_env = getenv("STARPU_SCHED");
  112. if (sched_env && (strcmp(sched_env, "help") == 0)) {
  113. fprintf(stderr, "STARPU_SCHED can be either of\n");
  114. /* display the description of all predefined policies */
  115. unsigned i;
  116. for (i = 0; i < sizeof(predefined_policies)/sizeof(predefined_policies[0]); i++)
  117. {
  118. struct starpu_sched_policy_s *p;
  119. p = predefined_policies[i];
  120. fprintf(stderr, "%s\t-> %s\n", p->policy_name, p->policy_description);
  121. }
  122. }
  123. }
  124. static struct starpu_sched_policy_s *select_sched_policy(struct starpu_machine_config_s *config, const char *policy_name)
  125. {
  126. struct starpu_sched_policy_s *selected_policy = NULL;
  127. struct starpu_conf *user_conf = config->user_conf;
  128. /* First, we check whether the application explicitely gave a scheduling policy or not */
  129. if (user_conf && (user_conf->sched_policy))
  130. return user_conf->sched_policy;
  131. /* Otherwise, we look if the application specified the name of a policy to load */
  132. const char *sched_pol_name;
  133. if (user_conf && (user_conf->sched_policy_name))
  134. {
  135. sched_pol_name = user_conf->sched_policy_name;
  136. }
  137. else {
  138. sched_pol_name = getenv("STARPU_SCHED");
  139. }
  140. if (sched_pol_name)
  141. selected_policy = find_sched_policy_from_name(sched_pol_name);
  142. else
  143. if(policy_name)
  144. selected_policy = find_sched_policy_from_name(policy_name);
  145. /* Perhaps there was no policy that matched the name */
  146. if (selected_policy)
  147. return selected_policy;
  148. /* If no policy was specified, we use the greedy policy as a default */
  149. // return &_starpu_sched_eager_policy;
  150. return &heft_policy;
  151. }
  152. void _starpu_init_sched_policy(struct starpu_machine_config_s *config, struct starpu_sched_ctx *sched_ctx, const char *policy_name)
  153. {
  154. /* Perhaps we have to display some help */
  155. display_sched_help_message();
  156. /* Prefetch is activated by default */
  157. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  158. if (use_prefetch == -1)
  159. use_prefetch = 1;
  160. /* By default, we don't calibrate */
  161. unsigned do_calibrate = 0;
  162. if (config->user_conf && (config->user_conf->calibrate != -1))
  163. {
  164. do_calibrate = config->user_conf->calibrate;
  165. }
  166. else {
  167. int res = starpu_get_env_number("STARPU_CALIBRATE");
  168. do_calibrate = (res < 0)?0:(unsigned)res;
  169. }
  170. _starpu_set_calibrate_flag(do_calibrate);
  171. struct starpu_sched_policy_s *selected_policy;
  172. selected_policy = select_sched_policy(config, policy_name);
  173. load_sched_policy(selected_policy, sched_ctx);
  174. sched_ctx->sched_policy->init_sched(sched_ctx->id);
  175. }
  176. void _starpu_deinit_sched_policy(struct starpu_sched_ctx *sched_ctx)
  177. {
  178. struct starpu_sched_policy_s *policy = sched_ctx->sched_policy;
  179. if (policy->deinit_sched)
  180. policy->deinit_sched(sched_ctx->id);
  181. }
  182. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  183. * case workerid identifies a combined worker, a task will be enqueued into
  184. * each worker of the combination. */
  185. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  186. {
  187. _starpu_increment_nsubmitted_tasks_of_worker(task->workerid);
  188. int nbasic_workers = (int)starpu_worker_get_count();
  189. /* Is this a basic worker or a combined worker ? */
  190. int is_basic_worker = (workerid < nbasic_workers);
  191. unsigned memory_node;
  192. struct starpu_worker_s *worker = NULL;
  193. struct starpu_combined_worker_s *combined_worker = NULL;
  194. if (is_basic_worker)
  195. {
  196. worker = _starpu_get_worker_struct(workerid);
  197. memory_node = worker->memory_node;
  198. }
  199. else
  200. {
  201. combined_worker = _starpu_get_combined_worker_struct(workerid);
  202. memory_node = combined_worker->memory_node;
  203. }
  204. if (use_prefetch)
  205. starpu_prefetch_task_input_on_node(task, memory_node);
  206. /* if we push a task on a specific worker, notify all the sched_ctxs the worker belongs to */
  207. unsigned i;
  208. struct starpu_sched_ctx *sched_ctx;
  209. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  210. {
  211. sched_ctx = worker->sched_ctx[i];
  212. if (sched_ctx != NULL && sched_ctx->sched_policy != NULL && sched_ctx->sched_policy->push_task_notify)
  213. sched_ctx->sched_policy->push_task_notify(task, workerid);
  214. }
  215. if (is_basic_worker)
  216. {
  217. return _starpu_push_local_task(worker, task, 0);
  218. }
  219. else {
  220. /* This is a combined worker so we create task aliases */
  221. int worker_size = combined_worker->worker_size;
  222. int *combined_workerid = combined_worker->combined_workerid;
  223. int ret = 0;
  224. int i;
  225. starpu_job_t j = _starpu_get_job_associated_to_task(task);
  226. j->task_size = worker_size;
  227. j->combined_workerid = workerid;
  228. j->active_task_alias_count = 0;
  229. PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
  230. PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
  231. for (i = 0; i < worker_size; i++)
  232. {
  233. struct starpu_task *alias = _starpu_create_task_alias(task);
  234. worker = _starpu_get_worker_struct(combined_workerid[i]);
  235. ret |= _starpu_push_local_task(worker, alias, 0);
  236. }
  237. return ret;
  238. }
  239. }
  240. /* the generic interface that call the proper underlying implementation */
  241. int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
  242. {
  243. struct starpu_task *task = j->task;
  244. _STARPU_LOG_IN();
  245. task->status = STARPU_TASK_READY;
  246. _starpu_profiling_set_task_push_start_time(task);
  247. /* in case there is no codelet associated to the task (that's a control
  248. * task), we directly execute its callback and enforce the
  249. * corresponding dependencies */
  250. if (task->cl == NULL)
  251. {
  252. _starpu_handle_job_termination(j, job_is_already_locked, -1);
  253. _STARPU_LOG_OUT_TAG("handle_job_termination");
  254. return 0;
  255. }
  256. int ret;
  257. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  258. {
  259. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  260. }
  261. else
  262. {
  263. struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  264. STARPU_ASSERT(sched_ctx->sched_policy->push_task);
  265. ret = sched_ctx->sched_policy->push_task(task);
  266. }
  267. _starpu_profiling_set_task_push_end_time(task);
  268. _STARPU_LOG_OUT();
  269. return ret;
  270. }
  271. struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker)
  272. {
  273. struct starpu_task *task;
  274. /* We can't tell in advance which task will be picked up, so we measure
  275. * a timestamp, and will attribute it afterwards to the task. */
  276. int profiling = starpu_profiling_status_get();
  277. struct timespec pop_start_time;
  278. if (profiling)
  279. starpu_clock_gettime(&pop_start_time);
  280. PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  281. /* perhaps there is some local task to be executed first */
  282. task = _starpu_pop_local_task(worker);
  283. PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  284. /* get tasks from the stacks of the strategy */
  285. if(!task)
  286. {
  287. struct starpu_sched_ctx *sched_ctx;
  288. pthread_mutex_t *sched_ctx_mutex;
  289. unsigned i;
  290. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  291. {
  292. sched_ctx = worker->sched_ctx[i];
  293. if(sched_ctx != NULL)
  294. {
  295. sched_ctx_mutex = _starpu_get_sched_mutex(sched_ctx, worker->workerid);
  296. if(sched_ctx_mutex != NULL)
  297. {
  298. PTHREAD_MUTEX_LOCK(sched_ctx_mutex);
  299. if (sched_ctx->sched_policy->pop_task)
  300. {
  301. task = sched_ctx->sched_policy->pop_task();
  302. PTHREAD_MUTEX_UNLOCK(sched_ctx_mutex);
  303. break;
  304. }
  305. PTHREAD_MUTEX_UNLOCK(sched_ctx_mutex);
  306. }
  307. }
  308. }
  309. }
  310. /* Note that we may get a NULL task in case the scheduler was unlocked
  311. * for some reason. */
  312. if (profiling && task)
  313. {
  314. struct starpu_task_profiling_info *profiling_info;
  315. profiling_info = task->profiling_info;
  316. /* The task may have been created before profiling was enabled,
  317. * so we check if the profiling_info structure is available
  318. * even though we already tested if profiling is enabled. */
  319. if (profiling_info)
  320. {
  321. memcpy(&profiling_info->pop_start_time,
  322. &pop_start_time, sizeof(struct timespec));
  323. starpu_clock_gettime(&profiling_info->pop_end_time);
  324. }
  325. }
  326. #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
  327. /* if task is NULL, the work is idle for this round
  328. therefore we let the sched_ctx_manager know in order
  329. to decide a possible resize */
  330. if(!task)
  331. {
  332. unsigned i;
  333. struct starpu_sched_ctx *sched_ctx = NULL;
  334. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  335. {
  336. sched_ctx = worker->sched_ctx[i];
  337. if(sched_ctx != NULL && sched_ctx->id != 0)
  338. {
  339. if(sched_ctx != NULL && sched_ctx->criteria != NULL)
  340. {
  341. sched_ctx->criteria->idle_time_cb(sched_ctx->id, worker->workerid, 1.0);
  342. }
  343. }
  344. }
  345. }
  346. #endif //STARPU_USE_SCHED_CTX_HYPERVISOR
  347. return task;
  348. }
  349. struct starpu_task *_starpu_pop_every_task(struct starpu_sched_ctx *sched_ctx)
  350. {
  351. STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
  352. /* TODO set profiling info */
  353. return sched_ctx->sched_policy->pop_every_task();
  354. }
  355. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  356. {
  357. struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  358. if (sched_ctx->sched_policy->post_exec_hook)
  359. sched_ctx->sched_policy->post_exec_hook(task);
  360. }
  361. void _starpu_wait_on_sched_event(void)
  362. {
  363. struct starpu_worker_s *worker = _starpu_get_local_worker_key();
  364. PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  365. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  366. if (_starpu_machine_is_running())
  367. {
  368. #ifndef STARPU_NON_BLOCKING_DRIVERS
  369. pthread_cond_wait(&worker->sched_cond, &worker->sched_mutex);
  370. #endif
  371. }
  372. PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  373. }
  374. /* The scheduling policy may put tasks directly into a worker's local queue so
  375. * that it is not always necessary to create its own queue when the local queue
  376. * is sufficient. If "back" not null, the task is put at the back of the queue
  377. * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  378. * a FIFO ordering. */
  379. int starpu_push_local_task(int workerid, struct starpu_task *task, int back)
  380. {
  381. struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
  382. return _starpu_push_local_task(worker, task, back);
  383. }