node_sched.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2013 Simon Archipoff
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <core/jobs.h>
  17. #include <core/workers.h>
  18. #include <starpu_sched_node.h>
  19. #include <starpu_thread_util.h>
  20. #include <float.h>
  21. #include "sched_node.h"
  22. /* wake up worker workerid
  23. * if called by a worker it dont try to wake up himself
  24. */
  25. static void wake_simple_worker(int workerid)
  26. {
  27. STARPU_ASSERT(0 <= workerid && (unsigned) workerid < starpu_worker_get_count());
  28. starpu_pthread_mutex_t * sched_mutex;
  29. starpu_pthread_cond_t * sched_cond;
  30. if(workerid == starpu_worker_get_id())
  31. return;
  32. starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
  33. STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  34. STARPU_PTHREAD_COND_SIGNAL(sched_cond);
  35. STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  36. }
  37. /* wake up all workers of a combined workers
  38. * this function must not be called during a pop (however this should not
  39. * even be possible) or you will have a dead lock
  40. */
  41. static void wake_combined_worker(int workerid)
  42. {
  43. STARPU_ASSERT( 0 <= workerid
  44. && starpu_worker_get_count() <= (unsigned) workerid
  45. && (unsigned) workerid < starpu_worker_get_count() + starpu_combined_worker_get_count());
  46. struct _starpu_combined_worker * combined_worker = _starpu_get_combined_worker_struct(workerid);
  47. int * list = combined_worker->combined_workerid;
  48. int size = combined_worker->worker_size;
  49. int i;
  50. for(i = 0; i < size; i++)
  51. wake_simple_worker(list[i]);
  52. }
  53. /* this function must not be called on worker nodes :
  54. * because this wouldn't have sense
  55. * and should dead lock
  56. */
  57. void starpu_sched_node_available(struct starpu_sched_node * node)
  58. {
  59. (void)node;
  60. STARPU_ASSERT(node);
  61. STARPU_ASSERT(!starpu_sched_node_is_worker(node));
  62. #ifndef STARPU_NON_BLOCKING_DRIVERS
  63. int i;
  64. for(i = starpu_bitmap_first(node->workers_in_ctx);
  65. i != -1;
  66. i = starpu_bitmap_next(node->workers_in_ctx, i))
  67. {
  68. if(i < (int) starpu_worker_get_count())
  69. wake_simple_worker(i);
  70. else
  71. wake_combined_worker(i);
  72. }
  73. #endif
  74. }
  75. /* default implementation for node->pop_task()
  76. * just perform a recursive call on father
  77. */
  78. static struct starpu_task * pop_task_node(struct starpu_sched_node * node, unsigned sched_ctx_id)
  79. {
  80. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  81. STARPU_ASSERT(node);
  82. if(node->fathers[sched_ctx_id] == NULL)
  83. return NULL;
  84. else
  85. return node->fathers[sched_ctx_id]->pop_task(node->fathers[sched_ctx_id], sched_ctx_id);
  86. }
  87. void starpu_sched_node_set_father(struct starpu_sched_node *node,
  88. struct starpu_sched_node *father_node,
  89. unsigned sched_ctx_id)
  90. {
  91. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  92. STARPU_ASSERT(node);
  93. node->fathers[sched_ctx_id] = father_node;
  94. }
  95. /******************************************************************************
  96. * functions for struct starpu_sched_policy interface *
  97. ******************************************************************************/
  98. int starpu_sched_tree_push_task(struct starpu_task * task)
  99. {
  100. STARPU_ASSERT(task);
  101. unsigned sched_ctx_id = task->sched_ctx;
  102. struct starpu_sched_tree *tree = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  103. int workerid = starpu_worker_get_id();
  104. /* application should take tree->lock to prevent concurent acces from hypervisor
  105. * worker take they own mutexes
  106. */
  107. if(-1 == workerid)
  108. STARPU_PTHREAD_MUTEX_LOCK(&tree->lock);
  109. else
  110. _starpu_sched_node_lock_worker(workerid);
  111. int ret_val = tree->root->push_task(tree->root,task);
  112. if(-1 == workerid)
  113. STARPU_PTHREAD_MUTEX_UNLOCK(&tree->lock);
  114. else
  115. _starpu_sched_node_unlock_worker(workerid);
  116. return ret_val;
  117. }
  118. struct starpu_task * starpu_sched_tree_pop_task(unsigned sched_ctx_id)
  119. {
  120. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  121. int workerid = starpu_worker_get_id();
  122. struct starpu_sched_node * node = starpu_sched_node_worker_get(workerid);
  123. /* _starpu_sched_node_lock_worker(workerid) is called by node->pop_task()
  124. */
  125. struct starpu_task * task = node->pop_task(node, sched_ctx_id);
  126. return task;
  127. }
  128. void starpu_sched_tree_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  129. {
  130. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  131. STARPU_ASSERT(workerids);
  132. struct starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  133. STARPU_PTHREAD_MUTEX_LOCK(&t->lock);
  134. _starpu_sched_node_lock_all_workers();
  135. unsigned i;
  136. for(i = 0; i < nworkers; i++)
  137. starpu_bitmap_set(t->workers, workerids[i]);
  138. starpu_sched_tree_update_workers_in_ctx(t);
  139. _starpu_sched_node_unlock_all_workers();
  140. STARPU_PTHREAD_MUTEX_UNLOCK(&t->lock);
  141. }
  142. void starpu_sched_tree_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  143. {
  144. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  145. STARPU_ASSERT(workerids);
  146. struct starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  147. STARPU_PTHREAD_MUTEX_LOCK(&t->lock);
  148. _starpu_sched_node_lock_all_workers();
  149. unsigned i;
  150. for(i = 0; i < nworkers; i++)
  151. starpu_bitmap_unset(t->workers, workerids[i]);
  152. starpu_sched_tree_update_workers_in_ctx(t);
  153. _starpu_sched_node_unlock_all_workers();
  154. STARPU_PTHREAD_MUTEX_UNLOCK(&t->lock);
  155. }
  156. void starpu_sched_node_destroy_rec(struct starpu_sched_node * node, unsigned sched_ctx_id)
  157. {
  158. if(node == NULL)
  159. return;
  160. struct starpu_sched_node ** stack = NULL;
  161. int top = -1;
  162. #define PUSH(n) \
  163. do{ \
  164. stack = realloc(stack, sizeof(*stack) * (top + 2)); \
  165. stack[++top] = n; \
  166. }while(0)
  167. #define POP() stack[top--]
  168. #define EMPTY() (top == -1)
  169. /* we want to delete all subtrees exept if a pointer in fathers point in an other tree
  170. * ie an other context
  171. */
  172. node->fathers[sched_ctx_id] = NULL;
  173. int shared = 0;
  174. {
  175. int i;
  176. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  177. if(node->fathers[i] != NULL)
  178. shared = 1;
  179. }
  180. if(!shared)
  181. PUSH(node);
  182. while(!EMPTY())
  183. {
  184. struct starpu_sched_node * n = POP();
  185. int i;
  186. for(i = 0; i < n->nchilds; i++)
  187. {
  188. struct starpu_sched_node * child = n->childs[i];
  189. int j;
  190. shared = 0;
  191. STARPU_ASSERT(child->fathers[sched_ctx_id] == n);
  192. child->fathers[sched_ctx_id] = NULL;
  193. for(j = 0; j < STARPU_NMAX_SCHED_CTXS; j++)
  194. {
  195. if(child->fathers[j] != NULL)/* child is shared */
  196. shared = 1;
  197. }
  198. if(!shared)/* if not shared we want to destroy it and his childs */
  199. PUSH(child);
  200. }
  201. starpu_sched_node_destroy(n);
  202. }
  203. free(stack);
  204. }
  205. struct starpu_sched_tree * starpu_sched_tree_create(unsigned sched_ctx_id)
  206. {
  207. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  208. struct starpu_sched_tree * t = malloc(sizeof(*t));
  209. memset(t, 0, sizeof(*t));
  210. t->sched_ctx_id = sched_ctx_id;
  211. t->workers = starpu_bitmap_create();
  212. STARPU_PTHREAD_MUTEX_INIT(&t->lock,NULL);
  213. return t;
  214. }
  215. void starpu_sched_tree_destroy(struct starpu_sched_tree * tree)
  216. {
  217. STARPU_ASSERT(tree);
  218. if(tree->root)
  219. starpu_sched_node_destroy_rec(tree->root, tree->sched_ctx_id);
  220. starpu_bitmap_destroy(tree->workers);
  221. STARPU_PTHREAD_MUTEX_DESTROY(&tree->lock);
  222. free(tree);
  223. }
  224. void starpu_sched_node_add_child(struct starpu_sched_node* node, struct starpu_sched_node * child)
  225. {
  226. STARPU_ASSERT(node && child);
  227. STARPU_ASSERT(!starpu_sched_node_is_worker(node));
  228. int i;
  229. for(i = 0; i < node->nchilds; i++){
  230. STARPU_ASSERT(node->childs[i] != node);
  231. STARPU_ASSERT(node->childs[i] != NULL);
  232. }
  233. node->childs = realloc(node->childs, sizeof(struct starpu_sched_node *) * (node->nchilds + 1));
  234. node->childs[node->nchilds] = child;
  235. node->nchilds++;
  236. }
  237. void starpu_sched_node_remove_child(struct starpu_sched_node * node, struct starpu_sched_node * child)
  238. {
  239. STARPU_ASSERT(node && child);
  240. STARPU_ASSERT(!starpu_sched_node_is_worker(node));
  241. int pos;
  242. for(pos = 0; pos < node->nchilds; pos++)
  243. if(node->childs[pos] == child)
  244. break;
  245. STARPU_ASSERT(pos != node->nchilds);
  246. node->childs[pos] = node->childs[--node->nchilds];
  247. }
  248. struct starpu_bitmap * _starpu_get_worker_mask(unsigned sched_ctx_id)
  249. {
  250. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  251. struct starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  252. STARPU_ASSERT(t);
  253. return t->workers;
  254. }
  255. static double estimated_load(struct starpu_sched_node * node)
  256. {
  257. double sum = 0.0;
  258. int i;
  259. for( i = 0; i < node->nchilds; i++)
  260. {
  261. struct starpu_sched_node * c = node->childs[i];
  262. sum += c->estimated_load(c);
  263. }
  264. return sum;
  265. }
  266. static double _starpu_sched_node_estimated_end_min(struct starpu_sched_node * node)
  267. {
  268. STARPU_ASSERT(node);
  269. double min = DBL_MAX;
  270. int i;
  271. for(i = 0; i < node->nchilds; i++)
  272. {
  273. double tmp = node->childs[i]->estimated_end(node->childs[i]);
  274. if(tmp < min)
  275. min = tmp;
  276. }
  277. return min;
  278. }
  279. /* this function find the best implementation or an implementation that need to be calibrated for a worker available
  280. * and set prediction in *length. nan if a implementation need to be calibrated, 0.0 if no perf model are available
  281. * return false if no worker on the node can execute that task
  282. */
  283. int STARPU_WARN_UNUSED_RESULT starpu_sched_node_execute_preds(struct starpu_sched_node * node, struct starpu_task * task, double * length)
  284. {
  285. STARPU_ASSERT(node && task);
  286. int can_execute = 0;
  287. starpu_task_bundle_t bundle = task->bundle;
  288. double len = DBL_MAX;
  289. int workerid;
  290. for(workerid = starpu_bitmap_first(node->workers_in_ctx);
  291. workerid != -1;
  292. workerid = starpu_bitmap_next(node->workers_in_ctx, workerid))
  293. {
  294. struct starpu_perfmodel_arch* archtype = starpu_worker_get_perf_archtype(workerid);
  295. int nimpl;
  296. for(nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  297. {
  298. if(starpu_worker_can_execute_task(workerid,task,nimpl)
  299. || starpu_combined_worker_can_execute_task(workerid, task, nimpl))
  300. {
  301. double d;
  302. can_execute = 1;
  303. if(bundle)
  304. d = starpu_task_bundle_expected_length(bundle, archtype, nimpl);
  305. else
  306. d = starpu_task_expected_length(task, archtype, nimpl);
  307. if(isnan(d))
  308. {
  309. *length = d;
  310. return can_execute;
  311. }
  312. if(_STARPU_IS_ZERO(d) && !can_execute)
  313. {
  314. can_execute = 1;
  315. continue;
  316. }
  317. if(d < len)
  318. {
  319. len = d;
  320. }
  321. }
  322. }
  323. if(STARPU_SCHED_NODE_IS_HOMOGENEOUS(node))
  324. break;
  325. }
  326. if(len == DBL_MAX) /* we dont have perf model */
  327. len = 0.0;
  328. if(length)
  329. *length = len;
  330. return can_execute;
  331. }
  332. /* very similar function that dont compute prediction */
  333. int starpu_sched_node_can_execute_task(struct starpu_sched_node * node, struct starpu_task * task)
  334. {
  335. STARPU_ASSERT(task);
  336. STARPU_ASSERT(node);
  337. unsigned nimpl;
  338. int worker;
  339. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  340. for(worker = starpu_bitmap_first(node->workers_in_ctx);
  341. -1 != worker;
  342. worker = starpu_bitmap_next(node->workers_in_ctx, worker))
  343. if (starpu_worker_can_execute_task(worker, task, nimpl)
  344. || starpu_combined_worker_can_execute_task(worker, task, nimpl))
  345. return 1;
  346. return 0;
  347. }
  348. /* compute the average of transfer length for tasks on all workers
  349. * maybe this should be optimised if all workers are under the same numa node
  350. */
  351. double starpu_sched_node_transfer_length(struct starpu_sched_node * node, struct starpu_task * task)
  352. {
  353. STARPU_ASSERT(node && task);
  354. int nworkers = starpu_bitmap_cardinal(node->workers_in_ctx);
  355. double sum = 0.0;
  356. int worker;
  357. if(STARPU_SCHED_NODE_IS_SINGLE_MEMORY_NODE(node))
  358. {
  359. unsigned memory_node = starpu_worker_get_memory_node(starpu_bitmap_first(node->workers_in_ctx));
  360. if(task->bundle)
  361. return starpu_task_bundle_expected_data_transfer_time(task->bundle,memory_node);
  362. else
  363. return starpu_task_expected_data_transfer_time(memory_node, task);
  364. }
  365. for(worker = starpu_bitmap_first(node->workers_in_ctx);
  366. worker != -1;
  367. worker = starpu_bitmap_next(node->workers_in_ctx, worker))
  368. {
  369. unsigned memory_node = starpu_worker_get_memory_node(worker);
  370. if(task->bundle)
  371. {
  372. sum += starpu_task_bundle_expected_data_transfer_time(task->bundle,memory_node);
  373. }
  374. else
  375. {
  376. sum += starpu_task_expected_data_transfer_time(memory_node, task);
  377. /* sum += starpu_task_expected_conversion_time(task, starpu_worker_get_perf_archtype(worker), impl ?)
  378. * I dont know what to do as we dont know what implementation would be used here...
  379. */
  380. }
  381. }
  382. return sum / nworkers;
  383. }
  384. void starpu_sched_node_prefetch_on_node(struct starpu_sched_node * node, struct starpu_task * task)
  385. {
  386. if (starpu_get_prefetch_flag() && (node->properties >= STARPU_SCHED_NODE_SINGLE_MEMORY_NODE))
  387. {
  388. int worker = starpu_bitmap_first(node->workers_in_ctx);
  389. unsigned memory_node = starpu_worker_get_memory_node(worker);
  390. starpu_prefetch_task_input_on_node(task, memory_node);
  391. }
  392. }
  393. void take_node_and_does_nothing(struct starpu_sched_node * node STARPU_ATTRIBUTE_UNUSED)
  394. {
  395. }
  396. struct starpu_sched_node * starpu_sched_node_create(void)
  397. {
  398. struct starpu_sched_node * node = malloc(sizeof(*node));
  399. memset(node,0,sizeof(*node));
  400. node->workers = starpu_bitmap_create();
  401. node->workers_in_ctx = starpu_bitmap_create();
  402. node->add_child = starpu_sched_node_add_child;
  403. node->remove_child = starpu_sched_node_remove_child;
  404. node->pop_task = pop_task_node;
  405. node->estimated_load = estimated_load;
  406. node->estimated_end = _starpu_sched_node_estimated_end_min;
  407. node->deinit_data = take_node_and_does_nothing;
  408. node->notify_change_workers = take_node_and_does_nothing;
  409. return node;
  410. }
  411. /* remove all child
  412. * for all child of node, if child->fathers[x] == node, set child->fathers[x] to null
  413. * call node->deinit_data
  414. */
  415. void starpu_sched_node_destroy(struct starpu_sched_node *node)
  416. {
  417. STARPU_ASSERT(node);
  418. if(starpu_sched_node_is_worker(node))
  419. return;
  420. int i,j;
  421. for(i = 0; i < node->nchilds; i++)
  422. {
  423. struct starpu_sched_node * child = node->childs[i];
  424. for(j = 0; j < STARPU_NMAX_SCHED_CTXS; j++)
  425. if(child->fathers[i] == node)
  426. child->fathers[i] = NULL;
  427. }
  428. while(node->nchilds != 0)
  429. node->remove_child(node, node->childs[0]);
  430. node->deinit_data(node);
  431. free(node->childs);
  432. starpu_bitmap_destroy(node->workers);
  433. starpu_bitmap_destroy(node->workers_in_ctx);
  434. free(node);
  435. }
  436. static void set_properties(struct starpu_sched_node * node)
  437. {
  438. STARPU_ASSERT(node);
  439. node->properties = 0;
  440. STARPU_ASSERT(starpu_bitmap_cardinal(node->workers_in_ctx) > 0);
  441. int worker = starpu_bitmap_first(node->workers_in_ctx);
  442. uint32_t first_worker = _starpu_get_worker_struct(worker)->worker_mask;
  443. unsigned first_memory_node = _starpu_get_worker_struct(worker)->memory_node;
  444. int is_homogeneous = 1;
  445. int is_all_same_node = 1;
  446. for(;
  447. worker != -1;
  448. worker = starpu_bitmap_next(node->workers_in_ctx, worker))
  449. {
  450. if(first_worker != _starpu_get_worker_struct(worker)->worker_mask)
  451. is_homogeneous = 0;
  452. if(first_memory_node != _starpu_get_worker_struct(worker)->memory_node)
  453. is_all_same_node = 0;
  454. }
  455. if(is_homogeneous)
  456. node->properties |= STARPU_SCHED_NODE_HOMOGENEOUS;
  457. if(is_all_same_node)
  458. node->properties |= STARPU_SCHED_NODE_SINGLE_MEMORY_NODE;
  459. }
  460. /* recursively set the node->workers member of node's subtree
  461. */
  462. void _starpu_sched_node_update_workers(struct starpu_sched_node * node)
  463. {
  464. STARPU_ASSERT(node);
  465. if(starpu_sched_node_is_worker(node))
  466. return;
  467. starpu_bitmap_unset_all(node->workers);
  468. int i;
  469. for(i = 0; i < node->nchilds; i++)
  470. {
  471. _starpu_sched_node_update_workers(node->childs[i]);
  472. starpu_bitmap_or(node->workers, node->childs[i]->workers);
  473. node->notify_change_workers(node);
  474. }
  475. }
  476. /* recursively set the node->workers_in_ctx in node's subtree
  477. */
  478. void _starpu_sched_node_update_workers_in_ctx(struct starpu_sched_node * node, unsigned sched_ctx_id)
  479. {
  480. STARPU_ASSERT(node);
  481. if(starpu_sched_node_is_worker(node))
  482. return;
  483. struct starpu_bitmap * workers_in_ctx = _starpu_get_worker_mask(sched_ctx_id);
  484. starpu_bitmap_unset_and(node->workers_in_ctx,node->workers, workers_in_ctx);
  485. int i,j;
  486. for(i = 0; i < node->nchilds; i++)
  487. {
  488. struct starpu_sched_node * child = node->childs[i];
  489. _starpu_sched_node_update_workers_in_ctx(child, sched_ctx_id);
  490. for(j = 0; j < STARPU_NMAX_SCHED_CTXS; j++)
  491. if(child->fathers[j] == node)
  492. {
  493. starpu_bitmap_or(node->workers_in_ctx, child->workers_in_ctx);
  494. break;
  495. }
  496. }
  497. set_properties(node);
  498. node->notify_change_workers(node);
  499. }
  500. void starpu_sched_tree_update_workers_in_ctx(struct starpu_sched_tree * t)
  501. {
  502. STARPU_ASSERT(t);
  503. _starpu_sched_node_update_workers_in_ctx(t->root, t->sched_ctx_id);
  504. }
  505. void starpu_sched_tree_update_workers(struct starpu_sched_tree * t)
  506. {
  507. STARPU_ASSERT(t);
  508. _starpu_sched_node_update_workers(t->root);
  509. }