deque-modeling-policy-data-aware.c 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. /*
  2. * StarPU
  3. * Copyright (C) INRIA 2008-2010 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <core/policies/deque-modeling-policy-data-aware.h>
  17. #include <core/perfmodel/perfmodel.h>
  18. static unsigned nworkers;
  19. static struct jobq_s *queue_array[STARPU_NMAXWORKERS];
  20. static int use_prefetch = 0;
  21. static double alpha = 1.0;
  22. static double beta = 1.0;
  23. static job_t dmda_pop_task(struct jobq_s *q)
  24. {
  25. struct job_s *j;
  26. j = fifo_pop_task(q);
  27. if (j) {
  28. struct fifo_jobq_s *fifo = q->queue;
  29. double model = j->predicted;
  30. fifo->exp_len -= model;
  31. fifo->exp_start = timing_now() + model;
  32. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  33. }
  34. return j;
  35. }
  36. static void update_data_requests(struct jobq_s *q, struct starpu_task *task)
  37. {
  38. uint32_t memory_node = q->memory_node;
  39. unsigned nbuffers = task->cl->nbuffers;
  40. unsigned buffer;
  41. for (buffer = 0; buffer < nbuffers; buffer++)
  42. {
  43. starpu_data_handle handle = task->buffers[buffer].handle;
  44. starpu_set_data_requested_flag_if_needed(handle, memory_node);
  45. }
  46. }
  47. static int _dmda_push_task(struct jobq_s *q __attribute__ ((unused)) , job_t j, unsigned prio)
  48. {
  49. /* find the queue */
  50. struct fifo_jobq_s *fifo;
  51. unsigned worker;
  52. int best = -1;
  53. /* this flag is set if the corresponding worker is selected because
  54. there is no performance prediction available yet */
  55. int forced_best = -1;
  56. double local_task_length[nworkers];
  57. double local_data_penalty[nworkers];
  58. double exp_end[nworkers];
  59. double fitness[nworkers];
  60. double best_exp_end = 10e240;
  61. double model_best = 0.0;
  62. double penality_best = 0.0;
  63. struct starpu_task *task = j->task;
  64. for (worker = 0; worker < nworkers; worker++)
  65. {
  66. fifo = queue_array[worker]->queue;
  67. fifo->exp_start = STARPU_MAX(fifo->exp_start, timing_now());
  68. fifo->exp_end = STARPU_MAX(fifo->exp_end, timing_now());
  69. if ((queue_array[worker]->who & task->cl->where) == 0)
  70. {
  71. /* no one on that queue may execute this task */
  72. continue;
  73. }
  74. local_task_length[worker] = job_expected_length(queue_array[worker]->who,
  75. j, queue_array[worker]->arch);
  76. //local_data_penalty[worker] = 0;
  77. local_data_penalty[worker] = data_expected_penalty(queue_array[worker], task);
  78. if (local_task_length[worker] == -1.0)
  79. {
  80. forced_best = worker;
  81. break;
  82. }
  83. exp_end[worker] = fifo->exp_start + fifo->exp_len + local_task_length[worker];
  84. if (exp_end[worker] < best_exp_end)
  85. {
  86. /* a better solution was found */
  87. best_exp_end = exp_end[worker];
  88. }
  89. }
  90. double best_fitness = -1;
  91. if (forced_best == -1)
  92. {
  93. for (worker = 0; worker < nworkers; worker++)
  94. {
  95. fifo = queue_array[worker]->queue;
  96. if ((queue_array[worker]->who & task->cl->where) == 0)
  97. {
  98. /* no one on that queue may execute this task */
  99. continue;
  100. }
  101. fitness[worker] = alpha*(exp_end[worker] - best_exp_end)
  102. + beta*(local_data_penalty[worker]);
  103. if (best == -1 || fitness[worker] < best_fitness)
  104. {
  105. /* we found a better solution */
  106. best_fitness = fitness[worker];
  107. best = worker;
  108. // fprintf(stderr, "best fitness (worker %d) %le = alpha*(%le) + beta(%le) \n", worker, best_fitness, exp_end[worker] - best_exp_end, local_data_penalty[worker]);
  109. }
  110. }
  111. }
  112. STARPU_ASSERT(forced_best != -1 || best != -1);
  113. if (forced_best != -1)
  114. {
  115. /* there is no prediction available for that task
  116. * with that arch we want to speed-up calibration time
  117. * so we force this measurement */
  118. best = worker;
  119. model_best = 0.0;
  120. penality_best = 0.0;
  121. }
  122. else
  123. {
  124. model_best = local_task_length[best];
  125. penality_best = local_data_penalty[best];
  126. }
  127. /* we should now have the best worker in variable "best" */
  128. fifo = queue_array[best]->queue;
  129. fifo->exp_end += model_best;
  130. fifo->exp_len += model_best;
  131. j->predicted = model_best;
  132. j->penality = penality_best;
  133. update_data_requests(queue_array[best], task);
  134. if (use_prefetch)
  135. starpu_prefetch_task_input_on_node(task, queue_array[best]->memory_node);
  136. if (prio) {
  137. return fifo_push_prio_task(queue_array[best], j);
  138. } else {
  139. return fifo_push_task(queue_array[best], j);
  140. }
  141. }
  142. static int dmda_push_prio_task(struct jobq_s *q, job_t j)
  143. {
  144. return _dmda_push_task(q, j, 1);
  145. }
  146. static int dmda_push_task(struct jobq_s *q, job_t j)
  147. {
  148. if (j->task->priority == STARPU_MAX_PRIO)
  149. return _dmda_push_task(q, j, 1);
  150. return _dmda_push_task(q, j, 0);
  151. }
  152. static struct jobq_s *init_dmda_fifo(void)
  153. {
  154. struct jobq_s *q;
  155. q = create_fifo();
  156. q->push_task = dmda_push_task;
  157. q->push_prio_task = dmda_push_prio_task;
  158. q->pop_task = dmda_pop_task;
  159. q->who = 0;
  160. queue_array[nworkers++] = q;
  161. return q;
  162. }
  163. static void initialize_dmda_policy(struct machine_config_s *config,
  164. __attribute__ ((unused)) struct sched_policy_s *_policy)
  165. {
  166. nworkers = 0;
  167. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  168. if (use_prefetch == -1)
  169. use_prefetch = 0;
  170. const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
  171. if (strval_alpha)
  172. beta = atof(strval_alpha);
  173. const char *strval_beta = getenv("STARPU_SCHED_BETA");
  174. if (strval_beta)
  175. beta = atof(strval_beta);
  176. setup_queues(init_fifo_queues_mechanisms, init_dmda_fifo, config);
  177. }
  178. static struct jobq_s *get_local_queue_dmda(struct sched_policy_s *policy __attribute__ ((unused)))
  179. {
  180. struct jobq_s *queue;
  181. queue = pthread_getspecific(policy->local_queue_key);
  182. if (!queue)
  183. {
  184. /* take one randomly as this *must* be for a push anyway XXX */
  185. queue = queue_array[0];
  186. }
  187. return queue;
  188. }
  189. struct sched_policy_s sched_dmda_policy = {
  190. .init_sched = initialize_dmda_policy,
  191. .deinit_sched = NULL,
  192. .get_local_queue = get_local_queue_dmda,
  193. .policy_name = "dmda",
  194. .policy_description = "data-aware performance model"
  195. };