component_work_stealing.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2013,2017 Inria
  4. * Copyright (C) 2013-2014,2016-2017 CNRS
  5. * Copyright (C) 2014-2018 Université de Bordeaux
  6. * Copyright (C) 2013 Simon Archipoff
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. #include <float.h>
  20. #include <starpu_sched_component.h>
  21. #include <starpu_scheduler.h>
  22. #include <starpu.h>
  23. #include <core/workers.h>
  24. #include <core/sched_policy.h>
  25. #include <core/task.h>
  26. #include "prio_deque.h"
  27. #ifdef STARPU_DEVEL
  28. #warning TODO: locality work-stealing
  29. #endif
  30. struct _starpu_work_stealing_data
  31. {
  32. /* keep track of the work performed from the beginning of the algorithm to make
  33. * better decisions about which queue to child when stealing or deferring work
  34. */
  35. unsigned performed_total, last_pop_child, last_push_child;
  36. struct _starpu_prio_deque ** fifos;
  37. starpu_pthread_mutex_t ** mutexes;
  38. unsigned size;
  39. };
  40. /**
  41. * steal a task in a round robin way
  42. * return NULL if none available
  43. */
  44. static struct starpu_task * steal_task_round_robin(struct starpu_sched_component *component, int workerid)
  45. {
  46. struct _starpu_work_stealing_data *wsd = component->data;
  47. STARPU_HG_DISABLE_CHECKING(wsd->last_pop_child);
  48. unsigned i = wsd->last_pop_child;
  49. wsd->last_pop_child = (i + 1) % component->nchildren;
  50. STARPU_HG_ENABLE_CHECKING(wsd->last_pop_child);
  51. /* If the worker's queue have no suitable tasks, let's try
  52. * the next ones */
  53. struct starpu_task * task = NULL;
  54. while (1)
  55. {
  56. struct _starpu_prio_deque * fifo = wsd->fifos[i];
  57. STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
  58. task = _starpu_prio_deque_deque_task_for_worker(fifo, workerid, NULL);
  59. if(task && !isnan(task->predicted))
  60. {
  61. fifo->exp_len -= task->predicted;
  62. fifo->nprocessed--;
  63. }
  64. STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
  65. if(task)
  66. {
  67. starpu_sched_task_break(task);
  68. break;
  69. }
  70. if (i == wsd->last_pop_child)
  71. {
  72. /* We got back to the first worker,
  73. * don't go in infinite loop */
  74. return NULL;
  75. }
  76. i = (i + 1) % component->nchildren;
  77. }
  78. return task;
  79. }
  80. /**
  81. * Return a worker to whom add a task.
  82. * Selecting a worker is done in a round-robin fashion.
  83. */
  84. static unsigned select_worker_round_robin(struct starpu_sched_component * component)
  85. {
  86. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)component->data;
  87. unsigned i = (ws->last_push_child + 1) % component->nchildren ;
  88. ws->last_push_child = i;
  89. return i;
  90. }
  91. /**
  92. * Return a worker from which a task can be stolen.
  93. * This is a phony function used to call the right
  94. * function depending on the value of USE_OVERLOAD.
  95. */
  96. static inline struct starpu_task * steal_task(struct starpu_sched_component * component, int workerid)
  97. {
  98. return steal_task_round_robin(component, workerid);
  99. }
  100. /**
  101. * Return a worker from which a task can be stolen.
  102. * This is a phony function used to call the right
  103. * function depending on the value of USE_OVERLOAD.
  104. */
  105. static inline unsigned select_worker(struct starpu_sched_component * component)
  106. {
  107. return select_worker_round_robin(component);
  108. }
  109. static int is_worker_of_component(struct starpu_sched_component * component, int workerid)
  110. {
  111. return starpu_bitmap_get(component->workers, workerid);
  112. }
  113. static struct starpu_task * pull_task(struct starpu_sched_component * component, struct starpu_sched_component * to STARPU_ATTRIBUTE_UNUSED)
  114. {
  115. unsigned workerid = starpu_worker_get_id_check();
  116. unsigned i;
  117. for(i = 0; i < component->nchildren; i++)
  118. {
  119. if(is_worker_of_component(component->children[i], workerid))
  120. break;
  121. }
  122. STARPU_ASSERT(i < component->nchildren);
  123. struct _starpu_work_stealing_data * wsd = component->data;
  124. const double now = starpu_timing_now();
  125. STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
  126. struct starpu_task * task = _starpu_prio_deque_pop_task(wsd->fifos[i]);
  127. if(task)
  128. {
  129. if(!isnan(task->predicted))
  130. {
  131. wsd->fifos[i]->exp_len -= task->predicted;
  132. wsd->fifos[i]->exp_start = now + task->predicted;
  133. }
  134. }
  135. else
  136. wsd->fifos[i]->exp_len = 0.0;
  137. STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
  138. if(task)
  139. {
  140. return task;
  141. }
  142. task = steal_task(component, workerid);
  143. if(task)
  144. {
  145. STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
  146. wsd->fifos[i]->nprocessed++;
  147. STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
  148. return task;
  149. }
  150. for(i=0; i < component->nparents; i++)
  151. {
  152. if(component->parents[i] == NULL)
  153. continue;
  154. else
  155. {
  156. task = starpu_sched_component_pull_task(component->parents[i],component);
  157. if(task)
  158. break;
  159. }
  160. }
  161. if(task)
  162. return task;
  163. else
  164. return NULL;
  165. }
  166. double _ws_estimated_end(struct starpu_sched_component * component)
  167. {
  168. STARPU_ASSERT(starpu_sched_component_is_work_stealing(component));
  169. struct _starpu_work_stealing_data * wsd = component->data;
  170. double sum_len = 0.0;
  171. double sum_start = 0.0;
  172. unsigned i;
  173. const double now = starpu_timing_now();
  174. for(i = 0; i < component->nchildren; i++)
  175. {
  176. STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
  177. sum_len += wsd->fifos[i]->exp_len;
  178. wsd->fifos[i]->exp_start = STARPU_MAX(now, wsd->fifos[i]->exp_start);
  179. sum_start += wsd->fifos[i]->exp_start;
  180. STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
  181. }
  182. int nb_workers = starpu_bitmap_cardinal(component->workers_in_ctx);
  183. return (sum_start + sum_len) / nb_workers;
  184. }
  185. double _ws_estimated_load(struct starpu_sched_component * component)
  186. {
  187. STARPU_ASSERT(starpu_sched_component_is_work_stealing(component));
  188. struct _starpu_work_stealing_data * wsd = component->data;
  189. int ntasks = 0;
  190. unsigned i;
  191. for(i = 0; i < component->nchildren; i++)
  192. {
  193. STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
  194. ntasks += wsd->fifos[i]->ntasks;
  195. STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
  196. }
  197. double speedup = 0.0;
  198. int workerid;
  199. for(workerid = starpu_bitmap_first(component->workers_in_ctx);
  200. -1 != workerid;
  201. workerid = starpu_bitmap_next(component->workers_in_ctx, workerid))
  202. {
  203. speedup += starpu_worker_get_relative_speedup(starpu_worker_get_perf_archtype(workerid, component->tree->sched_ctx_id));
  204. }
  205. return ntasks / speedup;
  206. }
  207. static int push_task(struct starpu_sched_component * component, struct starpu_task * task)
  208. {
  209. struct _starpu_work_stealing_data * wsd = component->data;
  210. int ret;
  211. unsigned i = wsd->last_push_child;
  212. i = (i+1)%component->nchildren;
  213. STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
  214. starpu_sched_task_break(task);
  215. ret = _starpu_prio_deque_push_front_task(wsd->fifos[i], task);
  216. STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
  217. wsd->last_push_child = i;
  218. component->can_pull(component);
  219. return ret;
  220. }
  221. //this function is special, when a worker call it, we want to push the task in his fifo
  222. int starpu_sched_tree_work_stealing_push_task(struct starpu_task *task)
  223. {
  224. int workerid = starpu_worker_get_id();
  225. if(workerid == -1)
  226. return starpu_sched_tree_push_task(task);
  227. unsigned sched_ctx_id = task->sched_ctx;
  228. struct starpu_sched_component * component =starpu_sched_component_worker_get(sched_ctx_id, workerid);
  229. while(sched_ctx_id < component->nparents && component->parents[sched_ctx_id] != NULL)
  230. {
  231. component = component->parents[sched_ctx_id];
  232. if(starpu_sched_component_is_work_stealing(component))
  233. {
  234. if(!starpu_sched_component_can_execute_task(component, task))
  235. return starpu_sched_tree_push_task(task);
  236. unsigned i;
  237. for(i = 0; i < component->nchildren; i++)
  238. if(is_worker_of_component(component->children[i], workerid))
  239. break;
  240. STARPU_ASSERT(i < component->nchildren);
  241. struct _starpu_work_stealing_data * wsd = component->data;
  242. STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
  243. int ret = _starpu_prio_deque_push_front_task(wsd->fifos[i] , task);
  244. if(ret == 0 && !isnan(task->predicted))
  245. wsd->fifos[i]->exp_len += task->predicted;
  246. STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
  247. component->can_pull(component);
  248. return ret;
  249. }
  250. }
  251. return starpu_sched_tree_push_task(task);
  252. }
  253. void _ws_add_child(struct starpu_sched_component * component, struct starpu_sched_component * child)
  254. {
  255. struct _starpu_work_stealing_data * wsd = component->data;
  256. starpu_sched_component_add_child(component, child);
  257. if(wsd->size < component->nchildren)
  258. {
  259. STARPU_ASSERT(wsd->size == component->nchildren - 1);
  260. _STARPU_REALLOC(wsd->fifos, component->nchildren * sizeof(*wsd->fifos));
  261. _STARPU_REALLOC(wsd->mutexes, component->nchildren * sizeof(*wsd->mutexes));
  262. wsd->size = component->nchildren;
  263. }
  264. struct _starpu_prio_deque *fifo;
  265. _STARPU_MALLOC(fifo, sizeof(*fifo));
  266. _starpu_prio_deque_init(fifo);
  267. wsd->fifos[component->nchildren - 1] = fifo;
  268. starpu_pthread_mutex_t *mutex;
  269. _STARPU_MALLOC(mutex, sizeof(*mutex));
  270. STARPU_PTHREAD_MUTEX_INIT(mutex,NULL);
  271. wsd->mutexes[component->nchildren - 1] = mutex;
  272. }
  273. void _ws_remove_child(struct starpu_sched_component * component, struct starpu_sched_component * child)
  274. {
  275. struct _starpu_work_stealing_data * wsd = component->data;
  276. STARPU_PTHREAD_MUTEX_DESTROY(wsd->mutexes[component->nchildren - 1]);
  277. free(wsd->mutexes[component->nchildren - 1]);
  278. unsigned i_component;
  279. for(i_component = 0; i_component < component->nchildren; i_component++)
  280. {
  281. if(component->children[i_component] == child)
  282. break;
  283. }
  284. STARPU_ASSERT(i_component != component->nchildren);
  285. struct _starpu_prio_deque * tmp_fifo = wsd->fifos[i_component];
  286. wsd->fifos[i_component] = wsd->fifos[component->nchildren - 1];
  287. component->children[i_component] = component->children[component->nchildren - 1];
  288. component->nchildren--;
  289. struct starpu_task * task;
  290. while ((task = _starpu_prio_deque_pop_task(tmp_fifo)))
  291. {
  292. starpu_sched_component_push_task(NULL, component, task);
  293. }
  294. _starpu_prio_deque_destroy(tmp_fifo);
  295. free(tmp_fifo);
  296. }
  297. void _work_stealing_component_deinit_data(struct starpu_sched_component * component)
  298. {
  299. struct _starpu_work_stealing_data * wsd = component->data;
  300. free(wsd->fifos);
  301. free(wsd->mutexes);
  302. free(wsd);
  303. }
  304. int starpu_sched_component_is_work_stealing(struct starpu_sched_component * component)
  305. {
  306. return component->push_task == push_task;
  307. }
  308. struct starpu_sched_component * starpu_sched_component_work_stealing_create(struct starpu_sched_tree *tree, void *arg)
  309. {
  310. (void)arg;
  311. struct starpu_sched_component *component = starpu_sched_component_create(tree, "work_stealing");
  312. struct _starpu_work_stealing_data *wsd;
  313. _STARPU_CALLOC(wsd, 1, sizeof(*wsd));
  314. component->pull_task = pull_task;
  315. component->push_task = push_task;
  316. component->add_child = _ws_add_child;
  317. component->remove_child = _ws_remove_child;
  318. component->estimated_end = _ws_estimated_end;
  319. component->estimated_load = _ws_estimated_load;
  320. component->deinit_data = _work_stealing_component_deinit_data;
  321. component->data = wsd;
  322. return component;
  323. }