node_work_stealing.c 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. #include "node_sched.h"
  2. #include "fifo_queues.h"
  3. #include <starpu_scheduler.h>
  4. #include <starpu.h>
  5. struct _starpu_work_stealing_data
  6. {
  7. /* keep track of the work performed from the beginning of the algorithm to make
  8. * better decisions about which queue to child when stealing or deferring work
  9. */
  10. unsigned performed_total;
  11. unsigned last_pop_child;
  12. unsigned last_push_child;
  13. struct _starpu_fifo_taskq ** fifos;
  14. starpu_pthread_mutex_t * mutexes;
  15. };
  16. /**
  17. * steal a task in a round robin way
  18. * return NULL if none available
  19. */
  20. static struct starpu_task * steal_task_round_robin(struct _starpu_sched_node *node, int workerid)
  21. {
  22. struct _starpu_work_stealing_data *wsd = node->data;
  23. unsigned i = wsd->last_pop_child;
  24. wsd->last_pop_child = (wsd->last_pop_child + 1) % node->nchilds;
  25. /* If the worker's queue have no suitable tasks, let's try
  26. * the next ones */
  27. struct starpu_task * task = NULL;
  28. while (1)
  29. {
  30. struct _starpu_fifo_taskq * fifo = wsd->fifos[i];
  31. STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
  32. task = _starpu_fifo_pop_task(fifo, workerid);
  33. STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
  34. if(task)
  35. {
  36. fifo->nprocessed--;
  37. break;
  38. }
  39. if (i == wsd->last_pop_child)
  40. {
  41. /* We got back to the first worker,
  42. * don't go in infinite loop */
  43. return NULL;
  44. }
  45. i = (i + 1) % node->nchilds;
  46. }
  47. return task;
  48. }
  49. /**
  50. * Return a worker to whom add a task.
  51. * Selecting a worker is done in a round-robin fashion.
  52. */
  53. static unsigned select_worker_round_robin(struct _starpu_sched_node * node)
  54. {
  55. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)node->data;
  56. unsigned i = (ws->last_push_child + 1) % node->nchilds ;
  57. ws->last_push_child = i;
  58. return i;
  59. }
  60. /**
  61. * Return a worker from which a task can be stolen.
  62. * This is a phony function used to call the right
  63. * function depending on the value of USE_OVERLOAD.
  64. */
  65. static inline struct starpu_task * steal_task(struct _starpu_sched_node * node, int workerid)
  66. {
  67. return steal_task_round_robin(node, workerid);
  68. }
  69. /**
  70. * Return a worker from which a task can be stolen.
  71. * This is a phony function used to call the right
  72. * function depending on the value of USE_OVERLOAD.
  73. */
  74. static inline unsigned select_worker(struct _starpu_sched_node * node)
  75. {
  76. return select_worker_round_robin(node);
  77. }
  78. static int is_worker_of_node(struct _starpu_sched_node * node, int workerid)
  79. {
  80. return _starpu_bitmap_get(node->workers, workerid);
  81. }
  82. static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned sched_ctx_id)
  83. {
  84. int workerid = starpu_worker_get_id();
  85. int i;
  86. for(i = 0; i < node->nchilds; i++)
  87. {
  88. if(is_worker_of_node(node->childs[i], workerid))
  89. break;
  90. }
  91. STARPU_ASSERT(i < node->nchilds);
  92. struct _starpu_work_stealing_data * wsd = node->data;
  93. STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
  94. struct starpu_task * task = _starpu_fifo_pop_local_task(wsd->fifos[i]);
  95. STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
  96. if(task)
  97. return task;
  98. task = steal_task(node, workerid);
  99. if(task)
  100. {
  101. STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
  102. wsd->fifos[i]->nprocessed++;
  103. STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
  104. return task;
  105. }
  106. if(node->fathers[sched_ctx_id])
  107. return node->fathers[sched_ctx_id]->pop_task(node->fathers[sched_ctx_id],sched_ctx_id);
  108. else
  109. return NULL;
  110. }
  111. static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
  112. {
  113. struct _starpu_work_stealing_data * wsd = node->data;
  114. int ret = -1;
  115. int start = wsd->last_push_child;
  116. int i;
  117. for(i = (start+1)%node->nchilds; i != start; i = (i+1)%node->nchilds)
  118. {
  119. struct _starpu_sched_node * child = node->childs[i];
  120. if(_starpu_sched_node_can_execute_task(child,task))
  121. {
  122. ret = _starpu_fifo_push_sorted_task(wsd->fifos[i], task);
  123. break;
  124. }
  125. }
  126. wsd->last_push_child = (wsd->last_push_child + 1) % node->nchilds;
  127. node->childs[i]->available(node->childs[i]);
  128. return ret;
  129. }
  130. //this function is special, when a worker call it, we want to push the task in his fifo
  131. int _starpu_ws_push_task(struct starpu_task *task)
  132. {
  133. int workerid = starpu_worker_get_id();
  134. if(workerid == -1)
  135. return _starpu_tree_push_task(task);
  136. unsigned sched_ctx_id = task->sched_ctx;
  137. struct _starpu_sched_node * node =_starpu_sched_node_worker_get(workerid);
  138. while(node->fathers[sched_ctx_id] != NULL)
  139. {
  140. node = node->fathers[sched_ctx_id];
  141. if(_starpu_sched_node_is_work_stealing(node))
  142. {
  143. int i;
  144. for(i = 0; i < node->nchilds; i++)
  145. if(is_worker_of_node(node->childs[i], workerid))
  146. break;
  147. STARPU_ASSERT(i < node->nchilds);
  148. struct _starpu_work_stealing_data * wsd = node->data;
  149. STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
  150. int ret = _starpu_fifo_push_sorted_task(wsd->fifos[i], task);
  151. STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
  152. //we need to wake all workers
  153. int j;
  154. for(j = 0; j < node->nchilds; j++)
  155. {
  156. if(j == i)
  157. continue;
  158. node->childs[j]->available(node->childs[j]);
  159. }
  160. return ret;
  161. }
  162. }
  163. STARPU_ASSERT_MSG(0, "there were a problem here, dont know what to do");
  164. return _starpu_tree_push_task(task);
  165. }
  166. static void init_ws_data(struct _starpu_sched_node *node)
  167. {
  168. struct _starpu_work_stealing_data * wsd = malloc(sizeof(*wsd));
  169. memset(wsd, 0, sizeof(*wsd));
  170. node->data = wsd;
  171. int i;
  172. for(i = 0; i < node->nchilds; i++){
  173. STARPU_ASSERT(node->childs[i] != node);
  174. STARPU_ASSERT(node->childs[i] != NULL);
  175. }
  176. int size = node->nchilds;
  177. wsd->fifos = malloc(sizeof(struct _starpu_fifo_taskq*) * size);
  178. wsd->mutexes = malloc(sizeof(starpu_pthread_rwlock_t) * size);
  179. for(i = 0; i < size; i++)
  180. {
  181. wsd->fifos[i] = _starpu_create_fifo();
  182. STARPU_PTHREAD_MUTEX_INIT(wsd->mutexes + i, NULL);
  183. }
  184. }
  185. static void deinit_ws_data(struct _starpu_sched_node *node)
  186. {
  187. struct _starpu_work_stealing_data * wsd = node->data;
  188. int i;
  189. for(i = 0; i < node->nchilds; i++)
  190. {
  191. STARPU_PTHREAD_MUTEX_DESTROY(wsd->mutexes + i);
  192. _starpu_destroy_fifo(wsd->fifos[i]);
  193. }
  194. free(wsd->mutexes);
  195. free(wsd->fifos);
  196. free(wsd);
  197. node->data = NULL;
  198. }
  199. struct _starpu_sched_node * _starpu_sched_node_work_stealing_create(void)
  200. {
  201. struct _starpu_sched_node * node = _starpu_sched_node_create();
  202. node->pop_task = pop_task;
  203. node->init_data = init_ws_data;
  204. node->deinit_data = deinit_ws_data;
  205. node->push_task = push_task;
  206. return node;
  207. }
  208. int _starpu_sched_node_is_work_stealing(struct _starpu_sched_node * node)
  209. {
  210. return node->init_data == init_ws_data;
  211. }
  212. static void initialize_ws_center_policy(unsigned sched_ctx_id)
  213. {
  214. starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
  215. struct _starpu_sched_tree *data = malloc(sizeof(struct _starpu_sched_tree));
  216. STARPU_PTHREAD_RWLOCK_INIT(&data->lock,NULL);
  217. struct _starpu_sched_node * ws;
  218. data->root = ws = _starpu_sched_node_work_stealing_create();
  219. data->workers = _starpu_bitmap_create();
  220. unsigned i;
  221. for(i = 0; i < starpu_worker_get_count(); i++)
  222. {
  223. struct _starpu_sched_node * node = _starpu_sched_node_worker_get(i);
  224. if(!node)
  225. continue;
  226. node->fathers[sched_ctx_id] = ws;
  227. _starpu_sched_node_add_child(ws, node);
  228. }
  229. _starpu_set_workers_bitmaps();
  230. _starpu_tree_call_init_data(data);
  231. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
  232. }
  233. static void deinitialize_ws_center_policy(unsigned sched_ctx_id)
  234. {
  235. struct _starpu_sched_tree *t = (struct _starpu_sched_tree*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  236. _starpu_bitmap_destroy(t->workers);
  237. _starpu_tree_destroy(t, sched_ctx_id);
  238. starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
  239. }
  240. struct starpu_sched_policy _starpu_sched_tree_ws_policy =
  241. {
  242. .init_sched = initialize_ws_center_policy,
  243. .deinit_sched = deinitialize_ws_center_policy,
  244. .add_workers = _starpu_tree_add_workers,
  245. .remove_workers = _starpu_tree_remove_workers,
  246. .push_task = _starpu_ws_push_task,
  247. .pop_task = _starpu_tree_pop_task,
  248. .pre_exec_hook = NULL,
  249. .post_exec_hook = NULL,
  250. .pop_every_task = NULL,
  251. .policy_name = "tree-ws",
  252. .policy_description = "work stealing tree policy"
  253. };