node_worker.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635
  1. #include "node_sched.h"
  2. #include <core/workers.h>
  3. #include <float.h>
  4. static struct _starpu_sched_node * _worker_nodes[STARPU_NMAXWORKERS];
  5. /* data structure for worker's queue look like this :
  6. * W = worker
  7. * T = simple task
  8. * P = parallel task
  9. *
  10. *
  11. * P--P T
  12. * | | \|
  13. * P--P T T P T
  14. * | | | | | |
  15. * T T P--P--P T
  16. * | | | | | |
  17. * W W W W W W
  18. *
  19. *
  20. *
  21. * its possible that a _starpu_task_grid wont have task
  22. *
  23. * N = no task
  24. *
  25. * T T T
  26. * | | |
  27. * P--N--N
  28. * | | |
  29. * W W W
  30. *
  31. *
  32. * this API is a little asymmetric : _starpu_task_grid are allocated by the caller and freed by the data structure
  33. *
  34. * exp_{start,end,len} are filled by the caller
  35. */
  36. struct _starpu_task_grid
  37. {
  38. /* this member may be NULL if a worker have poped it but its a
  39. * parallel task and we dont want mad pointers
  40. */
  41. struct starpu_task * task;
  42. struct _starpu_task_grid *up, *down, *left, *right;
  43. /* this is used to count the number of task to be poped by a worker
  44. * the leftist _starpu_task_grid maintain the ntasks counter (ie .left == NULL),
  45. * all the others use the pntasks that point to it
  46. *
  47. * when the counter reach 0, all the left and right member are set to NULL,
  48. * that mean that we will free that nodes.
  49. */
  50. union
  51. {
  52. int ntasks;
  53. int * pntasks;
  54. };
  55. };
  56. struct _starpu_worker_task_list
  57. {
  58. double exp_start, exp_len, exp_end;
  59. struct _starpu_task_grid *first, *last;
  60. starpu_pthread_mutex_t mutex;
  61. };
  62. struct _starpu_worker_node_data
  63. {
  64. struct _starpu_worker * worker;
  65. struct _starpu_combined_worker * combined_worker;
  66. struct _starpu_worker_task_list * list;
  67. };
  68. static struct _starpu_worker_task_list * _starpu_worker_task_list_create(void)
  69. {
  70. struct _starpu_worker_task_list * l = malloc(sizeof(*l));
  71. memset(l, 0, sizeof(*l));
  72. l->exp_len = 0.0;
  73. l->exp_start = l->exp_end = starpu_timing_now();
  74. STARPU_PTHREAD_MUTEX_INIT(&l->mutex,NULL);
  75. return l;
  76. }
  77. static struct _starpu_task_grid * _starpu_task_grid_create(void)
  78. {
  79. struct _starpu_task_grid * t = malloc(sizeof(*t));
  80. memset(t, 0, sizeof(*t));
  81. return t;
  82. }
  83. static void _starpu_task_grid_destroy(struct _starpu_task_grid * t)
  84. {
  85. free(t);
  86. }
  87. static void _starpu_worker_task_list_destroy(struct _starpu_worker_task_list * l)
  88. {
  89. if(!l)
  90. return;
  91. STARPU_PTHREAD_MUTEX_DESTROY(&l->mutex);
  92. free(l);
  93. }
  94. //the task, ntasks, pntasks, left and right field members are set by the caller
  95. static inline void _starpu_worker_task_list_push(struct _starpu_worker_task_list * l, struct _starpu_task_grid * t)
  96. {
  97. if(l->first == NULL)
  98. l->first = l->last = t;
  99. t->down = l->last;
  100. l->last->up = t;
  101. t->up = NULL;
  102. l->last = t;
  103. }
  104. //recursively set left and right pointers to NULL
  105. static inline void _starpu_task_grid_unset_left_right_member(struct _starpu_task_grid * t)
  106. {
  107. STARPU_ASSERT(t->task == NULL);
  108. struct _starpu_task_grid * t_left = t->left;
  109. struct _starpu_task_grid * t_right = t->right;
  110. t->left = t->right = NULL;
  111. while(t_left)
  112. {
  113. STARPU_ASSERT(t_left->task == NULL);
  114. t = t_left;
  115. t_left = t_left->left;
  116. t->left = NULL;
  117. t->right = NULL;
  118. }
  119. while(t_right)
  120. {
  121. STARPU_ASSERT(t_right->task == NULL);
  122. t = t_right;
  123. t_right = t_right->right;
  124. t->left = NULL;
  125. t->right = NULL;
  126. }
  127. }
  128. static inline struct starpu_task * _starpu_worker_task_list_pop(struct _starpu_worker_task_list * l)
  129. {
  130. if(!l->first)
  131. {
  132. l->exp_start = l->exp_end = starpu_timing_now();
  133. l->exp_len = 0;
  134. return NULL;
  135. }
  136. struct _starpu_task_grid * t = l->first;
  137. if(t->task == NULL && t->right == NULL && t->left == NULL)
  138. {
  139. l->first = t->up;
  140. if(l->first)
  141. l->first->down = NULL;
  142. if(l->last == t)
  143. l->last = NULL;
  144. _starpu_task_grid_destroy(t);
  145. return _starpu_worker_task_list_pop(l);
  146. }
  147. while(t)
  148. {
  149. if(t->task)
  150. {
  151. struct starpu_task * task = t->task;
  152. t->task = NULL;
  153. int * p = t->left ? t->pntasks : &t->ntasks;
  154. STARPU_ATOMIC_ADD(p, -1);
  155. if(*p == 0)
  156. _starpu_task_grid_unset_left_right_member(t);
  157. return task;
  158. }
  159. t = t->up;
  160. }
  161. return NULL;
  162. }
  163. static struct _starpu_sched_node * _starpu_sched_node_worker_create(int workerid);
  164. static struct _starpu_sched_node * _starpu_sched_node_combined_worker_create(int workerid);
  165. struct _starpu_sched_node * _starpu_sched_node_worker_get(int workerid)
  166. {
  167. STARPU_ASSERT(workerid >= 0 && workerid < STARPU_NMAXWORKERS);
  168. /* we may need to take a mutex here */
  169. if(_worker_nodes[workerid])
  170. return _worker_nodes[workerid];
  171. else
  172. return _worker_nodes[workerid] =
  173. (workerid < (int) starpu_worker_get_count() ?
  174. _starpu_sched_node_worker_create:
  175. _starpu_sched_node_combined_worker_create)(workerid);
  176. }
  177. struct _starpu_worker * _starpu_sched_node_worker_get_worker(struct _starpu_sched_node * worker_node)
  178. {
  179. STARPU_ASSERT(_starpu_sched_node_is_worker(worker_node));
  180. struct _starpu_worker_node_data * data = worker_node->data;
  181. return data->worker;
  182. }
  183. int _starpu_sched_node_worker_push_task(struct _starpu_sched_node * node, struct starpu_task *task)
  184. {
  185. /*this function take the worker's mutex */
  186. struct _starpu_worker_node_data * data = node->data;
  187. struct _starpu_task_grid * t = _starpu_task_grid_create();
  188. t->task = task;
  189. t->ntasks = 1;
  190. STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
  191. _starpu_worker_task_list_push(data->list, t);
  192. STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
  193. return 0;
  194. }
  195. struct starpu_task * _starpu_sched_node_worker_pop_task(struct _starpu_sched_node *node,unsigned sched_ctx_id)
  196. {
  197. struct _starpu_worker_node_data * data = node->data;
  198. struct _starpu_worker_task_list * list = data->list;
  199. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  200. struct starpu_task * task = _starpu_worker_task_list_pop(list);
  201. STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
  202. if(task)
  203. {
  204. starpu_push_task_end(task);
  205. return task;
  206. }
  207. struct _starpu_sched_node *father = node->fathers[sched_ctx_id];
  208. if(father == NULL)
  209. return NULL;
  210. task = father->pop_task(father,sched_ctx_id);
  211. if(!task)
  212. return NULL;
  213. if(task->cl->type == STARPU_SPMD)
  214. {
  215. int combined_workerid = starpu_combined_worker_get_id();
  216. if(combined_workerid < 0)
  217. {
  218. starpu_push_task_end(task);
  219. return task;
  220. }
  221. struct _starpu_sched_node * combined_worker_node = _starpu_sched_node_worker_get(combined_workerid);
  222. (void)combined_worker_node->push_task(combined_worker_node, task);
  223. //we have pushed a task in queue, so can make a recursive call
  224. return _starpu_sched_node_worker_pop_task(node, sched_ctx_id);
  225. }
  226. if(task)
  227. starpu_push_task_end(task);
  228. return task;
  229. }
  230. void _starpu_sched_node_worker_destroy(struct _starpu_sched_node *node)
  231. {
  232. struct _starpu_worker * worker = _starpu_sched_node_worker_get_worker(node);
  233. unsigned id = worker->workerid;
  234. assert(_worker_nodes[id] == node);
  235. int i;
  236. for(i = 0; i < STARPU_NMAX_SCHED_CTXS ; i++)
  237. if(node->fathers[i] != NULL)
  238. return;//this node is shared between several contexts
  239. _starpu_sched_node_destroy(node);
  240. _worker_nodes[id] = NULL;
  241. }
  242. static void available_worker(struct _starpu_sched_node * worker_node)
  243. {
  244. (void) worker_node;
  245. #ifndef STARPU_NON_BLOCKING_DRIVERS
  246. struct _starpu_worker * w = _starpu_sched_node_worker_get_worker(worker_node);
  247. // if(w->workerid == starpu_worker_get_id())
  248. // return;
  249. starpu_pthread_mutex_t *sched_mutex = &w->sched_mutex;
  250. starpu_pthread_cond_t *sched_cond = &w->sched_cond;
  251. STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  252. STARPU_PTHREAD_COND_SIGNAL(sched_cond);
  253. STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  254. #endif
  255. }
  256. static void available_combined_worker(struct _starpu_sched_node * node)
  257. {
  258. STARPU_ASSERT(_starpu_sched_node_is_combined_worker(node));
  259. struct _starpu_worker_node_data * data = node->data;
  260. int workerid = starpu_worker_get_id();
  261. int i;
  262. for(i = 0; i < data->combined_worker->worker_size; i++)
  263. {
  264. if(i == workerid)
  265. continue;
  266. int worker = data->combined_worker->combined_workerid[i];
  267. starpu_pthread_mutex_t *sched_mutex;
  268. starpu_pthread_cond_t *sched_cond;
  269. starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
  270. STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  271. STARPU_PTHREAD_COND_SIGNAL(sched_cond);
  272. STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  273. }
  274. }
  275. static double estimated_transfer_length(struct _starpu_sched_node * node,
  276. struct starpu_task * task)
  277. {
  278. STARPU_ASSERT(_starpu_sched_node_is_worker(node));
  279. starpu_task_bundle_t bundle = task->bundle;
  280. struct _starpu_worker_node_data * data = node->data;
  281. unsigned memory_node = data->worker ? data->worker->memory_node : data->combined_worker->memory_node;
  282. if(bundle)
  283. return starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
  284. else
  285. return starpu_task_expected_data_transfer_time(memory_node, task);
  286. }
  287. static double worker_estimated_finish_time(struct _starpu_worker * worker)
  288. {
  289. STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
  290. double sum = 0.0;
  291. struct starpu_task_list list = worker->local_tasks;
  292. struct starpu_task * task;
  293. for(task = starpu_task_list_front(&list);
  294. task != starpu_task_list_end(&list);
  295. task = starpu_task_list_next(task))
  296. if(!isnan(task->predicted))
  297. sum += task->predicted;
  298. if(worker->current_task)
  299. {
  300. struct starpu_task * t = worker->current_task;
  301. if(t && !isnan(t->predicted))
  302. sum += t->predicted/2;
  303. }
  304. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
  305. return sum + starpu_timing_now();
  306. }
  307. static double combined_worker_expected_finish_time(struct _starpu_sched_node * node)
  308. {
  309. STARPU_ASSERT(_starpu_sched_node_is_combined_worker(node));
  310. struct _starpu_worker_node_data * data = node->data;
  311. struct _starpu_combined_worker * combined_worker = data->combined_worker;
  312. double max = 0.0;
  313. int i;
  314. for(i = 0; i < combined_worker->worker_size; i++)
  315. {
  316. data = _worker_nodes[combined_worker->combined_workerid[i]]->data;
  317. STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
  318. double tmp = data->list->exp_end;
  319. STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
  320. max = tmp > max ? tmp : max;
  321. }
  322. return max;
  323. }
  324. static double simple_worker_expected_finish_time(struct _starpu_sched_node * node)
  325. {
  326. struct _starpu_worker_node_data * data = node->data;
  327. STARPU_PTHREAD_MUTEX_LOCK(&data->list->mutex);
  328. double tmp = data->list->exp_end;
  329. STARPU_PTHREAD_MUTEX_UNLOCK(&data->list->mutex);
  330. return tmp;
  331. }
  332. static struct _starpu_task_execute_preds estimated_execute_preds(struct _starpu_sched_node * node, struct starpu_task * task,
  333. double (*estimated_finish_time)(struct _starpu_sched_node*))
  334. {
  335. STARPU_ASSERT(_starpu_sched_node_is_worker(node));
  336. starpu_task_bundle_t bundle = task->bundle;
  337. struct _starpu_worker * worker = _starpu_sched_node_worker_get_worker(node);
  338. struct _starpu_task_execute_preds preds =
  339. {
  340. .state = CANNOT_EXECUTE,
  341. .archtype = worker->perf_arch,
  342. .expected_length = DBL_MAX,
  343. .expected_finish_time = estimated_finish_time(node),
  344. .expected_transfer_length = estimated_transfer_length(node, task),
  345. .expected_power = 0.0
  346. };
  347. int nimpl;
  348. for(nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  349. {
  350. if(starpu_worker_can_execute_task(worker->workerid,task,nimpl))
  351. {
  352. double d;
  353. if(bundle)
  354. d = starpu_task_bundle_expected_length(bundle, worker->perf_arch, nimpl);
  355. else
  356. d = starpu_task_expected_length(task, worker->perf_arch, nimpl);
  357. if(isnan(d))
  358. {
  359. preds.state = CALIBRATING;
  360. preds.expected_length = d;
  361. preds.impl = nimpl;
  362. return preds;
  363. }
  364. if(_STARPU_IS_ZERO(d) && preds.state == CANNOT_EXECUTE)
  365. {
  366. preds.state = NO_PERF_MODEL;
  367. preds.impl = nimpl;
  368. continue;
  369. }
  370. if(d < preds.expected_length)
  371. {
  372. preds.state = PERF_MODEL;
  373. preds.expected_length = d;
  374. preds.impl = nimpl;
  375. }
  376. }
  377. }
  378. if(preds.state == PERF_MODEL)
  379. {
  380. preds.expected_finish_time = _starpu_compute_expected_time(starpu_timing_now(),
  381. preds.expected_finish_time,
  382. preds.expected_length,
  383. preds.expected_transfer_length);
  384. if(bundle)
  385. preds.expected_power = starpu_task_bundle_expected_power(bundle, worker->perf_arch, preds.impl);
  386. else
  387. preds.expected_power = starpu_task_expected_power(task, worker->perf_arch,preds.impl);
  388. }
  389. return preds;
  390. }
  391. static struct _starpu_task_execute_preds combined_worker_estimated_execute_preds(struct _starpu_sched_node * node, struct starpu_task * task)
  392. {
  393. return estimated_execute_preds(node,task,combined_worker_expected_finish_time);
  394. }
  395. static struct _starpu_task_execute_preds simple_worker_estimated_execute_preds(struct _starpu_sched_node * node, struct starpu_task * task)
  396. {
  397. return estimated_execute_preds(node,task,simple_worker_expected_finish_time);
  398. }
  399. static double estimated_load(struct _starpu_sched_node * node)
  400. {
  401. struct _starpu_worker * worker = _starpu_sched_node_worker_get_worker(node);
  402. int nb_task = 0;
  403. STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
  404. struct starpu_task_list list = worker->local_tasks;
  405. struct starpu_task * task;
  406. for(task = starpu_task_list_front(&list);
  407. task != starpu_task_list_end(&list);
  408. task = starpu_task_list_next(task))
  409. nb_task++;
  410. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
  411. return (double) nb_task
  412. / starpu_worker_get_relative_speedup(_starpu_bitmap_first(node->workers));
  413. }
  414. static void worker_deinit_data(struct _starpu_sched_node * node)
  415. {
  416. struct _starpu_worker_node_data * data = node->data;
  417. if(data->list)
  418. _starpu_worker_task_list_destroy(data->list);
  419. free(data);
  420. node->data = NULL;
  421. int i;
  422. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  423. if(_worker_nodes[i] == node)
  424. break;
  425. STARPU_ASSERT(i < STARPU_NMAXWORKERS);
  426. _worker_nodes[i] = NULL;
  427. }
  428. static int _starpu_sched_node_combined_worker_push_task(struct _starpu_sched_node * node, struct starpu_task *task)
  429. {
  430. STARPU_ASSERT(_starpu_sched_node_is_combined_worker(node));
  431. struct _starpu_worker_node_data * data = node->data;
  432. STARPU_ASSERT(data->combined_worker && !data->worker);
  433. struct _starpu_combined_worker * combined_worker = data->combined_worker;
  434. STARPU_ASSERT(combined_worker->worker_size >= 1);
  435. struct _starpu_task_grid * task_alias[combined_worker->worker_size];
  436. starpu_parallel_task_barrier_init(task, _starpu_bitmap_first(node->workers));
  437. task_alias[0] = _starpu_task_grid_create();
  438. task_alias[0]->task = task;
  439. task_alias[0]->left = NULL;
  440. task_alias[0]->ntasks = combined_worker->worker_size;
  441. int i;
  442. for(i = 1; i < combined_worker->worker_size; i++)
  443. {
  444. task_alias[i] = _starpu_task_grid_create();
  445. task_alias[i]->task = starpu_task_dup(task);
  446. task_alias[i]->left = task_alias[i-1];
  447. task_alias[i - 1]->right = task_alias[i];
  448. task_alias[i]->pntasks = &task_alias[0]->ntasks;
  449. }
  450. starpu_pthread_mutex_t * mutex_to_unlock = NULL;
  451. i = 0;
  452. do
  453. {
  454. struct _starpu_sched_node * worker_node = _starpu_sched_node_worker_get(combined_worker->combined_workerid[i]);
  455. struct _starpu_worker_node_data * worker_data = worker_node->data;
  456. struct _starpu_worker_task_list * list = worker_data->list;
  457. STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
  458. if(mutex_to_unlock)
  459. STARPU_PTHREAD_MUTEX_UNLOCK(mutex_to_unlock);
  460. mutex_to_unlock = &list->mutex;
  461. _starpu_worker_task_list_push(list, task_alias[i]);
  462. worker_node->available(worker_node);
  463. i++;
  464. }
  465. while(i < combined_worker->worker_size);
  466. STARPU_PTHREAD_MUTEX_UNLOCK(mutex_to_unlock);
  467. return 0;
  468. }
  469. static struct _starpu_sched_node * _starpu_sched_node_worker_create(int workerid)
  470. {
  471. STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
  472. if(_worker_nodes[workerid])
  473. return _worker_nodes[workerid];
  474. struct _starpu_worker * worker = _starpu_get_worker_struct(workerid);
  475. if(worker == NULL)
  476. return NULL;
  477. struct _starpu_sched_node * node = _starpu_sched_node_create();
  478. struct _starpu_worker_node_data * data = malloc(sizeof(*data));
  479. memset(data, 0, sizeof(*data));
  480. data->worker = worker;
  481. data->list = _starpu_worker_task_list_create();
  482. node->data = data;
  483. node->push_task = _starpu_sched_node_worker_push_task;
  484. node->pop_task = _starpu_sched_node_worker_pop_task;
  485. node->estimated_execute_preds = simple_worker_estimated_execute_preds;
  486. node->estimated_load = estimated_load;
  487. node->available = available_worker;
  488. node->deinit_data = worker_deinit_data;
  489. node->workers = _starpu_bitmap_create();
  490. _starpu_bitmap_set(node->workers, workerid);
  491. _worker_nodes[workerid] = node;
  492. #ifdef STARPU_HAVE_HWLOC
  493. struct _starpu_machine_config *config = _starpu_get_machine_config();
  494. struct _starpu_machine_topology *topology = &config->topology;
  495. hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->cpu_depth, worker->bindid);
  496. STARPU_ASSERT(obj);
  497. node->obj = obj;
  498. #endif
  499. return node;
  500. }
  501. static struct _starpu_sched_node * _starpu_sched_node_combined_worker_create(int workerid)
  502. {
  503. STARPU_ASSERT(0 <= workerid && workerid < STARPU_NMAXWORKERS);
  504. if(_worker_nodes[workerid])
  505. return _worker_nodes[workerid];
  506. struct _starpu_combined_worker * combined_worker = _starpu_get_combined_worker_struct(workerid);
  507. if(combined_worker == NULL)
  508. return NULL;
  509. struct _starpu_sched_node * node = _starpu_sched_node_create();
  510. struct _starpu_worker_node_data * data = malloc(sizeof(*data));
  511. memset(data, 0, sizeof(*data));
  512. data->combined_worker = combined_worker;
  513. node->data = data;
  514. node->push_task = _starpu_sched_node_combined_worker_push_task;
  515. node->pop_task = NULL;
  516. node->estimated_execute_preds = combined_worker_estimated_execute_preds;
  517. node->estimated_load = estimated_load;
  518. node->available = available_combined_worker;
  519. node->deinit_data = worker_deinit_data;
  520. node->workers = _starpu_bitmap_create();
  521. _starpu_bitmap_set(node->workers, workerid);
  522. _worker_nodes[workerid] = node;
  523. #ifdef STARPU_HAVE_HWLOC
  524. struct _starpu_machine_config *config = _starpu_get_machine_config();
  525. struct _starpu_machine_topology *topology = &config->topology;
  526. hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->cpu_depth, combined_worker->combined_workerid[0]);
  527. STARPU_ASSERT(obj);
  528. node->obj = obj;
  529. #endif
  530. return node;
  531. }
  532. int _starpu_sched_node_is_simple_worker(struct _starpu_sched_node * node)
  533. {
  534. return node->push_task == _starpu_sched_node_worker_push_task;
  535. }
  536. int _starpu_sched_node_is_combined_worker(struct _starpu_sched_node * node)
  537. {
  538. return node->push_task == _starpu_sched_node_combined_worker_push_task;
  539. }
  540. int _starpu_sched_node_is_worker(struct _starpu_sched_node * node)
  541. {
  542. return _starpu_sched_node_is_simple_worker(node)
  543. || _starpu_sched_node_is_combined_worker(node);
  544. }
  545. #ifndef STARPU_NO_ASSERT
  546. static int _worker_consistant(struct _starpu_sched_node * node)
  547. {
  548. int is_a_worker = 0;
  549. int i;
  550. for(i = 0; i<STARPU_NMAXWORKERS; i++)
  551. if(_worker_nodes[i] == node)
  552. is_a_worker = 1;
  553. if(!is_a_worker)
  554. return 0;
  555. struct _starpu_worker_node_data * data = node->data;
  556. if(data->worker)
  557. {
  558. int id = data->worker->workerid;
  559. return (_worker_nodes[id] == node)
  560. && node->nchilds == 0;
  561. }
  562. return 1;
  563. }
  564. #endif
  565. int _starpu_sched_node_worker_get_workerid(struct _starpu_sched_node * worker_node)
  566. {
  567. #ifndef STARPU_NO_ASSERT
  568. STARPU_ASSERT(_worker_consistant(worker_node));
  569. #endif
  570. return _starpu_sched_node_worker_get_worker(worker_node)->workerid;
  571. }