node_worker.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2013 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011, 2012, 2013 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 Télécom-SudParis
  6. * Copyright (C) 2011-2013 INRIA
  7. * Copyright (C) 2013 Simon Archipoff
  8. *
  9. * StarPU is free software; you can redistribute it and/or modify
  10. * it under the terms of the GNU Lesser General Public License as published by
  11. * the Free Software Foundation; either version 2.1 of the License, or (at
  12. * your option) any later version.
  13. *
  14. * StarPU is distributed in the hope that it will be useful, but
  15. * WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  17. *
  18. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  19. */
  20. #include <starpu_sched_node.h>
  21. #include <core/workers.h>
  22. #include <float.h>
  23. /* data structure for worker's queue look like this :
  24. * W = worker
  25. * T = simple task
  26. * P = parallel task
  27. *
  28. *
  29. * P--P T
  30. * | | \|
  31. * P--P T T P T
  32. * | | | | | |
  33. * T T P--P--P T
  34. * | | | | | |
  35. * W W W W W W
  36. *
  37. *
  38. *
  39. * its possible that a _starpu_task_grid wont have task, because it have been
  40. * poped by a worker.
  41. *
  42. * N = no task
  43. *
  44. * T T T
  45. * | | |
  46. * P--N--N
  47. * | | |
  48. * W W W
  49. *
  50. *
  51. * this API is a little asymmetric : struct _starpu_task_grid are allocated by the caller and freed by the data structure
  52. *
  53. */
  54. struct _starpu_task_grid
  55. {
  56. /* this member may be NULL if a worker have poped it but its a
  57. * parallel task and we dont want mad pointers
  58. */
  59. struct starpu_task * task;
  60. struct _starpu_task_grid *up, *down, *left, *right;
  61. /* this is used to count the number of task to be poped by a worker
  62. * the leftist _starpu_task_grid maintain the ntasks counter (ie .left == NULL),
  63. * all the others use the pntasks that point to it
  64. *
  65. * when the counter reach 0, all the left and right member are set to NULL,
  66. * that mean that we will free that nodes.
  67. */
  68. union
  69. {
  70. int ntasks;
  71. int * pntasks;
  72. };
  73. };
  74. /* list->exp_start, list->exp_len, list-exp_end and list->ntasks
  75. * are updated by starpu_sched_node_worker_push_task(node, task) and pre_exec_hook
  76. */
  77. struct _starpu_worker_task_list
  78. {
  79. double exp_start, exp_len, exp_end;
  80. struct _starpu_task_grid *first, *last;
  81. unsigned ntasks;
  82. starpu_pthread_mutex_t mutex;
  83. };
  84. struct _starpu_worker_node_data
  85. {
  86. union
  87. {
  88. struct
  89. {
  90. struct _starpu_worker * worker;
  91. starpu_pthread_mutex_t lock;
  92. };
  93. struct _starpu_combined_worker * combined_worker;
  94. };
  95. struct _starpu_worker_task_list * list;
  96. };
  97. /* this array store worker nodes */
  98. static struct starpu_sched_node * _worker_nodes[STARPU_NMAXWORKERS];
  99. static struct _starpu_worker_task_list * _starpu_worker_task_list_create(void)
  100. {
  101. struct _starpu_worker_task_list * l = malloc(sizeof(*l));
  102. memset(l, 0, sizeof(*l));
  103. l->exp_len = 0.0;
  104. l->exp_start = l->exp_end = starpu_timing_now();
  105. STARPU_PTHREAD_MUTEX_INIT(&l->mutex,NULL);
  106. return l;
  107. }
  108. static struct _starpu_task_grid * _starpu_task_grid_create(void)
  109. {
  110. struct _starpu_task_grid * t = malloc(sizeof(*t));
  111. memset(t, 0, sizeof(*t));
  112. return t;
  113. }
  114. static void _starpu_task_grid_destroy(struct _starpu_task_grid * t)
  115. {
  116. free(t);
  117. }
  118. static void _starpu_worker_task_list_destroy(struct _starpu_worker_task_list * l)
  119. {
  120. if(l)
  121. {
  122. STARPU_PTHREAD_MUTEX_DESTROY(&l->mutex);
  123. free(l);
  124. }
  125. }
  126. static inline void _starpu_worker_task_list_push(struct _starpu_worker_task_list * l, struct _starpu_task_grid * t)
  127. {
  128. /* the task, ntasks, pntasks, left and right members of t are set by the caller */
  129. STARPU_ASSERT(t->task);
  130. if(l->first == NULL)
  131. l->first = l->last = t;
  132. t->down = l->last;
  133. l->last->up = t;
  134. t->up = NULL;
  135. l->last = t;
  136. l->ntasks++;
  137. double predicted = t->task->predicted;
  138. double predicted_transfer = t->task->predicted_transfer;
  139. /* Sometimes workers didn't take the tasks as early as we expected */
  140. l->exp_start = STARPU_MAX(l->exp_start, starpu_timing_now());
  141. l->exp_end = l->exp_start + l->exp_len;
  142. if (starpu_timing_now() + predicted_transfer < l->exp_end)
  143. {
  144. /* We may hope that the transfer will be finished by
  145. * the start of the task. */
  146. predicted_transfer = 0.0;
  147. }
  148. else
  149. {
  150. /* The transfer will not be finished by then, take the
  151. * remainder into account */
  152. predicted_transfer = (starpu_timing_now() + predicted_transfer) - l->exp_end;
  153. }
  154. if(!isnan(predicted_transfer))
  155. {
  156. l->exp_end += predicted_transfer;
  157. l->exp_len += predicted_transfer;
  158. }
  159. if(!isnan(predicted))
  160. {
  161. l->exp_end += predicted;
  162. l->exp_len += predicted;
  163. }
  164. t->task->predicted = predicted;
  165. t->task->predicted_transfer = predicted_transfer;
  166. }
  167. /* recursively set left and right pointers to NULL */
  168. static inline void _starpu_task_grid_unset_left_right_member(struct _starpu_task_grid * t)
  169. {
  170. STARPU_ASSERT(t->task == NULL);
  171. struct _starpu_task_grid * t_left = t->left;
  172. struct _starpu_task_grid * t_right = t->right;
  173. t->left = t->right = NULL;
  174. while(t_left)
  175. {
  176. STARPU_ASSERT(t_left->task == NULL);
  177. t = t_left;
  178. t_left = t_left->left;
  179. t->left = NULL;
  180. t->right = NULL;
  181. }
  182. while(t_right)
  183. {
  184. STARPU_ASSERT(t_right->task == NULL);
  185. t = t_right;
  186. t_right = t_right->right;
  187. t->left = NULL;
  188. t->right = NULL;
  189. }
  190. }
  191. static inline struct starpu_task * _starpu_worker_task_list_pop(struct _starpu_worker_task_list * l)
  192. {
  193. if(!l->first)
  194. {
  195. l->exp_start = l->exp_end = starpu_timing_now();
  196. l->exp_len = 0;
  197. return NULL;
  198. }
  199. struct _starpu_task_grid * t = l->first;
  200. /* if there is no task there is no tasks linked to this, then we can free it */
  201. if(t->task == NULL && t->right == NULL && t->left == NULL)
  202. {
  203. l->first = t->up;
  204. if(l->first)
  205. l->first->down = NULL;
  206. if(l->last == t)
  207. l->last = NULL;
  208. _starpu_task_grid_destroy(t);
  209. return _starpu_worker_task_list_pop(l);
  210. }
  211. while(t)
  212. {
  213. if(t->task)
  214. {
  215. struct starpu_task * task = t->task;
  216. t->task = NULL;
  217. /* the leftist thing hold the number of tasks, other have a pointer to it */
  218. int * p = t->left ? t->pntasks : &t->ntasks;
  219. /* the worker who pop the last task allow the rope to be freed */
  220. if(STARPU_ATOMIC_ADD(p, -1) == 0)
  221. _starpu_task_grid_unset_left_right_member(t);
  222. l->ntasks--;
  223. if(!isnan(task->predicted))
  224. {
  225. l->exp_len -= task->predicted_transfer;
  226. l->exp_end = l->exp_start + l->exp_len;
  227. }
  228. return task;
  229. }
  230. t = t->up;
  231. }
  232. return NULL;
  233. }
  234. static struct starpu_sched_node * starpu_sched_node_worker_create(int workerid);
  235. static struct starpu_sched_node * starpu_sched_node_combined_worker_create(int workerid);
  236. struct starpu_sched_node * starpu_sched_node_worker_get(int workerid)
  237. {
  238. STARPU_ASSERT(workerid >= 0 && workerid < STARPU_NMAXWORKERS);
  239. /* we may need to take a mutex here */
  240. if(_worker_nodes[workerid])
  241. return _worker_nodes[workerid];
  242. else
  243. {
  244. struct starpu_sched_node * node;
  245. if(workerid < (int) starpu_worker_get_count())
  246. node = starpu_sched_node_worker_create(workerid);
  247. else
  248. node = starpu_sched_node_combined_worker_create(workerid);
  249. _worker_nodes[workerid] = node;
  250. return node;
  251. }
  252. }
  253. struct _starpu_worker * _starpu_sched_node_worker_get_worker(struct starpu_sched_node * worker_node)
  254. {
  255. STARPU_ASSERT(starpu_sched_node_is_simple_worker(worker_node));
  256. struct _starpu_worker_node_data * data = worker_node->data;
  257. return data->worker;
  258. }
  259. struct _starpu_combined_worker * _starpu_sched_node_combined_worker_get_combined_worker(struct starpu_sched_node * worker_node)
  260. {
  261. STARPU_ASSERT(starpu_sched_node_is_combined_worker(worker_node));
  262. struct _starpu_worker_node_data * data = worker_node->data;
  263. return data->combined_worker;
  264. }
  265. /*
  266. enum starpu_perfmodel_archtype starpu_sched_node_worker_get_perf_arch(struct starpu_sched_node * worker_node)
  267. {
  268. STARPU_ASSERT(starpu_sched_node_is_worker(worker_node));
  269. if(starpu_sched_node_is_simple_worker(worker_node))
  270. return _starpu_sched_node_worker_get_worker(worker_node)->perf_arch;
  271. else
  272. return _starpu_sched_node_combined_worker_get_combined_worker(worker_node)->perf_arch;
  273. }
  274. */
  275. static int simple_worker_available(struct starpu_sched_node * worker_node)
  276. {
  277. (void) worker_node;
  278. #ifndef STARPU_NON_BLOCKING_DRIVERS
  279. struct _starpu_worker * w = _starpu_sched_node_worker_get_worker(worker_node);
  280. if(w->workerid == starpu_worker_get_id())
  281. return 1;
  282. starpu_pthread_mutex_t *sched_mutex = &w->sched_mutex;
  283. starpu_pthread_cond_t *sched_cond = &w->sched_cond;
  284. STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  285. STARPU_PTHREAD_COND_SIGNAL(sched_cond);
  286. STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  287. #endif
  288. return 1;
  289. }
  290. static int combined_worker_available(struct starpu_sched_node * node)
  291. {
  292. (void) node;
  293. #ifndef STARPU_NON_BLOCKING_DRIVERS
  294. STARPU_ASSERT(starpu_sched_node_is_combined_worker(node));
  295. struct _starpu_worker_node_data * data = node->data;
  296. int workerid = starpu_worker_get_id();
  297. int i;
  298. for(i = 0; i < data->combined_worker->worker_size; i++)
  299. {
  300. if(i == workerid)
  301. continue;
  302. int worker = data->combined_worker->combined_workerid[i];
  303. starpu_pthread_mutex_t *sched_mutex;
  304. starpu_pthread_cond_t *sched_cond;
  305. starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
  306. STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  307. STARPU_PTHREAD_COND_SIGNAL(sched_cond);
  308. STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  309. }
  310. #endif
  311. return 1;
  312. }
  313. static int simple_worker_push_task(struct starpu_sched_node * node, struct starpu_task *task)
  314. {
  315. STARPU_ASSERT(starpu_sched_node_is_worker(node));
  316. /*this function take the worker's mutex */
  317. struct _starpu_worker_node_data * data = node->data;
  318. struct _starpu_task_grid * t = _starpu_task_grid_create();
  319. t->task = task;
  320. t->ntasks = 1;
  321. task->workerid = starpu_bitmap_first(node->workers);
  322. #if 1 /* dead lock problem? */
  323. if (starpu_get_prefetch_flag())
  324. {
  325. unsigned memory_node = starpu_worker_get_memory_node(task->workerid);
  326. starpu_prefetch_task_input_on_node(task, memory_node);
  327. }
  328. #endif
  329. STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
  330. _starpu_worker_task_list_push(data->list, t);
  331. STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
  332. simple_worker_available(node);
  333. return 0;
  334. }
  335. struct starpu_task * simple_worker_pop_task(struct starpu_sched_node *node,unsigned sched_ctx_id)
  336. {
  337. struct _starpu_worker_node_data * data = node->data;
  338. struct _starpu_worker_task_list * list = data->list;
  339. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  340. struct starpu_task * task = _starpu_worker_task_list_pop(list);
  341. STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
  342. if(task)
  343. {
  344. starpu_push_task_end(task);
  345. return task;
  346. }
  347. STARPU_PTHREAD_MUTEX_LOCK(&data->lock);
  348. struct starpu_sched_node *father = node->fathers[sched_ctx_id];
  349. if(father == NULL)
  350. return NULL;
  351. task = father->pop_task(father,sched_ctx_id);
  352. STARPU_PTHREAD_MUTEX_UNLOCK(&data->lock);
  353. if(!task)
  354. return NULL;
  355. if(task->cl->type == STARPU_SPMD)
  356. {
  357. int workerid = starpu_worker_get_id();
  358. if(!starpu_worker_is_combined_worker(workerid))
  359. {
  360. starpu_push_task_end(task);
  361. return task;
  362. }
  363. struct starpu_sched_node * combined_worker_node = starpu_sched_node_worker_get(workerid);
  364. (void)combined_worker_node->push_task(combined_worker_node, task);
  365. /* we have pushed a task in queue, so can make a recursive call */
  366. return simple_worker_pop_task(node, sched_ctx_id);
  367. }
  368. if(task)
  369. starpu_push_task_end(task);
  370. return task;
  371. }
  372. void starpu_sched_node_worker_destroy(struct starpu_sched_node *node)
  373. {
  374. struct _starpu_worker * worker = _starpu_sched_node_worker_get_worker(node);
  375. unsigned id = worker->workerid;
  376. assert(_worker_nodes[id] == node);
  377. int i;
  378. for(i = 0; i < STARPU_NMAX_SCHED_CTXS ; i++)
  379. if(node->fathers[i] != NULL)
  380. return;//this node is shared between several contexts
  381. starpu_sched_node_destroy(node);
  382. _worker_nodes[id] = NULL;
  383. }
  384. void _starpu_sched_node_lock_worker(int workerid)
  385. {
  386. STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
  387. struct _starpu_worker_node_data * data = starpu_sched_node_worker_create(workerid)->data;
  388. STARPU_PTHREAD_MUTEX_LOCK(&data->lock);
  389. }
  390. void _starpu_sched_node_unlock_worker(int workerid)
  391. {
  392. STARPU_ASSERT(0 <= workerid && workerid < (int)starpu_worker_get_count());
  393. struct _starpu_worker_node_data * data = starpu_sched_node_worker_create(workerid)->data;
  394. STARPU_PTHREAD_MUTEX_UNLOCK(&data->lock);
  395. }
  396. void _starpu_sched_node_lock_all_workers(void)
  397. {
  398. unsigned i;
  399. for(i = 0; i < starpu_worker_get_count(); i++)
  400. _starpu_sched_node_lock_worker(i);
  401. }
  402. void _starpu_sched_node_unlock_all_workers(void)
  403. {
  404. unsigned i;
  405. for(i = 0; i < starpu_worker_get_count(); i++)
  406. _starpu_sched_node_unlock_worker(i);
  407. }
  408. /*
  409. dont know if this may be usefull…
  410. static double worker_estimated_finish_time(struct _starpu_worker * worker)
  411. {
  412. STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
  413. double sum = 0.0;
  414. struct starpu_task_list list = worker->local_tasks;
  415. struct starpu_task * task;
  416. for(task = starpu_task_list_front(&list);
  417. task != starpu_task_list_end(&list);
  418. task = starpu_task_list_next(task))
  419. if(!isnan(task->predicted))
  420. sum += task->predicted;
  421. if(worker->current_task)
  422. {
  423. struct starpu_task * t = worker->current_task;
  424. if(t && !isnan(t->predicted))
  425. sum += t->predicted/2;
  426. }
  427. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
  428. return sum + starpu_timing_now();
  429. }
  430. */
  431. static double combined_worker_estimated_end(struct starpu_sched_node * node)
  432. {
  433. STARPU_ASSERT(starpu_sched_node_is_combined_worker(node));
  434. struct _starpu_worker_node_data * data = node->data;
  435. struct _starpu_combined_worker * combined_worker = data->combined_worker;
  436. double max = 0.0;
  437. int i;
  438. for(i = 0; i < combined_worker->worker_size; i++)
  439. {
  440. data = _worker_nodes[combined_worker->combined_workerid[i]]->data;
  441. STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
  442. double tmp = data->list->exp_end;
  443. STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
  444. max = tmp > max ? tmp : max;
  445. }
  446. return max;
  447. }
  448. static double simple_worker_estimated_end(struct starpu_sched_node * node)
  449. {
  450. struct _starpu_worker_node_data * data = node->data;
  451. STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
  452. data->list->exp_start = STARPU_MAX(starpu_timing_now(), data->list->exp_start);
  453. double tmp = data->list->exp_end = data->list->exp_start + data->list->exp_len;
  454. STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
  455. return tmp;
  456. }
  457. static double simple_worker_estimated_load(struct starpu_sched_node * node)
  458. {
  459. struct _starpu_worker * worker = _starpu_sched_node_worker_get_worker(node);
  460. int nb_task = 0;
  461. STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
  462. struct starpu_task_list list = worker->local_tasks;
  463. struct starpu_task * task;
  464. for(task = starpu_task_list_front(&list);
  465. task != starpu_task_list_end(&list);
  466. task = starpu_task_list_next(task))
  467. nb_task++;
  468. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
  469. struct _starpu_worker_node_data * d = node->data;
  470. struct _starpu_worker_task_list * l = d->list;
  471. int ntasks_in_fifo = l ? l->ntasks : 0;
  472. return (double) (nb_task + ntasks_in_fifo)
  473. / starpu_worker_get_relative_speedup(
  474. starpu_worker_get_perf_archtype(starpu_bitmap_first(node->workers)));
  475. }
  476. static double combined_worker_estimated_load(struct starpu_sched_node * node)
  477. {
  478. struct _starpu_worker_node_data * d = node->data;
  479. struct _starpu_combined_worker * c = d->combined_worker;
  480. double load = 0;
  481. int i;
  482. for(i = 0; i < c->worker_size; i++)
  483. {
  484. struct starpu_sched_node * n = starpu_sched_node_worker_get(c->combined_workerid[i]);
  485. load += n->estimated_load(n);
  486. }
  487. return load;
  488. }
  489. static int combined_worker_push_task(struct starpu_sched_node * node, struct starpu_task *task)
  490. {
  491. STARPU_ASSERT(starpu_sched_node_is_combined_worker(node));
  492. struct _starpu_worker_node_data * data = node->data;
  493. STARPU_ASSERT(data->combined_worker && !data->worker);
  494. struct _starpu_combined_worker * combined_worker = data->combined_worker;
  495. STARPU_ASSERT(combined_worker->worker_size >= 1);
  496. struct _starpu_task_grid * task_alias[combined_worker->worker_size];
  497. starpu_parallel_task_barrier_init(task, starpu_bitmap_first(node->workers));
  498. task_alias[0] = _starpu_task_grid_create();
  499. task_alias[0]->task = starpu_task_dup(task);
  500. task_alias[0]->task->workerid = combined_worker->combined_workerid[0];
  501. task_alias[0]->left = NULL;
  502. task_alias[0]->ntasks = combined_worker->worker_size;
  503. int i;
  504. for(i = 1; i < combined_worker->worker_size; i++)
  505. {
  506. task_alias[i] = _starpu_task_grid_create();
  507. task_alias[i]->task = starpu_task_dup(task);
  508. task_alias[i]->task->workerid = combined_worker->combined_workerid[i];
  509. task_alias[i]->left = task_alias[i-1];
  510. task_alias[i - 1]->right = task_alias[i];
  511. task_alias[i]->pntasks = &task_alias[0]->ntasks;
  512. }
  513. starpu_pthread_mutex_t * mutex_to_unlock = NULL;
  514. i = 0;
  515. do
  516. {
  517. struct starpu_sched_node * worker_node = starpu_sched_node_worker_get(combined_worker->combined_workerid[i]);
  518. struct _starpu_worker_node_data * worker_data = worker_node->data;
  519. struct _starpu_worker_task_list * list = worker_data->list;
  520. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  521. if(mutex_to_unlock)
  522. STARPU_PTHREAD_MUTEX_UNLOCK(mutex_to_unlock);
  523. mutex_to_unlock = &list->mutex;
  524. _starpu_worker_task_list_push(list, task_alias[i]);
  525. i++;
  526. }
  527. while(i < combined_worker->worker_size);
  528. STARPU_PTHREAD_MUTEX_UNLOCK(mutex_to_unlock);
  529. int workerid = starpu_worker_get_id();
  530. if(-1 == workerid)
  531. {
  532. combined_worker_available(node);
  533. }
  534. else
  535. {
  536. starpu_pthread_mutex_t *worker_sched_mutex;
  537. starpu_pthread_cond_t *worker_sched_cond;
  538. starpu_worker_get_sched_condition(workerid, &worker_sched_mutex, &worker_sched_cond);
  539. STARPU_PTHREAD_MUTEX_UNLOCK(worker_sched_mutex);
  540. /* wake up all other workers of combined worker */
  541. for(i = 0; i < combined_worker->worker_size; i++)
  542. {
  543. struct starpu_sched_node * worker_node = starpu_sched_node_worker_get(combined_worker->combined_workerid[i]);
  544. simple_worker_available(worker_node);
  545. }
  546. combined_worker_available(node);
  547. STARPU_PTHREAD_MUTEX_LOCK(worker_sched_mutex);
  548. }
  549. return 0;
  550. }
  551. void _worker_node_deinit_data(struct starpu_sched_node * node)
  552. {
  553. struct _starpu_worker_node_data * d = node->data;
  554. _starpu_worker_task_list_destroy(d->list);
  555. if(starpu_sched_node_is_simple_worker(node))
  556. STARPU_PTHREAD_MUTEX_DESTROY(&d->lock);
  557. int i;
  558. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  559. if(_worker_nodes[i] == node)
  560. {
  561. _worker_nodes[i] = NULL;
  562. return;
  563. }
  564. free(d);
  565. }
  566. static struct starpu_sched_node * starpu_sched_node_worker_create(int workerid)
  567. {
  568. STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
  569. if(_worker_nodes[workerid])
  570. return _worker_nodes[workerid];
  571. struct _starpu_worker * worker = _starpu_get_worker_struct(workerid);
  572. if(worker == NULL)
  573. return NULL;
  574. struct starpu_sched_node * node = starpu_sched_node_create();
  575. struct _starpu_worker_node_data * data = malloc(sizeof(*data));
  576. memset(data, 0, sizeof(*data));
  577. data->worker = worker;
  578. STARPU_PTHREAD_MUTEX_INIT(&data->lock,NULL);
  579. data->list = _starpu_worker_task_list_create();
  580. node->data = data;
  581. node->push_task = simple_worker_push_task;
  582. node->pop_task = simple_worker_pop_task;
  583. node->estimated_end = simple_worker_estimated_end;
  584. node->estimated_load = simple_worker_estimated_load;
  585. node->deinit_data = _worker_node_deinit_data;
  586. starpu_bitmap_set(node->workers, workerid);
  587. starpu_bitmap_or(node->workers_in_ctx, node->workers);
  588. _worker_nodes[workerid] = node;
  589. #ifdef STARPU_HAVE_HWLOC
  590. struct _starpu_machine_config *config = _starpu_get_machine_config();
  591. struct _starpu_machine_topology *topology = &config->topology;
  592. hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->cpu_depth, worker->bindid);
  593. STARPU_ASSERT(obj);
  594. node->obj = obj;
  595. #endif
  596. return node;
  597. }
  598. static struct starpu_sched_node * starpu_sched_node_combined_worker_create(int workerid)
  599. {
  600. STARPU_ASSERT(0 <= workerid && workerid < STARPU_NMAXWORKERS);
  601. if(_worker_nodes[workerid])
  602. return _worker_nodes[workerid];
  603. struct _starpu_combined_worker * combined_worker = _starpu_get_combined_worker_struct(workerid);
  604. if(combined_worker == NULL)
  605. return NULL;
  606. struct starpu_sched_node * node = starpu_sched_node_create();
  607. struct _starpu_worker_node_data * data = malloc(sizeof(*data));
  608. memset(data, 0, sizeof(*data));
  609. data->combined_worker = combined_worker;
  610. node->data = data;
  611. node->push_task = combined_worker_push_task;
  612. node->pop_task = NULL;
  613. node->estimated_end = combined_worker_estimated_end;
  614. node->estimated_load = combined_worker_estimated_load;
  615. node->avail = combined_worker_available;
  616. node->deinit_data = _worker_node_deinit_data;
  617. starpu_bitmap_set(node->workers, workerid);
  618. starpu_bitmap_or(node->workers_in_ctx, node->workers);
  619. _worker_nodes[workerid] = node;
  620. #ifdef STARPU_HAVE_HWLOC
  621. struct _starpu_machine_config *config = _starpu_get_machine_config();
  622. struct _starpu_machine_topology *topology = &config->topology;
  623. hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->cpu_depth, combined_worker->combined_workerid[0]);
  624. STARPU_ASSERT(obj);
  625. node->obj = obj;
  626. #endif
  627. return node;
  628. }
  629. int starpu_sched_node_is_simple_worker(struct starpu_sched_node * node)
  630. {
  631. return node->push_task == simple_worker_push_task;
  632. }
  633. int starpu_sched_node_is_combined_worker(struct starpu_sched_node * node)
  634. {
  635. return node->push_task == combined_worker_push_task;
  636. }
  637. int starpu_sched_node_is_worker(struct starpu_sched_node * node)
  638. {
  639. return starpu_sched_node_is_simple_worker(node)
  640. || starpu_sched_node_is_combined_worker(node);
  641. }
  642. #ifndef STARPU_NO_ASSERT
  643. static int _worker_consistant(struct starpu_sched_node * node)
  644. {
  645. int is_a_worker = 0;
  646. int i;
  647. for(i = 0; i<STARPU_NMAXWORKERS; i++)
  648. if(_worker_nodes[i] == node)
  649. is_a_worker = 1;
  650. if(!is_a_worker)
  651. return 0;
  652. struct _starpu_worker_node_data * data = node->data;
  653. if(data->worker)
  654. {
  655. int id = data->worker->workerid;
  656. return (_worker_nodes[id] == node)
  657. && node->nchilds == 0;
  658. }
  659. return 1;
  660. }
  661. #endif
  662. int starpu_sched_node_worker_get_workerid(struct starpu_sched_node * worker_node)
  663. {
  664. #ifndef STARPU_NO_ASSERT
  665. STARPU_ASSERT(_worker_consistant(worker_node));
  666. #endif
  667. STARPU_ASSERT(1 == starpu_bitmap_cardinal(worker_node->workers));
  668. return starpu_bitmap_first(worker_node->workers);
  669. }
  670. static struct _starpu_worker_task_list * _worker_get_list(void)
  671. {
  672. int workerid = starpu_worker_get_id();
  673. STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
  674. struct _starpu_worker_node_data * d = starpu_sched_node_worker_get(workerid)->data;
  675. return d->list;
  676. }
  677. void starpu_sched_node_worker_pre_exec_hook(struct starpu_task * task)
  678. {
  679. if(!isnan(task->predicted))
  680. {
  681. struct _starpu_worker_task_list * list = _worker_get_list();
  682. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  683. list->exp_start = starpu_timing_now() + task->predicted;
  684. if(list->ntasks == 0)
  685. {
  686. list->exp_end = list->exp_start;
  687. list->exp_len = 0.0;
  688. }
  689. else
  690. list->exp_end = list->exp_start + list->exp_len;
  691. STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
  692. }
  693. }
  694. void starpu_sched_node_worker_post_exec_hook(struct starpu_task * task)
  695. {
  696. if(task->execute_on_a_specific_worker)
  697. return;
  698. struct _starpu_worker_task_list * list = _worker_get_list();
  699. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  700. list->exp_start = starpu_timing_now();
  701. list->exp_end = list->exp_start + list->exp_len;
  702. STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
  703. }
  704. #if 0
  705. static void starpu_sched_node_worker_push_task_notify(struct starpu_task * task, int workerid, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
  706. {
  707. struct starpu_sched_node * worker_node = starpu_sched_node_worker_get(workerid);
  708. /* dont work with parallel tasks */
  709. if(starpu_sched_node_is_combined_worker(worker_node))
  710. return;
  711. struct _starpu_worker_node_data * d = worker_node->data;
  712. struct _starpu_worker_task_list * list = d->list;
  713. /* Compute the expected penality */
  714. enum starpu_perfmodel_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
  715. unsigned memory_node = starpu_worker_get_memory_node(workerid);
  716. double predicted = starpu_task_expected_length(task, perf_arch,
  717. starpu_task_get_implementation(task));
  718. double predicted_transfer = starpu_task_expected_data_transfer_time(memory_node, task);
  719. /* Update the predictions */
  720. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  721. /* Sometimes workers didn't take the tasks as early as we expected */
  722. list->exp_start = STARPU_MAX(list->exp_start, starpu_timing_now());
  723. list->exp_end = list->exp_start + list->exp_len;
  724. /* If there is no prediction available, we consider the task has a null length */
  725. if (!isnan(predicted_transfer))
  726. {
  727. if (starpu_timing_now() + predicted_transfer < list->exp_end)
  728. {
  729. /* We may hope that the transfer will be finshied by
  730. * the start of the task. */
  731. predicted_transfer = 0;
  732. }
  733. else
  734. {
  735. /* The transfer will not be finished by then, take the
  736. * remainder into account */
  737. predicted_transfer = (starpu_timing_now() + predicted_transfer) - list->exp_end;
  738. }
  739. task->predicted_transfer = predicted_transfer;
  740. list->exp_end += predicted_transfer;
  741. list->exp_len += predicted_transfer;
  742. }
  743. /* If there is no prediction available, we consider the task has a null length */
  744. if (!isnan(predicted))
  745. {
  746. task->predicted = predicted;
  747. list->exp_end += predicted;
  748. list->exp_len += predicted;
  749. }
  750. list->ntasks++;
  751. STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
  752. }
  753. #endif