work_stealing_policy.c 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2011 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011 Centre National de la Recherche Scientifique
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. /* Work stealing policy */
  18. #include <core/workers.h>
  19. #include <sched_policies/deque_queues.h>
  20. static unsigned nworkers;
  21. static unsigned rr_worker;
  22. static struct _starpu_deque_jobq *queue_array[STARPU_NMAXWORKERS];
  23. static pthread_mutex_t global_sched_mutex;
  24. static pthread_cond_t global_sched_cond;
  25. /* keep track of the work performed from the beginning of the algorithm to make
  26. * better decisions about which queue to select when stealing or deferring work
  27. */
  28. static unsigned performed_total = 0;
  29. #ifdef USE_OVERLOAD
  30. static float overload_metric(unsigned id)
  31. {
  32. float execution_ratio = 0.0f;
  33. if (performed_total > 0)
  34. {
  35. execution_ratio = _starpu_get_deque_nprocessed(queue_array[id])/performed_total;
  36. }
  37. unsigned performed_queue;
  38. performed_queue = _starpu_get_deque_nprocessed(queue_array[id]);
  39. float current_ratio = 0.0f;
  40. if (performed_queue > 0)
  41. {
  42. current_ratio = _starpu_get_deque_njobs(queue_array[id])/performed_queue;
  43. }
  44. return (current_ratio - execution_ratio);
  45. }
  46. /* who to steal work to ? */
  47. static struct _starpu_deque_jobq *select_victimq(void)
  48. {
  49. struct _starpu_deque_jobq *q;
  50. unsigned attempts = nworkers;
  51. unsigned worker = rr_worker;
  52. do
  53. {
  54. if (overload_metric(worker) > 0.0f)
  55. {
  56. q = queue_array[worker];
  57. return q;
  58. }
  59. else
  60. {
  61. worker = (worker + 1)%nworkers;
  62. }
  63. }
  64. while(attempts-- > 0);
  65. /* take one anyway ... */
  66. q = queue_array[rr_worker];
  67. rr_worker = (rr_worker + 1 )%nworkers;
  68. return q;
  69. }
  70. static struct _starpu_deque_jobq *select_workerq(void)
  71. {
  72. struct _starpu_deque_jobq *q;
  73. unsigned attempts = nworkers;
  74. unsigned worker = rr_worker;
  75. do
  76. {
  77. if (overload_metric(worker) < 0.0f)
  78. {
  79. q = queue_array[worker];
  80. return q;
  81. }
  82. else
  83. {
  84. worker = (worker + 1)%nworkers;
  85. }
  86. }
  87. while(attempts-- > 0);
  88. /* take one anyway ... */
  89. q = queue_array[rr_worker];
  90. rr_worker = (rr_worker + 1 )%nworkers;
  91. return q;
  92. }
  93. #else
  94. /* who to steal work from ? */
  95. static struct _starpu_deque_jobq *select_victimq(void)
  96. {
  97. struct _starpu_deque_jobq *q;
  98. q = queue_array[rr_worker];
  99. rr_worker = (rr_worker + 1 )%nworkers;
  100. return q;
  101. }
  102. /* when anonymous threads submit tasks,
  103. * we need to select a queue where to dispose them */
  104. static struct _starpu_deque_jobq *select_workerq(void)
  105. {
  106. struct _starpu_deque_jobq *q;
  107. q = queue_array[rr_worker];
  108. rr_worker = (rr_worker + 1 )%nworkers;
  109. return q;
  110. }
  111. #endif
  112. #ifdef STARPU_DEVEL
  113. #warning TODO rewrite ... this will not scale at all now
  114. #endif
  115. static struct starpu_task *ws_pop_task(void)
  116. {
  117. struct starpu_task *task;
  118. int workerid = starpu_worker_get_id();
  119. struct _starpu_deque_jobq *q;
  120. q = queue_array[workerid];
  121. _STARPU_PTHREAD_MUTEX_LOCK(&global_sched_mutex);
  122. task = _starpu_deque_pop_task(q, -1);
  123. if (task)
  124. {
  125. /* there was a local task */
  126. performed_total++;
  127. _STARPU_PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
  128. return task;
  129. }
  130. /* we need to steal someone's job */
  131. struct _starpu_deque_jobq *victimq;
  132. victimq = select_victimq();
  133. task = _starpu_deque_pop_task(victimq, workerid);
  134. if (task)
  135. {
  136. _STARPU_TRACE_WORK_STEALING(q, victimq);
  137. performed_total++;
  138. }
  139. _STARPU_PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
  140. return task;
  141. }
  142. static int ws_push_task(struct starpu_task *task)
  143. {
  144. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  145. int workerid = starpu_worker_get_id();
  146. struct _starpu_deque_jobq *deque_queue;
  147. deque_queue = queue_array[workerid];
  148. _STARPU_PTHREAD_MUTEX_LOCK(&global_sched_mutex);
  149. // XXX reuse ?
  150. //total_number_of_jobs++;
  151. _STARPU_TRACE_JOB_PUSH(task, 0);
  152. _starpu_job_list_push_front(deque_queue->jobq, j);
  153. deque_queue->njobs++;
  154. deque_queue->nprocessed++;
  155. _STARPU_PTHREAD_COND_SIGNAL(&global_sched_cond);
  156. _STARPU_PTHREAD_MUTEX_UNLOCK(&global_sched_mutex);
  157. return 0;
  158. }
  159. static void initialize_ws_policy(struct starpu_machine_topology *topology,
  160. __attribute__ ((unused)) struct starpu_sched_policy *_policy)
  161. {
  162. nworkers = topology->nworkers;
  163. rr_worker = 0;
  164. _STARPU_PTHREAD_MUTEX_INIT(&global_sched_mutex, NULL);
  165. _STARPU_PTHREAD_COND_INIT(&global_sched_cond, NULL);
  166. unsigned workerid;
  167. for (workerid = 0; workerid < nworkers; workerid++)
  168. {
  169. queue_array[workerid] = _starpu_create_deque();
  170. starpu_worker_set_sched_condition(workerid, &global_sched_cond, &global_sched_mutex);
  171. }
  172. }
  173. struct starpu_sched_policy _starpu_sched_ws_policy =
  174. {
  175. .init_sched = initialize_ws_policy,
  176. .deinit_sched = NULL,
  177. .push_task = ws_push_task,
  178. .pop_task = ws_pop_task,
  179. .pre_exec_hook = NULL,
  180. .post_exec_hook = NULL,
  181. .pop_every_task = NULL,
  182. .policy_name = "ws",
  183. .policy_description = "work stealing"
  184. };