node_worker.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862
  1. #include <starpu_sched_node.h>
  2. #include <core/workers.h>
  3. #include <float.h>
  4. static struct starpu_sched_node * _worker_nodes[STARPU_NMAXWORKERS];
  5. /* data structure for worker's queue look like this :
  6. * W = worker
  7. * T = simple task
  8. * P = parallel task
  9. *
  10. *
  11. * P--P T
  12. * | | \|
  13. * P--P T T P T
  14. * | | | | | |
  15. * T T P--P--P T
  16. * | | | | | |
  17. * W W W W W W
  18. *
  19. *
  20. *
  21. * its possible that a _starpu_task_grid wont have task, because it have been
  22. * poped by a worker.
  23. *
  24. * N = no task
  25. *
  26. * T T T
  27. * | | |
  28. * P--N--N
  29. * | | |
  30. * W W W
  31. *
  32. *
  33. * this API is a little asymmetric : struct _starpu_task_grid are allocated by the caller and freed by the data structure
  34. *
  35. */
  36. struct _starpu_task_grid
  37. {
  38. /* this member may be NULL if a worker have poped it but its a
  39. * parallel task and we dont want mad pointers
  40. */
  41. struct starpu_task * task;
  42. struct _starpu_task_grid *up, *down, *left, *right;
  43. /* this is used to count the number of task to be poped by a worker
  44. * the leftist _starpu_task_grid maintain the ntasks counter (ie .left == NULL),
  45. * all the others use the pntasks that point to it
  46. *
  47. * when the counter reach 0, all the left and right member are set to NULL,
  48. * that mean that we will free that nodes.
  49. */
  50. union
  51. {
  52. int ntasks;
  53. int * pntasks;
  54. };
  55. };
  56. /* list->exp_start, list->exp_len, list-exp_end and list->ntasks
  57. * are updated by starpu_sched_node_worker_push_task(node, task) and pre_exec_hook
  58. */
  59. struct _starpu_worker_task_list
  60. {
  61. double exp_start, exp_len, exp_end;
  62. struct _starpu_task_grid *first, *last;
  63. unsigned ntasks;
  64. starpu_pthread_mutex_t mutex;
  65. };
  66. struct _starpu_worker_node_data
  67. {
  68. struct _starpu_worker * worker;
  69. struct _starpu_combined_worker * combined_worker;
  70. struct _starpu_worker_task_list * list;
  71. };
  72. static struct _starpu_worker_task_list * _starpu_worker_task_list_create(void)
  73. {
  74. struct _starpu_worker_task_list * l = malloc(sizeof(*l));
  75. memset(l, 0, sizeof(*l));
  76. l->exp_len = 0.0;
  77. l->exp_start = l->exp_end = starpu_timing_now();
  78. STARPU_PTHREAD_MUTEX_INIT(&l->mutex,NULL);
  79. return l;
  80. }
  81. static struct _starpu_task_grid * _starpu_task_grid_create(void)
  82. {
  83. struct _starpu_task_grid * t = malloc(sizeof(*t));
  84. memset(t, 0, sizeof(*t));
  85. return t;
  86. }
  87. static void _starpu_task_grid_destroy(struct _starpu_task_grid * t)
  88. {
  89. free(t);
  90. }
  91. static void _starpu_worker_task_list_destroy(struct _starpu_worker_task_list * l)
  92. {
  93. if(l)
  94. {
  95. STARPU_PTHREAD_MUTEX_DESTROY(&l->mutex);
  96. free(l);
  97. }
  98. }
  99. /* the task, ntasks, pntasks, left and right field members are set by the caller */
  100. static inline void _starpu_worker_task_list_push(struct _starpu_worker_task_list * l, struct _starpu_task_grid * t)
  101. {
  102. STARPU_ASSERT(t->task);
  103. if(l->first == NULL)
  104. l->first = l->last = t;
  105. t->down = l->last;
  106. l->last->up = t;
  107. t->up = NULL;
  108. l->last = t;
  109. l->ntasks++;
  110. double predicted = t->task->predicted;
  111. double predicted_transfer = t->task->predicted_transfer;
  112. /* Sometimes workers didn't take the tasks as early as we expected */
  113. l->exp_start = STARPU_MAX(l->exp_start, starpu_timing_now());
  114. l->exp_end = l->exp_start + l->exp_len;
  115. if ((starpu_timing_now() + predicted_transfer) < l->exp_end)
  116. {
  117. /* We may hope that the transfer will be finished by
  118. * the start of the task. */
  119. predicted_transfer = 0.0;
  120. }
  121. else
  122. {
  123. /* The transfer will not be finished by then, take the
  124. * remainder into account */
  125. predicted_transfer = (starpu_timing_now() + predicted_transfer) - l->exp_end;
  126. }
  127. if(!isnan(predicted_transfer))
  128. {
  129. l->exp_end += predicted_transfer;
  130. l->exp_len += predicted_transfer;
  131. }
  132. if(!isnan(predicted))
  133. {
  134. l->exp_end += predicted;
  135. l->exp_len += predicted;
  136. }
  137. t->task->predicted = predicted;
  138. t->task->predicted_transfer = predicted_transfer;
  139. }
  140. /* recursively set left and right pointers to NULL */
  141. static inline void _starpu_task_grid_unset_left_right_member(struct _starpu_task_grid * t)
  142. {
  143. STARPU_ASSERT(t->task == NULL);
  144. struct _starpu_task_grid * t_left = t->left;
  145. struct _starpu_task_grid * t_right = t->right;
  146. t->left = t->right = NULL;
  147. while(t_left)
  148. {
  149. STARPU_ASSERT(t_left->task == NULL);
  150. t = t_left;
  151. t_left = t_left->left;
  152. t->left = NULL;
  153. t->right = NULL;
  154. }
  155. while(t_right)
  156. {
  157. STARPU_ASSERT(t_right->task == NULL);
  158. t = t_right;
  159. t_right = t_right->right;
  160. t->left = NULL;
  161. t->right = NULL;
  162. }
  163. }
  164. static inline struct starpu_task * _starpu_worker_task_list_pop(struct _starpu_worker_task_list * l)
  165. {
  166. if(!l->first)
  167. {
  168. l->exp_start = l->exp_end = starpu_timing_now();
  169. l->exp_len = 0;
  170. return NULL;
  171. }
  172. struct _starpu_task_grid * t = l->first;
  173. if(t->task == NULL && t->right == NULL && t->left == NULL)
  174. {
  175. l->first = t->up;
  176. if(l->first)
  177. l->first->down = NULL;
  178. if(l->last == t)
  179. l->last = NULL;
  180. _starpu_task_grid_destroy(t);
  181. return _starpu_worker_task_list_pop(l);
  182. }
  183. while(t)
  184. {
  185. if(t->task)
  186. {
  187. struct starpu_task * task = t->task;
  188. t->task = NULL;
  189. int * p = t->left ? t->pntasks : &t->ntasks;
  190. STARPU_ATOMIC_ADD(p, -1);
  191. if(*p == 0)
  192. _starpu_task_grid_unset_left_right_member(t);
  193. l->ntasks--;
  194. if(!isnan(task->predicted))
  195. {
  196. l->exp_len -= task->predicted;
  197. l->exp_end = l->exp_start + l->exp_len;
  198. }
  199. return task;
  200. }
  201. t = t->up;
  202. }
  203. return NULL;
  204. }
  205. static struct starpu_sched_node * starpu_sched_node_worker_create(int workerid);
  206. static struct starpu_sched_node * starpu_sched_node_combined_worker_create(int workerid);
  207. struct starpu_sched_node * starpu_sched_node_worker_get(int workerid)
  208. {
  209. STARPU_ASSERT(workerid >= 0 && workerid < STARPU_NMAXWORKERS);
  210. /* we may need to take a mutex here */
  211. if(_worker_nodes[workerid])
  212. return _worker_nodes[workerid];
  213. else
  214. {
  215. struct starpu_sched_node * node;
  216. if(workerid < (int) starpu_worker_get_count())
  217. node = starpu_sched_node_worker_create(workerid);
  218. else
  219. node = starpu_sched_node_combined_worker_create(workerid);
  220. _worker_nodes[workerid] = node;
  221. return node;
  222. }
  223. }
  224. struct _starpu_worker * starpu_sched_node_worker_get_worker(struct starpu_sched_node * worker_node)
  225. {
  226. STARPU_ASSERT(starpu_sched_node_is_simple_worker(worker_node));
  227. struct _starpu_worker_node_data * data = worker_node->data;
  228. return data->worker;
  229. }
  230. struct _starpu_combined_worker * starpu_sched_node_combined_worker_get_combined_worker(struct starpu_sched_node * worker_node)
  231. {
  232. STARPU_ASSERT(starpu_sched_node_is_combined_worker(worker_node));
  233. struct _starpu_worker_node_data * data = worker_node->data;
  234. return data->combined_worker;
  235. }
  236. enum starpu_perfmodel_archtype starpu_sched_node_worker_get_perf_arch(struct starpu_sched_node * worker_node)
  237. {
  238. STARPU_ASSERT(starpu_sched_node_is_worker(worker_node));
  239. if(starpu_sched_node_is_simple_worker(worker_node))
  240. return starpu_sched_node_worker_get_worker(worker_node)->perf_arch;
  241. else
  242. return starpu_sched_node_combined_worker_get_combined_worker(worker_node)->perf_arch;
  243. }
  244. int starpu_sched_node_worker_push_task(struct starpu_sched_node * node, struct starpu_task *task)
  245. {
  246. /*this function take the worker's mutex */
  247. struct _starpu_worker_node_data * data = node->data;
  248. struct _starpu_task_grid * t = _starpu_task_grid_create();
  249. t->task = task;
  250. t->ntasks = 1;
  251. task->workerid = starpu_bitmap_first(node->workers);
  252. if (starpu_get_prefetch_flag())
  253. {
  254. unsigned memory_node = starpu_worker_get_memory_node(task->workerid);
  255. starpu_prefetch_task_input_on_node(task, memory_node);
  256. }
  257. STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
  258. _starpu_worker_task_list_push(data->list, t);
  259. STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
  260. node->available(node);
  261. return 0;
  262. }
  263. struct starpu_task * starpu_sched_node_worker_pop_task(struct starpu_sched_node *node,unsigned sched_ctx_id)
  264. {
  265. struct _starpu_worker_node_data * data = node->data;
  266. struct _starpu_worker_task_list * list = data->list;
  267. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  268. struct starpu_task * task = _starpu_worker_task_list_pop(list);
  269. STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
  270. if(task)
  271. {
  272. starpu_push_task_end(task);
  273. return task;
  274. }
  275. struct starpu_sched_node *father = node->fathers[sched_ctx_id];
  276. if(father == NULL)
  277. return NULL;
  278. task = father->pop_task(father,sched_ctx_id);
  279. if(!task)
  280. return NULL;
  281. if(task->cl->type == STARPU_SPMD)
  282. {
  283. int combined_workerid = starpu_combined_worker_get_id();
  284. if(combined_workerid < 0)
  285. {
  286. starpu_push_task_end(task);
  287. return task;
  288. }
  289. struct starpu_sched_node * combined_worker_node = starpu_sched_node_worker_get(combined_workerid);
  290. (void)combined_worker_node->push_task(combined_worker_node, task);
  291. //we have pushed a task in queue, so can make a recursive call
  292. return starpu_sched_node_worker_pop_task(node, sched_ctx_id);
  293. }
  294. if(task)
  295. starpu_push_task_end(task);
  296. return task;
  297. }
  298. void starpu_sched_node_worker_destroy(struct starpu_sched_node *node)
  299. {
  300. struct _starpu_worker * worker = starpu_sched_node_worker_get_worker(node);
  301. unsigned id = worker->workerid;
  302. assert(_worker_nodes[id] == node);
  303. int i;
  304. for(i = 0; i < STARPU_NMAX_SCHED_CTXS ; i++)
  305. if(node->fathers[i] != NULL)
  306. return;//this node is shared between several contexts
  307. starpu_sched_node_destroy(node);
  308. _worker_nodes[id] = NULL;
  309. }
  310. static void simple_worker_available(struct starpu_sched_node * worker_node)
  311. {
  312. (void) worker_node;
  313. #ifndef STARPU_NON_BLOCKING_DRIVERS
  314. struct _starpu_worker * w = starpu_sched_node_worker_get_worker(worker_node);
  315. if(w->workerid == starpu_worker_get_id())
  316. return;
  317. starpu_pthread_mutex_t *sched_mutex = &w->sched_mutex;
  318. starpu_pthread_cond_t *sched_cond = &w->sched_cond;
  319. STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  320. STARPU_PTHREAD_COND_SIGNAL(sched_cond);
  321. STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  322. #endif
  323. }
  324. static void combined_worker_available(struct starpu_sched_node * node)
  325. {
  326. (void) node;
  327. #ifndef STARPU_NON_BLOCKING_DRIVERS
  328. STARPU_ASSERT(starpu_sched_node_is_combined_worker(node));
  329. struct _starpu_worker_node_data * data = node->data;
  330. int workerid = starpu_worker_get_id();
  331. int i;
  332. for(i = 0; i < data->combined_worker->worker_size; i++)
  333. {
  334. if(i == workerid)
  335. continue;
  336. int worker = data->combined_worker->combined_workerid[i];
  337. starpu_pthread_mutex_t *sched_mutex;
  338. starpu_pthread_cond_t *sched_cond;
  339. starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
  340. STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  341. STARPU_PTHREAD_COND_SIGNAL(sched_cond);
  342. STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  343. }
  344. #endif
  345. }
  346. static double estimated_transfer_length(struct starpu_sched_node * node,
  347. struct starpu_task * task)
  348. {
  349. STARPU_ASSERT(starpu_sched_node_is_worker(node));
  350. starpu_task_bundle_t bundle = task->bundle;
  351. struct _starpu_worker_node_data * data = node->data;
  352. unsigned memory_node = data->worker ? data->worker->memory_node : data->combined_worker->memory_node;
  353. if(bundle)
  354. return starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
  355. else
  356. return starpu_task_expected_data_transfer_time(memory_node, task);
  357. }
  358. static double worker_estimated_finish_time(struct _starpu_worker * worker)
  359. {
  360. STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
  361. double sum = 0.0;
  362. struct starpu_task_list list = worker->local_tasks;
  363. struct starpu_task * task;
  364. for(task = starpu_task_list_front(&list);
  365. task != starpu_task_list_end(&list);
  366. task = starpu_task_list_next(task))
  367. if(!isnan(task->predicted))
  368. sum += task->predicted;
  369. if(worker->current_task)
  370. {
  371. struct starpu_task * t = worker->current_task;
  372. if(t && !isnan(t->predicted))
  373. sum += t->predicted/2;
  374. }
  375. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
  376. return sum + starpu_timing_now();
  377. }
  378. static double combined_worker_expected_finish_time(struct starpu_sched_node * node)
  379. {
  380. STARPU_ASSERT(starpu_sched_node_is_combined_worker(node));
  381. struct _starpu_worker_node_data * data = node->data;
  382. struct _starpu_combined_worker * combined_worker = data->combined_worker;
  383. double max = 0.0;
  384. int i;
  385. for(i = 0; i < combined_worker->worker_size; i++)
  386. {
  387. data = _worker_nodes[combined_worker->combined_workerid[i]]->data;
  388. STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
  389. double tmp = data->list->exp_end;
  390. STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
  391. max = tmp > max ? tmp : max;
  392. }
  393. return max;
  394. }
  395. static double simple_worker_expected_finish_time(struct starpu_sched_node * node)
  396. {
  397. struct _starpu_worker_node_data * data = node->data;
  398. STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
  399. double tmp = data->list->exp_end;
  400. STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
  401. return tmp;
  402. }
  403. static struct starpu_task_execute_preds estimated_execute_preds(struct starpu_sched_node * node, struct starpu_task * task,
  404. double (*estimated_finish_time)(struct starpu_sched_node*))
  405. {
  406. STARPU_ASSERT(starpu_sched_node_is_worker(node));
  407. starpu_task_bundle_t bundle = task->bundle;
  408. int workerid = starpu_sched_node_worker_get_workerid(node);
  409. struct starpu_task_execute_preds preds =
  410. {
  411. .state = CANNOT_EXECUTE,
  412. .archtype = starpu_sched_node_worker_get_perf_arch(node),
  413. .expected_length = DBL_MAX,
  414. .expected_finish_time = estimated_finish_time(node),
  415. .expected_transfer_length = estimated_transfer_length(node, task),
  416. .expected_power = 0.0
  417. };
  418. int nimpl;
  419. for(nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  420. {
  421. if(starpu_worker_can_execute_task(workerid,task,nimpl) || starpu_combined_worker_can_execute_task(workerid, task, nimpl))
  422. {
  423. double d;
  424. if(bundle)
  425. d = starpu_task_bundle_expected_length(bundle, preds.archtype, nimpl);
  426. else
  427. d = starpu_task_expected_length(task, preds.archtype, nimpl);
  428. if(isnan(d))
  429. {
  430. preds.state = CALIBRATING;
  431. preds.expected_length = d;
  432. preds.impl = nimpl;
  433. return preds;
  434. }
  435. if(_STARPU_IS_ZERO(d) && preds.state == CANNOT_EXECUTE)
  436. {
  437. preds.state = NO_PERF_MODEL;
  438. preds.impl = nimpl;
  439. continue;
  440. }
  441. if(d < preds.expected_length)
  442. {
  443. preds.state = PERF_MODEL;
  444. preds.expected_length = d;
  445. preds.impl = nimpl;
  446. }
  447. }
  448. }
  449. if(preds.state == PERF_MODEL)
  450. {
  451. preds.expected_finish_time = starpu_sched_compute_expected_time(starpu_timing_now(),
  452. preds.expected_finish_time,
  453. preds.expected_length,
  454. preds.expected_transfer_length);
  455. if(bundle)
  456. preds.expected_power = starpu_task_bundle_expected_power(bundle, preds.archtype, preds.impl);
  457. else
  458. preds.expected_power = starpu_task_expected_power(task, preds.archtype, preds.impl);
  459. }
  460. return preds;
  461. }
  462. static struct starpu_task_execute_preds combined_worker_estimated_execute_preds(struct starpu_sched_node * node, struct starpu_task * task)
  463. {
  464. return estimated_execute_preds(node,task,combined_worker_expected_finish_time);
  465. }
  466. static struct starpu_task_execute_preds simple_worker_estimated_execute_preds(struct starpu_sched_node * node, struct starpu_task * task)
  467. {
  468. return estimated_execute_preds(node,task,simple_worker_expected_finish_time);
  469. }
  470. static double simple_worker_estimated_load(struct starpu_sched_node * node)
  471. {
  472. struct _starpu_worker * worker = starpu_sched_node_worker_get_worker(node);
  473. int nb_task = 0;
  474. STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
  475. struct starpu_task_list list = worker->local_tasks;
  476. struct starpu_task * task;
  477. for(task = starpu_task_list_front(&list);
  478. task != starpu_task_list_end(&list);
  479. task = starpu_task_list_next(task))
  480. nb_task++;
  481. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
  482. struct _starpu_worker_node_data * d = node->data;
  483. struct _starpu_worker_task_list * l = d->list;
  484. int ntasks_in_fifo = l ? l->ntasks : 0;
  485. return (double) (nb_task + ntasks_in_fifo)
  486. / starpu_worker_get_relative_speedup(starpu_bitmap_first(node->workers));
  487. }
  488. static double combined_worker_estimated_load(struct starpu_sched_node * node)
  489. {
  490. struct _starpu_worker_node_data * d = node->data;
  491. struct _starpu_combined_worker * c = d->combined_worker;
  492. double load = 0;
  493. int i;
  494. for(i = 0; i < c->worker_size; i++)
  495. {
  496. struct starpu_sched_node * n = starpu_sched_node_worker_get(c->combined_workerid[i]);
  497. load += n->estimated_load(n);
  498. }
  499. return load;
  500. }
  501. static void worker_deinit_data(struct starpu_sched_node * node)
  502. {
  503. struct _starpu_worker_node_data * data = node->data;
  504. if(data->list)
  505. _starpu_worker_task_list_destroy(data->list);
  506. free(data);
  507. node->data = NULL;
  508. int i;
  509. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  510. if(_worker_nodes[i] == node)
  511. break;
  512. STARPU_ASSERT(i < STARPU_NMAXWORKERS);
  513. _worker_nodes[i] = NULL;
  514. }
  515. static int starpu_sched_node_combined_worker_push_task(struct starpu_sched_node * node, struct starpu_task *task)
  516. {
  517. STARPU_ASSERT(starpu_sched_node_is_combined_worker(node));
  518. struct _starpu_worker_node_data * data = node->data;
  519. STARPU_ASSERT(data->combined_worker && !data->worker);
  520. struct _starpu_combined_worker * combined_worker = data->combined_worker;
  521. STARPU_ASSERT(combined_worker->worker_size >= 1);
  522. struct _starpu_task_grid * task_alias[combined_worker->worker_size];
  523. starpu_parallel_task_barrier_init(task, starpu_bitmap_first(node->workers));
  524. task_alias[0] = _starpu_task_grid_create();
  525. task_alias[0]->task = starpu_task_dup(task);
  526. task_alias[0]->task->workerid = combined_worker->combined_workerid[0];
  527. task_alias[0]->left = NULL;
  528. task_alias[0]->ntasks = combined_worker->worker_size;
  529. int i;
  530. for(i = 1; i < combined_worker->worker_size; i++)
  531. {
  532. task_alias[i] = _starpu_task_grid_create();
  533. task_alias[i]->task = starpu_task_dup(task);
  534. task_alias[i]->task->workerid = combined_worker->combined_workerid[i];
  535. task_alias[i]->left = task_alias[i-1];
  536. task_alias[i - 1]->right = task_alias[i];
  537. task_alias[i]->pntasks = &task_alias[0]->ntasks;
  538. }
  539. starpu_pthread_mutex_t * mutex_to_unlock = NULL;
  540. i = 0;
  541. do
  542. {
  543. struct starpu_sched_node * worker_node = starpu_sched_node_worker_get(combined_worker->combined_workerid[i]);
  544. struct _starpu_worker_node_data * worker_data = worker_node->data;
  545. struct _starpu_worker_task_list * list = worker_data->list;
  546. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  547. if(mutex_to_unlock)
  548. STARPU_PTHREAD_MUTEX_UNLOCK(mutex_to_unlock);
  549. mutex_to_unlock = &list->mutex;
  550. _starpu_worker_task_list_push(list, task_alias[i]);
  551. i++;
  552. }
  553. while(i < combined_worker->worker_size);
  554. STARPU_PTHREAD_MUTEX_UNLOCK(mutex_to_unlock);
  555. int workerid = starpu_worker_get_id();
  556. if(-1 == workerid)
  557. {
  558. node->available(node);
  559. }
  560. else
  561. {
  562. starpu_pthread_mutex_t *worker_sched_mutex;
  563. starpu_pthread_cond_t *worker_sched_cond;
  564. starpu_worker_get_sched_condition(workerid, &worker_sched_mutex, &worker_sched_cond);
  565. STARPU_PTHREAD_MUTEX_UNLOCK(worker_sched_mutex);
  566. /* wake up all other workers of combined worker */
  567. for(i = 0; i < combined_worker->worker_size; i++)
  568. {
  569. struct starpu_sched_node * worker_node = starpu_sched_node_worker_get(combined_worker->combined_workerid[i]);
  570. worker_node->available(worker_node);
  571. }
  572. node->available(node);
  573. STARPU_PTHREAD_MUTEX_LOCK(worker_sched_mutex);
  574. }
  575. return 0;
  576. }
  577. static struct starpu_sched_node * starpu_sched_node_worker_create(int workerid)
  578. {
  579. STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
  580. if(_worker_nodes[workerid])
  581. return _worker_nodes[workerid];
  582. struct _starpu_worker * worker = _starpu_get_worker_struct(workerid);
  583. if(worker == NULL)
  584. return NULL;
  585. struct starpu_sched_node * node = starpu_sched_node_create();
  586. struct _starpu_worker_node_data * data = malloc(sizeof(*data));
  587. memset(data, 0, sizeof(*data));
  588. data->worker = worker;
  589. data->list = _starpu_worker_task_list_create();
  590. node->data = data;
  591. node->push_task = starpu_sched_node_worker_push_task;
  592. node->pop_task = starpu_sched_node_worker_pop_task;
  593. node->estimated_execute_preds = simple_worker_estimated_execute_preds;
  594. node->estimated_load = simple_worker_estimated_load;
  595. node->available = simple_worker_available;
  596. node->deinit_data = worker_deinit_data;
  597. node->workers = starpu_bitmap_create();
  598. starpu_bitmap_set(node->workers, workerid);
  599. _worker_nodes[workerid] = node;
  600. #ifdef STARPU_HAVE_HWLOC
  601. struct _starpu_machine_config *config = _starpu_get_machine_config();
  602. struct _starpu_machine_topology *topology = &config->topology;
  603. hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->cpu_depth, worker->bindid);
  604. STARPU_ASSERT(obj);
  605. node->obj = obj;
  606. #endif
  607. return node;
  608. }
  609. static struct starpu_sched_node * starpu_sched_node_combined_worker_create(int workerid)
  610. {
  611. STARPU_ASSERT(0 <= workerid && workerid < STARPU_NMAXWORKERS);
  612. if(_worker_nodes[workerid])
  613. return _worker_nodes[workerid];
  614. struct _starpu_combined_worker * combined_worker = _starpu_get_combined_worker_struct(workerid);
  615. if(combined_worker == NULL)
  616. return NULL;
  617. struct starpu_sched_node * node = starpu_sched_node_create();
  618. struct _starpu_worker_node_data * data = malloc(sizeof(*data));
  619. memset(data, 0, sizeof(*data));
  620. data->combined_worker = combined_worker;
  621. node->data = data;
  622. node->push_task = starpu_sched_node_combined_worker_push_task;
  623. node->pop_task = NULL;
  624. node->estimated_execute_preds = combined_worker_estimated_execute_preds;
  625. node->estimated_load = combined_worker_estimated_load;
  626. node->available = combined_worker_available;
  627. node->deinit_data = worker_deinit_data;
  628. node->workers = starpu_bitmap_create();
  629. starpu_bitmap_set(node->workers, workerid);
  630. _worker_nodes[workerid] = node;
  631. #ifdef STARPU_HAVE_HWLOC
  632. struct _starpu_machine_config *config = _starpu_get_machine_config();
  633. struct _starpu_machine_topology *topology = &config->topology;
  634. hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->cpu_depth, combined_worker->combined_workerid[0]);
  635. STARPU_ASSERT(obj);
  636. node->obj = obj;
  637. #endif
  638. return node;
  639. }
  640. int starpu_sched_node_is_simple_worker(struct starpu_sched_node * node)
  641. {
  642. return node->push_task == starpu_sched_node_worker_push_task;
  643. }
  644. int starpu_sched_node_is_combined_worker(struct starpu_sched_node * node)
  645. {
  646. return node->push_task == starpu_sched_node_combined_worker_push_task;
  647. }
  648. int starpu_sched_node_is_worker(struct starpu_sched_node * node)
  649. {
  650. return starpu_sched_node_is_simple_worker(node)
  651. || starpu_sched_node_is_combined_worker(node);
  652. }
  653. #ifndef STARPU_NO_ASSERT
  654. static int _worker_consistant(struct starpu_sched_node * node)
  655. {
  656. int is_a_worker = 0;
  657. int i;
  658. for(i = 0; i<STARPU_NMAXWORKERS; i++)
  659. if(_worker_nodes[i] == node)
  660. is_a_worker = 1;
  661. if(!is_a_worker)
  662. return 0;
  663. struct _starpu_worker_node_data * data = node->data;
  664. if(data->worker)
  665. {
  666. int id = data->worker->workerid;
  667. return (_worker_nodes[id] == node)
  668. && node->nchilds == 0;
  669. }
  670. return 1;
  671. }
  672. #endif
  673. int starpu_sched_node_worker_get_workerid(struct starpu_sched_node * worker_node)
  674. {
  675. #ifndef STARPU_NO_ASSERT
  676. STARPU_ASSERT(_worker_consistant(worker_node));
  677. #endif
  678. STARPU_ASSERT(1 == starpu_bitmap_cardinal(worker_node->workers));
  679. return starpu_bitmap_first(worker_node->workers);
  680. }
  681. static struct _starpu_worker_task_list * _worker_get_list(void)
  682. {
  683. int workerid = starpu_worker_get_id();
  684. STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
  685. struct _starpu_worker_node_data * d = starpu_sched_node_worker_get(workerid)->data;
  686. return d->list;
  687. }
  688. void starpu_sched_node_worker_pre_exec_hook(struct starpu_task * task)
  689. {
  690. if(!isnan(task->predicted))
  691. {
  692. struct _starpu_worker_task_list * list = _worker_get_list();
  693. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  694. STARPU_ASSERT(list->ntasks != 0);
  695. list->ntasks--;
  696. if(!task->execute_on_a_specific_worker)
  697. list->exp_len = STARPU_MIN(list->exp_len - task->predicted, 0.0);
  698. list->exp_start = starpu_timing_now() + task->predicted;
  699. if(list->ntasks == 0)
  700. {
  701. list->exp_end = list->exp_start;
  702. list->exp_end = 0.0;
  703. }
  704. else
  705. list->exp_end = list->exp_start + list->exp_len;
  706. STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
  707. }
  708. }
  709. void starpu_sched_node_worker_post_exec_hook(struct starpu_task * task)
  710. {
  711. if(task->execute_on_a_specific_worker)
  712. return;
  713. struct _starpu_worker_task_list * list = _worker_get_list();
  714. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  715. list->exp_start = starpu_timing_now();
  716. list->exp_end = list->exp_start + list->exp_len;
  717. STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
  718. }
  719. static void starpu_sched_node_worker_push_task_notify(struct starpu_task *task, int workerid, unsigned sched_ctx_id)
  720. {
  721. struct starpu_sched_node * worker_node = starpu_sched_node_worker_get(workerid);
  722. /* dont work with parallel tasks */
  723. if(starpu_sched_node_is_combined_worker(worker_node))
  724. return;
  725. struct _starpu_worker_node_data * d = worker_node->data;
  726. struct _starpu_worker_task_list * list = d->list;
  727. /* Compute the expected penality */
  728. enum starpu_perfmodel_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
  729. unsigned memory_node = starpu_worker_get_memory_node(workerid);
  730. double predicted = starpu_task_expected_length(task, perf_arch,
  731. starpu_task_get_implementation(task));
  732. double predicted_transfer = starpu_task_expected_data_transfer_time(memory_node, task);
  733. /* Update the predictions */
  734. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  735. /* Sometimes workers didn't take the tasks as early as we expected */
  736. list->exp_start = STARPU_MAX(list->exp_start, starpu_timing_now());
  737. list->exp_end = list->exp_start + list->exp_len;
  738. /* If there is no prediction available, we consider the task has a null length */
  739. if (!isnan(predicted_transfer))
  740. {
  741. if (starpu_timing_now() + predicted_transfer < list->exp_end)
  742. {
  743. /* We may hope that the transfer will be finished by
  744. * the start of the task. */
  745. predicted_transfer = 0;
  746. }
  747. else
  748. {
  749. /* The transfer will not be finished by then, take the
  750. * remainder into account */
  751. predicted_transfer = (starpu_timing_now() + predicted_transfer) - list->exp_end;
  752. }
  753. task->predicted_transfer = predicted_transfer;
  754. list->exp_end += predicted_transfer;
  755. list->exp_len += predicted_transfer;
  756. }
  757. /* If there is no prediction available, we consider the task has a null length */
  758. if (!isnan(predicted))
  759. {
  760. task->predicted = predicted;
  761. list->exp_end += predicted;
  762. list->exp_len += predicted;
  763. }
  764. list->ntasks++;
  765. STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
  766. }