sched_policy.c 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. /*
  2. * StarPU
  3. * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <pthread.h>
  17. #include <starpu.h>
  18. #include <common/config.h>
  19. #include <core/mechanisms/queues.h>
  20. #include <core/policies/sched_policy.h>
  21. #include <core/policies/no-prio-policy.h>
  22. #include <core/policies/eager-central-policy.h>
  23. #include <core/policies/eager-central-priority-policy.h>
  24. #include <core/policies/work-stealing-policy.h>
  25. #include <core/policies/deque-modeling-policy.h>
  26. #include <core/policies/random-policy.h>
  27. #include <core/policies/deque-modeling-policy-data-aware.h>
  28. static struct sched_policy_s policy;
  29. static int use_prefetch = 0;
  30. /*
  31. * Predefined policies
  32. */
  33. #define NPREDEFINED_POLICIES 7
  34. struct sched_policy_s *predefined_policies[NPREDEFINED_POLICIES] = {
  35. &sched_ws_policy,
  36. &sched_prio_policy,
  37. &sched_no_prio_policy,
  38. &sched_dm_policy,
  39. &sched_dmda_policy,
  40. &sched_random_policy,
  41. &sched_eager_policy
  42. };
  43. struct sched_policy_s *get_sched_policy(void)
  44. {
  45. return &policy;
  46. }
  47. /*
  48. * Methods to initialize the scheduling policy
  49. */
  50. static void load_sched_policy(struct sched_policy_s *sched_policy)
  51. {
  52. STARPU_ASSERT(sched_policy);
  53. #ifdef STARPU_VERBOSE
  54. if (sched_policy->policy_name)
  55. {
  56. fprintf(stderr, "Use %s scheduler", sched_policy->policy_name);
  57. if (sched_policy->policy_description)
  58. {
  59. fprintf(stderr, " (%s)", sched_policy->policy_description);
  60. }
  61. fprintf(stderr, "\n");
  62. }
  63. #endif
  64. policy.init_sched = sched_policy->init_sched;
  65. policy.deinit_sched = sched_policy->deinit_sched;
  66. policy.get_local_queue = sched_policy->get_local_queue;
  67. pthread_cond_init(&policy.sched_activity_cond, NULL);
  68. pthread_mutex_init(&policy.sched_activity_mutex, NULL);
  69. pthread_key_create(&policy.local_queue_key, NULL);
  70. }
  71. static struct sched_policy_s *find_sched_policy_from_name(const char *policy_name)
  72. {
  73. if (!policy_name)
  74. return NULL;
  75. unsigned i;
  76. for (i = 0; i < NPREDEFINED_POLICIES; i++)
  77. {
  78. struct sched_policy_s *p;
  79. p = predefined_policies[i];
  80. if (p->policy_name)
  81. {
  82. if (strcmp(policy_name, p->policy_name) == 0) {
  83. /* we found a policy with the requested name */
  84. return p;
  85. }
  86. }
  87. }
  88. /* nothing was found */
  89. return NULL;
  90. }
  91. static void display_sched_help_message(void)
  92. {
  93. const char *sched_env = getenv("STARPU_SCHED");
  94. if (sched_env && (strcmp(sched_env, "help") == 0)) {
  95. fprintf(stderr, "STARPU_SCHED can be either of\n");
  96. /* display the description of all predefined policies */
  97. unsigned i;
  98. for (i = 0; i < NPREDEFINED_POLICIES; i++)
  99. {
  100. struct sched_policy_s *p;
  101. p = predefined_policies[i];
  102. fprintf(stderr, "%s\t-> %s\n", p->policy_name, p->policy_description);
  103. }
  104. }
  105. }
  106. static struct sched_policy_s *select_sched_policy(struct machine_config_s *config)
  107. {
  108. struct sched_policy_s *selected_policy = NULL;
  109. struct starpu_conf *user_conf = config->user_conf;
  110. /* First, we check whether the application explicitely gave a scheduling policy or not */
  111. if (user_conf && (user_conf->sched_policy))
  112. return user_conf->sched_policy;
  113. /* Otherwise, we look if the application specified the name of a policy to load */
  114. const char *sched_pol_name;
  115. if (user_conf && (user_conf->sched_policy_name))
  116. {
  117. sched_pol_name = user_conf->sched_policy_name;
  118. }
  119. else {
  120. sched_pol_name = getenv("STARPU_SCHED");
  121. }
  122. if (sched_pol_name)
  123. selected_policy = find_sched_policy_from_name(sched_pol_name);
  124. /* Perhaps there was no policy that matched the name */
  125. if (selected_policy)
  126. return selected_policy;
  127. /* If no policy was specified, we use the greedy policy as a default */
  128. return &sched_eager_policy;
  129. }
  130. void init_sched_policy(struct machine_config_s *config)
  131. {
  132. /* Perhaps we have to display some help */
  133. display_sched_help_message();
  134. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  135. if (use_prefetch == -1)
  136. use_prefetch = 0;
  137. struct sched_policy_s *selected_policy;
  138. selected_policy = select_sched_policy(config);
  139. load_sched_policy(selected_policy);
  140. policy.init_sched(config, &policy);
  141. }
  142. void deinit_sched_policy(struct machine_config_s *config)
  143. {
  144. if (policy.deinit_sched)
  145. policy.deinit_sched(config, &policy);
  146. pthread_key_delete(policy.local_queue_key);
  147. pthread_mutex_destroy(&policy.sched_activity_mutex);
  148. pthread_cond_destroy(&policy.sched_activity_cond);
  149. }
  150. /* the generic interface that call the proper underlying implementation */
  151. int push_task(starpu_job_t j)
  152. {
  153. struct jobq_s *queue = policy.get_local_queue(&policy);
  154. /* in case there is no codelet associated to the task (that's a control
  155. * task), we directly execute its callback and enforce the
  156. * corresponding dependencies */
  157. if (j->task->cl == NULL)
  158. {
  159. _starpu_handle_job_termination(j);
  160. return 0;
  161. }
  162. if (STARPU_UNLIKELY(j->task->execute_on_a_specific_worker))
  163. {
  164. struct starpu_task *task = j->task;
  165. unsigned workerid = task->workerid;
  166. struct worker_s *worker = _starpu_get_worker_struct(workerid);
  167. if (use_prefetch)
  168. {
  169. uint32_t memory_node = starpu_get_worker_memory_node(workerid);
  170. starpu_prefetch_task_input_on_node(task, memory_node);
  171. }
  172. return _starpu_push_local_task(worker, j);
  173. }
  174. else {
  175. STARPU_ASSERT(queue->push_task);
  176. return queue->push_task(queue, j);
  177. }
  178. }
  179. struct starpu_job_s * pop_task_from_queue(struct jobq_s *queue)
  180. {
  181. STARPU_ASSERT(queue->pop_task);
  182. struct starpu_job_s *j = queue->pop_task(queue);
  183. return j;
  184. }
  185. struct starpu_job_s * pop_task(void)
  186. {
  187. struct jobq_s *queue = policy.get_local_queue(&policy);
  188. return pop_task_from_queue(queue);
  189. }
  190. struct starpu_job_list_s * pop_every_task_from_queue(struct jobq_s *queue, uint32_t where)
  191. {
  192. STARPU_ASSERT(queue->pop_every_task);
  193. struct starpu_job_list_s *list = queue->pop_every_task(queue, where);
  194. return list;
  195. }
  196. /* pop every task that can be executed on "where" (eg. GORDON) */
  197. struct starpu_job_list_s *pop_every_task(uint32_t where)
  198. {
  199. struct jobq_s *queue = policy.get_local_queue(&policy);
  200. return pop_every_task_from_queue(queue, where);
  201. }
  202. void wait_on_sched_event(void)
  203. {
  204. struct jobq_s *q = policy.get_local_queue(&policy);
  205. pthread_mutex_lock(&q->activity_mutex);
  206. starpu_handle_all_pending_node_data_requests(starpu_get_local_memory_node());
  207. if (_starpu_machine_is_running())
  208. {
  209. #ifndef STARPU_NON_BLOCKING_DRIVERS
  210. pthread_cond_wait(&q->activity_cond, &q->activity_mutex);
  211. #endif
  212. }
  213. pthread_mutex_unlock(&q->activity_mutex);
  214. }