sched_policy.c 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. /*
  2. * StarPU
  3. * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <pthread.h>
  17. #include <starpu.h>
  18. #include <common/config.h>
  19. #include <common/utils.h>
  20. #include <core/mechanisms/queues.h>
  21. #include <core/policies/sched_policy.h>
  22. #include <core/policies/no_prio_policy.h>
  23. #include <core/policies/eager_central_policy.h>
  24. #include <core/policies/eager_central_priority_policy.h>
  25. #include <core/policies/work_stealing_policy.h>
  26. #include <core/policies/deque_modeling_policy.h>
  27. #include <core/policies/random_policy.h>
  28. #include <core/policies/deque_modeling_policy_data_aware.h>
  29. static struct starpu_sched_policy_s policy;
  30. static int use_prefetch = 0;
  31. int _starpu_get_prefetch_flag(void)
  32. {
  33. return use_prefetch;
  34. }
  35. /*
  36. * Predefined policies
  37. */
  38. #define NPREDEFINED_POLICIES 7
  39. static struct starpu_sched_policy_s *predefined_policies[NPREDEFINED_POLICIES] = {
  40. &_starpu_sched_ws_policy,
  41. &_starpu_sched_prio_policy,
  42. &_starpu_sched_no_prio_policy,
  43. &_starpu_sched_dm_policy,
  44. &_starpu_sched_dmda_policy,
  45. &_starpu_sched_random_policy,
  46. &_starpu_sched_eager_policy
  47. };
  48. struct starpu_sched_policy_s *_starpu_get_sched_policy(void)
  49. {
  50. return &policy;
  51. }
  52. /*
  53. * Methods to initialize the scheduling policy
  54. */
  55. static void load_sched_policy(struct starpu_sched_policy_s *sched_policy)
  56. {
  57. STARPU_ASSERT(sched_policy);
  58. #ifdef STARPU_VERBOSE
  59. if (sched_policy->policy_name)
  60. {
  61. fprintf(stderr, "Use %s scheduler", sched_policy->policy_name);
  62. if (sched_policy->policy_description)
  63. {
  64. fprintf(stderr, " (%s)", sched_policy->policy_description);
  65. }
  66. fprintf(stderr, "\n");
  67. }
  68. #endif
  69. policy.init_sched = sched_policy->init_sched;
  70. policy.deinit_sched = sched_policy->deinit_sched;
  71. policy.starpu_get_local_queue = sched_policy->starpu_get_local_queue;
  72. PTHREAD_COND_INIT(&policy.sched_activity_cond, NULL);
  73. PTHREAD_MUTEX_INIT(&policy.sched_activity_mutex, NULL);
  74. pthread_key_create(&policy.local_queue_key, NULL);
  75. }
  76. static struct starpu_sched_policy_s *find_sched_policy_from_name(const char *policy_name)
  77. {
  78. if (!policy_name)
  79. return NULL;
  80. unsigned i;
  81. for (i = 0; i < NPREDEFINED_POLICIES; i++)
  82. {
  83. struct starpu_sched_policy_s *p;
  84. p = predefined_policies[i];
  85. if (p->policy_name)
  86. {
  87. if (strcmp(policy_name, p->policy_name) == 0) {
  88. /* we found a policy with the requested name */
  89. return p;
  90. }
  91. }
  92. }
  93. /* nothing was found */
  94. return NULL;
  95. }
  96. static void display_sched_help_message(void)
  97. {
  98. const char *sched_env = getenv("STARPU_SCHED");
  99. if (sched_env && (strcmp(sched_env, "help") == 0)) {
  100. fprintf(stderr, "STARPU_SCHED can be either of\n");
  101. /* display the description of all predefined policies */
  102. unsigned i;
  103. for (i = 0; i < NPREDEFINED_POLICIES; i++)
  104. {
  105. struct starpu_sched_policy_s *p;
  106. p = predefined_policies[i];
  107. fprintf(stderr, "%s\t-> %s\n", p->policy_name, p->policy_description);
  108. }
  109. }
  110. }
  111. static struct starpu_sched_policy_s *select_sched_policy(struct starpu_machine_config_s *config)
  112. {
  113. struct starpu_sched_policy_s *selected_policy = NULL;
  114. struct starpu_conf *user_conf = config->user_conf;
  115. /* First, we check whether the application explicitely gave a scheduling policy or not */
  116. if (user_conf && (user_conf->sched_policy))
  117. return user_conf->sched_policy;
  118. /* Otherwise, we look if the application specified the name of a policy to load */
  119. const char *sched_pol_name;
  120. if (user_conf && (user_conf->sched_policy_name))
  121. {
  122. sched_pol_name = user_conf->sched_policy_name;
  123. }
  124. else {
  125. sched_pol_name = getenv("STARPU_SCHED");
  126. }
  127. if (sched_pol_name)
  128. selected_policy = find_sched_policy_from_name(sched_pol_name);
  129. /* Perhaps there was no policy that matched the name */
  130. if (selected_policy)
  131. return selected_policy;
  132. /* If no policy was specified, we use the greedy policy as a default */
  133. return &_starpu_sched_eager_policy;
  134. }
  135. void _starpu_init_sched_policy(struct starpu_machine_config_s *config)
  136. {
  137. /* Perhaps we have to display some help */
  138. display_sched_help_message();
  139. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  140. if (use_prefetch == -1)
  141. use_prefetch = 0;
  142. /* By default, we don't calibrate */
  143. unsigned do_calibrate = 0;
  144. if (config->user_conf && (config->user_conf->calibrate != -1))
  145. {
  146. do_calibrate = config->user_conf->calibrate;
  147. }
  148. else {
  149. int res = starpu_get_env_number("STARPU_CALIBRATE");
  150. do_calibrate = (res < 0)?0:(unsigned)res;
  151. }
  152. _starpu_set_calibrate_flag(do_calibrate);
  153. struct starpu_sched_policy_s *selected_policy;
  154. selected_policy = select_sched_policy(config);
  155. load_sched_policy(selected_policy);
  156. policy.init_sched(config, &policy);
  157. }
  158. void _starpu_deinit_sched_policy(struct starpu_machine_config_s *config)
  159. {
  160. if (policy.deinit_sched)
  161. policy.deinit_sched(config, &policy);
  162. pthread_key_delete(policy.local_queue_key);
  163. PTHREAD_MUTEX_DESTROY(&policy.sched_activity_mutex);
  164. PTHREAD_COND_DESTROY(&policy.sched_activity_cond);
  165. }
  166. /* the generic interface that call the proper underlying implementation */
  167. int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
  168. {
  169. struct starpu_jobq_s *queue = policy.starpu_get_local_queue(&policy);
  170. struct starpu_task *task = j->task;
  171. task->status = STARPU_TASK_READY;
  172. /* in case there is no codelet associated to the task (that's a control
  173. * task), we directly execute its callback and enforce the
  174. * corresponding dependencies */
  175. if (task->cl == NULL)
  176. {
  177. _starpu_handle_job_termination(j, job_is_already_locked);
  178. return 0;
  179. }
  180. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  181. {
  182. unsigned workerid = task->workerid;
  183. struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
  184. if (use_prefetch)
  185. {
  186. uint32_t memory_node = starpu_worker_get_memory_node(workerid);
  187. _starpu_prefetch_task_input_on_node(task, memory_node);
  188. }
  189. return _starpu_push_local_task(worker, j);
  190. }
  191. else {
  192. STARPU_ASSERT(queue->push_task);
  193. return queue->push_task(queue, j);
  194. }
  195. }
  196. struct starpu_job_s * _starpu_pop_task_from_queue(struct starpu_jobq_s *queue)
  197. {
  198. STARPU_ASSERT(queue->pop_task);
  199. struct starpu_job_s *j = queue->pop_task(queue);
  200. return j;
  201. }
  202. struct starpu_job_s * _starpu_pop_task(void)
  203. {
  204. struct starpu_jobq_s *queue = policy.starpu_get_local_queue(&policy);
  205. return _starpu_pop_task_from_queue(queue);
  206. }
  207. struct starpu_job_list_s * _starpu_pop_every_task_from_queue(struct starpu_jobq_s *queue, uint32_t where)
  208. {
  209. STARPU_ASSERT(queue->pop_every_task);
  210. struct starpu_job_list_s *list = queue->pop_every_task(queue, where);
  211. return list;
  212. }
  213. /* pop every task that can be executed on "where" (eg. GORDON) */
  214. struct starpu_job_list_s *_starpu_pop_every_task(uint32_t where)
  215. {
  216. struct starpu_jobq_s *queue = policy.starpu_get_local_queue(&policy);
  217. return _starpu_pop_every_task_from_queue(queue, where);
  218. }
  219. void _starpu_wait_on_sched_event(void)
  220. {
  221. struct starpu_jobq_s *q = policy.starpu_get_local_queue(&policy);
  222. PTHREAD_MUTEX_LOCK(&q->activity_mutex);
  223. _starpu_handle_all_pending_node_data_requests(_starpu_get_local_memory_node());
  224. if (_starpu_machine_is_running())
  225. {
  226. #ifndef STARPU_NON_BLOCKING_DRIVERS
  227. pthread_cond_wait(&q->activity_cond, &q->activity_mutex);
  228. #endif
  229. }
  230. PTHREAD_MUTEX_UNLOCK(&q->activity_mutex);
  231. }