component_work_stealing.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2013 INRIA
  4. * Copyright (C) 2013 CNRS
  5. * Copyright (C) 2013 Simon Archipoff
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <float.h>
  19. #include <starpu_sched_component.h>
  20. #include <starpu_scheduler.h>
  21. #include <starpu.h>
  22. #include <core/workers.h>
  23. #include "prio_deque.h"
  24. struct _starpu_work_stealing_data
  25. {
  26. /* keep track of the work performed from the beginning of the algorithm to make
  27. * better decisions about which queue to child when stealing or deferring work
  28. */
  29. unsigned performed_total, last_pop_child, last_push_child;
  30. struct _starpu_prio_deque ** fifos;
  31. starpu_pthread_mutex_t ** mutexes;
  32. int size;
  33. };
  34. /**
  35. * steal a task in a round robin way
  36. * return NULL if none available
  37. */
  38. static struct starpu_task * steal_task_round_robin(struct starpu_sched_component *component, int workerid)
  39. {
  40. struct _starpu_work_stealing_data *wsd = component->data;
  41. unsigned i = wsd->last_pop_child;
  42. wsd->last_pop_child = (wsd->last_pop_child + 1) % component->nchildren;
  43. /* If the worker's queue have no suitable tasks, let's try
  44. * the next ones */
  45. struct starpu_task * task = NULL;
  46. while (1)
  47. {
  48. struct _starpu_prio_deque * fifo = wsd->fifos[i];
  49. STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
  50. task = _starpu_prio_deque_deque_task_for_worker(fifo, workerid);
  51. if(task && !isnan(task->predicted))
  52. {
  53. fifo->exp_len -= task->predicted;
  54. fifo->nprocessed--;
  55. }
  56. STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
  57. if(task)
  58. break;
  59. if (i == wsd->last_pop_child)
  60. {
  61. /* We got back to the first worker,
  62. * don't go in infinite loop */
  63. return NULL;
  64. }
  65. i = (i + 1) % component->nchildren;
  66. }
  67. return task;
  68. }
  69. /**
  70. * Return a worker to whom add a task.
  71. * Selecting a worker is done in a round-robin fashion.
  72. */
  73. static unsigned select_worker_round_robin(struct starpu_sched_component * component)
  74. {
  75. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)component->data;
  76. unsigned i = (ws->last_push_child + 1) % component->nchildren ;
  77. ws->last_push_child = i;
  78. return i;
  79. }
  80. /**
  81. * Return a worker from which a task can be stolen.
  82. * This is a phony function used to call the right
  83. * function depending on the value of USE_OVERLOAD.
  84. */
  85. static inline struct starpu_task * steal_task(struct starpu_sched_component * component, int workerid)
  86. {
  87. return steal_task_round_robin(component, workerid);
  88. }
  89. /**
  90. * Return a worker from which a task can be stolen.
  91. * This is a phony function used to call the right
  92. * function depending on the value of USE_OVERLOAD.
  93. */
  94. static inline unsigned select_worker(struct starpu_sched_component * component)
  95. {
  96. return select_worker_round_robin(component);
  97. }
  98. static int is_worker_of_component(struct starpu_sched_component * component, int workerid)
  99. {
  100. return starpu_bitmap_get(component->workers, workerid);
  101. }
  102. static struct starpu_task * pull_task(struct starpu_sched_component * component)
  103. {
  104. unsigned workerid = starpu_worker_get_id_check();
  105. int i;
  106. for(i = 0; i < component->nchildren; i++)
  107. {
  108. if(is_worker_of_component(component->children[i], workerid))
  109. break;
  110. }
  111. STARPU_ASSERT(i < component->nchildren);
  112. struct _starpu_work_stealing_data * wsd = component->data;
  113. STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
  114. struct starpu_task * task = _starpu_prio_deque_pop_task(wsd->fifos[i]);
  115. if(task)
  116. {
  117. if(!isnan(task->predicted))
  118. {
  119. wsd->fifos[i]->exp_len -= task->predicted;
  120. wsd->fifos[i]->exp_start = starpu_timing_now() + task->predicted;
  121. }
  122. }
  123. else
  124. wsd->fifos[i]->exp_len = 0.0;
  125. STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
  126. if(task)
  127. {
  128. return task;
  129. }
  130. task = steal_task(component, workerid);
  131. if(task)
  132. {
  133. STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
  134. wsd->fifos[i]->nprocessed++;
  135. STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
  136. return task;
  137. }
  138. for(i=0; i < component->nparents; i++)
  139. {
  140. if(component->parents[i] == NULL)
  141. continue;
  142. else
  143. {
  144. task = starpu_sched_component_pull_task(component->parents[i],component);
  145. if(task)
  146. break;
  147. }
  148. }
  149. if(task)
  150. return task;
  151. else
  152. return NULL;
  153. }
  154. double _ws_estimated_end(struct starpu_sched_component * component)
  155. {
  156. STARPU_ASSERT(starpu_sched_component_is_work_stealing(component));
  157. struct _starpu_work_stealing_data * wsd = component->data;
  158. double sum_len = 0.0;
  159. double sum_start = 0.0;
  160. int i;
  161. for(i = 0; i < component->nchildren; i++)
  162. {
  163. STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
  164. sum_len += wsd->fifos[i]->exp_len;
  165. wsd->fifos[i]->exp_start = STARPU_MAX(starpu_timing_now(), wsd->fifos[i]->exp_start);
  166. sum_start += wsd->fifos[i]->exp_start;
  167. STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
  168. }
  169. int nb_workers = starpu_bitmap_cardinal(component->workers_in_ctx);
  170. return (sum_start + sum_len) / nb_workers;
  171. }
  172. double _ws_estimated_load(struct starpu_sched_component * component)
  173. {
  174. STARPU_ASSERT(starpu_sched_component_is_work_stealing(component));
  175. struct _starpu_work_stealing_data * wsd = component->data;
  176. int ntasks = 0;
  177. int i;
  178. for(i = 0; i < component->nchildren; i++)
  179. {
  180. STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
  181. ntasks += wsd->fifos[i]->ntasks;
  182. STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
  183. }
  184. double speedup = 0.0;
  185. int workerid;
  186. for(workerid = starpu_bitmap_first(component->workers_in_ctx);
  187. -1 != workerid;
  188. workerid = starpu_bitmap_next(component->workers_in_ctx, workerid))
  189. {
  190. speedup += starpu_worker_get_relative_speedup(starpu_worker_get_perf_archtype(workerid, component->tree->sched_ctx_id));
  191. }
  192. return ntasks / speedup;
  193. }
  194. static int push_task(struct starpu_sched_component * component, struct starpu_task * task)
  195. {
  196. struct _starpu_work_stealing_data * wsd = component->data;
  197. int ret = -1;
  198. int i = wsd->last_push_child;
  199. i = (i+1)%component->nchildren;
  200. STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
  201. ret = _starpu_prio_deque_push_task(wsd->fifos[i], task);
  202. STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
  203. wsd->last_push_child = i;
  204. component->can_pull(component);
  205. return ret;
  206. }
  207. //this function is special, when a worker call it, we want to push the task in his fifo
  208. int starpu_sched_tree_work_stealing_push_task(struct starpu_task *task)
  209. {
  210. int workerid = starpu_worker_get_id();
  211. if(workerid == -1)
  212. return starpu_sched_tree_push_task(task);
  213. unsigned sched_ctx_id = task->sched_ctx;
  214. struct starpu_sched_component * component =starpu_sched_component_worker_get(sched_ctx_id, workerid);
  215. while(component->parents[sched_ctx_id] != NULL)
  216. {
  217. component = component->parents[sched_ctx_id];
  218. if(starpu_sched_component_is_work_stealing(component))
  219. {
  220. if(!starpu_sched_component_can_execute_task(component, task))
  221. return starpu_sched_tree_push_task(task);
  222. int i;
  223. for(i = 0; i < component->nchildren; i++)
  224. if(is_worker_of_component(component->children[i], workerid))
  225. break;
  226. STARPU_ASSERT(i < component->nchildren);
  227. struct _starpu_work_stealing_data * wsd = component->data;
  228. STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
  229. int ret = _starpu_prio_deque_push_task(wsd->fifos[i] , task);
  230. if(ret == 0 && !isnan(task->predicted))
  231. wsd->fifos[i]->exp_len += task->predicted;
  232. STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
  233. //we need to wake all workers
  234. component->can_pull(component);
  235. return ret;
  236. }
  237. }
  238. /* this should not be reached */
  239. return starpu_sched_tree_push_task(task);
  240. }
  241. void _ws_add_child(struct starpu_sched_component * component, struct starpu_sched_component * child)
  242. {
  243. struct _starpu_work_stealing_data * wsd = component->data;
  244. starpu_sched_component_add_child(component, child);
  245. if(wsd->size < component->nchildren)
  246. {
  247. STARPU_ASSERT(wsd->size == component->nchildren - 1);
  248. wsd->fifos = realloc(wsd->fifos, component->nchildren * sizeof(*wsd->fifos));
  249. wsd->mutexes = realloc(wsd->mutexes, component->nchildren * sizeof(*wsd->mutexes));
  250. wsd->size = component->nchildren;
  251. }
  252. struct _starpu_prio_deque * fifo = malloc(sizeof(*fifo));
  253. _starpu_prio_deque_init(fifo);
  254. wsd->fifos[component->nchildren - 1] = fifo;
  255. starpu_pthread_mutex_t * mutex = malloc(sizeof(*mutex));
  256. STARPU_PTHREAD_MUTEX_INIT(mutex,NULL);
  257. wsd->mutexes[component->nchildren - 1] = mutex;
  258. }
  259. void _ws_remove_child(struct starpu_sched_component * component, struct starpu_sched_component * child)
  260. {
  261. struct _starpu_work_stealing_data * wsd = component->data;
  262. STARPU_PTHREAD_MUTEX_DESTROY(wsd->mutexes[component->nchildren - 1]);
  263. free(wsd->mutexes[component->nchildren - 1]);
  264. int i_component;
  265. for(i_component = 0; i_component < component->nchildren; i_component++)
  266. {
  267. if(component->children[i_component] == child)
  268. break;
  269. }
  270. STARPU_ASSERT(i_component != component->nchildren);
  271. struct _starpu_prio_deque * tmp_fifo = wsd->fifos[i_component];
  272. wsd->fifos[i_component] = wsd->fifos[component->nchildren - 1];
  273. component->children[i_component] = component->children[component->nchildren - 1];
  274. component->nchildren--;
  275. struct starpu_task * task;
  276. while((task = _starpu_prio_deque_pop_task(tmp_fifo)))
  277. {
  278. starpu_sched_component_push_task(NULL, component, task);
  279. }
  280. _starpu_prio_deque_destroy(tmp_fifo);
  281. free(tmp_fifo);
  282. }
  283. void _work_stealing_component_deinit_data(struct starpu_sched_component * component)
  284. {
  285. struct _starpu_work_stealing_data * wsd = component->data;
  286. free(wsd->fifos);
  287. free(wsd->mutexes);
  288. free(wsd);
  289. }
  290. int starpu_sched_component_is_work_stealing(struct starpu_sched_component * component)
  291. {
  292. return component->push_task == push_task;
  293. }
  294. struct starpu_sched_component * starpu_sched_component_work_stealing_create(struct starpu_sched_tree *tree, void * arg STARPU_ATTRIBUTE_UNUSED)
  295. {
  296. struct starpu_sched_component * component = starpu_sched_component_create(tree, "work_stealing");
  297. struct _starpu_work_stealing_data * wsd = malloc(sizeof(*wsd));
  298. memset(wsd, 0, sizeof(*wsd));
  299. component->pull_task = pull_task;
  300. component->push_task = push_task;
  301. component->add_child = _ws_add_child;
  302. component->remove_child = _ws_remove_child;
  303. component->estimated_end = _ws_estimated_end;
  304. component->estimated_load = _ws_estimated_load;
  305. component->deinit_data = _work_stealing_component_deinit_data;
  306. component->data = wsd;
  307. return component;
  308. }