node_work_stealing.c 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. #include "node_sched.h"
  2. #include "fifo_queues.h"
  3. #include <starpu_scheduler.h>
  4. #include <starpu.h>
  5. struct _starpu_work_stealing_data
  6. {
  7. /* keep track of the work performed from the beginning of the algorithm to make
  8. * better decisions about which queue to child when stealing or deferring work
  9. */
  10. unsigned performed_total;
  11. unsigned last_pop_child;
  12. unsigned last_push_child;
  13. struct _starpu_fifo_taskq ** fifos;
  14. starpu_pthread_mutex_t * mutexes;
  15. };
  16. /**
  17. * steal a task in a round robin way
  18. * return NULL if none available
  19. */
  20. static struct starpu_task * steal_task_round_robin(struct _starpu_sched_node *node, int workerid)
  21. {
  22. struct _starpu_work_stealing_data *wsd = node->data;
  23. unsigned i = wsd->last_pop_child;
  24. wsd->last_pop_child = (wsd->last_pop_child + 1) % node->nchilds;
  25. /* If the worker's queue have no suitable tasks, let's try
  26. * the next ones */
  27. struct starpu_task * task = NULL;
  28. while (1)
  29. {
  30. struct _starpu_fifo_taskq * fifo = wsd->fifos[i];
  31. STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
  32. task = _starpu_fifo_pop_task(fifo, workerid);
  33. STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
  34. if(task)
  35. {
  36. fifo->nprocessed--;
  37. break;
  38. }
  39. if (i == wsd->last_pop_child)
  40. {
  41. /* We got back to the first worker,
  42. * don't go in infinite loop */
  43. return NULL;
  44. }
  45. i = (i + 1) % node->nchilds;
  46. }
  47. return task;
  48. }
  49. /**
  50. * Return a worker to whom add a task.
  51. * Selecting a worker is done in a round-robin fashion.
  52. */
  53. static unsigned select_worker_round_robin(struct _starpu_sched_node * node)
  54. {
  55. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)node->data;
  56. unsigned i = (ws->last_push_child + 1) % node->nchilds ;
  57. ws->last_push_child = i;
  58. return i;
  59. }
  60. /**
  61. * Return a worker from which a task can be stolen.
  62. * This is a phony function used to call the right
  63. * function depending on the value of USE_OVERLOAD.
  64. */
  65. static inline struct starpu_task * steal_task(struct _starpu_sched_node * node, int workerid)
  66. {
  67. return steal_task_round_robin(node, workerid);
  68. }
  69. /**
  70. * Return a worker from which a task can be stolen.
  71. * This is a phony function used to call the right
  72. * function depending on the value of USE_OVERLOAD.
  73. */
  74. static inline unsigned select_worker(struct _starpu_sched_node * node)
  75. {
  76. return select_worker_round_robin(node);
  77. }
  78. static int is_worker_of_node(struct _starpu_sched_node * node, int workerid)
  79. {
  80. return _starpu_bitmap_get(node->workers, workerid);
  81. }
  82. static struct starpu_task * pop_task(struct _starpu_sched_node * node, unsigned sched_ctx_id)
  83. {
  84. int workerid = starpu_worker_get_id();
  85. int i;
  86. for(i = 0; i < node->nchilds; i++)
  87. {
  88. if(is_worker_of_node(node->childs[i], workerid))
  89. break;
  90. }
  91. STARPU_ASSERT(i < node->nchilds);
  92. struct _starpu_work_stealing_data * wsd = node->data;
  93. STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
  94. struct starpu_task * task = _starpu_fifo_pop_local_task(wsd->fifos[i]);
  95. STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
  96. if(task)
  97. return task;
  98. task = steal_task(node, workerid);
  99. if(task)
  100. {
  101. STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
  102. wsd->fifos[i]->nprocessed++;
  103. STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
  104. return task;
  105. }
  106. if(node->fathers[sched_ctx_id])
  107. return node->fathers[sched_ctx_id]->pop_task(node->fathers[sched_ctx_id],sched_ctx_id);
  108. else
  109. return NULL;
  110. }
  111. static int push_task(struct _starpu_sched_node * node, struct starpu_task * task)
  112. {
  113. struct _starpu_work_stealing_data * wsd = node->data;
  114. int ret = -1;
  115. int start = wsd->last_push_child;
  116. int i = start;
  117. do
  118. {
  119. i = (i+1)%node->nchilds;
  120. struct _starpu_sched_node * child = node->childs[i];
  121. if(_starpu_sched_node_can_execute_task(child,task))
  122. {
  123. ret = _starpu_fifo_push_sorted_task(wsd->fifos[i], task);
  124. break;
  125. }
  126. }
  127. while(i != start);
  128. wsd->last_push_child = (start + 1) % node->nchilds;
  129. node->childs[i]->available(node->childs[i]);
  130. return ret;
  131. }
  132. //this function is special, when a worker call it, we want to push the task in his fifo
  133. int _starpu_ws_push_task(struct starpu_task *task)
  134. {
  135. int workerid = starpu_worker_get_id();
  136. if(workerid == -1)
  137. return _starpu_tree_push_task(task);
  138. unsigned sched_ctx_id = task->sched_ctx;
  139. struct _starpu_sched_node * node =_starpu_sched_node_worker_get(workerid);
  140. while(node->fathers[sched_ctx_id] != NULL)
  141. {
  142. node = node->fathers[sched_ctx_id];
  143. if(_starpu_sched_node_is_work_stealing(node))
  144. {
  145. int i;
  146. for(i = 0; i < node->nchilds; i++)
  147. if(is_worker_of_node(node->childs[i], workerid))
  148. break;
  149. STARPU_ASSERT(i < node->nchilds);
  150. struct _starpu_work_stealing_data * wsd = node->data;
  151. STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes + i);
  152. int ret = _starpu_fifo_push_sorted_task(wsd->fifos[i], task);
  153. STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes + i);
  154. //we need to wake all workers
  155. int j;
  156. for(j = 0; j < node->nchilds; j++)
  157. {
  158. if(j == i)
  159. continue;
  160. node->childs[j]->available(node->childs[j]);
  161. }
  162. return ret;
  163. }
  164. }
  165. STARPU_ASSERT_MSG(0, "there were a problem here, dont know what to do");
  166. return _starpu_tree_push_task(task);
  167. }
  168. static void init_ws_data(struct _starpu_sched_node *node)
  169. {
  170. struct _starpu_work_stealing_data * wsd = malloc(sizeof(*wsd));
  171. memset(wsd, 0, sizeof(*wsd));
  172. node->data = wsd;
  173. int i;
  174. for(i = 0; i < node->nchilds; i++){
  175. STARPU_ASSERT(node->childs[i] != node);
  176. STARPU_ASSERT(node->childs[i] != NULL);
  177. }
  178. int size = node->nchilds;
  179. wsd->fifos = malloc(sizeof(struct _starpu_fifo_taskq*) * size);
  180. wsd->mutexes = malloc(sizeof(starpu_pthread_rwlock_t) * size);
  181. for(i = 0; i < size; i++)
  182. {
  183. wsd->fifos[i] = _starpu_create_fifo();
  184. STARPU_PTHREAD_MUTEX_INIT(wsd->mutexes + i, NULL);
  185. }
  186. }
  187. static void deinit_ws_data(struct _starpu_sched_node *node)
  188. {
  189. struct _starpu_work_stealing_data * wsd = node->data;
  190. int i;
  191. for(i = 0; i < node->nchilds; i++)
  192. {
  193. STARPU_PTHREAD_MUTEX_DESTROY(wsd->mutexes + i);
  194. _starpu_destroy_fifo(wsd->fifos[i]);
  195. }
  196. free(wsd->mutexes);
  197. free(wsd->fifos);
  198. free(wsd);
  199. node->data = NULL;
  200. }
  201. struct _starpu_sched_node * _starpu_sched_node_work_stealing_create(void)
  202. {
  203. struct _starpu_sched_node * node = _starpu_sched_node_create();
  204. node->pop_task = pop_task;
  205. node->init_data = init_ws_data;
  206. node->deinit_data = deinit_ws_data;
  207. node->push_task = push_task;
  208. return node;
  209. }
  210. int _starpu_sched_node_is_work_stealing(struct _starpu_sched_node * node)
  211. {
  212. return node->init_data == init_ws_data;
  213. }
  214. static void initialize_ws_center_policy(unsigned sched_ctx_id)
  215. {
  216. starpu_sched_ctx_create_worker_collection(sched_ctx_id, STARPU_WORKER_LIST);
  217. struct _starpu_sched_tree *data = malloc(sizeof(struct _starpu_sched_tree));
  218. STARPU_PTHREAD_RWLOCK_INIT(&data->lock,NULL);
  219. struct _starpu_sched_node * ws;
  220. data->root = ws = _starpu_sched_node_work_stealing_create();
  221. data->workers = _starpu_bitmap_create();
  222. unsigned i;
  223. for(i = 0; i < starpu_worker_get_count(); i++)
  224. {
  225. struct _starpu_sched_node * node = _starpu_sched_node_worker_get(i);
  226. if(!node)
  227. continue;
  228. node->fathers[sched_ctx_id] = ws;
  229. _starpu_sched_node_add_child(ws, node);
  230. }
  231. _starpu_set_workers_bitmaps();
  232. _starpu_tree_call_init_data(data);
  233. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
  234. }
  235. static void deinitialize_ws_center_policy(unsigned sched_ctx_id)
  236. {
  237. struct _starpu_sched_tree *t = (struct _starpu_sched_tree*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  238. _starpu_bitmap_destroy(t->workers);
  239. _starpu_tree_destroy(t, sched_ctx_id);
  240. starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
  241. }
  242. struct starpu_sched_policy _starpu_sched_tree_ws_policy =
  243. {
  244. .init_sched = initialize_ws_center_policy,
  245. .deinit_sched = deinitialize_ws_center_policy,
  246. .add_workers = _starpu_tree_add_workers,
  247. .remove_workers = _starpu_tree_remove_workers,
  248. .push_task = _starpu_ws_push_task,
  249. .pop_task = _starpu_tree_pop_task,
  250. .pre_exec_hook = NULL,
  251. .post_exec_hook = NULL,
  252. .pop_every_task = NULL,
  253. .policy_name = "tree-ws",
  254. .policy_description = "work stealing tree policy"
  255. };