deque-modeling-policy.c 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. /*
  2. * StarPU
  3. * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <common/config.h>
  17. #include <core/policies/deque-modeling-policy.h>
  18. #include <core/perfmodel/perfmodel.h>
  19. static unsigned nworkers;
  20. static struct jobq_s *queue_array[STARPU_NMAXWORKERS];
  21. static int use_prefetch = 0;
  22. static job_t dm_pop_task(struct jobq_s *q)
  23. {
  24. struct job_s *j;
  25. j = fifo_pop_task(q);
  26. if (j) {
  27. struct fifo_jobq_s *fifo = q->queue;
  28. double model = j->predicted;
  29. fifo->exp_len -= model;
  30. fifo->exp_start = timing_now() + model;
  31. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  32. }
  33. return j;
  34. }
  35. static struct job_list_s *dm_pop_every_task(struct jobq_s *q, uint32_t where)
  36. {
  37. struct job_list_s *new_list;
  38. new_list = fifo_pop_every_task(q, where);
  39. if (new_list) {
  40. job_itor_t i;
  41. for(i = job_list_begin(new_list);
  42. i != job_list_end(new_list);
  43. i = job_list_next(i))
  44. {
  45. struct fifo_jobq_s *fifo = q->queue;
  46. double model = i->predicted;
  47. fifo->exp_len -= model;
  48. fifo->exp_start = timing_now() + model;
  49. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  50. }
  51. }
  52. return new_list;
  53. }
  54. static int _dm_push_task(struct jobq_s *q __attribute__ ((unused)), job_t j, unsigned prio)
  55. {
  56. /* find the queue */
  57. struct fifo_jobq_s *fifo;
  58. unsigned worker;
  59. int best = -1;
  60. double best_exp_end = 0.0;
  61. double model_best = 0.0;
  62. struct starpu_task *task = j->task;
  63. for (worker = 0; worker < nworkers; worker++)
  64. {
  65. double exp_end;
  66. fifo = queue_array[worker]->queue;
  67. fifo->exp_start = STARPU_MAX(fifo->exp_start, timing_now());
  68. fifo->exp_end = STARPU_MAX(fifo->exp_end, timing_now());
  69. if ((queue_array[worker]->who & task->cl->where) == 0)
  70. {
  71. /* no one on that queue may execute this task */
  72. continue;
  73. }
  74. double local_length = job_expected_length(queue_array[worker]->who, j, queue_array[worker]->arch);
  75. if (local_length == -1.0)
  76. {
  77. /* there is no prediction available for that task
  78. * with that arch we want to speed-up calibration time
  79. * so we force this measurement */
  80. /* XXX assert we are benchmarking ! */
  81. best = worker;
  82. model_best = 0.0;
  83. exp_end = fifo->exp_start + fifo->exp_len;
  84. break;
  85. }
  86. exp_end = fifo->exp_start + fifo->exp_len + local_length;
  87. if (best == -1 || exp_end < best_exp_end)
  88. {
  89. /* a better solution was found */
  90. best_exp_end = exp_end;
  91. best = worker;
  92. model_best = local_length;
  93. }
  94. }
  95. /* make sure someone coule execute that task ! */
  96. STARPU_ASSERT(best != -1);
  97. /* we should now have the best worker in variable "best" */
  98. fifo = queue_array[best]->queue;
  99. fifo->exp_end += model_best;
  100. fifo->exp_len += model_best;
  101. j->predicted = model_best;
  102. if (use_prefetch)
  103. prefetch_task_input_on_node(task, queue_array[best]->memory_node);
  104. if (prio) {
  105. return fifo_push_prio_task(queue_array[best], j);
  106. } else {
  107. return fifo_push_task(queue_array[best], j);
  108. }
  109. }
  110. static int dm_push_prio_task(struct jobq_s *q, job_t j)
  111. {
  112. return _dm_push_task(q, j, 1);
  113. }
  114. static int dm_push_task(struct jobq_s *q, job_t j)
  115. {
  116. if (j->task->priority == MAX_PRIO)
  117. return _dm_push_task(q, j, 1);
  118. return _dm_push_task(q, j, 0);
  119. }
  120. static struct jobq_s *init_dm_fifo(void)
  121. {
  122. struct jobq_s *q;
  123. q = create_fifo();
  124. q->push_task = dm_push_task;
  125. q->push_prio_task = dm_push_prio_task;
  126. q->pop_task = dm_pop_task;
  127. q->pop_every_task = dm_pop_every_task;
  128. q->who = 0;
  129. queue_array[nworkers++] = q;
  130. return q;
  131. }
  132. void initialize_dm_policy(struct machine_config_s *config,
  133. __attribute__ ((unused)) struct sched_policy_s *_policy)
  134. {
  135. nworkers = 0;
  136. use_prefetch = starpu_get_env_number("PREFETCH");
  137. if (use_prefetch == -1)
  138. use_prefetch = 0;
  139. #ifdef VERBOSE
  140. fprintf(stderr, "Using prefetch ? %s\n", use_prefetch?"yes":"no");
  141. #endif
  142. setup_queues(init_fifo_queues_mechanisms, init_dm_fifo, config);
  143. }
  144. struct jobq_s *get_local_queue_dm(struct sched_policy_s *policy __attribute__ ((unused)))
  145. {
  146. struct jobq_s *queue;
  147. queue = pthread_getspecific(policy->local_queue_key);
  148. if (!queue)
  149. {
  150. /* take one randomly as this *must* be for a push anyway XXX */
  151. queue = queue_array[0];
  152. }
  153. return queue;
  154. }