node_sched.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. #include <core/jobs.h>
  2. #include <core/workers.h>
  3. #include "node_sched.h"
  4. double _starpu_compute_expected_time(double now, double predicted_end, double predicted_length, double predicted_transfer)
  5. {
  6. if (now + predicted_transfer < predicted_end)
  7. {
  8. /* We may hope that the transfer will be finished by
  9. * the start of the task. */
  10. predicted_transfer = 0;
  11. }
  12. else
  13. {
  14. /* The transfer will not be finished by then, take the
  15. * remainder into account */
  16. predicted_transfer += now;
  17. predicted_transfer -= predicted_end;
  18. }
  19. if(!isnan(predicted_transfer))
  20. {
  21. predicted_end += predicted_transfer;
  22. predicted_length += predicted_transfer;
  23. }
  24. if(!isnan(predicted_length))
  25. predicted_end += predicted_length;
  26. return predicted_end;
  27. }
  28. static void available(struct _starpu_sched_node * node)
  29. {
  30. (void)node;
  31. #ifndef STARPU_NON_BLOCKING_DRIVERS
  32. int i;
  33. for(i = 0; i < node->nchilds; i++)
  34. node->childs[i]->available(node->childs[i]);
  35. #endif
  36. }
  37. static struct starpu_task * pop_task_node(struct _starpu_sched_node * node, unsigned sched_ctx_id)
  38. {
  39. if(node->fathers[sched_ctx_id] == NULL)
  40. return NULL;
  41. else
  42. return node->fathers[sched_ctx_id]->pop_task(node->fathers[sched_ctx_id], sched_ctx_id);
  43. }
  44. void _starpu_sched_node_set_father(struct _starpu_sched_node *node,
  45. struct _starpu_sched_node *father_node,
  46. unsigned sched_ctx_id)
  47. {
  48. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  49. node->fathers[sched_ctx_id] = father_node;
  50. }
  51. struct starpu_task * pop_task(unsigned sched_ctx_id)
  52. {
  53. struct _starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  54. int workerid = starpu_worker_get_id();
  55. struct _starpu_sched_node * wn = _starpu_sched_node_worker_get(workerid);
  56. STARPU_PTHREAD_RWLOCK_RDLOCK(&t->lock);
  57. struct starpu_task * task = wn->pop_task(wn, sched_ctx_id);
  58. STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
  59. return task;
  60. }
  61. int push_task(struct starpu_task * task)
  62. {
  63. unsigned sched_ctx_id = task->sched_ctx;
  64. struct _starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  65. STARPU_PTHREAD_RWLOCK_RDLOCK(&t->lock);
  66. int ret = t->root->push_task(t->root, task);
  67. STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
  68. return ret;
  69. }
  70. void _starpu_tree_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  71. {
  72. struct _starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  73. STARPU_PTHREAD_RWLOCK_WRLOCK(&t->lock);
  74. unsigned i;
  75. for(i = 0; i < nworkers; i++)
  76. _starpu_bitmap_set(t->workers, workerids[i]);
  77. STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
  78. }
  79. void _starpu_tree_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  80. {
  81. struct _starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  82. STARPU_PTHREAD_RWLOCK_WRLOCK(&t->lock);
  83. unsigned i;
  84. for(i = 0; i < nworkers; i++)
  85. _starpu_bitmap_unset(t->workers, workerids[i]);
  86. STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
  87. }
  88. void _starpu_node_destroy_rec(struct _starpu_sched_node * node, unsigned sched_ctx_id)
  89. {
  90. if(node == NULL)
  91. return;
  92. struct _starpu_sched_node ** stack = NULL;
  93. int top = -1;
  94. #define PUSH(n) do{ \
  95. stack = realloc(stack, sizeof(*stack) * (top + 2)); \
  96. stack[++top] = n;}while(0)
  97. #define POP() stack[top--]
  98. #define EMPTY() (top == -1)
  99. //we want to delete all subtrees exept if a pointer in fathers point in an other tree
  100. //ie an other context
  101. node->fathers[sched_ctx_id] = NULL;
  102. int shared = 0;
  103. {
  104. int i;
  105. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  106. if(node->fathers[i] != NULL)
  107. shared = 1;
  108. }
  109. if(!shared)
  110. PUSH(node);
  111. while(!EMPTY())
  112. {
  113. struct _starpu_sched_node * n = POP();
  114. int i;
  115. for(i = 0; i < n->nchilds; i++)
  116. {
  117. struct _starpu_sched_node * child = n->childs[i];
  118. int j;
  119. shared = 0;
  120. STARPU_ASSERT(child->fathers[sched_ctx_id] == n);
  121. child->fathers[sched_ctx_id] = NULL;
  122. for(j = 0; j < STARPU_NMAX_SCHED_CTXS; j++)
  123. {
  124. if(child->fathers[j] != NULL)//child is shared
  125. shared = 1;
  126. }
  127. if(!shared)//if not shared we want to destroy it and his childs
  128. PUSH(child);
  129. }
  130. _starpu_sched_node_destroy(n);
  131. }
  132. free(stack);
  133. }
  134. void _starpu_tree_destroy(struct _starpu_sched_tree * tree, unsigned sched_ctx_id)
  135. {
  136. _starpu_node_destroy_rec(tree->root, sched_ctx_id);
  137. STARPU_PTHREAD_RWLOCK_DESTROY(&tree->lock);
  138. free(tree);
  139. }
  140. void _starpu_sched_node_add_child(struct _starpu_sched_node* node, struct _starpu_sched_node * child)
  141. {
  142. STARPU_ASSERT(!_starpu_sched_node_is_worker(node));
  143. int i;
  144. for(i = 0; i < node->nchilds; i++){
  145. STARPU_ASSERT(node->childs[i] != node);
  146. STARPU_ASSERT(node->childs[i] != NULL);
  147. }
  148. node->childs = realloc(node->childs, sizeof(struct _starpu_sched_node *) * (node->nchilds + 1));
  149. node->childs[node->nchilds] = child;
  150. node->nchilds++;
  151. }
  152. void _starpu_sched_node_remove_child(struct _starpu_sched_node * node, struct _starpu_sched_node * child)
  153. {
  154. int pos;
  155. for(pos = 0; pos < node->nchilds; pos++)
  156. if(node->childs[pos] == child)
  157. break;
  158. STARPU_ASSERT(pos != node->nchilds);
  159. node->childs[pos] = node->childs[--node->nchilds];
  160. }
  161. struct _starpu_bitmap * _starpu_get_worker_mask(struct starpu_task * task)
  162. {
  163. struct _starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(task->sched_ctx);
  164. return t->workers;
  165. }
  166. int _starpu_tree_push_task(struct starpu_task * task)
  167. {
  168. unsigned sched_ctx_id = task->sched_ctx;
  169. struct _starpu_sched_tree *tree = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  170. STARPU_PTHREAD_RWLOCK_RDLOCK(&tree->lock);
  171. int ret_val = tree->root->push_task(tree->root,task);
  172. STARPU_PTHREAD_RWLOCK_UNLOCK(&tree->lock);
  173. return ret_val;
  174. }
  175. struct starpu_task * _starpu_tree_pop_task(unsigned sched_ctx_id)
  176. {
  177. struct _starpu_sched_tree *tree = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  178. STARPU_PTHREAD_RWLOCK_RDLOCK(&tree->lock);
  179. int workerid = starpu_worker_get_id();
  180. struct _starpu_sched_node * node = _starpu_sched_node_worker_get(workerid);
  181. struct starpu_task * task = node->pop_task(node, sched_ctx_id);
  182. STARPU_PTHREAD_RWLOCK_UNLOCK(&tree->lock);
  183. return task;
  184. }
  185. /*
  186. static double estimated_finish_time(struct _starpu_sched_node * node)
  187. {
  188. double sum = 0.0;
  189. int i;
  190. for(i = 0; i < node->nchilds; i++)
  191. {
  192. struct _starpu_sched_node * c = node->childs[i];
  193. double tmp = c->estimated_finish_time(c);
  194. if( tmp > sum)
  195. sum = tmp;
  196. }
  197. return sum;
  198. }
  199. */
  200. static double estimated_load(struct _starpu_sched_node * node)
  201. {
  202. double sum = 0.0;
  203. int i;
  204. for( i = 0; i < node->nchilds; i++)
  205. {
  206. struct _starpu_sched_node * c = node->childs[i];
  207. sum += c->estimated_load(c);
  208. }
  209. return sum;
  210. }
  211. static struct _starpu_task_execute_preds estimated_execute_preds(struct _starpu_sched_node * node, struct starpu_task * task)
  212. {
  213. if(node->is_homogeneous)
  214. return node->childs[0]->estimated_execute_preds(node->childs[0], task);
  215. struct _starpu_task_execute_preds pred =
  216. {
  217. .state = CANNOT_EXECUTE,
  218. .expected_length = 0.0,
  219. .expected_finish_time = 0.0,
  220. .expected_transfer_length = 0.0,
  221. .expected_power = 0.0
  222. };
  223. int nb = 0;
  224. int i;
  225. for(i = 0; i < node->nchilds; i++)
  226. {
  227. struct _starpu_task_execute_preds tmp = node->childs[i]->estimated_execute_preds(node->childs[i], task);
  228. switch(tmp.state)
  229. {
  230. case CALIBRATING:
  231. return tmp;
  232. break;
  233. case NO_PERF_MODEL:
  234. if(pred.state == CANNOT_EXECUTE)
  235. pred = tmp;
  236. break;
  237. case PERF_MODEL:
  238. nb++;
  239. pred.expected_length += tmp.expected_length;
  240. pred.expected_finish_time += tmp.expected_finish_time;
  241. pred.expected_transfer_length += tmp.expected_transfer_length;
  242. pred.expected_power += tmp.expected_power;
  243. pred.state = PERF_MODEL;
  244. break;
  245. case CANNOT_EXECUTE:
  246. break;
  247. }
  248. }
  249. pred.expected_length /= nb;
  250. pred.expected_finish_time /= nb;
  251. pred.expected_transfer_length /= nb;
  252. pred.expected_power /= nb;
  253. return pred;
  254. }
  255. /*
  256. static double estimated_transfer_length(struct _starpu_sched_node * node, struct starpu_task * task)
  257. {
  258. double sum = 0.0;
  259. int nb = 0, i = 0;
  260. for(i = 0; i < node->nchilds; i++)
  261. {
  262. struct _starpu_sched_node * c = node->childs[i];
  263. if(_starpu_sched_node_can_execute_task(c, task))
  264. {
  265. sum += c->estimated_transfer_length(c, task);
  266. nb++;
  267. }
  268. }
  269. sum /= nb;
  270. return sum;
  271. }
  272. */
  273. int _starpu_sched_node_can_execute_task(struct _starpu_sched_node * node, struct starpu_task * task)
  274. {
  275. unsigned nimpl;
  276. int worker;
  277. struct _starpu_bitmap * worker_mask = _starpu_get_worker_mask(task);
  278. STARPU_ASSERT(task);
  279. STARPU_ASSERT(node);
  280. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  281. for(worker = _starpu_bitmap_first(node->workers);
  282. -1 != worker;
  283. worker = _starpu_bitmap_next(node->workers, worker))
  284. if (_starpu_bitmap_get(worker_mask, worker)
  285. && starpu_worker_can_execute_task(worker, task, nimpl))
  286. return 1;
  287. return 0;
  288. }
  289. int _starpu_sched_node_can_execute_task_with_impl(struct _starpu_sched_node * node, struct starpu_task * task, unsigned nimpl)
  290. {
  291. struct _starpu_bitmap * worker_mask = _starpu_get_worker_mask(task);
  292. int worker;
  293. STARPU_ASSERT(task);
  294. STARPU_ASSERT(nimpl < STARPU_MAXIMPLEMENTATIONS);
  295. for(worker = _starpu_bitmap_first(node->workers);
  296. worker != -1;
  297. worker = _starpu_bitmap_next(node->workers, worker))
  298. if (_starpu_bitmap_get(worker_mask, worker)
  299. && starpu_worker_can_execute_task(worker, task, nimpl))
  300. return 1;
  301. return 0;
  302. }
  303. void take_node_and_does_nothing(struct _starpu_sched_node * node STARPU_ATTRIBUTE_UNUSED)
  304. {
  305. }
  306. struct _starpu_sched_node * _starpu_sched_node_create(void)
  307. {
  308. struct _starpu_sched_node * node = malloc(sizeof(*node));
  309. memset(node,0,sizeof(*node));
  310. node->workers = _starpu_bitmap_create();
  311. node->available = available;
  312. node->init_data = take_node_and_does_nothing;
  313. node->deinit_data = take_node_and_does_nothing;
  314. node->pop_task = pop_task_node;
  315. node->estimated_load = estimated_load;
  316. node->estimated_execute_preds = estimated_execute_preds;
  317. return node;
  318. }
  319. void _starpu_sched_node_destroy(struct _starpu_sched_node *node)
  320. {
  321. node->deinit_data(node);
  322. int i,j;
  323. for(i = 0; i < node->nchilds; i++)
  324. {
  325. struct _starpu_sched_node * child = node->childs[i];
  326. for(j = 0; j < STARPU_NMAX_SCHED_CTXS; j++)
  327. if(child->fathers[i] == node)
  328. child->fathers[i] = NULL;
  329. }
  330. free(node->childs);
  331. _starpu_bitmap_destroy(node->workers);
  332. free(node);
  333. }
  334. static void set_is_homogeneous(struct _starpu_sched_node * node)
  335. {
  336. STARPU_ASSERT(_starpu_bitmap_cardinal(node->workers) > 0);
  337. if(_starpu_bitmap_cardinal(node->workers) == 1)
  338. node->is_homogeneous = 1;
  339. int worker = _starpu_bitmap_first(node->workers);
  340. uint32_t last_worker = _starpu_get_worker_struct(worker)->worker_mask;
  341. for(;
  342. worker != -1;
  343. worker = _starpu_bitmap_next(node->workers, worker))
  344. {
  345. if(last_worker != _starpu_get_worker_struct(worker)->worker_mask)
  346. {
  347. node->is_homogeneous = 0;
  348. return;
  349. }
  350. last_worker = _starpu_get_worker_struct(worker)->worker_mask;
  351. }
  352. node->is_homogeneous = 1;
  353. }
  354. static void add_worker_bit(struct _starpu_sched_node * node, int worker)
  355. {
  356. STARPU_ASSERT(node);
  357. _starpu_bitmap_set(node->workers, worker);
  358. int i;
  359. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  360. if(node->fathers[i])
  361. {
  362. add_worker_bit(node->fathers[i], worker);
  363. set_is_homogeneous(node->fathers[i]);
  364. }
  365. }
  366. void _starpu_set_workers_bitmaps(void)
  367. {
  368. unsigned worker;
  369. for(worker = 0; worker < starpu_worker_get_count(); worker++)
  370. {
  371. struct _starpu_sched_node * worker_node = _starpu_sched_node_worker_get(worker);
  372. add_worker_bit(worker_node, worker);
  373. }
  374. }
  375. static void helper_starpu_call_init_data(struct _starpu_sched_node *node)
  376. {
  377. int i;
  378. for(i = 0; i < node->nchilds; i++)
  379. helper_starpu_call_init_data(node->childs[i]);
  380. if(!node->data)
  381. node->init_data(node);
  382. }
  383. void _starpu_tree_call_init_data(struct _starpu_sched_tree * t)
  384. {
  385. helper_starpu_call_init_data(t->root);
  386. }
  387. static int push_task_to_first_suitable_parent(struct _starpu_sched_node * node, struct starpu_task * task, int sched_ctx_id)
  388. {
  389. if(node == NULL || node->fathers[sched_ctx_id] == NULL)
  390. return 1;
  391. // struct _starpu_sched_tree *t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  392. struct _starpu_sched_node * father = node->fathers[sched_ctx_id];
  393. if(_starpu_sched_node_can_execute_task(father,task))
  394. return father->push_task(father, task);
  395. else
  396. return push_task_to_first_suitable_parent(father, task, sched_ctx_id);
  397. }
  398. int _starpu_sched_node_push_tasks_to_firsts_suitable_parent(struct _starpu_sched_node * node, struct starpu_task_list *list, int sched_ctx_id)
  399. {
  400. while(!starpu_task_list_empty(list))
  401. {
  402. struct starpu_task * task = starpu_task_list_pop_front(list);
  403. int res = push_task_to_first_suitable_parent(node, task, sched_ctx_id);
  404. if(res)
  405. {
  406. starpu_task_list_push_front(list,task);
  407. return res;
  408. }
  409. }
  410. return 0;
  411. }