parallel_heft.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. /*
  2. * StarPU
  3. * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. /* Distributed queues using performance modeling to assign tasks */
  17. #include <float.h>
  18. #include <limits.h>
  19. #include <core/workers.h>
  20. #include <sched_policies/fifo_queues.h>
  21. #include <core/perfmodel/perfmodel.h>
  22. static pthread_mutex_t big_lock;
  23. static unsigned nworkers, ncombinedworkers;
  24. static enum starpu_perf_archtype applicable_perf_archtypes[STARPU_NARCH_VARIATIONS];
  25. static unsigned napplicable_perf_archtypes = 0;
  26. static struct starpu_fifo_taskq_s *queue_array[STARPU_NMAXWORKERS];
  27. static pthread_cond_t sched_cond[STARPU_NMAXWORKERS];
  28. static pthread_mutex_t sched_mutex[STARPU_NMAXWORKERS];
  29. static double alpha = 1.0;
  30. static double beta = 1.0;
  31. static struct starpu_task *parallel_heft_pop_task(void)
  32. {
  33. struct starpu_task *task;
  34. int workerid = starpu_worker_get_id();
  35. struct starpu_fifo_taskq_s *fifo = queue_array[workerid];
  36. task = _starpu_fifo_pop_task(fifo, -1);
  37. if (task) {
  38. double model = task->predicted;
  39. fifo->exp_len -= model;
  40. fifo->exp_start = _starpu_timing_now() + model;
  41. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  42. }
  43. return task;
  44. }
  45. static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio)
  46. {
  47. /* make sure someone coule execute that task ! */
  48. STARPU_ASSERT(best_workerid != -1);
  49. /* Is this a basic worker or a combined worker ? */
  50. int nbasic_workers = (int)starpu_worker_get_count();
  51. int is_basic_worker = (best_workerid < nbasic_workers);
  52. unsigned memory_node;
  53. memory_node = starpu_worker_get_memory_node(best_workerid);
  54. if (_starpu_get_prefetch_flag())
  55. _starpu_prefetch_task_input_on_node(task, memory_node);
  56. if (is_basic_worker)
  57. {
  58. PTHREAD_MUTEX_LOCK(&big_lock);
  59. struct starpu_fifo_taskq_s *fifo;
  60. fifo = queue_array[best_workerid];
  61. fifo->exp_end += predicted;
  62. fifo->exp_len += predicted;
  63. task->predicted = predicted;
  64. int ret;
  65. if (prio)
  66. {
  67. ret = _starpu_fifo_push_prio_task(queue_array[best_workerid],
  68. &sched_mutex[best_workerid], &sched_cond[best_workerid], task);
  69. }
  70. else {
  71. ret = _starpu_fifo_push_task(queue_array[best_workerid],
  72. &sched_mutex[best_workerid], &sched_cond[best_workerid], task);
  73. }
  74. PTHREAD_MUTEX_UNLOCK(&big_lock);
  75. return ret;
  76. }
  77. else {
  78. /* This is a combined worker so we create task aliases */
  79. struct starpu_combined_worker_s *combined_worker;
  80. combined_worker = _starpu_get_combined_worker_struct(best_workerid);
  81. int worker_size = combined_worker->worker_size;
  82. int *combined_workerid = combined_worker->combined_workerid;
  83. int ret = 0;
  84. int i;
  85. task->predicted = predicted;
  86. starpu_job_t j = _starpu_get_job_associated_to_task(task);
  87. j->task_size = worker_size;
  88. j->combined_workerid = best_workerid;
  89. j->active_task_alias_count = 0;
  90. PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
  91. PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
  92. PTHREAD_MUTEX_LOCK(&big_lock);
  93. for (i = 0; i < worker_size; i++)
  94. {
  95. struct starpu_task *alias = _starpu_create_task_alias(task);
  96. int local_worker = combined_workerid[i];
  97. struct starpu_fifo_taskq_s *fifo;
  98. fifo = queue_array[local_worker];
  99. fifo->exp_end += predicted;
  100. fifo->exp_len += predicted;
  101. alias->predicted = predicted;
  102. if (prio)
  103. {
  104. ret |= _starpu_fifo_push_prio_task(queue_array[local_worker],
  105. &sched_mutex[local_worker], &sched_cond[local_worker], alias);
  106. }
  107. else {
  108. ret |= _starpu_fifo_push_task(queue_array[local_worker],
  109. &sched_mutex[local_worker], &sched_cond[local_worker], alias);
  110. }
  111. }
  112. PTHREAD_MUTEX_UNLOCK(&big_lock);
  113. return ret;
  114. }
  115. }
  116. static double compute_expected_end(int workerid, double length)
  117. {
  118. if (workerid < (int)nworkers)
  119. {
  120. /* This is a basic worker */
  121. struct starpu_fifo_taskq_s *fifo;
  122. fifo = queue_array[workerid];
  123. return (fifo->exp_start + fifo->exp_len + length);
  124. }
  125. else {
  126. /* This is a combined worker, the expected end is the end for the latest worker */
  127. int worker_size;
  128. int *combined_workerid;
  129. starpu_combined_worker_get_description(workerid, &worker_size, &combined_workerid);
  130. double exp_end = DBL_MIN;
  131. int i;
  132. for (i = 0; i < worker_size; i++)
  133. {
  134. struct starpu_fifo_taskq_s *fifo;
  135. fifo = queue_array[combined_workerid[i]];
  136. double local_exp_end = (fifo->exp_start + fifo->exp_len + length);
  137. exp_end = STARPU_MAX(exp_end, local_exp_end);
  138. }
  139. return exp_end;
  140. }
  141. }
  142. static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio)
  143. {
  144. /* find the queue */
  145. struct starpu_fifo_taskq_s *fifo;
  146. unsigned worker;
  147. int best = -1;
  148. /* this flag is set if the corresponding worker is selected because
  149. there is no performance prediction available yet */
  150. int forced_best = -1;
  151. double local_task_length[nworkers+ncombinedworkers];
  152. double local_data_penalty[nworkers+ncombinedworkers];
  153. double exp_end[nworkers+ncombinedworkers];
  154. double fitness[nworkers+ncombinedworkers];
  155. int skip_worker[nworkers+ncombinedworkers];
  156. double best_exp_end = DBL_MAX;
  157. double model_best = 0.0;
  158. double penality_best = 0.0;
  159. for (worker = 0; worker < nworkers; worker++)
  160. {
  161. fifo = queue_array[worker];
  162. fifo->exp_start = STARPU_MAX(fifo->exp_start, _starpu_timing_now());
  163. fifo->exp_end = STARPU_MAX(fifo->exp_end, _starpu_timing_now());
  164. }
  165. for (worker = 0; worker < (nworkers+ncombinedworkers); worker++)
  166. {
  167. if (!_starpu_combined_worker_may_execute_task(worker, task))
  168. {
  169. /* no one on that queue may execute this task */
  170. skip_worker[worker] = 1;
  171. continue;
  172. }
  173. else {
  174. skip_worker[worker] = 0;
  175. }
  176. enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
  177. local_task_length[worker] = _starpu_task_expected_length(task, perf_arch);
  178. unsigned memory_node = starpu_worker_get_memory_node(worker);
  179. local_data_penalty[worker] = _starpu_data_expected_penalty(memory_node, task);
  180. if (local_task_length[worker] == -1.0)
  181. {
  182. forced_best = worker;
  183. break;
  184. }
  185. exp_end[worker] = compute_expected_end(worker, local_task_length[worker]);
  186. if (exp_end[worker] < best_exp_end)
  187. {
  188. /* a better solution was found */
  189. best_exp_end = exp_end[worker];
  190. }
  191. }
  192. double best_fitness = -1;
  193. if (forced_best == -1)
  194. {
  195. for (worker = 0; worker < nworkers+ncombinedworkers; worker++)
  196. {
  197. if (skip_worker[worker])
  198. {
  199. /* no one on that queue may execute this task */
  200. continue;
  201. }
  202. fitness[worker] = alpha*(exp_end[worker] - best_exp_end)
  203. + beta*(local_data_penalty[worker]);
  204. if (best == -1 || fitness[worker] < best_fitness)
  205. {
  206. /* we found a better solution */
  207. best_fitness = fitness[worker];
  208. best = worker;
  209. }
  210. }
  211. }
  212. STARPU_ASSERT(forced_best != -1 || best != -1);
  213. if (forced_best != -1)
  214. {
  215. /* there is no prediction available for that task
  216. * with that arch we want to speed-up calibration time
  217. * so we force this measurement */
  218. best = worker;
  219. model_best = 0.0;
  220. penality_best = 0.0;
  221. }
  222. else
  223. {
  224. model_best = local_task_length[best];
  225. penality_best = local_data_penalty[best];
  226. }
  227. /* we should now have the best worker in variable "best" */
  228. return push_task_on_best_worker(task, best, model_best, prio);
  229. }
  230. static int parallel_heft_push_prio_task(struct starpu_task *task)
  231. {
  232. return _parallel_heft_push_task(task, 1);
  233. }
  234. static int parallel_heft_push_task(struct starpu_task *task)
  235. {
  236. if (task->priority == STARPU_MAX_PRIO)
  237. return _parallel_heft_push_task(task, 1);
  238. return _parallel_heft_push_task(task, 0);
  239. }
  240. static void initialize_parallel_heft_policy(struct starpu_machine_topology_s *topology,
  241. __attribute__ ((unused)) struct starpu_sched_policy_s *_policy)
  242. {
  243. nworkers = topology->nworkers;
  244. const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
  245. if (strval_alpha)
  246. beta = atof(strval_alpha);
  247. const char *strval_beta = getenv("STARPU_SCHED_BETA");
  248. if (strval_beta)
  249. beta = atof(strval_beta);
  250. _starpu_sched_find_worker_combinations(topology);
  251. ncombinedworkers = topology->ncombinedworkers;
  252. unsigned workerid;
  253. for (workerid = 0; workerid < nworkers; workerid++)
  254. {
  255. queue_array[workerid] = _starpu_create_fifo();
  256. PTHREAD_MUTEX_INIT(&sched_mutex[workerid], NULL);
  257. PTHREAD_COND_INIT(&sched_cond[workerid], NULL);
  258. starpu_worker_set_sched_condition(workerid, &sched_cond[workerid], &sched_mutex[workerid]);
  259. }
  260. PTHREAD_MUTEX_INIT(&big_lock, NULL);
  261. /* We pre-compute an array of all the perfmodel archs that are applicable */
  262. unsigned total_worker_count = nworkers + ncombinedworkers;
  263. unsigned used_perf_archtypes[STARPU_NARCH_VARIATIONS];
  264. memset(used_perf_archtypes, 0, sizeof(used_perf_archtypes));
  265. for (workerid = 0; workerid < total_worker_count; workerid++)
  266. {
  267. enum starpu_perf_archtype perf_archtype = starpu_worker_get_perf_archtype(workerid);
  268. used_perf_archtypes[perf_archtype] = 1;
  269. }
  270. napplicable_perf_archtypes = 0;
  271. int arch;
  272. for (arch = 0; arch < STARPU_NARCH_VARIATIONS; arch++)
  273. {
  274. if (used_perf_archtypes[arch])
  275. applicable_perf_archtypes[napplicable_perf_archtypes++] = arch;
  276. }
  277. }
  278. static void deinitialize_parallel_heft_policy(struct starpu_machine_topology_s *topology,
  279. __attribute__ ((unused)) struct starpu_sched_policy_s *_policy)
  280. {
  281. unsigned workerid;
  282. for (workerid = 0; workerid < topology->nworkers; workerid++)
  283. _starpu_destroy_fifo(queue_array[workerid]);
  284. }
  285. struct starpu_sched_policy_s _starpu_sched_parallel_heft_policy = {
  286. .init_sched = initialize_parallel_heft_policy,
  287. .deinit_sched = deinitialize_parallel_heft_policy,
  288. .push_task = parallel_heft_push_task,
  289. .push_prio_task = parallel_heft_push_prio_task,
  290. .pop_task = parallel_heft_pop_task,
  291. .post_exec_hook = NULL,
  292. .pop_every_task = NULL,
  293. .policy_name = "pheft",
  294. .policy_description = "parallel HEFT"
  295. };