node_sched.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565
  1. #include <core/jobs.h>
  2. #include <core/workers.h>
  3. #include <starpu_sched_node.h>
  4. #include <starpu_thread_util.h>
  5. #include "sched_node.h"
  6. #include <float.h>
  7. double starpu_sched_compute_expected_time(double now, double predicted_end, double predicted_length, double predicted_transfer)
  8. {
  9. if (now + predicted_transfer < predicted_end)
  10. {
  11. /* We may hope that the transfer will be finished by
  12. * the start of the task. */
  13. predicted_transfer = 0;
  14. }
  15. else
  16. {
  17. /* The transfer will not be finished by then, take the
  18. * remainder into account */
  19. predicted_transfer += now;
  20. predicted_transfer -= predicted_end;
  21. }
  22. if(!isnan(predicted_transfer))
  23. {
  24. predicted_end += predicted_transfer;
  25. predicted_length += predicted_transfer;
  26. }
  27. if(!isnan(predicted_length))
  28. predicted_end += predicted_length;
  29. return predicted_end;
  30. }
  31. static void available(struct starpu_sched_node * node)
  32. {
  33. (void)node;
  34. #ifndef STARPU_NON_BLOCKING_DRIVERS
  35. int i;
  36. for(i = 0; i < node->nchilds; i++)
  37. node->childs[i]->available(node->childs[i]);
  38. #endif
  39. }
  40. static struct starpu_task * pop_task_node(struct starpu_sched_node * node, unsigned sched_ctx_id)
  41. {
  42. if(node->fathers[sched_ctx_id] == NULL)
  43. return NULL;
  44. else
  45. return node->fathers[sched_ctx_id]->pop_task(node->fathers[sched_ctx_id], sched_ctx_id);
  46. }
  47. void starpu_sched_node_set_father(struct starpu_sched_node *node,
  48. struct starpu_sched_node *father_node,
  49. unsigned sched_ctx_id)
  50. {
  51. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  52. node->fathers[sched_ctx_id] = father_node;
  53. }
  54. struct starpu_task * pop_task(unsigned sched_ctx_id)
  55. {
  56. struct starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  57. int workerid = starpu_worker_get_id();
  58. struct starpu_sched_node * wn = starpu_sched_node_worker_get(workerid);
  59. STARPU_PTHREAD_RWLOCK_RDLOCK(&t->lock);
  60. struct starpu_task * task = wn->pop_task(wn, sched_ctx_id);
  61. STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
  62. return task;
  63. }
  64. int push_task(struct starpu_task * task)
  65. {
  66. unsigned sched_ctx_id = task->sched_ctx;
  67. struct starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  68. STARPU_PTHREAD_RWLOCK_RDLOCK(&t->lock);
  69. int ret = t->root->push_task(t->root, task);
  70. STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
  71. return ret;
  72. }
  73. void starpu_sched_tree_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  74. {
  75. struct starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  76. STARPU_PTHREAD_RWLOCK_WRLOCK(&t->lock);
  77. _starpu_sched_node_lock_all_workers();
  78. unsigned i;
  79. for(i = 0; i < nworkers; i++)
  80. starpu_bitmap_set(t->workers, workerids[i]);
  81. starpu_sched_tree_update_workers_in_ctx(t);
  82. _starpu_sched_node_unlock_all_workers();
  83. STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
  84. }
  85. void starpu_sched_tree_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  86. {
  87. struct starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  88. STARPU_PTHREAD_RWLOCK_WRLOCK(&t->lock);
  89. _starpu_sched_node_lock_all_workers();
  90. unsigned i;
  91. for(i = 0; i < nworkers; i++)
  92. starpu_bitmap_unset(t->workers, workerids[i]);
  93. starpu_sched_tree_update_workers_in_ctx(t);
  94. _starpu_sched_node_unlock_all_workers();
  95. STARPU_PTHREAD_RWLOCK_UNLOCK(&t->lock);
  96. }
  97. void starpu_sched_node_destroy_rec(struct starpu_sched_node * node, unsigned sched_ctx_id)
  98. {
  99. if(node == NULL)
  100. return;
  101. struct starpu_sched_node ** stack = NULL;
  102. int top = -1;
  103. #define PUSH(n) do{ \
  104. stack = realloc(stack, sizeof(*stack) * (top + 2)); \
  105. stack[++top] = n;}while(0)
  106. #define POP() stack[top--]
  107. #define EMPTY() (top == -1)
  108. //we want to delete all subtrees exept if a pointer in fathers point in an other tree
  109. //ie an other context
  110. node->fathers[sched_ctx_id] = NULL;
  111. int shared = 0;
  112. {
  113. int i;
  114. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  115. if(node->fathers[i] != NULL)
  116. shared = 1;
  117. }
  118. if(!shared)
  119. PUSH(node);
  120. while(!EMPTY())
  121. {
  122. struct starpu_sched_node * n = POP();
  123. int i;
  124. for(i = 0; i < n->nchilds; i++)
  125. {
  126. struct starpu_sched_node * child = n->childs[i];
  127. int j;
  128. shared = 0;
  129. STARPU_ASSERT(child->fathers[sched_ctx_id] == n);
  130. child->fathers[sched_ctx_id] = NULL;
  131. for(j = 0; j < STARPU_NMAX_SCHED_CTXS; j++)
  132. {
  133. if(child->fathers[j] != NULL)//child is shared
  134. shared = 1;
  135. }
  136. if(!shared)//if not shared we want to destroy it and his childs
  137. PUSH(child);
  138. }
  139. starpu_sched_node_destroy(n);
  140. }
  141. free(stack);
  142. }
  143. struct starpu_sched_tree * starpu_sched_tree_create(unsigned sched_ctx_id)
  144. {
  145. struct starpu_sched_tree * t = malloc(sizeof(*t));
  146. memset(t, 0, sizeof(*t));
  147. t->sched_ctx_id = sched_ctx_id;
  148. t->workers = starpu_bitmap_create();
  149. STARPU_PTHREAD_RWLOCK_INIT(&t->lock,NULL);
  150. return t;
  151. }
  152. void starpu_sched_tree_destroy(struct starpu_sched_tree * tree, unsigned sched_ctx_id)
  153. {
  154. if(tree->root)
  155. starpu_sched_node_destroy_rec(tree->root, sched_ctx_id);
  156. starpu_bitmap_destroy(tree->workers);
  157. STARPU_PTHREAD_RWLOCK_DESTROY(&tree->lock);
  158. free(tree);
  159. }
  160. void starpu_sched_node_add_child(struct starpu_sched_node* node, struct starpu_sched_node * child)
  161. {
  162. STARPU_ASSERT(!starpu_sched_node_is_worker(node));
  163. int i;
  164. for(i = 0; i < node->nchilds; i++){
  165. STARPU_ASSERT(node->childs[i] != node);
  166. STARPU_ASSERT(node->childs[i] != NULL);
  167. }
  168. node->childs = realloc(node->childs, sizeof(struct starpu_sched_node *) * (node->nchilds + 1));
  169. node->childs[node->nchilds] = child;
  170. node->nchilds++;
  171. }
  172. void starpu_sched_node_remove_child(struct starpu_sched_node * node, struct starpu_sched_node * child)
  173. {
  174. int pos;
  175. for(pos = 0; pos < node->nchilds; pos++)
  176. if(node->childs[pos] == child)
  177. break;
  178. STARPU_ASSERT(pos != node->nchilds);
  179. node->childs[pos] = node->childs[--node->nchilds];
  180. }
  181. struct starpu_bitmap * _starpu_get_worker_mask(unsigned sched_ctx_id)
  182. {
  183. struct starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  184. return t->workers;
  185. }
  186. void _starpu_sched_node_block_worker(int workerid);
  187. void _starpu_sched_node_unblock_worker(int workerid);
  188. int starpu_sched_tree_push_task(struct starpu_task * task)
  189. {
  190. unsigned sched_ctx_id = task->sched_ctx;
  191. struct starpu_sched_tree *tree = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  192. int workerid = starpu_worker_get_id();
  193. if(-1 == workerid)
  194. STARPU_PTHREAD_RWLOCK_RDLOCK(&tree->lock);
  195. else
  196. _starpu_sched_node_block_worker(workerid);
  197. int ret_val = tree->root->push_task(tree->root,task);
  198. if(-1 == workerid)
  199. STARPU_PTHREAD_RWLOCK_UNLOCK(&tree->lock);
  200. else
  201. _starpu_sched_node_unblock_worker(workerid);
  202. return ret_val;
  203. }
  204. struct starpu_task * starpu_sched_tree_pop_task(unsigned sched_ctx_id)
  205. {
  206. struct starpu_sched_tree *tree = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  207. int workerid = starpu_worker_get_id();
  208. struct starpu_sched_node * node = starpu_sched_node_worker_get(workerid);
  209. struct starpu_task * task = node->pop_task(node, sched_ctx_id);
  210. return task;
  211. }
  212. /*
  213. static double estimated_finish_time(struct starpu_sched_node * node)
  214. {
  215. double sum = 0.0;
  216. int i;
  217. for(i = 0; i < node->nchilds; i++)
  218. {
  219. struct starpu_sched_node * c = node->childs[i];
  220. double tmp = c->estimated_finish_time(c);
  221. if( tmp > sum)
  222. sum = tmp;
  223. }
  224. return sum;
  225. }
  226. */
  227. static double estimated_load(struct starpu_sched_node * node)
  228. {
  229. double sum = 0.0;
  230. int i;
  231. for( i = 0; i < node->nchilds; i++)
  232. {
  233. struct starpu_sched_node * c = node->childs[i];
  234. sum += c->estimated_load(c);
  235. }
  236. return sum;
  237. }
  238. static double _starpu_sched_node_estimated_end_min(struct starpu_sched_node * node)
  239. {
  240. double min = DBL_MAX;
  241. int i;
  242. for(i = 0; i < node->nchilds; i++)
  243. {
  244. double tmp = node->childs[i]->estimated_end(node->childs[i]);
  245. if(tmp < min)
  246. min = tmp;
  247. }
  248. return min;
  249. }
  250. int STARPU_WARN_UNUSED_RESULT starpu_sched_node_execute_preds(struct starpu_sched_node * node, struct starpu_task * task, double * length)
  251. {
  252. int can_execute = 0;
  253. starpu_task_bundle_t bundle = task->bundle;
  254. double len = DBL_MAX;
  255. int workerid;
  256. for(workerid = starpu_bitmap_first(node->workers_in_ctx);
  257. workerid != -1;
  258. workerid = starpu_bitmap_next(node->workers_in_ctx, workerid))
  259. {
  260. enum starpu_perfmodel_archtype archtype = starpu_worker_get_perf_archtype(workerid);
  261. int nimpl;
  262. for(nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  263. {
  264. if(starpu_worker_can_execute_task(workerid,task,nimpl)
  265. || starpu_combined_worker_can_execute_task(workerid, task, nimpl))
  266. {
  267. double d;
  268. can_execute = 1;
  269. if(bundle)
  270. d = starpu_task_bundle_expected_length(bundle, archtype, nimpl);
  271. else
  272. d = starpu_task_expected_length(task, archtype, nimpl);
  273. if(isnan(d))
  274. {
  275. *length = d;
  276. return can_execute;
  277. }
  278. if(_STARPU_IS_ZERO(d) && !can_execute)
  279. {
  280. can_execute = 1;
  281. continue;
  282. }
  283. if(d < len)
  284. {
  285. len = d;
  286. }
  287. }
  288. }
  289. if(node->is_homogeneous)
  290. break;
  291. }
  292. if(len == DBL_MAX) /* we dont have perf model */
  293. len = 0.0;
  294. if(length)
  295. *length = len;
  296. return can_execute;
  297. }
  298. double starpu_sched_node_transfer_length(struct starpu_sched_node * node, struct starpu_task * task)
  299. {
  300. int nworkers = starpu_bitmap_cardinal(node->workers_in_ctx);
  301. double sum = 0.0;
  302. int worker;
  303. for(worker = starpu_bitmap_first(node->workers_in_ctx);
  304. worker != -1;
  305. worker = starpu_bitmap_next(node->workers_in_ctx, worker))
  306. {
  307. unsigned memory_node = starpu_worker_get_memory_node(worker);
  308. if(task->bundle)
  309. {
  310. sum += starpu_task_bundle_expected_data_transfer_time(task->bundle,memory_node);
  311. }
  312. else
  313. {
  314. sum += starpu_task_expected_data_transfer_time(memory_node, task);
  315. //sum += starpu_task_expected_conversion_time(task, starpu_worker_get_perf_archtype(worker), impl ?)
  316. }
  317. }
  318. return sum / nworkers;
  319. }
  320. /*
  321. static double estimated_transfer_length(struct starpu_sched_node * node, struct starpu_task * task)
  322. {
  323. double sum = 0.0;
  324. int nb = 0, i = 0;
  325. for(i = 0; i < node->nchilds; i++)
  326. { struct starpu_sched_node * c = node->childs[i];
  327. if(starpu_sched_node_can_execute_task(c, task))
  328. {
  329. sum += c->estimated_transfer_length(c, task);
  330. nb++;
  331. }
  332. }
  333. sum /= nb;
  334. return sum;
  335. }
  336. */
  337. int starpu_sched_node_can_execute_task(struct starpu_sched_node * node, struct starpu_task * task)
  338. {
  339. unsigned nimpl;
  340. int worker;
  341. STARPU_ASSERT(task);
  342. STARPU_ASSERT(node);
  343. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  344. for(worker = starpu_bitmap_first(node->workers_in_ctx);
  345. -1 != worker;
  346. worker = starpu_bitmap_next(node->workers_in_ctx, worker))
  347. if (starpu_worker_can_execute_task(worker, task, nimpl)
  348. || starpu_combined_worker_can_execute_task(worker, task, nimpl))
  349. return 1;
  350. return 0;
  351. }
  352. void take_node_and_does_nothing(struct starpu_sched_node * node STARPU_ATTRIBUTE_UNUSED)
  353. {
  354. }
  355. struct starpu_sched_node * starpu_sched_node_create(void)
  356. {
  357. struct starpu_sched_node * node = malloc(sizeof(*node));
  358. memset(node,0,sizeof(*node));
  359. node->workers = starpu_bitmap_create();
  360. node->workers_in_ctx = starpu_bitmap_create();
  361. node->available = available;
  362. node->add_child = starpu_sched_node_add_child;
  363. node->remove_child = starpu_sched_node_remove_child;
  364. node->pop_task = pop_task_node;
  365. node->estimated_load = estimated_load;
  366. node->estimated_end = _starpu_sched_node_estimated_end_min;
  367. return node;
  368. }
  369. void starpu_sched_node_destroy(struct starpu_sched_node *node)
  370. {
  371. if(starpu_sched_node_is_worker(node))
  372. return;
  373. int i,j;
  374. for(i = 0; i < node->nchilds; i++)
  375. {
  376. struct starpu_sched_node * child = node->childs[i];
  377. for(j = 0; j < STARPU_NMAX_SCHED_CTXS; j++)
  378. if(child->fathers[i] == node)
  379. child->fathers[i] = NULL;
  380. }
  381. free(node->childs);
  382. starpu_bitmap_destroy(node->workers);
  383. starpu_bitmap_destroy(node->workers_in_ctx);
  384. free(node);
  385. }
  386. static void set_is_homogeneous(struct starpu_sched_node * node)
  387. {
  388. STARPU_ASSERT(starpu_bitmap_cardinal(node->workers_in_ctx) > 0);
  389. if(starpu_bitmap_cardinal(node->workers_in_ctx) == 1)
  390. node->is_homogeneous = 1;
  391. int worker = starpu_bitmap_first(node->workers_in_ctx);
  392. uint32_t last_worker = _starpu_get_worker_struct(worker)->worker_mask;
  393. do
  394. {
  395. if(last_worker != _starpu_get_worker_struct(worker)->worker_mask)
  396. {
  397. node->is_homogeneous = 0;
  398. return;
  399. }
  400. last_worker = _starpu_get_worker_struct(worker)->worker_mask;
  401. worker = starpu_bitmap_next(node->workers_in_ctx, worker);
  402. }while(worker != -1);
  403. node->is_homogeneous = 1;
  404. }
  405. void _starpu_sched_node_update_workers(struct starpu_sched_node * node)
  406. {
  407. if(starpu_sched_node_is_worker(node))
  408. return;
  409. starpu_bitmap_unset_all(node->workers);
  410. int i;
  411. for(i = 0; i < node->nchilds; i++)
  412. {
  413. _starpu_sched_node_update_workers(node->childs[i]);
  414. starpu_bitmap_or(node->workers, node->childs[i]->workers);
  415. }
  416. }
  417. void _starpu_sched_node_update_workers_in_ctx(struct starpu_sched_node * node, unsigned sched_ctx_id)
  418. {
  419. if(starpu_sched_node_is_worker(node))
  420. return;
  421. struct starpu_bitmap * workers_in_ctx = _starpu_get_worker_mask(sched_ctx_id);
  422. starpu_bitmap_unset_and(node->workers_in_ctx,node->workers, workers_in_ctx);
  423. int i,j;
  424. for(i = 0; i < node->nchilds; i++)
  425. {
  426. struct starpu_sched_node * child = node->childs[i];
  427. _starpu_sched_node_update_workers_in_ctx(child, sched_ctx_id);
  428. for(j = 0; j < STARPU_NMAX_SCHED_CTXS; j++)
  429. if(child->fathers[j] == node)
  430. {
  431. starpu_bitmap_or(node->workers_in_ctx, child->workers_in_ctx);
  432. break;
  433. }
  434. }
  435. set_is_homogeneous(node);
  436. }
  437. void starpu_sched_tree_update_workers_in_ctx(struct starpu_sched_tree * t)
  438. {
  439. _starpu_sched_node_update_workers_in_ctx(t->root, t->sched_ctx_id);
  440. }
  441. void starpu_sched_tree_update_workers(struct starpu_sched_tree * t)
  442. {
  443. _starpu_sched_node_update_workers(t->root);
  444. }
  445. void _update_worker_bits(struct starpu_sched_node * node, struct starpu_bitmap * workers_in_ctx)
  446. {
  447. if(starpu_sched_node_is_worker(node))
  448. return;
  449. starpu_bitmap_unset_and(node->workers_in_ctx, node->workers, workers_in_ctx);
  450. int i;
  451. for(i = 0; i < node->nchilds; i++)
  452. _update_worker_bits(node->childs[i], workers_in_ctx);
  453. }
  454. void starpu_sched_node_init_rec(struct starpu_sched_node * node)
  455. {
  456. if(starpu_sched_node_is_worker(node))
  457. return;
  458. int i;
  459. for(i = 0; i < node->nchilds; i++)
  460. starpu_sched_node_init_rec(node->childs[i]);
  461. for(i = 0; i < node->nchilds; i++)
  462. starpu_bitmap_or(node->workers, node->childs[i]->workers);
  463. set_is_homogeneous(node);
  464. }
  465. static void _init_add_worker_bit(struct starpu_sched_node * node, int worker)
  466. {
  467. STARPU_ASSERT(node);
  468. starpu_bitmap_set(node->workers, worker);
  469. int i;
  470. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  471. if(node->fathers[i])
  472. {
  473. _init_add_worker_bit(node->fathers[i], worker);
  474. set_is_homogeneous(node->fathers[i]);
  475. }
  476. }
  477. void _starpu_set_workers_bitmaps(void)
  478. {
  479. unsigned worker;
  480. for(worker = 0; worker < starpu_worker_get_count() + starpu_combined_worker_get_count(); worker++)
  481. {
  482. struct starpu_sched_node * worker_node = starpu_sched_node_worker_get(worker);
  483. _init_add_worker_bit(worker_node, worker);
  484. }
  485. }
  486. static int push_task_to_first_suitable_parent(struct starpu_sched_node * node, struct starpu_task * task, int sched_ctx_id)
  487. {
  488. if(node == NULL || node->fathers[sched_ctx_id] == NULL)
  489. return 1;
  490. struct starpu_sched_node * father = node->fathers[sched_ctx_id];
  491. if(starpu_sched_node_can_execute_task(father,task))
  492. return father->push_task(father, task);
  493. else
  494. return push_task_to_first_suitable_parent(father, task, sched_ctx_id);
  495. }
  496. int starpu_sched_node_push_tasks_to_firsts_suitable_parent(struct starpu_sched_node * node, struct starpu_task_list *list, int sched_ctx_id)
  497. {
  498. while(!starpu_task_list_empty(list))
  499. {
  500. struct starpu_task * task = starpu_task_list_pop_front(list);
  501. int res = push_task_to_first_suitable_parent(node, task, sched_ctx_id);
  502. if(res)
  503. {
  504. starpu_task_list_push_front(list,task);
  505. return res;
  506. }
  507. }
  508. return 0;
  509. }