node_worker.c 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2013 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011, 2012, 2013 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 Télécom-SudParis
  6. * Copyright (C) 2011-2013 INRIA
  7. * Copyright (C) 2013 Simon Archipoff
  8. *
  9. * StarPU is free software; you can redistribute it and/or modify
  10. * it under the terms of the GNU Lesser General Public License as published by
  11. * the Free Software Foundation; either version 2.1 of the License, or (at
  12. * your option) any later version.
  13. *
  14. * StarPU is distributed in the hope that it will be useful, but
  15. * WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  17. *
  18. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  19. */
  20. #include <starpu_sched_node.h>
  21. #include <core/workers.h>
  22. #include <float.h>
  23. /* data structure for worker's queue look like this :
  24. * W = worker
  25. * T = simple task
  26. * P = parallel task
  27. *
  28. *
  29. * P--P T
  30. * | | \|
  31. * P--P T T P T
  32. * | | | | | |
  33. * T T P--P--P T
  34. * | | | | | |
  35. * W W W W W W
  36. *
  37. *
  38. *
  39. * its possible that a _starpu_task_grid wont have task, because it have been
  40. * poped by a worker.
  41. *
  42. * N = no task
  43. *
  44. * T T T
  45. * | | |
  46. * P--N--N
  47. * | | |
  48. * W W W
  49. *
  50. *
  51. * this API is a little asymmetric : struct _starpu_task_grid are allocated by the caller and freed by the data structure
  52. *
  53. */
  54. struct _starpu_task_grid
  55. {
  56. /* this member may be NULL if a worker have poped it but its a
  57. * parallel task and we dont want mad pointers
  58. */
  59. struct starpu_task * task;
  60. struct _starpu_task_grid *up, *down, *left, *right;
  61. /* this is used to count the number of task to be poped by a worker
  62. * the leftist _starpu_task_grid maintain the ntasks counter (ie .left == NULL),
  63. * all the others use the pntasks that point to it
  64. *
  65. * when the counter reach 0, all the left and right member are set to NULL,
  66. * that mean that we will free that nodes.
  67. */
  68. union
  69. {
  70. int ntasks;
  71. int * pntasks;
  72. };
  73. };
  74. /* list->exp_start, list->exp_len, list-exp_end and list->ntasks
  75. * are updated by starpu_sched_node_worker_push_task(node, task) and pre_exec_hook
  76. */
  77. struct _starpu_worker_task_list
  78. {
  79. double exp_start, exp_len, exp_end;
  80. struct _starpu_task_grid *first, *last;
  81. unsigned ntasks;
  82. starpu_pthread_mutex_t mutex;
  83. };
  84. enum _starpu_worker_node_status
  85. {
  86. NODE_STATUS_SLEEPING,
  87. NODE_STATUS_RESET,
  88. NODE_STATUS_CHANGED
  89. };
  90. struct _starpu_worker_node_data
  91. {
  92. union
  93. {
  94. struct
  95. {
  96. struct _starpu_worker * worker;
  97. starpu_pthread_mutex_t lock;
  98. };
  99. struct _starpu_combined_worker * combined_worker;
  100. };
  101. struct _starpu_worker_task_list * list;
  102. enum _starpu_worker_node_status status;
  103. };
  104. /* this array store worker nodes */
  105. static struct starpu_sched_node * _worker_nodes[STARPU_NMAXWORKERS];
  106. static struct _starpu_worker_task_list * _starpu_worker_task_list_create(void)
  107. {
  108. struct _starpu_worker_task_list * l = malloc(sizeof(*l));
  109. memset(l, 0, sizeof(*l));
  110. l->exp_len = 0.0;
  111. l->exp_start = l->exp_end = starpu_timing_now();
  112. STARPU_PTHREAD_MUTEX_INIT(&l->mutex,NULL);
  113. return l;
  114. }
  115. static struct _starpu_task_grid * _starpu_task_grid_create(void)
  116. {
  117. struct _starpu_task_grid * t = malloc(sizeof(*t));
  118. memset(t, 0, sizeof(*t));
  119. return t;
  120. }
  121. static void _starpu_task_grid_destroy(struct _starpu_task_grid * t)
  122. {
  123. free(t);
  124. }
  125. static void _starpu_worker_task_list_destroy(struct _starpu_worker_task_list * l)
  126. {
  127. if(l)
  128. {
  129. STARPU_PTHREAD_MUTEX_DESTROY(&l->mutex);
  130. free(l);
  131. }
  132. }
  133. static inline void _starpu_worker_task_list_push(struct _starpu_worker_task_list * l, struct _starpu_task_grid * t)
  134. {
  135. /* the task, ntasks, pntasks, left and right members of t are set by the caller */
  136. STARPU_ASSERT(t->task);
  137. if(l->first == NULL)
  138. l->first = l->last = t;
  139. t->down = l->last;
  140. l->last->up = t;
  141. t->up = NULL;
  142. l->last = t;
  143. l->ntasks++;
  144. double predicted = t->task->predicted;
  145. double predicted_transfer = t->task->predicted_transfer;
  146. /* Sometimes workers didn't take the tasks as early as we expected */
  147. l->exp_start = STARPU_MAX(l->exp_start, starpu_timing_now());
  148. l->exp_end = l->exp_start + l->exp_len;
  149. if (starpu_timing_now() + predicted_transfer < l->exp_end)
  150. {
  151. /* We may hope that the transfer will be finished by
  152. * the start of the task. */
  153. predicted_transfer = 0.0;
  154. }
  155. else
  156. {
  157. /* The transfer will not be finished by then, take the
  158. * remainder into account */
  159. predicted_transfer = (starpu_timing_now() + predicted_transfer) - l->exp_end;
  160. }
  161. if(!isnan(predicted_transfer))
  162. {
  163. l->exp_end += predicted_transfer;
  164. l->exp_len += predicted_transfer;
  165. }
  166. if(!isnan(predicted))
  167. {
  168. l->exp_end += predicted;
  169. l->exp_len += predicted;
  170. }
  171. t->task->predicted = predicted;
  172. t->task->predicted_transfer = predicted_transfer;
  173. }
  174. /* recursively set left and right pointers to NULL */
  175. static inline void _starpu_task_grid_unset_left_right_member(struct _starpu_task_grid * t)
  176. {
  177. STARPU_ASSERT(t->task == NULL);
  178. struct _starpu_task_grid * t_left = t->left;
  179. struct _starpu_task_grid * t_right = t->right;
  180. t->left = t->right = NULL;
  181. while(t_left)
  182. {
  183. STARPU_ASSERT(t_left->task == NULL);
  184. t = t_left;
  185. t_left = t_left->left;
  186. t->left = NULL;
  187. t->right = NULL;
  188. }
  189. while(t_right)
  190. {
  191. STARPU_ASSERT(t_right->task == NULL);
  192. t = t_right;
  193. t_right = t_right->right;
  194. t->left = NULL;
  195. t->right = NULL;
  196. }
  197. }
  198. static inline struct starpu_task * _starpu_worker_task_list_pop(struct _starpu_worker_task_list * l)
  199. {
  200. if(!l->first)
  201. {
  202. l->exp_start = l->exp_end = starpu_timing_now();
  203. l->exp_len = 0;
  204. return NULL;
  205. }
  206. struct _starpu_task_grid * t = l->first;
  207. /* if there is no task there is no tasks linked to this, then we can free it */
  208. if(t->task == NULL && t->right == NULL && t->left == NULL)
  209. {
  210. l->first = t->up;
  211. if(l->first)
  212. l->first->down = NULL;
  213. if(l->last == t)
  214. l->last = NULL;
  215. _starpu_task_grid_destroy(t);
  216. return _starpu_worker_task_list_pop(l);
  217. }
  218. while(t)
  219. {
  220. if(t->task)
  221. {
  222. struct starpu_task * task = t->task;
  223. t->task = NULL;
  224. /* the leftist thing hold the number of tasks, other have a pointer to it */
  225. int * p = t->left ? t->pntasks : &t->ntasks;
  226. /* the worker who pop the last task allow the rope to be freed */
  227. if(STARPU_ATOMIC_ADD(p, -1) == 0)
  228. _starpu_task_grid_unset_left_right_member(t);
  229. l->ntasks--;
  230. if(!isnan(task->predicted))
  231. {
  232. l->exp_len -= task->predicted_transfer;
  233. l->exp_end = l->exp_start + l->exp_len;
  234. }
  235. return task;
  236. }
  237. t = t->up;
  238. }
  239. return NULL;
  240. }
  241. static struct starpu_sched_node * starpu_sched_node_worker_create(int workerid);
  242. static struct starpu_sched_node * starpu_sched_node_combined_worker_create(int workerid);
  243. struct starpu_sched_node * starpu_sched_node_worker_get(int workerid)
  244. {
  245. STARPU_ASSERT(workerid >= 0 && workerid < STARPU_NMAXWORKERS);
  246. /* we may need to take a mutex here */
  247. if(_worker_nodes[workerid])
  248. return _worker_nodes[workerid];
  249. else
  250. {
  251. struct starpu_sched_node * node;
  252. if(workerid < (int) starpu_worker_get_count())
  253. node = starpu_sched_node_worker_create(workerid);
  254. else
  255. node = starpu_sched_node_combined_worker_create(workerid);
  256. _worker_nodes[workerid] = node;
  257. return node;
  258. }
  259. }
  260. struct _starpu_worker * _starpu_sched_node_worker_get_worker(struct starpu_sched_node * worker_node)
  261. {
  262. STARPU_ASSERT(starpu_sched_node_is_simple_worker(worker_node));
  263. struct _starpu_worker_node_data * data = worker_node->data;
  264. return data->worker;
  265. }
  266. struct _starpu_combined_worker * _starpu_sched_node_combined_worker_get_combined_worker(struct starpu_sched_node * worker_node)
  267. {
  268. STARPU_ASSERT(starpu_sched_node_is_combined_worker(worker_node));
  269. struct _starpu_worker_node_data * data = worker_node->data;
  270. return data->combined_worker;
  271. }
  272. /*
  273. enum starpu_perfmodel_archtype starpu_sched_node_worker_get_perf_arch(struct starpu_sched_node * worker_node)
  274. {
  275. STARPU_ASSERT(starpu_sched_node_is_worker(worker_node));
  276. if(starpu_sched_node_is_simple_worker(worker_node))
  277. return _starpu_sched_node_worker_get_worker(worker_node)->perf_arch;
  278. else
  279. return _starpu_sched_node_combined_worker_get_combined_worker(worker_node)->perf_arch;
  280. }
  281. */
  282. static void _starpu_sched_node_worker_set_sleep_status(struct starpu_sched_node * worker_node)
  283. {
  284. STARPU_ASSERT(starpu_sched_node_is_worker(worker_node));
  285. struct _starpu_worker_node_data * data = worker_node->data;
  286. data->status = NODE_STATUS_SLEEPING;
  287. }
  288. static void _starpu_sched_node_worker_set_changed_status(struct starpu_sched_node * worker_node)
  289. {
  290. STARPU_ASSERT(starpu_sched_node_is_worker(worker_node));
  291. struct _starpu_worker_node_data * data = worker_node->data;
  292. data->status = NODE_STATUS_CHANGED;
  293. }
  294. static void _starpu_sched_node_worker_reset_status(struct starpu_sched_node * worker_node)
  295. {
  296. STARPU_ASSERT(starpu_sched_node_is_worker(worker_node));
  297. struct _starpu_worker_node_data * data = worker_node->data;
  298. data->status = NODE_STATUS_RESET;
  299. }
  300. static int _starpu_sched_node_worker_is_reset_status(struct starpu_sched_node * worker_node)
  301. {
  302. STARPU_ASSERT(starpu_sched_node_is_worker(worker_node));
  303. struct _starpu_worker_node_data * data = worker_node->data;
  304. return (data->status == NODE_STATUS_RESET);
  305. }
  306. static int _starpu_sched_node_worker_is_changed_status(struct starpu_sched_node * worker_node)
  307. {
  308. STARPU_ASSERT(starpu_sched_node_is_worker(worker_node));
  309. struct _starpu_worker_node_data * data = worker_node->data;
  310. return (data->status == NODE_STATUS_CHANGED);
  311. }
  312. static int _starpu_sched_node_worker_is_sleeping_status(struct starpu_sched_node * worker_node)
  313. {
  314. STARPU_ASSERT(starpu_sched_node_is_worker(worker_node));
  315. struct _starpu_worker_node_data * data = worker_node->data;
  316. return (data->status == NODE_STATUS_SLEEPING);
  317. }
  318. void _starpu_sched_node_lock_worker(int workerid)
  319. {
  320. STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
  321. struct _starpu_worker_node_data * data = starpu_sched_node_worker_create(workerid)->data;
  322. STARPU_PTHREAD_MUTEX_LOCK(&data->lock);
  323. }
  324. void _starpu_sched_node_unlock_worker(int workerid)
  325. {
  326. STARPU_ASSERT(0 <= workerid && workerid < (int)starpu_worker_get_count());
  327. struct _starpu_worker_node_data * data = starpu_sched_node_worker_create(workerid)->data;
  328. STARPU_PTHREAD_MUTEX_UNLOCK(&data->lock);
  329. }
  330. static void simple_worker_available(struct starpu_sched_node * worker_node)
  331. {
  332. (void) worker_node;
  333. struct _starpu_worker * w = _starpu_sched_node_worker_get_worker(worker_node);
  334. _starpu_sched_node_lock_worker(w->workerid);
  335. if(_starpu_sched_node_worker_is_reset_status(worker_node))
  336. _starpu_sched_node_worker_set_changed_status(worker_node);
  337. if(w->workerid == starpu_worker_get_id())
  338. {
  339. _starpu_sched_node_unlock_worker(w->workerid);
  340. return;
  341. }
  342. if(_starpu_sched_node_worker_is_sleeping_status(worker_node))
  343. {
  344. starpu_pthread_mutex_t *sched_mutex;
  345. starpu_pthread_cond_t *sched_cond;
  346. starpu_worker_get_sched_condition(w->workerid, &sched_mutex, &sched_cond);
  347. _starpu_sched_node_unlock_worker(w->workerid);
  348. starpu_wakeup_worker(w->workerid, sched_cond, sched_mutex);
  349. }
  350. else
  351. _starpu_sched_node_unlock_worker(w->workerid);
  352. }
  353. static void combined_worker_available(struct starpu_sched_node * node)
  354. {
  355. (void) node;
  356. STARPU_ASSERT(starpu_sched_node_is_combined_worker(node));
  357. struct _starpu_worker_node_data * data = node->data;
  358. int workerid = starpu_worker_get_id();
  359. int i;
  360. for(i = 0; i < data->combined_worker->worker_size; i++)
  361. {
  362. if(i == workerid)
  363. continue;
  364. int worker = data->combined_worker->combined_workerid[i];
  365. _starpu_sched_node_lock_worker(worker);
  366. if(_starpu_sched_node_worker_is_sleeping_status(node))
  367. {
  368. starpu_pthread_mutex_t *sched_mutex;
  369. starpu_pthread_cond_t *sched_cond;
  370. starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
  371. starpu_wakeup_worker(worker, sched_cond, sched_mutex);
  372. }
  373. if(_starpu_sched_node_worker_is_reset_status(node))
  374. _starpu_sched_node_worker_set_changed_status(node);
  375. _starpu_sched_node_unlock_worker(worker);
  376. }
  377. }
  378. static int simple_worker_push_task(struct starpu_sched_node * node, struct starpu_task *task)
  379. {
  380. STARPU_ASSERT(starpu_sched_node_is_worker(node));
  381. /*this function take the worker's mutex */
  382. struct _starpu_worker_node_data * data = node->data;
  383. struct _starpu_task_grid * t = _starpu_task_grid_create();
  384. t->task = task;
  385. t->ntasks = 1;
  386. task->workerid = starpu_bitmap_first(node->workers);
  387. #if 1 /* dead lock problem? */
  388. if (starpu_get_prefetch_flag())
  389. {
  390. unsigned memory_node = starpu_worker_get_memory_node(task->workerid);
  391. starpu_prefetch_task_input_on_node(task, memory_node);
  392. }
  393. #endif
  394. STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
  395. _starpu_worker_task_list_push(data->list, t);
  396. STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
  397. simple_worker_available(node);
  398. return 0;
  399. }
  400. struct starpu_task * simple_worker_pop_task(struct starpu_sched_node *node)
  401. {
  402. struct _starpu_worker_node_data * data = node->data;
  403. struct _starpu_worker_task_list * list = data->list;
  404. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  405. struct starpu_task * task = _starpu_worker_task_list_pop(list);
  406. STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
  407. if(task)
  408. {
  409. starpu_push_task_end(task);
  410. return task;
  411. }
  412. STARPU_PTHREAD_MUTEX_LOCK(&data->lock);
  413. int i;
  414. _starpu_sched_node_worker_reset_status(node);
  415. for(i=0; i < node->nfathers; i++)
  416. {
  417. if(node->fathers[i] == NULL)
  418. continue;
  419. else
  420. {
  421. task = node->fathers[i]->pop_task(node->fathers[i]);
  422. if(task)
  423. break;
  424. }
  425. }
  426. if((!task) && _starpu_sched_node_worker_is_changed_status(node))
  427. {
  428. for(i=0; i < node->nfathers; i++)
  429. {
  430. if(node->fathers[i] == NULL)
  431. continue;
  432. else
  433. {
  434. task = node->fathers[i]->pop_task(node->fathers[i]);
  435. if(task)
  436. break;
  437. }
  438. }
  439. STARPU_ASSERT(task);
  440. }
  441. _starpu_sched_node_worker_set_sleep_status(node);
  442. STARPU_PTHREAD_MUTEX_UNLOCK(&data->lock);
  443. if(!task)
  444. return NULL;
  445. if(task->cl->type == STARPU_SPMD)
  446. {
  447. int workerid = starpu_worker_get_id();
  448. if(!starpu_worker_is_combined_worker(workerid))
  449. {
  450. starpu_push_task_end(task);
  451. return task;
  452. }
  453. struct starpu_sched_node * combined_worker_node = starpu_sched_node_worker_get(workerid);
  454. (void)combined_worker_node->push_task(combined_worker_node, task);
  455. /* we have pushed a task in queue, so can make a recursive call */
  456. return simple_worker_pop_task(node);
  457. }
  458. if(task)
  459. starpu_push_task_end(task);
  460. return task;
  461. }
  462. void starpu_sched_node_worker_destroy(struct starpu_sched_node *node)
  463. {
  464. struct _starpu_worker * worker = _starpu_sched_node_worker_get_worker(node);
  465. unsigned id = worker->workerid;
  466. assert(_worker_nodes[id] == node);
  467. int i;
  468. for(i = 0; i < STARPU_NMAX_SCHED_CTXS ; i++)
  469. if(node->fathers[i] != NULL)
  470. return;//this node is shared between several contexts
  471. starpu_sched_node_destroy(node);
  472. _worker_nodes[id] = NULL;
  473. }
  474. void _starpu_sched_node_lock_all_workers(void)
  475. {
  476. unsigned i;
  477. for(i = 0; i < starpu_worker_get_count(); i++)
  478. _starpu_sched_node_lock_worker(i);
  479. }
  480. void _starpu_sched_node_unlock_all_workers(void)
  481. {
  482. unsigned i;
  483. for(i = 0; i < starpu_worker_get_count(); i++)
  484. _starpu_sched_node_unlock_worker(i);
  485. }
  486. /*
  487. dont know if this may be usefull…
  488. static double worker_estimated_finish_time(struct _starpu_worker * worker)
  489. {
  490. STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
  491. double sum = 0.0;
  492. struct starpu_task_list list = worker->local_tasks;
  493. struct starpu_task * task;
  494. for(task = starpu_task_list_front(&list);
  495. task != starpu_task_list_end(&list);
  496. task = starpu_task_list_next(task))
  497. if(!isnan(task->predicted))
  498. sum += task->predicted;
  499. if(worker->current_task)
  500. {
  501. struct starpu_task * t = worker->current_task;
  502. if(t && !isnan(t->predicted))
  503. sum += t->predicted/2;
  504. }
  505. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
  506. return sum + starpu_timing_now();
  507. }
  508. */
  509. static double combined_worker_estimated_end(struct starpu_sched_node * node)
  510. {
  511. STARPU_ASSERT(starpu_sched_node_is_combined_worker(node));
  512. struct _starpu_worker_node_data * data = node->data;
  513. struct _starpu_combined_worker * combined_worker = data->combined_worker;
  514. double max = 0.0;
  515. int i;
  516. for(i = 0; i < combined_worker->worker_size; i++)
  517. {
  518. data = _worker_nodes[combined_worker->combined_workerid[i]]->data;
  519. STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
  520. double tmp = data->list->exp_end;
  521. STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
  522. max = tmp > max ? tmp : max;
  523. }
  524. return max;
  525. }
  526. static double simple_worker_estimated_end(struct starpu_sched_node * node)
  527. {
  528. struct _starpu_worker_node_data * data = node->data;
  529. STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
  530. data->list->exp_start = STARPU_MAX(starpu_timing_now(), data->list->exp_start);
  531. double tmp = data->list->exp_end = data->list->exp_start + data->list->exp_len;
  532. STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
  533. return tmp;
  534. }
  535. static double simple_worker_estimated_load(struct starpu_sched_node * node)
  536. {
  537. struct _starpu_worker * worker = _starpu_sched_node_worker_get_worker(node);
  538. int nb_task = 0;
  539. STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
  540. struct starpu_task_list list = worker->local_tasks;
  541. struct starpu_task * task;
  542. for(task = starpu_task_list_front(&list);
  543. task != starpu_task_list_end(&list);
  544. task = starpu_task_list_next(task))
  545. nb_task++;
  546. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
  547. struct _starpu_worker_node_data * d = node->data;
  548. struct _starpu_worker_task_list * l = d->list;
  549. int ntasks_in_fifo = l ? l->ntasks : 0;
  550. return (double) (nb_task + ntasks_in_fifo)
  551. / starpu_worker_get_relative_speedup(
  552. starpu_worker_get_perf_archtype(starpu_bitmap_first(node->workers)));
  553. }
  554. static double combined_worker_estimated_load(struct starpu_sched_node * node)
  555. {
  556. struct _starpu_worker_node_data * d = node->data;
  557. struct _starpu_combined_worker * c = d->combined_worker;
  558. double load = 0;
  559. int i;
  560. for(i = 0; i < c->worker_size; i++)
  561. {
  562. struct starpu_sched_node * n = starpu_sched_node_worker_get(c->combined_workerid[i]);
  563. load += n->estimated_load(n);
  564. }
  565. return load;
  566. }
  567. static int combined_worker_push_task(struct starpu_sched_node * node, struct starpu_task *task)
  568. {
  569. STARPU_ASSERT(starpu_sched_node_is_combined_worker(node));
  570. struct _starpu_worker_node_data * data = node->data;
  571. STARPU_ASSERT(data->combined_worker && !data->worker);
  572. struct _starpu_combined_worker * combined_worker = data->combined_worker;
  573. STARPU_ASSERT(combined_worker->worker_size >= 1);
  574. struct _starpu_task_grid * task_alias[combined_worker->worker_size];
  575. starpu_parallel_task_barrier_init(task, starpu_bitmap_first(node->workers));
  576. task_alias[0] = _starpu_task_grid_create();
  577. task_alias[0]->task = starpu_task_dup(task);
  578. task_alias[0]->task->workerid = combined_worker->combined_workerid[0];
  579. task_alias[0]->left = NULL;
  580. task_alias[0]->ntasks = combined_worker->worker_size;
  581. int i;
  582. for(i = 1; i < combined_worker->worker_size; i++)
  583. {
  584. task_alias[i] = _starpu_task_grid_create();
  585. task_alias[i]->task = starpu_task_dup(task);
  586. task_alias[i]->task->workerid = combined_worker->combined_workerid[i];
  587. task_alias[i]->left = task_alias[i-1];
  588. task_alias[i - 1]->right = task_alias[i];
  589. task_alias[i]->pntasks = &task_alias[0]->ntasks;
  590. }
  591. starpu_pthread_mutex_t * mutex_to_unlock = NULL;
  592. i = 0;
  593. do
  594. {
  595. struct starpu_sched_node * worker_node = starpu_sched_node_worker_get(combined_worker->combined_workerid[i]);
  596. struct _starpu_worker_node_data * worker_data = worker_node->data;
  597. struct _starpu_worker_task_list * list = worker_data->list;
  598. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  599. if(mutex_to_unlock)
  600. STARPU_PTHREAD_MUTEX_UNLOCK(mutex_to_unlock);
  601. mutex_to_unlock = &list->mutex;
  602. _starpu_worker_task_list_push(list, task_alias[i]);
  603. i++;
  604. }
  605. while(i < combined_worker->worker_size);
  606. STARPU_PTHREAD_MUTEX_UNLOCK(mutex_to_unlock);
  607. int workerid = starpu_worker_get_id();
  608. if(-1 == workerid)
  609. {
  610. combined_worker_available(node);
  611. }
  612. else
  613. {
  614. starpu_pthread_mutex_t *worker_sched_mutex;
  615. starpu_pthread_cond_t *worker_sched_cond;
  616. starpu_worker_get_sched_condition(workerid, &worker_sched_mutex, &worker_sched_cond);
  617. STARPU_PTHREAD_MUTEX_UNLOCK(worker_sched_mutex);
  618. /* wake up all other workers of combined worker */
  619. for(i = 0; i < combined_worker->worker_size; i++)
  620. {
  621. struct starpu_sched_node * worker_node = starpu_sched_node_worker_get(combined_worker->combined_workerid[i]);
  622. simple_worker_available(worker_node);
  623. }
  624. combined_worker_available(node);
  625. STARPU_PTHREAD_MUTEX_LOCK(worker_sched_mutex);
  626. }
  627. return 0;
  628. }
  629. void _worker_node_deinit_data(struct starpu_sched_node * node)
  630. {
  631. struct _starpu_worker_node_data * d = node->data;
  632. _starpu_worker_task_list_destroy(d->list);
  633. if(starpu_sched_node_is_simple_worker(node))
  634. STARPU_PTHREAD_MUTEX_DESTROY(&d->lock);
  635. int i;
  636. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  637. if(_worker_nodes[i] == node)
  638. {
  639. _worker_nodes[i] = NULL;
  640. return;
  641. }
  642. free(d);
  643. }
  644. static struct starpu_sched_node * starpu_sched_node_worker_create(int workerid)
  645. {
  646. STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
  647. if(_worker_nodes[workerid])
  648. return _worker_nodes[workerid];
  649. struct _starpu_worker * worker = _starpu_get_worker_struct(workerid);
  650. if(worker == NULL)
  651. return NULL;
  652. struct starpu_sched_node * node = starpu_sched_node_create();
  653. struct _starpu_worker_node_data * data = malloc(sizeof(*data));
  654. memset(data, 0, sizeof(*data));
  655. data->worker = worker;
  656. STARPU_PTHREAD_MUTEX_INIT(&data->lock,NULL);
  657. data->status = NODE_STATUS_SLEEPING;
  658. data->list = _starpu_worker_task_list_create();
  659. node->data = data;
  660. node->push_task = simple_worker_push_task;
  661. node->pop_task = simple_worker_pop_task;
  662. node->avail = simple_worker_available;
  663. node->estimated_end = simple_worker_estimated_end;
  664. node->estimated_load = simple_worker_estimated_load;
  665. node->deinit_data = _worker_node_deinit_data;
  666. starpu_bitmap_set(node->workers, workerid);
  667. starpu_bitmap_or(node->workers_in_ctx, node->workers);
  668. _worker_nodes[workerid] = node;
  669. #ifdef STARPU_HAVE_HWLOC
  670. struct _starpu_machine_config *config = _starpu_get_machine_config();
  671. struct _starpu_machine_topology *topology = &config->topology;
  672. hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->cpu_depth, worker->bindid);
  673. STARPU_ASSERT(obj);
  674. node->obj = obj;
  675. #endif
  676. return node;
  677. }
  678. static struct starpu_sched_node * starpu_sched_node_combined_worker_create(int workerid)
  679. {
  680. STARPU_ASSERT(0 <= workerid && workerid < STARPU_NMAXWORKERS);
  681. if(_worker_nodes[workerid])
  682. return _worker_nodes[workerid];
  683. struct _starpu_combined_worker * combined_worker = _starpu_get_combined_worker_struct(workerid);
  684. if(combined_worker == NULL)
  685. return NULL;
  686. struct starpu_sched_node * node = starpu_sched_node_create();
  687. struct _starpu_worker_node_data * data = malloc(sizeof(*data));
  688. memset(data, 0, sizeof(*data));
  689. data->combined_worker = combined_worker;
  690. data->status = NODE_STATUS_SLEEPING;
  691. node->data = data;
  692. node->push_task = combined_worker_push_task;
  693. node->pop_task = NULL;
  694. node->estimated_end = combined_worker_estimated_end;
  695. node->estimated_load = combined_worker_estimated_load;
  696. node->avail = combined_worker_available;
  697. node->deinit_data = _worker_node_deinit_data;
  698. starpu_bitmap_set(node->workers, workerid);
  699. starpu_bitmap_or(node->workers_in_ctx, node->workers);
  700. _worker_nodes[workerid] = node;
  701. #ifdef STARPU_HAVE_HWLOC
  702. struct _starpu_machine_config *config = _starpu_get_machine_config();
  703. struct _starpu_machine_topology *topology = &config->topology;
  704. hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->cpu_depth, combined_worker->combined_workerid[0]);
  705. STARPU_ASSERT(obj);
  706. node->obj = obj;
  707. #endif
  708. return node;
  709. }
  710. int starpu_sched_node_is_simple_worker(struct starpu_sched_node * node)
  711. {
  712. return node->push_task == simple_worker_push_task;
  713. }
  714. int starpu_sched_node_is_combined_worker(struct starpu_sched_node * node)
  715. {
  716. return node->push_task == combined_worker_push_task;
  717. }
  718. int starpu_sched_node_is_worker(struct starpu_sched_node * node)
  719. {
  720. return starpu_sched_node_is_simple_worker(node)
  721. || starpu_sched_node_is_combined_worker(node);
  722. }
  723. #ifndef STARPU_NO_ASSERT
  724. static int _worker_consistant(struct starpu_sched_node * node)
  725. {
  726. int is_a_worker = 0;
  727. int i;
  728. for(i = 0; i<STARPU_NMAXWORKERS; i++)
  729. if(_worker_nodes[i] == node)
  730. is_a_worker = 1;
  731. if(!is_a_worker)
  732. return 0;
  733. struct _starpu_worker_node_data * data = node->data;
  734. if(data->worker)
  735. {
  736. int id = data->worker->workerid;
  737. return (_worker_nodes[id] == node)
  738. && node->nchilds == 0;
  739. }
  740. return 1;
  741. }
  742. #endif
  743. int starpu_sched_node_worker_get_workerid(struct starpu_sched_node * worker_node)
  744. {
  745. #ifndef STARPU_NO_ASSERT
  746. STARPU_ASSERT(_worker_consistant(worker_node));
  747. #endif
  748. STARPU_ASSERT(1 == starpu_bitmap_cardinal(worker_node->workers));
  749. return starpu_bitmap_first(worker_node->workers);
  750. }
  751. static struct _starpu_worker_task_list * _worker_get_list(void)
  752. {
  753. int workerid = starpu_worker_get_id();
  754. STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
  755. struct _starpu_worker_node_data * d = starpu_sched_node_worker_get(workerid)->data;
  756. return d->list;
  757. }
  758. void starpu_sched_node_worker_pre_exec_hook(struct starpu_task * task)
  759. {
  760. if(!isnan(task->predicted))
  761. {
  762. struct _starpu_worker_task_list * list = _worker_get_list();
  763. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  764. list->exp_start = starpu_timing_now() + task->predicted;
  765. if(list->ntasks == 0)
  766. {
  767. list->exp_end = list->exp_start;
  768. list->exp_len = 0.0;
  769. }
  770. else
  771. list->exp_end = list->exp_start + list->exp_len;
  772. STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
  773. }
  774. }
  775. void starpu_sched_node_worker_post_exec_hook(struct starpu_task * task)
  776. {
  777. if(task->execute_on_a_specific_worker)
  778. return;
  779. struct _starpu_worker_task_list * list = _worker_get_list();
  780. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  781. list->exp_start = starpu_timing_now();
  782. list->exp_end = list->exp_start + list->exp_len;
  783. STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
  784. }
  785. #if 0
  786. static void starpu_sched_node_worker_push_task_notify(struct starpu_task * task, int workerid, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
  787. {
  788. struct starpu_sched_node * worker_node = starpu_sched_node_worker_get(workerid);
  789. /* dont work with parallel tasks */
  790. if(starpu_sched_node_is_combined_worker(worker_node))
  791. return;
  792. struct _starpu_worker_node_data * d = worker_node->data;
  793. struct _starpu_worker_task_list * list = d->list;
  794. /* Compute the expected penality */
  795. enum starpu_perfmodel_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
  796. unsigned memory_node = starpu_worker_get_memory_node(workerid);
  797. double predicted = starpu_task_expected_length(task, perf_arch,
  798. starpu_task_get_implementation(task));
  799. double predicted_transfer = starpu_task_expected_data_transfer_time(memory_node, task);
  800. /* Update the predictions */
  801. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  802. /* Sometimes workers didn't take the tasks as early as we expected */
  803. list->exp_start = STARPU_MAX(list->exp_start, starpu_timing_now());
  804. list->exp_end = list->exp_start + list->exp_len;
  805. /* If there is no prediction available, we consider the task has a null length */
  806. if (!isnan(predicted_transfer))
  807. {
  808. if (starpu_timing_now() + predicted_transfer < list->exp_end)
  809. {
  810. /* We may hope that the transfer will be finshied by
  811. * the start of the task. */
  812. predicted_transfer = 0;
  813. }
  814. else
  815. {
  816. /* The transfer will not be finished by then, take the
  817. * remainder into account */
  818. predicted_transfer = (starpu_timing_now() + predicted_transfer) - list->exp_end;
  819. }
  820. task->predicted_transfer = predicted_transfer;
  821. list->exp_end += predicted_transfer;
  822. list->exp_len += predicted_transfer;
  823. }
  824. /* If there is no prediction available, we consider the task has a null length */
  825. if (!isnan(predicted))
  826. {
  827. task->predicted = predicted;
  828. list->exp_end += predicted;
  829. list->exp_len += predicted;
  830. }
  831. list->ntasks++;
  832. STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
  833. }
  834. #endif