component_sched.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2013 INRIA
  4. * Copyright (C) 2013 Simon Archipoff
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <core/jobs.h>
  18. #include <core/workers.h>
  19. #include <starpu_sched_component.h>
  20. #include <starpu_thread_util.h>
  21. #include <float.h>
  22. #include "sched_component.h"
  23. /* default implementation for component->pop_task()
  24. * just perform a recursive call on father
  25. */
  26. static struct starpu_task * pop_task_component(struct starpu_sched_component * component)
  27. {
  28. STARPU_ASSERT(component);
  29. struct starpu_task * task = NULL;
  30. int i;
  31. for(i=0; i < component->nfathers; i++)
  32. {
  33. if(component->fathers[i] == NULL)
  34. continue;
  35. else
  36. {
  37. task = component->fathers[i]->pop_task(component->fathers[i]);
  38. if(task)
  39. break;
  40. }
  41. }
  42. return task;
  43. }
  44. /******************************************************************************
  45. * functions for struct starpu_sched_policy interface *
  46. ******************************************************************************/
  47. int starpu_sched_tree_push_task(struct starpu_task * task)
  48. {
  49. STARPU_ASSERT(task);
  50. unsigned sched_ctx_id = task->sched_ctx;
  51. struct starpu_sched_tree *tree = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  52. int workerid = starpu_worker_get_id();
  53. /* application should take tree->lock to prevent concurent acces from hypervisor
  54. * worker take they own mutexes
  55. */
  56. if(-1 == workerid)
  57. STARPU_PTHREAD_MUTEX_LOCK(&tree->lock);
  58. else
  59. _starpu_sched_component_lock_worker(workerid);
  60. int ret_val = tree->root->push_task(tree->root,task);
  61. if(-1 == workerid)
  62. STARPU_PTHREAD_MUTEX_UNLOCK(&tree->lock);
  63. else
  64. _starpu_sched_component_unlock_worker(workerid);
  65. return ret_val;
  66. }
  67. struct starpu_task * starpu_sched_tree_pop_task(unsigned sched_ctx STARPU_ATTRIBUTE_UNUSED)
  68. {
  69. int workerid = starpu_worker_get_id();
  70. struct starpu_sched_component * component = starpu_sched_component_worker_get(workerid);
  71. /* _starpu_sched_component_lock_worker(workerid) is called by component->pop_task()
  72. */
  73. struct starpu_task * task = component->pop_task(component);
  74. return task;
  75. }
  76. void starpu_sched_tree_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  77. {
  78. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  79. STARPU_ASSERT(workerids);
  80. struct starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  81. STARPU_PTHREAD_MUTEX_LOCK(&t->lock);
  82. _starpu_sched_component_lock_all_workers();
  83. unsigned i;
  84. for(i = 0; i < nworkers; i++)
  85. starpu_bitmap_set(t->workers, workerids[i]);
  86. starpu_sched_tree_update_workers_in_ctx(t);
  87. _starpu_sched_component_unlock_all_workers();
  88. STARPU_PTHREAD_MUTEX_UNLOCK(&t->lock);
  89. }
  90. void starpu_sched_tree_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  91. {
  92. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  93. STARPU_ASSERT(workerids);
  94. struct starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  95. STARPU_PTHREAD_MUTEX_LOCK(&t->lock);
  96. _starpu_sched_component_lock_all_workers();
  97. unsigned i;
  98. for(i = 0; i < nworkers; i++)
  99. starpu_bitmap_unset(t->workers, workerids[i]);
  100. starpu_sched_tree_update_workers_in_ctx(t);
  101. _starpu_sched_component_unlock_all_workers();
  102. STARPU_PTHREAD_MUTEX_UNLOCK(&t->lock);
  103. }
  104. void starpu_sched_component_destroy_rec(struct starpu_sched_component * component)
  105. {
  106. if(component == NULL)
  107. return;
  108. int i;
  109. if(component->nchildren > 0)
  110. {
  111. for(i=0; i < component->nchildren; i++)
  112. starpu_sched_component_destroy_rec(component->children[i]);
  113. }
  114. starpu_sched_component_destroy(component);
  115. }
  116. struct starpu_sched_tree * starpu_sched_tree_create(unsigned sched_ctx_id)
  117. {
  118. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  119. struct starpu_sched_tree * t = malloc(sizeof(*t));
  120. memset(t, 0, sizeof(*t));
  121. t->sched_ctx_id = sched_ctx_id;
  122. t->workers = starpu_bitmap_create();
  123. STARPU_PTHREAD_MUTEX_INIT(&t->lock,NULL);
  124. return t;
  125. }
  126. void starpu_sched_tree_destroy(struct starpu_sched_tree * tree)
  127. {
  128. STARPU_ASSERT(tree);
  129. if(tree->root)
  130. starpu_sched_component_destroy_rec(tree->root);
  131. starpu_bitmap_destroy(tree->workers);
  132. STARPU_PTHREAD_MUTEX_DESTROY(&tree->lock);
  133. free(tree);
  134. }
  135. static void starpu_sched_component_add_child(struct starpu_sched_component* component, struct starpu_sched_component * child)
  136. {
  137. STARPU_ASSERT(component && child);
  138. STARPU_ASSERT(!starpu_sched_component_is_worker(component));
  139. int i;
  140. for(i = 0; i < component->nchildren; i++){
  141. STARPU_ASSERT(component->children[i] != component);
  142. STARPU_ASSERT(component->children[i] != NULL);
  143. }
  144. component->children = realloc(component->children, sizeof(struct starpu_sched_component *) * (component->nchildren + 1));
  145. component->children[component->nchildren] = child;
  146. component->nchildren++;
  147. }
  148. static void starpu_sched_component_remove_child(struct starpu_sched_component * component, struct starpu_sched_component * child)
  149. {
  150. STARPU_ASSERT(component && child);
  151. STARPU_ASSERT(!starpu_sched_component_is_worker(component));
  152. int pos;
  153. for(pos = 0; pos < component->nchildren; pos++)
  154. if(component->children[pos] == child)
  155. break;
  156. STARPU_ASSERT(pos != component->nchildren);
  157. component->children[pos] = component->children[--component->nchildren];
  158. }
  159. static void starpu_sched_component_add_father(struct starpu_sched_component* component, struct starpu_sched_component * father)
  160. {
  161. STARPU_ASSERT(component && father);
  162. int i;
  163. for(i = 0; i < component->nfathers; i++){
  164. STARPU_ASSERT(component->fathers[i] != component);
  165. STARPU_ASSERT(component->fathers[i] != NULL);
  166. }
  167. component->fathers = realloc(component->fathers, sizeof(struct starpu_sched_component *) * (component->nfathers + 1));
  168. component->fathers[component->nfathers] = father;
  169. component->nfathers++;
  170. }
  171. static void starpu_sched_component_remove_father(struct starpu_sched_component * component, struct starpu_sched_component * father)
  172. {
  173. STARPU_ASSERT(component && father);
  174. int pos;
  175. for(pos = 0; pos < component->nfathers; pos++)
  176. if(component->fathers[pos] == father)
  177. break;
  178. STARPU_ASSERT(pos != component->nfathers);
  179. component->fathers[pos] = component->fathers[--component->nfathers];
  180. }
  181. struct starpu_bitmap * _starpu_get_worker_mask(unsigned sched_ctx_id)
  182. {
  183. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  184. struct starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
  185. STARPU_ASSERT(t);
  186. return t->workers;
  187. }
  188. static double estimated_load(struct starpu_sched_component * component)
  189. {
  190. double sum = 0.0;
  191. int i;
  192. for( i = 0; i < component->nchildren; i++)
  193. {
  194. struct starpu_sched_component * c = component->children[i];
  195. sum += c->estimated_load(c);
  196. }
  197. return sum;
  198. }
  199. static double _starpu_sched_component_estimated_end_min(struct starpu_sched_component * component)
  200. {
  201. STARPU_ASSERT(component);
  202. double min = DBL_MAX;
  203. int i;
  204. for(i = 0; i < component->nchildren; i++)
  205. {
  206. double tmp = component->children[i]->estimated_end(component->children[i]);
  207. if(tmp < min)
  208. min = tmp;
  209. }
  210. return min;
  211. }
  212. /* this function find the best implementation or an implementation that need to be calibrated for a worker available
  213. * and set prediction in *length. nan if a implementation need to be calibrated, 0.0 if no perf model are available
  214. * return false if no worker on the component can execute that task
  215. */
  216. int starpu_sched_component_execute_preds(struct starpu_sched_component * component, struct starpu_task * task, double * length)
  217. {
  218. STARPU_ASSERT(component && task);
  219. int can_execute = 0;
  220. starpu_task_bundle_t bundle = task->bundle;
  221. double len = DBL_MAX;
  222. int workerid;
  223. for(workerid = starpu_bitmap_first(component->workers_in_ctx);
  224. workerid != -1;
  225. workerid = starpu_bitmap_next(component->workers_in_ctx, workerid))
  226. {
  227. struct starpu_perfmodel_arch* archtype = starpu_worker_get_perf_archtype(workerid);
  228. int nimpl;
  229. for(nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  230. {
  231. if(starpu_worker_can_execute_task(workerid,task,nimpl)
  232. || starpu_combined_worker_can_execute_task(workerid, task, nimpl))
  233. {
  234. double d;
  235. can_execute = 1;
  236. if(bundle)
  237. d = starpu_task_bundle_expected_length(bundle, archtype, nimpl);
  238. else
  239. d = starpu_task_expected_length(task, archtype, nimpl);
  240. if(isnan(d))
  241. {
  242. *length = d;
  243. return can_execute;
  244. }
  245. if(_STARPU_IS_ZERO(d) && !can_execute)
  246. {
  247. can_execute = 1;
  248. continue;
  249. }
  250. if(d < len)
  251. {
  252. len = d;
  253. }
  254. }
  255. }
  256. if(STARPU_SCHED_COMPONENT_IS_HOMOGENEOUS(component))
  257. break;
  258. }
  259. if(len == DBL_MAX) /* we dont have perf model */
  260. len = 0.0;
  261. if(length)
  262. *length = len;
  263. return can_execute;
  264. }
  265. /* very similar function that dont compute prediction */
  266. int starpu_sched_component_can_execute_task(struct starpu_sched_component * component, struct starpu_task * task)
  267. {
  268. STARPU_ASSERT(task);
  269. STARPU_ASSERT(component);
  270. unsigned nimpl;
  271. int worker;
  272. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  273. for(worker = starpu_bitmap_first(component->workers_in_ctx);
  274. -1 != worker;
  275. worker = starpu_bitmap_next(component->workers_in_ctx, worker))
  276. if (starpu_worker_can_execute_task(worker, task, nimpl)
  277. || starpu_combined_worker_can_execute_task(worker, task, nimpl))
  278. return 1;
  279. return 0;
  280. }
  281. /* compute the average of transfer length for tasks on all workers
  282. * maybe this should be optimised if all workers are under the same numa component
  283. */
  284. double starpu_sched_component_transfer_length(struct starpu_sched_component * component, struct starpu_task * task)
  285. {
  286. STARPU_ASSERT(component && task);
  287. int nworkers = starpu_bitmap_cardinal(component->workers_in_ctx);
  288. double sum = 0.0;
  289. int worker;
  290. if(STARPU_SCHED_COMPONENT_IS_SINGLE_MEMORY_NODE(component))
  291. {
  292. unsigned memory_node = starpu_worker_get_memory_node(starpu_bitmap_first(component->workers_in_ctx));
  293. if(task->bundle)
  294. return starpu_task_bundle_expected_data_transfer_time(task->bundle,memory_node);
  295. else
  296. return starpu_task_expected_data_transfer_time(memory_node, task);
  297. }
  298. for(worker = starpu_bitmap_first(component->workers_in_ctx);
  299. worker != -1;
  300. worker = starpu_bitmap_next(component->workers_in_ctx, worker))
  301. {
  302. unsigned memory_node = starpu_worker_get_memory_node(worker);
  303. if(task->bundle)
  304. {
  305. sum += starpu_task_bundle_expected_data_transfer_time(task->bundle,memory_node);
  306. }
  307. else
  308. {
  309. sum += starpu_task_expected_data_transfer_time(memory_node, task);
  310. /* sum += starpu_task_expected_conversion_time(task, starpu_worker_get_perf_archtype(worker), impl ?)
  311. * I dont know what to do as we dont know what implementation would be used here...
  312. */
  313. }
  314. }
  315. return sum / nworkers;
  316. }
  317. /* This function can be called by components when they think that a prefetching request can be submitted.
  318. * For example, it is currently used by the MCT component to begin the prefetching on accelerators
  319. * on which it pushed tasks as soon as possible.
  320. */
  321. void starpu_sched_component_prefetch_on_node(struct starpu_sched_component * component, struct starpu_task * task)
  322. {
  323. if (starpu_get_prefetch_flag() && (!task->prefetched)
  324. && (component->properties >= STARPU_SCHED_COMPONENT_SINGLE_MEMORY_NODE))
  325. {
  326. int worker = starpu_bitmap_first(component->workers_in_ctx);
  327. unsigned memory_node = starpu_worker_get_memory_node(worker);
  328. starpu_prefetch_task_input_on_node(task, memory_node);
  329. task->prefetched = 1;
  330. }
  331. }
  332. /* The default implementation of the can_push function is a recursive call to its fathers.
  333. * A personally-made can_push in a component (like in prio components) is necessary to catch
  334. * this recursive call somewhere, if the user wants to exploit it.
  335. */
  336. static int starpu_sched_component_can_push(struct starpu_sched_component * component)
  337. {
  338. STARPU_ASSERT(component);
  339. int ret = 0;
  340. if(component->nfathers > 0)
  341. {
  342. int i;
  343. for(i=0; i < component->nfathers; i++)
  344. {
  345. struct starpu_sched_component * father = component->fathers[i];
  346. if(father != NULL)
  347. ret = father->can_push(father);
  348. if(ret)
  349. break;
  350. }
  351. }
  352. return ret;
  353. }
  354. /* A can_pull call will try to wake up one worker associated to the childs of the
  355. * component. It is currenly called by components which holds a queue (like fifo and prio
  356. * components) to signify its childs that a task has been pushed on its local queue.
  357. */
  358. static void starpu_sched_component_can_pull(struct starpu_sched_component * component)
  359. {
  360. STARPU_ASSERT(component);
  361. STARPU_ASSERT(!starpu_sched_component_is_worker(component));
  362. int i;
  363. for(i = 0; i < component->nchildren; i++)
  364. component->children[i]->can_pull(component->children[i]);
  365. }
  366. /* Allows a worker to lock/unlock scheduling mutexes. Currently used in
  367. * self-defined can_push calls to allow can_pull calls to take those mutexes while the
  368. * current worker is pushing tasks on other workers (or itself).
  369. */
  370. void _starpu_sched_component_lock_scheduling(void)
  371. {
  372. int workerid = starpu_worker_get_id();
  373. starpu_pthread_mutex_t *sched_mutex;
  374. starpu_pthread_cond_t *sched_cond;
  375. starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
  376. _starpu_sched_component_lock_worker(workerid);
  377. STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  378. }
  379. void _starpu_sched_component_unlock_scheduling(void)
  380. {
  381. int workerid = starpu_worker_get_id();
  382. starpu_pthread_mutex_t *sched_mutex;
  383. starpu_pthread_cond_t *sched_cond;
  384. starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
  385. STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  386. _starpu_sched_component_unlock_worker(workerid);
  387. }
  388. void take_component_and_does_nothing(struct starpu_sched_component * component STARPU_ATTRIBUTE_UNUSED)
  389. {
  390. }
  391. struct starpu_sched_component * starpu_sched_component_create(void)
  392. {
  393. struct starpu_sched_component * component = malloc(sizeof(*component));
  394. memset(component,0,sizeof(*component));
  395. component->workers = starpu_bitmap_create();
  396. component->workers_in_ctx = starpu_bitmap_create();
  397. component->add_child = starpu_sched_component_add_child;
  398. component->remove_child = starpu_sched_component_remove_child;
  399. component->add_father = starpu_sched_component_add_father;
  400. component->remove_father = starpu_sched_component_remove_father;
  401. component->pop_task = pop_task_component;
  402. component->can_push = starpu_sched_component_can_push;
  403. component->can_pull = starpu_sched_component_can_pull;
  404. component->estimated_load = estimated_load;
  405. component->estimated_end = _starpu_sched_component_estimated_end_min;
  406. component->deinit_data = take_component_and_does_nothing;
  407. component->notify_change_workers = take_component_and_does_nothing;
  408. return component;
  409. }
  410. /* remove all child
  411. * for all child of component, if child->fathers[x] == component, set child->fathers[x] to null
  412. * call component->deinit_data
  413. */
  414. void starpu_sched_component_destroy(struct starpu_sched_component *component)
  415. {
  416. STARPU_ASSERT(component);
  417. if(starpu_sched_component_is_worker(component))
  418. return;
  419. int i,j;
  420. for(i = 0; i < component->nchildren; i++)
  421. {
  422. struct starpu_sched_component * child = component->children[i];
  423. for(j = 0; j < child->nfathers; j++)
  424. if(child->fathers[j] == component)
  425. child->remove_father(child,component);
  426. }
  427. while(component->nchildren != 0)
  428. component->remove_child(component, component->children[0]);
  429. for(i = 0; i < component->nfathers; i++)
  430. {
  431. struct starpu_sched_component * father = component->fathers[i];
  432. for(j = 0; j < father->nchildren; j++)
  433. if(father->children[j] == component)
  434. father->remove_child(father,component);
  435. }
  436. while(component->nfathers != 0)
  437. component->remove_father(component, component->fathers[0]);
  438. component->deinit_data(component);
  439. free(component->children);
  440. free(component->fathers);
  441. starpu_bitmap_destroy(component->workers);
  442. starpu_bitmap_destroy(component->workers_in_ctx);
  443. free(component);
  444. }
  445. static void set_properties(struct starpu_sched_component * component)
  446. {
  447. STARPU_ASSERT(component);
  448. component->properties = 0;
  449. int worker = starpu_bitmap_first(component->workers_in_ctx);
  450. if (worker == -1)
  451. return;
  452. uint32_t first_worker = _starpu_get_worker_struct(worker)->worker_mask;
  453. unsigned first_memory_node = _starpu_get_worker_struct(worker)->memory_node;
  454. int is_homogeneous = 1;
  455. int is_all_same_component = 1;
  456. for(;
  457. worker != -1;
  458. worker = starpu_bitmap_next(component->workers_in_ctx, worker))
  459. {
  460. if(first_worker != _starpu_get_worker_struct(worker)->worker_mask)
  461. is_homogeneous = 0;
  462. if(first_memory_node != _starpu_get_worker_struct(worker)->memory_node)
  463. is_all_same_component = 0;
  464. }
  465. if(is_homogeneous)
  466. component->properties |= STARPU_SCHED_COMPONENT_HOMOGENEOUS;
  467. if(is_all_same_component)
  468. component->properties |= STARPU_SCHED_COMPONENT_SINGLE_MEMORY_NODE;
  469. }
  470. /* recursively set the component->workers member of component's subtree
  471. */
  472. void _starpu_sched_component_update_workers(struct starpu_sched_component * component)
  473. {
  474. STARPU_ASSERT(component);
  475. if(starpu_sched_component_is_worker(component))
  476. return;
  477. starpu_bitmap_unset_all(component->workers);
  478. int i;
  479. for(i = 0; i < component->nchildren; i++)
  480. {
  481. _starpu_sched_component_update_workers(component->children[i]);
  482. starpu_bitmap_or(component->workers, component->children[i]->workers);
  483. component->notify_change_workers(component);
  484. }
  485. }
  486. /* recursively set the component->workers_in_ctx in component's subtree
  487. */
  488. void _starpu_sched_component_update_workers_in_ctx(struct starpu_sched_component * component, unsigned sched_ctx_id)
  489. {
  490. STARPU_ASSERT(component);
  491. if(starpu_sched_component_is_worker(component))
  492. return;
  493. struct starpu_bitmap * workers_in_ctx = _starpu_get_worker_mask(sched_ctx_id);
  494. starpu_bitmap_unset_and(component->workers_in_ctx,component->workers, workers_in_ctx);
  495. int i,j;
  496. for(i = 0; i < component->nchildren; i++)
  497. {
  498. struct starpu_sched_component * child = component->children[i];
  499. _starpu_sched_component_update_workers_in_ctx(child, sched_ctx_id);
  500. for(j = 0; j < STARPU_NMAX_SCHED_CTXS; j++)
  501. if(child->fathers[j] == component)
  502. {
  503. starpu_bitmap_or(component->workers_in_ctx, child->workers_in_ctx);
  504. break;
  505. }
  506. }
  507. set_properties(component);
  508. component->notify_change_workers(component);
  509. }
  510. void starpu_sched_tree_update_workers_in_ctx(struct starpu_sched_tree * t)
  511. {
  512. STARPU_ASSERT(t);
  513. _starpu_sched_component_update_workers_in_ctx(t->root, t->sched_ctx_id);
  514. }
  515. void starpu_sched_tree_update_workers(struct starpu_sched_tree * t)
  516. {
  517. STARPU_ASSERT(t);
  518. _starpu_sched_component_update_workers(t->root);
  519. }