component_worker.c 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2017 Université de Bordeaux
  4. * Copyright (C) 2010, 2011, 2012, 2014, 2015, 2016, 2017 CNRS
  5. * Copyright (C) 2011 Télécom-SudParis
  6. * Copyright (C) 2011-2013 INRIA
  7. * Copyright (C) 2013 Simon Archipoff
  8. *
  9. * StarPU is free software; you can redistribute it and/or modify
  10. * it under the terms of the GNU Lesser General Public License as published by
  11. * the Free Software Foundation; either version 2.1 of the License, or (at
  12. * your option) any later version.
  13. *
  14. * StarPU is distributed in the hope that it will be useful, but
  15. * WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  17. *
  18. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  19. */
  20. #include <starpu_sched_component.h>
  21. #include <sched_policies/sched_component.h>
  22. #include <core/workers.h>
  23. #include <float.h>
  24. /* data structure for worker's queue look like this :
  25. * W = worker
  26. * T = simple task
  27. * P = parallel task
  28. *
  29. *
  30. * P--P T
  31. * | | \|
  32. * P--P T T P T
  33. * | | | | | |
  34. * T T P--P--P T
  35. * | | | | | |
  36. * W W W W W W
  37. *
  38. *
  39. *
  40. * its possible that a _starpu_task_grid wont have task, because it have been
  41. * poped by a worker.
  42. *
  43. * N = no task
  44. *
  45. * T T T
  46. * | | |
  47. * P--N--N
  48. * | | |
  49. * W W W
  50. *
  51. *
  52. * this API is a little asymmetric : struct _starpu_task_grid are allocated by the caller and freed by the data structure
  53. *
  54. */
  55. /******************************************************************************
  56. * Worker Components' Data Structures *
  57. *****************************************************************************/
  58. struct _starpu_task_grid
  59. {
  60. /* this member may be NULL if a worker have poped it but its a
  61. * parallel task and we dont want mad pointers
  62. */
  63. struct starpu_task * task;
  64. struct _starpu_task_grid *up, *down, *left, *right;
  65. /* this is used to count the number of task to be poped by a worker
  66. * the leftist _starpu_task_grid maintain the ntasks counter (ie .left == NULL),
  67. * all the others use the pntasks that point to it
  68. *
  69. * when the counter reach 0, all the left and right member are set to NULL,
  70. * that mean that we will free that components.
  71. */
  72. union
  73. {
  74. int ntasks;
  75. int * pntasks;
  76. };
  77. };
  78. /* list->exp_start, list->exp_len, list-exp_end and list->ntasks
  79. * are updated by starpu_sched_component_worker_push_task(component, task) and pre_exec_hook
  80. */
  81. struct _starpu_worker_task_list
  82. {
  83. double exp_start, exp_len, exp_end, pipeline_len;
  84. struct _starpu_task_grid *first, *last;
  85. unsigned ntasks;
  86. starpu_pthread_mutex_t mutex;
  87. };
  88. /* This is called when a transfer request is actually pushed to the worker */
  89. static void _starpu_worker_task_list_transfer_started(struct _starpu_worker_task_list *l, struct starpu_task *task)
  90. {
  91. double transfer_model = task->predicted_transfer;
  92. if (isnan(transfer_model))
  93. return;
  94. /* We now start the transfer, move it from predicted to pipelined */
  95. l->exp_len -= transfer_model;
  96. l->pipeline_len += transfer_model;
  97. l->exp_start = starpu_timing_now() + l->pipeline_len;
  98. l->exp_end = l->exp_start + l->exp_len;
  99. }
  100. #ifdef STARPU_DEVEL
  101. #warning FIXME: merge with deque_modeling_policy_data_aware
  102. #endif
  103. /* This is called when a task is actually pushed to the worker (i.e. the transfer finished */
  104. static void _starpu_worker_task_list_started(struct _starpu_worker_task_list *l, struct starpu_task *task)
  105. {
  106. double model = task->predicted;
  107. double transfer_model = task->predicted_transfer;
  108. if(!isnan(transfer_model))
  109. /* The transfer is over, remove it from pipelined */
  110. l->pipeline_len -= transfer_model;
  111. if(!isnan(model))
  112. {
  113. /* We now start the computation, move it from predicted to pipelined */
  114. l->exp_len -= model;
  115. l->pipeline_len += model;
  116. l->exp_start = starpu_timing_now() + l->pipeline_len;
  117. l->exp_end= l->exp_start + l->exp_len;
  118. }
  119. }
  120. /* This is called when a task is actually finished */
  121. static void _starpu_worker_task_list_finished(struct _starpu_worker_task_list *l, struct starpu_task *task)
  122. {
  123. if(!isnan(task->predicted))
  124. /* The execution is over, remove it from pipelined */
  125. l->pipeline_len -= task->predicted;
  126. l->exp_start = STARPU_MAX(starpu_timing_now() + l->pipeline_len, l->exp_start);
  127. l->exp_end = l->exp_start + l->exp_len;
  128. }
  129. struct _starpu_worker_component_data
  130. {
  131. union
  132. {
  133. struct _starpu_worker * worker;
  134. struct _starpu_combined_worker * combined_worker;
  135. };
  136. struct _starpu_worker_task_list * list;
  137. };
  138. /* this array store worker components */
  139. static struct starpu_sched_component * _worker_components[STARPU_NMAX_SCHED_CTXS][STARPU_NMAXWORKERS];
  140. /******************************************************************************
  141. * Worker Components' Task List and Grid Functions *
  142. *****************************************************************************/
  143. static struct _starpu_worker_task_list * _starpu_worker_task_list_create(void)
  144. {
  145. struct _starpu_worker_task_list *l;
  146. _STARPU_MALLOC(l, sizeof(*l));
  147. memset(l, 0, sizeof(*l));
  148. l->exp_len = l->pipeline_len = 0.0;
  149. l->exp_start = l->exp_end = starpu_timing_now();
  150. /* These are only for statistics */
  151. STARPU_HG_DISABLE_CHECKING(l->exp_end);
  152. STARPU_HG_DISABLE_CHECKING(l->exp_start);
  153. STARPU_PTHREAD_MUTEX_INIT(&l->mutex,NULL);
  154. return l;
  155. }
  156. static struct _starpu_task_grid * _starpu_task_grid_create(void)
  157. {
  158. struct _starpu_task_grid *t;
  159. _STARPU_MALLOC(t, sizeof(*t));
  160. memset(t, 0, sizeof(*t));
  161. return t;
  162. }
  163. static struct _starpu_worker_task_list * _worker_get_list(unsigned sched_ctx_id)
  164. {
  165. unsigned workerid = starpu_worker_get_id_check();
  166. STARPU_ASSERT(workerid < starpu_worker_get_count());
  167. struct _starpu_worker_component_data * d = starpu_sched_component_worker_get(sched_ctx_id, workerid)->data;
  168. return d->list;
  169. }
  170. static void _starpu_task_grid_destroy(struct _starpu_task_grid * t)
  171. {
  172. free(t);
  173. }
  174. static void _starpu_worker_task_list_destroy(struct _starpu_worker_task_list * l)
  175. {
  176. if(l)
  177. {
  178. /* There can be empty task grids, when we picked the last task after the front task grid */
  179. struct _starpu_task_grid *t = l->first, *nextt;
  180. while(t)
  181. {
  182. STARPU_ASSERT(!t->task);
  183. nextt = t->up;
  184. _starpu_task_grid_destroy(t);
  185. t = nextt;
  186. }
  187. STARPU_PTHREAD_MUTEX_DESTROY(&l->mutex);
  188. free(l);
  189. }
  190. }
  191. static inline void _starpu_worker_task_list_add(struct _starpu_worker_task_list * l, struct starpu_task *task)
  192. {
  193. double predicted = task->predicted;
  194. double predicted_transfer = task->predicted_transfer;
  195. double end = l->exp_end;
  196. /* Sometimes workers didn't take the tasks as early as we expected */
  197. l->exp_start = STARPU_MAX(l->exp_start, starpu_timing_now());
  198. if (starpu_timing_now() + predicted_transfer < end)
  199. {
  200. /* We may hope that the transfer will be finished by
  201. * the start of the task. */
  202. predicted_transfer = 0.0;
  203. }
  204. else
  205. {
  206. /* The transfer will not be finished by then, take the
  207. * remainder into account */
  208. predicted_transfer = (starpu_timing_now() + predicted_transfer) - end;
  209. }
  210. if(!isnan(predicted_transfer))
  211. l->exp_len += predicted_transfer;
  212. if(!isnan(predicted))
  213. l->exp_len += predicted;
  214. l->exp_end = l->exp_start + l->exp_len;
  215. task->predicted = predicted;
  216. task->predicted_transfer = predicted_transfer;
  217. }
  218. static inline void _starpu_worker_task_list_push(struct _starpu_worker_task_list * l, struct _starpu_task_grid * t)
  219. {
  220. /* the task, ntasks, pntasks, left and right members of t are set by the caller */
  221. STARPU_ASSERT(t->task);
  222. if(l->first == NULL)
  223. l->first = l->last = t;
  224. t->down = l->last;
  225. l->last->up = t;
  226. t->up = NULL;
  227. l->last = t;
  228. l->ntasks++;
  229. _starpu_worker_task_list_add(l, t->task);
  230. }
  231. /* recursively set left and right pointers to NULL */
  232. static inline void _starpu_task_grid_unset_left_right_member(struct _starpu_task_grid * t)
  233. {
  234. STARPU_ASSERT(t->task == NULL);
  235. struct _starpu_task_grid * t_left = t->left;
  236. struct _starpu_task_grid * t_right = t->right;
  237. t->left = t->right = NULL;
  238. while(t_left)
  239. {
  240. STARPU_ASSERT(t_left->task == NULL);
  241. t = t_left;
  242. t_left = t_left->left;
  243. t->left = NULL;
  244. t->right = NULL;
  245. }
  246. while(t_right)
  247. {
  248. STARPU_ASSERT(t_right->task == NULL);
  249. t = t_right;
  250. t_right = t_right->right;
  251. t->left = NULL;
  252. t->right = NULL;
  253. }
  254. }
  255. static inline struct starpu_task * _starpu_worker_task_list_pop(struct _starpu_worker_task_list * l)
  256. {
  257. if(!l->first)
  258. {
  259. l->exp_len = l->pipeline_len = 0.0;
  260. l->exp_start = l->exp_end = starpu_timing_now();
  261. return NULL;
  262. }
  263. struct _starpu_task_grid * t = l->first;
  264. /* if there is no task there is no tasks linked to this, then we can free it */
  265. if(t->task == NULL && t->right == NULL && t->left == NULL)
  266. {
  267. l->first = t->up;
  268. if(l->first)
  269. l->first->down = NULL;
  270. if(l->last == t)
  271. l->last = NULL;
  272. _starpu_task_grid_destroy(t);
  273. return _starpu_worker_task_list_pop(l);
  274. }
  275. while(t)
  276. {
  277. if(t->task)
  278. {
  279. struct starpu_task * task = t->task;
  280. t->task = NULL;
  281. /* the leftist thing hold the number of tasks, other have a pointer to it */
  282. int * p = t->left ? t->pntasks : &t->ntasks;
  283. /* the worker who pop the last task allow the rope to be freed */
  284. if(STARPU_ATOMIC_ADD(p, -1) == 0)
  285. _starpu_task_grid_unset_left_right_member(t);
  286. l->ntasks--;
  287. return task;
  288. }
  289. t = t->up;
  290. }
  291. return NULL;
  292. }
  293. /******************************************************************************
  294. * Worker Components' Public Helper Functions (Part 1) *
  295. *****************************************************************************/
  296. struct _starpu_worker * _starpu_sched_component_worker_get_worker(struct starpu_sched_component * worker_component)
  297. {
  298. STARPU_ASSERT(starpu_sched_component_is_simple_worker(worker_component));
  299. struct _starpu_worker_component_data * data = worker_component->data;
  300. return data->worker;
  301. }
  302. struct _starpu_combined_worker * _starpu_sched_component_combined_worker_get_combined_worker(struct starpu_sched_component * worker_component)
  303. {
  304. STARPU_ASSERT(starpu_sched_component_is_combined_worker(worker_component));
  305. struct _starpu_worker_component_data * data = worker_component->data;
  306. return data->combined_worker;
  307. }
  308. /******************************************************************************
  309. * Worker Components' Private Helper Functions *
  310. *****************************************************************************/
  311. #ifndef STARPU_NO_ASSERT
  312. static int _worker_consistant(struct starpu_sched_component * component)
  313. {
  314. int is_a_worker = 0;
  315. int i;
  316. for(i = 0; i<STARPU_NMAXWORKERS; i++)
  317. if(_worker_components[component->tree->sched_ctx_id][i] == component)
  318. is_a_worker = 1;
  319. if(!is_a_worker)
  320. return 0;
  321. struct _starpu_worker_component_data * data = component->data;
  322. if(data->worker)
  323. {
  324. int id = data->worker->workerid;
  325. return (_worker_components[component->tree->sched_ctx_id][id] == component)
  326. && component->nchildren == 0;
  327. }
  328. return 1;
  329. }
  330. #endif
  331. /******************************************************************************
  332. * Simple Worker Components' Interface Functions *
  333. *****************************************************************************/
  334. static void simple_worker_can_pull(struct starpu_sched_component * worker_component)
  335. {
  336. struct _starpu_worker * worker = _starpu_sched_component_worker_get_worker(worker_component);
  337. int workerid = worker->workerid;
  338. _starpu_wake_worker_relax(workerid);
  339. }
  340. static int simple_worker_push_task(struct starpu_sched_component * component, struct starpu_task *task)
  341. {
  342. STARPU_ASSERT(starpu_sched_component_is_worker(component));
  343. /*this function take the worker's mutex */
  344. struct _starpu_worker_component_data * data = component->data;
  345. struct _starpu_task_grid * t = _starpu_task_grid_create();
  346. t->task = task;
  347. t->ntasks = 1;
  348. task->workerid = starpu_bitmap_first(component->workers);
  349. #if 1 /* dead lock problem? */
  350. if (starpu_get_prefetch_flag() && !task->prefetched)
  351. {
  352. unsigned memory_node = starpu_worker_get_memory_node(task->workerid);
  353. starpu_prefetch_task_input_on_node(task, memory_node);
  354. }
  355. #endif
  356. struct _starpu_worker_task_list * list = data->list;
  357. STARPU_COMPONENT_MUTEX_LOCK(&list->mutex);
  358. _starpu_worker_task_list_push(list, t);
  359. STARPU_COMPONENT_MUTEX_UNLOCK(&list->mutex);
  360. simple_worker_can_pull(component);
  361. return 0;
  362. }
  363. static struct starpu_task * simple_worker_pull_task(struct starpu_sched_component *component)
  364. {
  365. unsigned workerid = starpu_worker_get_id_check();
  366. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  367. struct _starpu_worker_component_data * data = component->data;
  368. struct _starpu_worker_task_list * list = data->list;
  369. struct starpu_task * task;
  370. int i;
  371. int n_tries = 0;
  372. do
  373. {
  374. /* do not reset state_keep_awake here has it may hide tasks in worker->local_tasks */
  375. n_tries++;
  376. STARPU_COMPONENT_MUTEX_LOCK(&list->mutex);
  377. /* Take the opportunity to update start time */
  378. data->list->exp_start = STARPU_MAX(starpu_timing_now(), data->list->exp_start);
  379. data->list->exp_end = data->list->exp_start + data->list->exp_len;
  380. task = _starpu_worker_task_list_pop(list);
  381. if(task)
  382. {
  383. _starpu_worker_task_list_transfer_started(list, task);
  384. STARPU_COMPONENT_MUTEX_UNLOCK(&list->mutex);
  385. starpu_push_task_end(task);
  386. goto ret;
  387. }
  388. STARPU_COMPONENT_MUTEX_UNLOCK(&list->mutex);
  389. for(i=0; i < component->nparents; i++)
  390. {
  391. if(component->parents[i] == NULL)
  392. continue;
  393. else
  394. {
  395. task = starpu_sched_component_pull_task(component->parents[i],component);
  396. if(task)
  397. break;
  398. }
  399. }
  400. }
  401. while((!task) && worker->state_keep_awake && n_tries < 2);
  402. if(!task)
  403. goto ret;
  404. if(task->cl->type == STARPU_SPMD)
  405. {
  406. if(!starpu_worker_is_combined_worker(workerid))
  407. {
  408. STARPU_COMPONENT_MUTEX_LOCK(&list->mutex);
  409. _starpu_worker_task_list_add(list, task);
  410. _starpu_worker_task_list_transfer_started(list, task);
  411. STARPU_COMPONENT_MUTEX_UNLOCK(&list->mutex);
  412. starpu_push_task_end(task);
  413. goto ret;
  414. }
  415. struct starpu_sched_component * combined_worker_component = starpu_sched_component_worker_get(component->tree->sched_ctx_id, workerid);
  416. starpu_sched_component_push_task(component, combined_worker_component, task);
  417. /* we have pushed a task in queue, so can make a recursive call */
  418. task = simple_worker_pull_task(component);
  419. goto ret;
  420. }
  421. if(task)
  422. {
  423. STARPU_COMPONENT_MUTEX_LOCK(&list->mutex);
  424. _starpu_worker_task_list_add(list, task);
  425. _starpu_worker_task_list_transfer_started(list, task);
  426. STARPU_COMPONENT_MUTEX_UNLOCK(&list->mutex);
  427. starpu_push_task_end(task);
  428. }
  429. ret:
  430. return task;
  431. }
  432. static double simple_worker_estimated_end(struct starpu_sched_component * component)
  433. {
  434. struct _starpu_worker_component_data * data = component->data;
  435. double now = starpu_timing_now();
  436. if (now + data->list->pipeline_len > data->list->exp_start )
  437. {
  438. data->list->exp_start = now + data->list->pipeline_len;
  439. data->list->exp_end = data->list->exp_start + data->list->exp_len;
  440. }
  441. return data->list->exp_end;
  442. }
  443. static double simple_worker_estimated_load(struct starpu_sched_component * component)
  444. {
  445. struct _starpu_worker * worker = _starpu_sched_component_worker_get_worker(component);
  446. int nb_task = 0;
  447. STARPU_COMPONENT_MUTEX_LOCK(&worker->mutex);
  448. struct starpu_task_list list = worker->local_tasks;
  449. struct starpu_task * task;
  450. for(task = starpu_task_list_front(&list);
  451. task != starpu_task_list_end(&list);
  452. task = starpu_task_list_next(task))
  453. nb_task++;
  454. STARPU_COMPONENT_MUTEX_UNLOCK(&worker->mutex);
  455. struct _starpu_worker_component_data * d = component->data;
  456. struct _starpu_worker_task_list * l = d->list;
  457. int ntasks_in_fifo = l ? l->ntasks : 0;
  458. return (double) (nb_task + ntasks_in_fifo)
  459. / starpu_worker_get_relative_speedup(
  460. starpu_worker_get_perf_archtype(starpu_bitmap_first(component->workers), component->tree->sched_ctx_id));
  461. }
  462. static void _worker_component_deinit_data(struct starpu_sched_component * component)
  463. {
  464. struct _starpu_worker_component_data * d = component->data;
  465. _starpu_worker_task_list_destroy(d->list);
  466. int i, j;
  467. for(j = 0; j < STARPU_NMAX_SCHED_CTXS; j++)
  468. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  469. if(_worker_components[j][i] == component)
  470. {
  471. _worker_components[j][i] = NULL;
  472. break;
  473. }
  474. free(d);
  475. }
  476. static struct starpu_sched_component * starpu_sched_component_worker_create(struct starpu_sched_tree *tree, int workerid)
  477. {
  478. STARPU_ASSERT(workerid >= 0 && workerid < (int) starpu_worker_get_count());
  479. if(_worker_components[tree->sched_ctx_id][workerid])
  480. return _worker_components[tree->sched_ctx_id][workerid];
  481. struct _starpu_worker * worker = _starpu_get_worker_struct(workerid);
  482. if(worker == NULL)
  483. return NULL;
  484. char name[32];
  485. snprintf(name, sizeof(name), "worker %d", workerid);
  486. struct starpu_sched_component * component = starpu_sched_component_create(tree, name);
  487. struct _starpu_worker_component_data *data;
  488. _STARPU_MALLOC(data, sizeof(*data));
  489. memset(data, 0, sizeof(*data));
  490. data->worker = worker;
  491. data->list = _starpu_worker_task_list_create();
  492. component->data = data;
  493. component->push_task = simple_worker_push_task;
  494. component->pull_task = simple_worker_pull_task;
  495. component->can_pull = simple_worker_can_pull;
  496. component->estimated_end = simple_worker_estimated_end;
  497. component->estimated_load = simple_worker_estimated_load;
  498. component->deinit_data = _worker_component_deinit_data;
  499. starpu_bitmap_set(component->workers, workerid);
  500. starpu_bitmap_or(component->workers_in_ctx, component->workers);
  501. _worker_components[tree->sched_ctx_id][workerid] = component;
  502. /*
  503. #ifdef STARPU_HAVE_HWLOC
  504. struct _starpu_machine_config *config = _starpu_get_machine_config();
  505. struct _starpu_machine_topology *topology = &config->topology;
  506. hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->cpu_depth, worker->bindid);
  507. STARPU_ASSERT(obj);
  508. component->obj = obj;
  509. #endif
  510. */
  511. return component;
  512. }
  513. /******************************************************************************
  514. * Combined Worker Components' Interface Functions *
  515. *****************************************************************************/
  516. static void combined_worker_can_pull(struct starpu_sched_component * component)
  517. {
  518. (void) component;
  519. STARPU_ASSERT(starpu_sched_component_is_combined_worker(component));
  520. struct _starpu_worker_component_data * data = component->data;
  521. unsigned workerid = starpu_worker_get_id_check();
  522. int i;
  523. for(i = 0; i < data->combined_worker->worker_size; i++)
  524. {
  525. if((unsigned) i == workerid)
  526. continue;
  527. _starpu_wake_worker_relax(workerid);
  528. }
  529. }
  530. static int combined_worker_push_task(struct starpu_sched_component * component, struct starpu_task *task)
  531. {
  532. STARPU_ASSERT(starpu_sched_component_is_combined_worker(component));
  533. struct _starpu_worker_component_data * data = component->data;
  534. STARPU_ASSERT(data->combined_worker && !data->worker);
  535. struct _starpu_combined_worker * combined_worker = data->combined_worker;
  536. STARPU_ASSERT(combined_worker->worker_size >= 1);
  537. struct _starpu_task_grid * task_alias[combined_worker->worker_size];
  538. starpu_parallel_task_barrier_init(task, starpu_bitmap_first(component->workers));
  539. task_alias[0] = _starpu_task_grid_create();
  540. task_alias[0]->task = starpu_task_dup(task);
  541. task_alias[0]->task->workerid = combined_worker->combined_workerid[0];
  542. task_alias[0]->task->destroy = 1;
  543. task_alias[0]->left = NULL;
  544. task_alias[0]->ntasks = combined_worker->worker_size;
  545. int i;
  546. for(i = 1; i < combined_worker->worker_size; i++)
  547. {
  548. task_alias[i] = _starpu_task_grid_create();
  549. task_alias[i]->task = starpu_task_dup(task);
  550. task_alias[i]->task->destroy = 1;
  551. task_alias[i]->task->workerid = combined_worker->combined_workerid[i];
  552. task_alias[i]->left = task_alias[i-1];
  553. task_alias[i - 1]->right = task_alias[i];
  554. task_alias[i]->pntasks = &(task_alias[0]->ntasks);
  555. }
  556. starpu_pthread_mutex_t * mutex_to_unlock = NULL;
  557. i = 0;
  558. do
  559. {
  560. struct starpu_sched_component * worker_component = starpu_sched_component_worker_get(component->tree->sched_ctx_id, combined_worker->combined_workerid[i]);
  561. struct _starpu_worker_component_data * worker_data = worker_component->data;
  562. struct _starpu_worker_task_list * list = worker_data->list;
  563. STARPU_COMPONENT_MUTEX_LOCK(&list->mutex);
  564. if(mutex_to_unlock)
  565. STARPU_COMPONENT_MUTEX_UNLOCK(mutex_to_unlock);
  566. mutex_to_unlock = &list->mutex;
  567. _starpu_worker_task_list_push(list, task_alias[i]);
  568. i++;
  569. }
  570. while(i < combined_worker->worker_size);
  571. STARPU_COMPONENT_MUTEX_UNLOCK(mutex_to_unlock);
  572. int workerid = starpu_worker_get_id();
  573. if(-1 == workerid)
  574. {
  575. combined_worker_can_pull(component);
  576. }
  577. else
  578. {
  579. /* wake up all other workers of combined worker */
  580. for(i = 0; i < combined_worker->worker_size; i++)
  581. {
  582. struct starpu_sched_component * worker_component = starpu_sched_component_worker_get(component->tree->sched_ctx_id, combined_worker->combined_workerid[i]);
  583. simple_worker_can_pull(worker_component);
  584. }
  585. combined_worker_can_pull(component);
  586. }
  587. return 0;
  588. }
  589. static double combined_worker_estimated_end(struct starpu_sched_component * component)
  590. {
  591. STARPU_ASSERT(starpu_sched_component_is_combined_worker(component));
  592. struct _starpu_worker_component_data * data = component->data;
  593. struct _starpu_combined_worker * combined_worker = data->combined_worker;
  594. double max = 0.0;
  595. int i;
  596. for(i = 0; i < combined_worker->worker_size; i++)
  597. {
  598. data = _worker_components[component->tree->sched_ctx_id][combined_worker->combined_workerid[i]]->data;
  599. double tmp = data->list->exp_end;
  600. max = tmp > max ? tmp : max;
  601. }
  602. return max;
  603. }
  604. static double combined_worker_estimated_load(struct starpu_sched_component * component)
  605. {
  606. struct _starpu_worker_component_data * d = component->data;
  607. struct _starpu_combined_worker * c = d->combined_worker;
  608. double load = 0;
  609. int i;
  610. for(i = 0; i < c->worker_size; i++)
  611. {
  612. struct starpu_sched_component * n = starpu_sched_component_worker_get(component->tree->sched_ctx_id, c->combined_workerid[i]);
  613. load += n->estimated_load(n);
  614. }
  615. return load;
  616. }
  617. static struct starpu_sched_component * starpu_sched_component_combined_worker_create(struct starpu_sched_tree *tree, int workerid)
  618. {
  619. STARPU_ASSERT(workerid >= 0 && workerid < STARPU_NMAXWORKERS);
  620. if(_worker_components[tree->sched_ctx_id][workerid])
  621. return _worker_components[tree->sched_ctx_id][workerid];
  622. struct _starpu_combined_worker * combined_worker = _starpu_get_combined_worker_struct(workerid);
  623. if(combined_worker == NULL)
  624. return NULL;
  625. struct starpu_sched_component * component = starpu_sched_component_create(tree, "combined_worker");
  626. struct _starpu_worker_component_data *data;
  627. _STARPU_MALLOC(data, sizeof(*data));
  628. memset(data, 0, sizeof(*data));
  629. data->combined_worker = combined_worker;
  630. component->data = data;
  631. component->push_task = combined_worker_push_task;
  632. component->pull_task = NULL;
  633. component->estimated_end = combined_worker_estimated_end;
  634. component->estimated_load = combined_worker_estimated_load;
  635. component->can_pull = combined_worker_can_pull;
  636. component->deinit_data = _worker_component_deinit_data;
  637. starpu_bitmap_set(component->workers, workerid);
  638. starpu_bitmap_or(component->workers_in_ctx, component->workers);
  639. _worker_components[tree->sched_ctx_id][workerid] = component;
  640. #ifdef STARPU_HAVE_HWLOC
  641. struct _starpu_machine_config *config = _starpu_get_machine_config();
  642. struct _starpu_machine_topology *topology = &config->topology;
  643. hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->cpu_depth, combined_worker->combined_workerid[0]);
  644. STARPU_ASSERT(obj);
  645. component->obj = obj;
  646. #endif
  647. return component;
  648. }
  649. /******************************************************************************
  650. * Worker Components' Public Helper Functions (Part 2) *
  651. *****************************************************************************/
  652. void _starpu_sched_component_lock_all_workers(void)
  653. {
  654. unsigned i;
  655. for(i = 0; i < starpu_worker_get_count(); i++)
  656. _starpu_worker_lock(i);
  657. }
  658. void _starpu_sched_component_unlock_all_workers(void)
  659. {
  660. unsigned i;
  661. for(i = 0; i < starpu_worker_get_count(); i++)
  662. _starpu_worker_unlock(i);
  663. }
  664. void _starpu_sched_component_workers_destroy(void)
  665. {
  666. int i, j;
  667. for(j = 0; j < STARPU_NMAX_SCHED_CTXS; j++)
  668. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  669. if (_worker_components[j][i])
  670. starpu_sched_component_destroy(_worker_components[j][i]);
  671. }
  672. int starpu_sched_component_worker_get_workerid(struct starpu_sched_component * worker_component)
  673. {
  674. #ifndef STARPU_NO_ASSERT
  675. STARPU_ASSERT(_worker_consistant(worker_component));
  676. #endif
  677. STARPU_ASSERT(1 == starpu_bitmap_cardinal(worker_component->workers));
  678. return starpu_bitmap_first(worker_component->workers);
  679. }
  680. void starpu_sched_component_worker_pre_exec_hook(struct starpu_task * task, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
  681. {
  682. struct _starpu_worker_task_list * list = _worker_get_list(sched_ctx_id);
  683. STARPU_COMPONENT_MUTEX_LOCK(&list->mutex);
  684. _starpu_worker_task_list_started(list, task);
  685. /* Take the opportunity to update start time */
  686. list->exp_start = STARPU_MAX(starpu_timing_now() + list->pipeline_len, list->exp_start);
  687. STARPU_COMPONENT_MUTEX_UNLOCK(&list->mutex);
  688. }
  689. void starpu_sched_component_worker_post_exec_hook(struct starpu_task * task, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
  690. {
  691. if(task->execute_on_a_specific_worker)
  692. return;
  693. struct _starpu_worker_task_list * list = _worker_get_list(sched_ctx_id);
  694. STARPU_COMPONENT_MUTEX_LOCK(&list->mutex);
  695. _starpu_worker_task_list_finished(list, task);
  696. STARPU_COMPONENT_MUTEX_UNLOCK(&list->mutex);
  697. }
  698. int starpu_sched_component_is_simple_worker(struct starpu_sched_component * component)
  699. {
  700. return component->push_task == simple_worker_push_task;
  701. }
  702. int starpu_sched_component_is_combined_worker(struct starpu_sched_component * component)
  703. {
  704. return component->push_task == combined_worker_push_task;
  705. }
  706. int starpu_sched_component_is_worker(struct starpu_sched_component * component)
  707. {
  708. return starpu_sched_component_is_simple_worker(component)
  709. || starpu_sched_component_is_combined_worker(component);
  710. }
  711. /* As Worker Components' creating functions are protected, this function allows
  712. * the user to get a Worker Component from a worker id */
  713. struct starpu_sched_component * starpu_sched_component_worker_get(unsigned sched_ctx, int workerid)
  714. {
  715. STARPU_ASSERT(workerid >= 0 && workerid < STARPU_NMAXWORKERS);
  716. /* we may need to take a mutex here */
  717. STARPU_ASSERT(_worker_components[sched_ctx][workerid]);
  718. return _worker_components[sched_ctx][workerid];
  719. }
  720. struct starpu_sched_component * starpu_sched_component_worker_new(unsigned sched_ctx, int workerid)
  721. {
  722. STARPU_ASSERT(workerid >= 0 && workerid < STARPU_NMAXWORKERS);
  723. /* we may need to take a mutex here */
  724. STARPU_ASSERT(!_worker_components[sched_ctx][workerid]);
  725. struct starpu_sched_component * component;
  726. if(workerid < (int) starpu_worker_get_count())
  727. component = starpu_sched_component_worker_create(starpu_sched_tree_get(sched_ctx), workerid);
  728. else
  729. component = starpu_sched_component_combined_worker_create(starpu_sched_tree_get(sched_ctx), workerid);
  730. _worker_components[sched_ctx][workerid] = component;
  731. return component;
  732. }