component_work_stealing.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2013, 2017 Inria
  4. * Copyright (C) 2013, 2016, 2017 CNRS
  5. * Copyright (C) 2013 Simon Archipoff
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <float.h>
  19. #include <starpu_sched_component.h>
  20. #include <starpu_scheduler.h>
  21. #include <starpu.h>
  22. #include <core/workers.h>
  23. #include <core/sched_policy.h>
  24. #include <core/task.h>
  25. #include "prio_deque.h"
  26. struct _starpu_work_stealing_data
  27. {
  28. /* keep track of the work performed from the beginning of the algorithm to make
  29. * better decisions about which queue to child when stealing or deferring work
  30. */
  31. unsigned performed_total, last_pop_child, last_push_child;
  32. struct _starpu_prio_deque ** fifos;
  33. starpu_pthread_mutex_t ** mutexes;
  34. unsigned size;
  35. };
  36. /**
  37. * steal a task in a round robin way
  38. * return NULL if none available
  39. */
  40. static struct starpu_task * steal_task_round_robin(struct starpu_sched_component *component, int workerid)
  41. {
  42. struct _starpu_work_stealing_data *wsd = component->data;
  43. STARPU_HG_DISABLE_CHECKING(wsd->last_pop_child);
  44. unsigned i = wsd->last_pop_child;
  45. wsd->last_pop_child = (i + 1) % component->nchildren;
  46. STARPU_HG_ENABLE_CHECKING(wsd->last_pop_child);
  47. /* If the worker's queue have no suitable tasks, let's try
  48. * the next ones */
  49. struct starpu_task * task = NULL;
  50. while (1)
  51. {
  52. struct _starpu_prio_deque * fifo = wsd->fifos[i];
  53. STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
  54. task = _starpu_prio_deque_deque_task_for_worker(fifo, workerid, NULL);
  55. if(task && !isnan(task->predicted))
  56. {
  57. fifo->exp_len -= task->predicted;
  58. fifo->nprocessed--;
  59. }
  60. STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
  61. if(task)
  62. {
  63. _STARPU_TASK_BREAK_ON(task, sched);
  64. break;
  65. }
  66. if (i == wsd->last_pop_child)
  67. {
  68. /* We got back to the first worker,
  69. * don't go in infinite loop */
  70. return NULL;
  71. }
  72. i = (i + 1) % component->nchildren;
  73. }
  74. return task;
  75. }
  76. /**
  77. * Return a worker to whom add a task.
  78. * Selecting a worker is done in a round-robin fashion.
  79. */
  80. static unsigned select_worker_round_robin(struct starpu_sched_component * component)
  81. {
  82. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)component->data;
  83. unsigned i = (ws->last_push_child + 1) % component->nchildren ;
  84. ws->last_push_child = i;
  85. return i;
  86. }
  87. /**
  88. * Return a worker from which a task can be stolen.
  89. * This is a phony function used to call the right
  90. * function depending on the value of USE_OVERLOAD.
  91. */
  92. static inline struct starpu_task * steal_task(struct starpu_sched_component * component, int workerid)
  93. {
  94. return steal_task_round_robin(component, workerid);
  95. }
  96. /**
  97. * Return a worker from which a task can be stolen.
  98. * This is a phony function used to call the right
  99. * function depending on the value of USE_OVERLOAD.
  100. */
  101. static inline unsigned select_worker(struct starpu_sched_component * component)
  102. {
  103. return select_worker_round_robin(component);
  104. }
  105. static int is_worker_of_component(struct starpu_sched_component * component, int workerid)
  106. {
  107. return starpu_bitmap_get(component->workers, workerid);
  108. }
  109. static struct starpu_task * pull_task(struct starpu_sched_component * component)
  110. {
  111. unsigned workerid = starpu_worker_get_id_check();
  112. unsigned i;
  113. for(i = 0; i < component->nchildren; i++)
  114. {
  115. if(is_worker_of_component(component->children[i], workerid))
  116. break;
  117. }
  118. STARPU_ASSERT(i < component->nchildren);
  119. struct _starpu_work_stealing_data * wsd = component->data;
  120. const double now = starpu_timing_now();
  121. STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
  122. struct starpu_task * task = _starpu_prio_deque_pop_task(wsd->fifos[i]);
  123. if(task)
  124. {
  125. if(!isnan(task->predicted))
  126. {
  127. wsd->fifos[i]->exp_len -= task->predicted;
  128. wsd->fifos[i]->exp_start = now + task->predicted;
  129. }
  130. }
  131. else
  132. wsd->fifos[i]->exp_len = 0.0;
  133. STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
  134. if(task)
  135. {
  136. return task;
  137. }
  138. task = steal_task(component, workerid);
  139. if(task)
  140. {
  141. STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
  142. wsd->fifos[i]->nprocessed++;
  143. STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
  144. return task;
  145. }
  146. for(i=0; i < component->nparents; i++)
  147. {
  148. if(component->parents[i] == NULL)
  149. continue;
  150. else
  151. {
  152. task = starpu_sched_component_pull_task(component->parents[i],component);
  153. if(task)
  154. break;
  155. }
  156. }
  157. if(task)
  158. return task;
  159. else
  160. return NULL;
  161. }
  162. double _ws_estimated_end(struct starpu_sched_component * component)
  163. {
  164. STARPU_ASSERT(starpu_sched_component_is_work_stealing(component));
  165. struct _starpu_work_stealing_data * wsd = component->data;
  166. double sum_len = 0.0;
  167. double sum_start = 0.0;
  168. unsigned i;
  169. const double now = starpu_timing_now();
  170. for(i = 0; i < component->nchildren; i++)
  171. {
  172. STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
  173. sum_len += wsd->fifos[i]->exp_len;
  174. wsd->fifos[i]->exp_start = STARPU_MAX(now, wsd->fifos[i]->exp_start);
  175. sum_start += wsd->fifos[i]->exp_start;
  176. STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
  177. }
  178. int nb_workers = starpu_bitmap_cardinal(component->workers_in_ctx);
  179. return (sum_start + sum_len) / nb_workers;
  180. }
  181. double _ws_estimated_load(struct starpu_sched_component * component)
  182. {
  183. STARPU_ASSERT(starpu_sched_component_is_work_stealing(component));
  184. struct _starpu_work_stealing_data * wsd = component->data;
  185. int ntasks = 0;
  186. unsigned i;
  187. for(i = 0; i < component->nchildren; i++)
  188. {
  189. STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
  190. ntasks += wsd->fifos[i]->ntasks;
  191. STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
  192. }
  193. double speedup = 0.0;
  194. int workerid;
  195. for(workerid = starpu_bitmap_first(component->workers_in_ctx);
  196. -1 != workerid;
  197. workerid = starpu_bitmap_next(component->workers_in_ctx, workerid))
  198. {
  199. speedup += starpu_worker_get_relative_speedup(starpu_worker_get_perf_archtype(workerid, component->tree->sched_ctx_id));
  200. }
  201. return ntasks / speedup;
  202. }
  203. static int push_task(struct starpu_sched_component * component, struct starpu_task * task)
  204. {
  205. struct _starpu_work_stealing_data * wsd = component->data;
  206. int ret;
  207. unsigned i = wsd->last_push_child;
  208. i = (i+1)%component->nchildren;
  209. STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
  210. _STARPU_TASK_BREAK_ON(task, sched);
  211. ret = _starpu_prio_deque_push_task(wsd->fifos[i], task);
  212. STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
  213. wsd->last_push_child = i;
  214. component->can_pull(component);
  215. return ret;
  216. }
  217. //this function is special, when a worker call it, we want to push the task in his fifo
  218. int starpu_sched_tree_work_stealing_push_task(struct starpu_task *task)
  219. {
  220. int workerid = starpu_worker_get_id();
  221. if(workerid == -1)
  222. return starpu_sched_tree_push_task(task);
  223. unsigned sched_ctx_id = task->sched_ctx;
  224. struct starpu_sched_component * component =starpu_sched_component_worker_get(sched_ctx_id, workerid);
  225. while(sched_ctx_id < component->nparents && component->parents[sched_ctx_id] != NULL)
  226. {
  227. component = component->parents[sched_ctx_id];
  228. if(starpu_sched_component_is_work_stealing(component))
  229. {
  230. if(!starpu_sched_component_can_execute_task(component, task))
  231. return starpu_sched_tree_push_task(task);
  232. unsigned i;
  233. for(i = 0; i < component->nchildren; i++)
  234. if(is_worker_of_component(component->children[i], workerid))
  235. break;
  236. STARPU_ASSERT(i < component->nchildren);
  237. struct _starpu_work_stealing_data * wsd = component->data;
  238. STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
  239. int ret = _starpu_prio_deque_push_task(wsd->fifos[i] , task);
  240. if(ret == 0 && !isnan(task->predicted))
  241. wsd->fifos[i]->exp_len += task->predicted;
  242. STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
  243. component->can_pull(component);
  244. return ret;
  245. }
  246. }
  247. return starpu_sched_tree_push_task(task);
  248. }
  249. void _ws_add_child(struct starpu_sched_component * component, struct starpu_sched_component * child)
  250. {
  251. struct _starpu_work_stealing_data * wsd = component->data;
  252. starpu_sched_component_add_child(component, child);
  253. if(wsd->size < component->nchildren)
  254. {
  255. STARPU_ASSERT(wsd->size == component->nchildren - 1);
  256. _STARPU_REALLOC(wsd->fifos, component->nchildren * sizeof(*wsd->fifos));
  257. _STARPU_REALLOC(wsd->mutexes, component->nchildren * sizeof(*wsd->mutexes));
  258. wsd->size = component->nchildren;
  259. }
  260. struct _starpu_prio_deque *fifo;
  261. _STARPU_MALLOC(fifo, sizeof(*fifo));
  262. _starpu_prio_deque_init(fifo);
  263. wsd->fifos[component->nchildren - 1] = fifo;
  264. starpu_pthread_mutex_t *mutex;
  265. _STARPU_MALLOC(mutex, sizeof(*mutex));
  266. STARPU_PTHREAD_MUTEX_INIT(mutex,NULL);
  267. wsd->mutexes[component->nchildren - 1] = mutex;
  268. }
  269. void _ws_remove_child(struct starpu_sched_component * component, struct starpu_sched_component * child)
  270. {
  271. struct _starpu_work_stealing_data * wsd = component->data;
  272. STARPU_PTHREAD_MUTEX_DESTROY(wsd->mutexes[component->nchildren - 1]);
  273. free(wsd->mutexes[component->nchildren - 1]);
  274. unsigned i_component;
  275. for(i_component = 0; i_component < component->nchildren; i_component++)
  276. {
  277. if(component->children[i_component] == child)
  278. break;
  279. }
  280. STARPU_ASSERT(i_component != component->nchildren);
  281. struct _starpu_prio_deque * tmp_fifo = wsd->fifos[i_component];
  282. wsd->fifos[i_component] = wsd->fifos[component->nchildren - 1];
  283. component->children[i_component] = component->children[component->nchildren - 1];
  284. component->nchildren--;
  285. struct starpu_task * task;
  286. while ((task = _starpu_prio_deque_pop_task(tmp_fifo)))
  287. {
  288. starpu_sched_component_push_task(NULL, component, task);
  289. }
  290. _starpu_prio_deque_destroy(tmp_fifo);
  291. free(tmp_fifo);
  292. }
  293. void _work_stealing_component_deinit_data(struct starpu_sched_component * component)
  294. {
  295. struct _starpu_work_stealing_data * wsd = component->data;
  296. free(wsd->fifos);
  297. free(wsd->mutexes);
  298. free(wsd);
  299. }
  300. int starpu_sched_component_is_work_stealing(struct starpu_sched_component * component)
  301. {
  302. return component->push_task == push_task;
  303. }
  304. struct starpu_sched_component * starpu_sched_component_work_stealing_create(struct starpu_sched_tree *tree, void *arg)
  305. {
  306. (void)arg;
  307. struct starpu_sched_component *component = starpu_sched_component_create(tree, "work_stealing");
  308. struct _starpu_work_stealing_data *wsd;
  309. _STARPU_CALLOC(wsd, 1, sizeof(*wsd));
  310. component->pull_task = pull_task;
  311. component->push_task = push_task;
  312. component->add_child = _ws_add_child;
  313. component->remove_child = _ws_remove_child;
  314. component->estimated_end = _ws_estimated_end;
  315. component->estimated_load = _ws_estimated_load;
  316. component->deinit_data = _work_stealing_component_deinit_data;
  317. component->data = wsd;
  318. return component;
  319. }