node_worker.c 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839
  1. #include <starpu_sched_node.h>
  2. #include <core/workers.h>
  3. #include <float.h>
  4. static struct starpu_sched_node * _worker_nodes[STARPU_NMAXWORKERS];
  5. /* data structure for worker's queue look like this :
  6. * W = worker
  7. * T = simple task
  8. * P = parallel task
  9. *
  10. *
  11. * P--P T
  12. * | | \|
  13. * P--P T T P T
  14. * | | | | | |
  15. * T T P--P--P T
  16. * | | | | | |
  17. * W W W W W W
  18. *
  19. *
  20. *
  21. * its possible that a _starpu_task_grid wont have task, because it have been
  22. * poped by a worker.
  23. *
  24. * N = no task
  25. *
  26. * T T T
  27. * | | |
  28. * P--N--N
  29. * | | |
  30. * W W W
  31. *
  32. *
  33. * this API is a little asymmetric : struct _starpu_task_grid are allocated by the caller and freed by the data structure
  34. *
  35. */
  36. struct _starpu_task_grid
  37. {
  38. /* this member may be NULL if a worker have poped it but its a
  39. * parallel task and we dont want mad pointers
  40. */
  41. struct starpu_task * task;
  42. struct _starpu_task_grid *up, *down, *left, *right;
  43. /* this is used to count the number of task to be poped by a worker
  44. * the leftist _starpu_task_grid maintain the ntasks counter (ie .left == NULL),
  45. * all the others use the pntasks that point to it
  46. *
  47. * when the counter reach 0, all the left and right member are set to NULL,
  48. * that mean that we will free that nodes.
  49. */
  50. union
  51. {
  52. int ntasks;
  53. int * pntasks;
  54. };
  55. };
  56. /* list->exp_start, list->exp_len, list-exp_end and list->ntasks
  57. * are updated by starpu_sched_node_worker_push_task(node, task) and pre_exec_hook
  58. */
  59. struct _starpu_worker_task_list
  60. {
  61. double exp_start, exp_len, exp_end;
  62. struct _starpu_task_grid *first, *last;
  63. unsigned ntasks;
  64. starpu_pthread_mutex_t mutex;
  65. };
  66. struct _starpu_worker_node_data
  67. {
  68. union
  69. {
  70. struct
  71. {
  72. struct _starpu_worker * worker;
  73. starpu_pthread_mutex_t lock;
  74. };
  75. struct _starpu_combined_worker * combined_worker;
  76. };
  77. struct _starpu_worker_task_list * list;
  78. };
  79. static struct _starpu_worker_task_list * _starpu_worker_task_list_create(void)
  80. {
  81. struct _starpu_worker_task_list * l = malloc(sizeof(*l));
  82. memset(l, 0, sizeof(*l));
  83. l->exp_len = 0.0;
  84. l->exp_start = l->exp_end = starpu_timing_now();
  85. STARPU_PTHREAD_MUTEX_INIT(&l->mutex,NULL);
  86. return l;
  87. }
  88. static struct _starpu_task_grid * _starpu_task_grid_create(void)
  89. {
  90. struct _starpu_task_grid * t = malloc(sizeof(*t));
  91. memset(t, 0, sizeof(*t));
  92. return t;
  93. }
  94. static void _starpu_task_grid_destroy(struct _starpu_task_grid * t)
  95. {
  96. free(t);
  97. }
  98. static void _starpu_worker_task_list_destroy(struct _starpu_worker_task_list * l)
  99. {
  100. if(l)
  101. {
  102. STARPU_PTHREAD_MUTEX_DESTROY(&l->mutex);
  103. free(l);
  104. }
  105. }
  106. /* the task, ntasks, pntasks, left and right field members are set by the caller */
  107. static inline void _starpu_worker_task_list_push(struct _starpu_worker_task_list * l, struct _starpu_task_grid * t)
  108. {
  109. STARPU_ASSERT(t->task);
  110. if(l->first == NULL)
  111. l->first = l->last = t;
  112. t->down = l->last;
  113. l->last->up = t;
  114. t->up = NULL;
  115. l->last = t;
  116. l->ntasks++;
  117. double predicted = t->task->predicted;
  118. double predicted_transfer = t->task->predicted_transfer;
  119. /* Sometimes workers didn't take the tasks as early as we expected
  120. l->exp_start = STARPU_MAX(l->exp_start, starpu_timing_now());*/
  121. l->exp_end = l->exp_start + l->exp_len;
  122. if ((starpu_timing_now() + predicted_transfer) < l->exp_end)
  123. {
  124. /* We may hope that the transfer will be finished by
  125. * the start of the task. */
  126. predicted_transfer = 0.0;
  127. }
  128. else
  129. {
  130. /* The transfer will not be finished by then, take the
  131. * remainder into account */
  132. predicted_transfer = (starpu_timing_now() + predicted_transfer) - l->exp_end;
  133. }
  134. if(!isnan(predicted_transfer))
  135. {
  136. l->exp_end += predicted_transfer;
  137. l->exp_len += predicted_transfer;
  138. }
  139. if(!isnan(predicted))
  140. {
  141. l->exp_end += predicted;
  142. l->exp_len += predicted;
  143. }
  144. t->task->predicted = predicted;
  145. t->task->predicted_transfer = predicted_transfer;
  146. }
  147. /* recursively set left and right pointers to NULL */
  148. static inline void _starpu_task_grid_unset_left_right_member(struct _starpu_task_grid * t)
  149. {
  150. STARPU_ASSERT(t->task == NULL);
  151. struct _starpu_task_grid * t_left = t->left;
  152. struct _starpu_task_grid * t_right = t->right;
  153. t->left = t->right = NULL;
  154. while(t_left)
  155. {
  156. STARPU_ASSERT(t_left->task == NULL);
  157. t = t_left;
  158. t_left = t_left->left;
  159. t->left = NULL;
  160. t->right = NULL;
  161. }
  162. while(t_right)
  163. {
  164. STARPU_ASSERT(t_right->task == NULL);
  165. t = t_right;
  166. t_right = t_right->right;
  167. t->left = NULL;
  168. t->right = NULL;
  169. }
  170. }
  171. static inline struct starpu_task * _starpu_worker_task_list_pop(struct _starpu_worker_task_list * l)
  172. {
  173. if(!l->first)
  174. {
  175. l->exp_start = l->exp_end = starpu_timing_now();
  176. l->exp_len = 0;
  177. return NULL;
  178. }
  179. struct _starpu_task_grid * t = l->first;
  180. if(t->task == NULL && t->right == NULL && t->left == NULL)
  181. {
  182. l->first = t->up;
  183. if(l->first)
  184. l->first->down = NULL;
  185. if(l->last == t)
  186. l->last = NULL;
  187. _starpu_task_grid_destroy(t);
  188. return _starpu_worker_task_list_pop(l);
  189. }
  190. while(t)
  191. {
  192. if(t->task)
  193. {
  194. struct starpu_task * task = t->task;
  195. t->task = NULL;
  196. int * p = t->left ? t->pntasks : &t->ntasks;
  197. (void) STARPU_ATOMIC_ADD(p, -1);
  198. if(*p == 0)
  199. _starpu_task_grid_unset_left_right_member(t);
  200. // l->ntasks--;
  201. if(!isnan(task->predicted))
  202. {
  203. l->exp_len -= task->predicted_transfer;
  204. l->exp_end = l->exp_start + l->exp_len;
  205. }
  206. return task;
  207. }
  208. t = t->up;
  209. }
  210. return NULL;
  211. }
  212. static struct starpu_sched_node * starpu_sched_node_worker_create(int workerid);
  213. static struct starpu_sched_node * starpu_sched_node_combined_worker_create(int workerid);
  214. struct starpu_sched_node * starpu_sched_node_worker_get(int workerid)
  215. {
  216. STARPU_ASSERT(workerid >= 0 && workerid < STARPU_NMAXWORKERS);
  217. /* we may need to take a mutex here */
  218. if(_worker_nodes[workerid])
  219. return _worker_nodes[workerid];
  220. else
  221. {
  222. struct starpu_sched_node * node;
  223. if(workerid < (int) starpu_worker_get_count())
  224. node = starpu_sched_node_worker_create(workerid);
  225. else
  226. node = starpu_sched_node_combined_worker_create(workerid);
  227. _worker_nodes[workerid] = node;
  228. return node;
  229. }
  230. }
  231. struct _starpu_worker * _starpu_sched_node_worker_get_worker(struct starpu_sched_node * worker_node)
  232. {
  233. STARPU_ASSERT(starpu_sched_node_is_simple_worker(worker_node));
  234. struct _starpu_worker_node_data * data = worker_node->data;
  235. return data->worker;
  236. }
  237. struct _starpu_combined_worker * _starpu_sched_node_combined_worker_get_combined_worker(struct starpu_sched_node * worker_node)
  238. {
  239. STARPU_ASSERT(starpu_sched_node_is_combined_worker(worker_node));
  240. struct _starpu_worker_node_data * data = worker_node->data;
  241. return data->combined_worker;
  242. }
  243. enum starpu_perfmodel_archtype starpu_sched_node_worker_get_perf_arch(struct starpu_sched_node * worker_node)
  244. {
  245. STARPU_ASSERT(starpu_sched_node_is_worker(worker_node));
  246. if(starpu_sched_node_is_simple_worker(worker_node))
  247. return _starpu_sched_node_worker_get_worker(worker_node)->perf_arch;
  248. else
  249. return _starpu_sched_node_combined_worker_get_combined_worker(worker_node)->perf_arch;
  250. }
  251. int starpu_sched_node_worker_push_task(struct starpu_sched_node * node, struct starpu_task *task)
  252. {
  253. /*this function take the worker's mutex */
  254. struct _starpu_worker_node_data * data = node->data;
  255. struct _starpu_task_grid * t = _starpu_task_grid_create();
  256. t->task = task;
  257. t->ntasks = 1;
  258. task->workerid = starpu_bitmap_first(node->workers);
  259. #if 1 /* dead lock problem */
  260. if (starpu_get_prefetch_flag())
  261. {
  262. unsigned memory_node = starpu_worker_get_memory_node(task->workerid);
  263. starpu_prefetch_task_input_on_node(task, memory_node);
  264. }
  265. #endif
  266. STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
  267. _starpu_worker_task_list_push(data->list, t);
  268. STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
  269. node->available(node);
  270. return 0;
  271. }
  272. struct starpu_task * starpu_sched_node_worker_pop_task(struct starpu_sched_node *node,unsigned sched_ctx_id)
  273. {
  274. struct _starpu_worker_node_data * data = node->data;
  275. struct _starpu_worker_task_list * list = data->list;
  276. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  277. struct starpu_task * task = _starpu_worker_task_list_pop(list);
  278. STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
  279. if(task)
  280. {
  281. starpu_push_task_end(task);
  282. return task;
  283. }
  284. STARPU_PTHREAD_MUTEX_LOCK(&data->lock);
  285. struct starpu_sched_node *father = node->fathers[sched_ctx_id];
  286. if(father == NULL)
  287. return NULL;
  288. task = father->pop_task(father,sched_ctx_id);
  289. STARPU_PTHREAD_MUTEX_UNLOCK(&data->lock);
  290. if(!task)
  291. return NULL;
  292. if(task->cl->type == STARPU_SPMD)
  293. {
  294. int combined_workerid = starpu_combined_worker_get_id();
  295. if(combined_workerid < 0)
  296. {
  297. starpu_push_task_end(task);
  298. return task;
  299. }
  300. struct starpu_sched_node * combined_worker_node = starpu_sched_node_worker_get(combined_workerid);
  301. (void)combined_worker_node->push_task(combined_worker_node, task);
  302. //we have pushed a task in queue, so can make a recursive call
  303. return starpu_sched_node_worker_pop_task(node, sched_ctx_id);
  304. }
  305. if(task)
  306. starpu_push_task_end(task);
  307. return task;
  308. }
  309. void starpu_sched_node_worker_destroy(struct starpu_sched_node *node)
  310. {
  311. struct _starpu_worker * worker = _starpu_sched_node_worker_get_worker(node);
  312. unsigned id = worker->workerid;
  313. assert(_worker_nodes[id] == node);
  314. int i;
  315. for(i = 0; i < STARPU_NMAX_SCHED_CTXS ; i++)
  316. if(node->fathers[i] != NULL)
  317. return;//this node is shared between several contexts
  318. starpu_sched_node_destroy(node);
  319. _worker_nodes[id] = NULL;
  320. }
  321. void _starpu_sched_node_block_worker(int workerid)
  322. {
  323. STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
  324. struct _starpu_worker_node_data * data = starpu_sched_node_worker_create(workerid)->data;
  325. STARPU_PTHREAD_MUTEX_LOCK(&data->lock);
  326. }
  327. void _starpu_sched_node_unblock_worker(int workerid)
  328. {
  329. STARPU_ASSERT(0 <= workerid && workerid < (int)starpu_worker_get_count());
  330. struct _starpu_worker_node_data * data = starpu_sched_node_worker_create(workerid)->data;
  331. STARPU_PTHREAD_MUTEX_UNLOCK(&data->lock);
  332. }
  333. void _starpu_sched_node_lock_all_workers(void)
  334. {
  335. unsigned i;
  336. for(i = 0; i < starpu_worker_get_count(); i++)
  337. _starpu_sched_node_block_worker(i);
  338. }
  339. void _starpu_sched_node_unlock_all_workers(void)
  340. {
  341. unsigned i;
  342. for(i = 0; i < starpu_worker_get_count(); i++)
  343. _starpu_sched_node_unblock_worker(i);
  344. }
  345. static void simple_worker_available(struct starpu_sched_node * worker_node)
  346. {
  347. (void) worker_node;
  348. #ifndef STARPU_NON_BLOCKING_DRIVERS
  349. struct _starpu_worker * w = _starpu_sched_node_worker_get_worker(worker_node);
  350. if(w->workerid == starpu_worker_get_id())
  351. return;
  352. starpu_pthread_mutex_t *sched_mutex = &w->sched_mutex;
  353. starpu_pthread_cond_t *sched_cond = &w->sched_cond;
  354. STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  355. STARPU_PTHREAD_COND_SIGNAL(sched_cond);
  356. STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  357. #endif
  358. }
  359. static void combined_worker_available(struct starpu_sched_node * node)
  360. {
  361. (void) node;
  362. #ifndef STARPU_NON_BLOCKING_DRIVERS
  363. STARPU_ASSERT(starpu_sched_node_is_combined_worker(node));
  364. struct _starpu_worker_node_data * data = node->data;
  365. int workerid = starpu_worker_get_id();
  366. int i;
  367. for(i = 0; i < data->combined_worker->worker_size; i++)
  368. {
  369. if(i == workerid)
  370. continue;
  371. int worker = data->combined_worker->combined_workerid[i];
  372. starpu_pthread_mutex_t *sched_mutex;
  373. starpu_pthread_cond_t *sched_cond;
  374. starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
  375. STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  376. STARPU_PTHREAD_COND_SIGNAL(sched_cond);
  377. STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  378. }
  379. #endif
  380. }
  381. static double estimated_transfer_length(struct starpu_sched_node * node,
  382. struct starpu_task * task)
  383. {
  384. STARPU_ASSERT(starpu_sched_node_is_worker(node));
  385. starpu_task_bundle_t bundle = task->bundle;
  386. struct _starpu_worker_node_data * data = node->data;
  387. unsigned memory_node = data->worker ? data->worker->memory_node : data->combined_worker->memory_node;
  388. if(bundle)
  389. return starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
  390. else
  391. return starpu_task_expected_data_transfer_time(memory_node, task);
  392. }
  393. static double worker_estimated_finish_time(struct _starpu_worker * worker)
  394. {
  395. STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
  396. double sum = 0.0;
  397. struct starpu_task_list list = worker->local_tasks;
  398. struct starpu_task * task;
  399. for(task = starpu_task_list_front(&list);
  400. task != starpu_task_list_end(&list);
  401. task = starpu_task_list_next(task))
  402. if(!isnan(task->predicted))
  403. sum += task->predicted;
  404. if(worker->current_task)
  405. {
  406. struct starpu_task * t = worker->current_task;
  407. if(t && !isnan(t->predicted))
  408. sum += t->predicted/2;
  409. }
  410. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
  411. return sum + starpu_timing_now();
  412. }
  413. static double combined_worker_estimated_end(struct starpu_sched_node * node)
  414. {
  415. STARPU_ASSERT(starpu_sched_node_is_combined_worker(node));
  416. struct _starpu_worker_node_data * data = node->data;
  417. struct _starpu_combined_worker * combined_worker = data->combined_worker;
  418. double max = 0.0;
  419. int i;
  420. for(i = 0; i < combined_worker->worker_size; i++)
  421. {
  422. data = _worker_nodes[combined_worker->combined_workerid[i]]->data;
  423. STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
  424. double tmp = data->list->exp_end;
  425. STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
  426. max = tmp > max ? tmp : max;
  427. }
  428. return max;
  429. }
  430. static double simple_worker_estimated_end(struct starpu_sched_node * node)
  431. {
  432. struct _starpu_worker_node_data * data = node->data;
  433. STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
  434. data->list->exp_start = STARPU_MAX(starpu_timing_now(), data->list->exp_start);
  435. double tmp = data->list->exp_end = data->list->exp_start + data->list->exp_len;
  436. STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
  437. return tmp;
  438. }
  439. static double simple_worker_estimated_load(struct starpu_sched_node * node)
  440. {
  441. struct _starpu_worker * worker = _starpu_sched_node_worker_get_worker(node);
  442. int nb_task = 0;
  443. STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
  444. struct starpu_task_list list = worker->local_tasks;
  445. struct starpu_task * task;
  446. for(task = starpu_task_list_front(&list);
  447. task != starpu_task_list_end(&list);
  448. task = starpu_task_list_next(task))
  449. nb_task++;
  450. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
  451. struct _starpu_worker_node_data * d = node->data;
  452. struct _starpu_worker_task_list * l = d->list;
  453. int ntasks_in_fifo = l ? l->ntasks : 0;
  454. return (double) (nb_task + ntasks_in_fifo)
  455. / starpu_worker_get_relative_speedup(starpu_bitmap_first(node->workers));
  456. }
  457. static double combined_worker_estimated_load(struct starpu_sched_node * node)
  458. {
  459. struct _starpu_worker_node_data * d = node->data;
  460. struct _starpu_combined_worker * c = d->combined_worker;
  461. double load = 0;
  462. int i;
  463. for(i = 0; i < c->worker_size; i++)
  464. {
  465. struct starpu_sched_node * n = starpu_sched_node_worker_get(c->combined_workerid[i]);
  466. load += n->estimated_load(n);
  467. }
  468. return load;
  469. }
  470. static int starpu_sched_node_combined_worker_push_task(struct starpu_sched_node * node, struct starpu_task *task)
  471. {
  472. STARPU_ASSERT(starpu_sched_node_is_combined_worker(node));
  473. struct _starpu_worker_node_data * data = node->data;
  474. STARPU_ASSERT(data->combined_worker && !data->worker);
  475. struct _starpu_combined_worker * combined_worker = data->combined_worker;
  476. STARPU_ASSERT(combined_worker->worker_size >= 1);
  477. struct _starpu_task_grid * task_alias[combined_worker->worker_size];
  478. starpu_parallel_task_barrier_init(task, starpu_bitmap_first(node->workers));
  479. task_alias[0] = _starpu_task_grid_create();
  480. task_alias[0]->task = starpu_task_dup(task);
  481. task_alias[0]->task->workerid = combined_worker->combined_workerid[0];
  482. task_alias[0]->left = NULL;
  483. task_alias[0]->ntasks = combined_worker->worker_size;
  484. int i;
  485. for(i = 1; i < combined_worker->worker_size; i++)
  486. {
  487. task_alias[i] = _starpu_task_grid_create();
  488. task_alias[i]->task = starpu_task_dup(task);
  489. task_alias[i]->task->workerid = combined_worker->combined_workerid[i];
  490. task_alias[i]->left = task_alias[i-1];
  491. task_alias[i - 1]->right = task_alias[i];
  492. task_alias[i]->pntasks = &task_alias[0]->ntasks;
  493. }
  494. starpu_pthread_mutex_t * mutex_to_unlock = NULL;
  495. i = 0;
  496. do
  497. {
  498. struct starpu_sched_node * worker_node = starpu_sched_node_worker_get(combined_worker->combined_workerid[i]);
  499. struct _starpu_worker_node_data * worker_data = worker_node->data;
  500. struct _starpu_worker_task_list * list = worker_data->list;
  501. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  502. if(mutex_to_unlock)
  503. STARPU_PTHREAD_MUTEX_UNLOCK(mutex_to_unlock);
  504. mutex_to_unlock = &list->mutex;
  505. _starpu_worker_task_list_push(list, task_alias[i]);
  506. i++;
  507. }
  508. while(i < combined_worker->worker_size);
  509. STARPU_PTHREAD_MUTEX_UNLOCK(mutex_to_unlock);
  510. int workerid = starpu_worker_get_id();
  511. if(-1 == workerid)
  512. {
  513. node->available(node);
  514. }
  515. else
  516. {
  517. starpu_pthread_mutex_t *worker_sched_mutex;
  518. starpu_pthread_cond_t *worker_sched_cond;
  519. starpu_worker_get_sched_condition(workerid, &worker_sched_mutex, &worker_sched_cond);
  520. STARPU_PTHREAD_MUTEX_UNLOCK(worker_sched_mutex);
  521. /* wake up all other workers of combined worker */
  522. for(i = 0; i < combined_worker->worker_size; i++)
  523. {
  524. struct starpu_sched_node * worker_node = starpu_sched_node_worker_get(combined_worker->combined_workerid[i]);
  525. worker_node->available(worker_node);
  526. }
  527. node->available(node);
  528. STARPU_PTHREAD_MUTEX_LOCK(worker_sched_mutex);
  529. }
  530. return 0;
  531. }
  532. void _worker_node_deinit_data(struct starpu_sched_node * node)
  533. {
  534. struct _starpu_worker_node_data * d = node->data;
  535. _starpu_worker_task_list_destroy(d->list);
  536. if(starpu_sched_node_is_simple_worker(node))
  537. STARPU_PTHREAD_MUTEX_DESTROY(&d->lock);
  538. int i;
  539. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  540. if(_worker_nodes[i] == node)
  541. {
  542. _worker_nodes[i] = NULL;
  543. return;
  544. }
  545. free(d);
  546. }
  547. static struct starpu_sched_node * starpu_sched_node_worker_create(int workerid)
  548. {
  549. STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
  550. if(_worker_nodes[workerid])
  551. return _worker_nodes[workerid];
  552. struct _starpu_worker * worker = _starpu_get_worker_struct(workerid);
  553. if(worker == NULL)
  554. return NULL;
  555. struct starpu_sched_node * node = starpu_sched_node_create();
  556. struct _starpu_worker_node_data * data = malloc(sizeof(*data));
  557. memset(data, 0, sizeof(*data));
  558. data->worker = worker;
  559. STARPU_PTHREAD_MUTEX_INIT(&data->lock,NULL);
  560. data->list = _starpu_worker_task_list_create();
  561. node->data = data;
  562. node->push_task = starpu_sched_node_worker_push_task;
  563. node->pop_task = starpu_sched_node_worker_pop_task;
  564. node->estimated_end = simple_worker_estimated_end;
  565. node->estimated_load = simple_worker_estimated_load;
  566. node->available = simple_worker_available;
  567. node->deinit_data = _worker_node_deinit_data;
  568. starpu_bitmap_set(node->workers, workerid);
  569. starpu_bitmap_or(node->workers_in_ctx, node->workers);
  570. _worker_nodes[workerid] = node;
  571. #ifdef STARPU_HAVE_HWLOC
  572. struct _starpu_machine_config *config = _starpu_get_machine_config();
  573. struct _starpu_machine_topology *topology = &config->topology;
  574. hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->cpu_depth, worker->bindid);
  575. STARPU_ASSERT(obj);
  576. node->obj = obj;
  577. #endif
  578. return node;
  579. }
  580. static struct starpu_sched_node * starpu_sched_node_combined_worker_create(int workerid)
  581. {
  582. STARPU_ASSERT(0 <= workerid && workerid < STARPU_NMAXWORKERS);
  583. if(_worker_nodes[workerid])
  584. return _worker_nodes[workerid];
  585. struct _starpu_combined_worker * combined_worker = _starpu_get_combined_worker_struct(workerid);
  586. if(combined_worker == NULL)
  587. return NULL;
  588. struct starpu_sched_node * node = starpu_sched_node_create();
  589. struct _starpu_worker_node_data * data = malloc(sizeof(*data));
  590. memset(data, 0, sizeof(*data));
  591. data->combined_worker = combined_worker;
  592. node->data = data;
  593. node->push_task = starpu_sched_node_combined_worker_push_task;
  594. node->pop_task = NULL;
  595. node->estimated_end = combined_worker_estimated_end;
  596. node->estimated_load = combined_worker_estimated_load;
  597. node->available = combined_worker_available;
  598. node->deinit_data = _worker_node_deinit_data;
  599. starpu_bitmap_set(node->workers, workerid);
  600. starpu_bitmap_or(node->workers_in_ctx, node->workers);
  601. _worker_nodes[workerid] = node;
  602. #ifdef STARPU_HAVE_HWLOC
  603. struct _starpu_machine_config *config = _starpu_get_machine_config();
  604. struct _starpu_machine_topology *topology = &config->topology;
  605. hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->cpu_depth, combined_worker->combined_workerid[0]);
  606. STARPU_ASSERT(obj);
  607. node->obj = obj;
  608. #endif
  609. return node;
  610. }
  611. int starpu_sched_node_is_simple_worker(struct starpu_sched_node * node)
  612. {
  613. return node->push_task == starpu_sched_node_worker_push_task;
  614. }
  615. int starpu_sched_node_is_combined_worker(struct starpu_sched_node * node)
  616. {
  617. return node->push_task == starpu_sched_node_combined_worker_push_task;
  618. }
  619. int starpu_sched_node_is_worker(struct starpu_sched_node * node)
  620. {
  621. return starpu_sched_node_is_simple_worker(node)
  622. || starpu_sched_node_is_combined_worker(node);
  623. }
  624. #ifndef STARPU_NO_ASSERT
  625. static int _worker_consistant(struct starpu_sched_node * node)
  626. {
  627. int is_a_worker = 0;
  628. int i;
  629. for(i = 0; i<STARPU_NMAXWORKERS; i++)
  630. if(_worker_nodes[i] == node)
  631. is_a_worker = 1;
  632. if(!is_a_worker)
  633. return 0;
  634. struct _starpu_worker_node_data * data = node->data;
  635. if(data->worker)
  636. {
  637. int id = data->worker->workerid;
  638. return (_worker_nodes[id] == node)
  639. && node->nchilds == 0;
  640. }
  641. return 1;
  642. }
  643. #endif
  644. int _starpu_sched_node_worker_get_workerid(struct starpu_sched_node * worker_node)
  645. {
  646. #ifndef STARPU_NO_ASSERT
  647. STARPU_ASSERT(_worker_consistant(worker_node));
  648. #endif
  649. STARPU_ASSERT(1 == starpu_bitmap_cardinal(worker_node->workers));
  650. return starpu_bitmap_first(worker_node->workers);
  651. }
  652. static struct _starpu_worker_task_list * _worker_get_list(void)
  653. {
  654. int workerid = starpu_worker_get_id();
  655. STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
  656. struct _starpu_worker_node_data * d = starpu_sched_node_worker_get(workerid)->data;
  657. return d->list;
  658. }
  659. void starpu_sched_node_worker_pre_exec_hook(struct starpu_task * task)
  660. {
  661. if(!isnan(task->predicted))
  662. {
  663. struct _starpu_worker_task_list * list = _worker_get_list();
  664. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  665. if(!task->execute_on_a_specific_worker)
  666. {
  667. STARPU_ASSERT(list->ntasks != 0);
  668. list->ntasks--;
  669. }
  670. list->exp_start = starpu_timing_now() + task->predicted;
  671. if(list->ntasks == 0)
  672. {
  673. list->exp_end = list->exp_start;
  674. list->exp_len = 0.0;
  675. }
  676. else
  677. list->exp_end = list->exp_start + list->exp_len;
  678. STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
  679. }
  680. }
  681. void starpu_sched_node_worker_post_exec_hook(struct starpu_task * task)
  682. {
  683. if(task->execute_on_a_specific_worker)
  684. return;
  685. struct _starpu_worker_task_list * list = _worker_get_list();
  686. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  687. list->exp_start = starpu_timing_now();
  688. list->exp_end = list->exp_start + list->exp_len;
  689. STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
  690. }
  691. static void starpu_sched_node_worker_push_task_notify(struct starpu_task * task, int workerid, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
  692. {
  693. #if 0
  694. struct starpu_sched_node * worker_node = starpu_sched_node_worker_get(workerid);
  695. /* dont work with parallel tasks */
  696. if(starpu_sched_node_is_combined_worker(worker_node))
  697. return;
  698. struct _starpu_worker_node_data * d = worker_node->data;
  699. struct _starpu_worker_task_list * list = d->list;
  700. /* Compute the expected penality */
  701. enum starpu_perfmodel_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
  702. unsigned memory_node = starpu_worker_get_memory_node(workerid);
  703. double predicted = starpu_task_expected_length(task, perf_arch,
  704. starpu_task_get_implementation(task));
  705. double predicted_transfer = starpu_task_expected_data_transfer_time(memory_node, task);
  706. /* Update the predictions */
  707. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  708. /* Sometimes workers didn't take the tasks as early as we expected */
  709. list->exp_start = STARPU_MAX(list->exp_start, starpu_timing_now());
  710. list->exp_end = list->exp_start + list->exp_len;
  711. /* If there is no prediction available, we consider the task has a null length */
  712. if (!isnan(predicted_transfer))
  713. {
  714. if (starpu_timing_now() + predicted_transfer < list->exp_end)
  715. {
  716. /* We may hope that the transfer will be finshied by
  717. * the start of the task. */
  718. predicted_transfer = 0;
  719. }
  720. else
  721. {
  722. /* The transfer will not be finished by then, take the
  723. * remainder into account */
  724. predicted_transfer = (starpu_timing_now() + predicted_transfer) - list->exp_end;
  725. }
  726. task->predicted_transfer = predicted_transfer;
  727. list->exp_end += predicted_transfer;
  728. list->exp_len += predicted_transfer;
  729. }
  730. /* If there is no prediction available, we consider the task has a null length */
  731. if (!isnan(predicted))
  732. {
  733. task->predicted = predicted;
  734. list->exp_end += predicted;
  735. list->exp_len += predicted;
  736. }
  737. list->ntasks++;
  738. STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
  739. #endif
  740. }