work_stealing_policy.c 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2016 Université de Bordeaux
  4. * Copyright (C) 2010, 2011, 2012, 2013, 2016 CNRS
  5. * Copyright (C) 2011, 2012, 2016 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. /* Work stealing policy */
  19. #include <float.h>
  20. #include <core/workers.h>
  21. #include <sched_policies/fifo_queues.h>
  22. #include <core/debug.h>
  23. #include <starpu_scheduler.h>
  24. #include <core/sched_policy.h>
  25. #include <core/debug.h>
  26. /* Experimental (dead) code which needs to be tested, fixed... */
  27. /* #define USE_OVERLOAD */
  28. /*
  29. * Experimental code for improving data cache locality:
  30. *
  31. * USE_LOCALITY:
  32. * - for each data, we record on which worker it was last accessed with the
  33. * locality flag.
  34. *
  35. * - when pushing a ready task, we choose the worker which has last accessed the
  36. * most data of the task with the locality flag.
  37. *
  38. * USE_LOCALITY_TASKS:
  39. * - for each worker, we record the locality data that the task used last (i.e. a rough
  40. * estimation of what is contained in the innermost caches).
  41. *
  42. * - for each worker, we have a hash table associating from a data handle to
  43. * all the ready tasks pushed to it that will use it with the locality flag.
  44. *
  45. * - When fetching a task from a queue, pick a task which has the biggest number
  46. * of data estimated to be contained in the cache.
  47. */
  48. //#define USE_LOCALITY
  49. //#define USE_LOCALITY_TASKS
  50. /* Maximum number of recorded locality data per task */
  51. #define MAX_LOCALITY 8
  52. /* Entry for queued_tasks_per_data: records that a queued task is accessing the data with locality flag */
  53. struct locality_entry {
  54. UT_hash_handle hh;
  55. starpu_data_handle_t data;
  56. struct starpu_task *task;
  57. };
  58. struct _starpu_work_stealing_data_per_worker
  59. {
  60. struct _starpu_fifo_taskq *queue_array;
  61. int *proxlist;
  62. starpu_pthread_mutex_t worker_mutex;
  63. int busy;
  64. #ifdef USE_LOCALITY_TASKS
  65. /* This records the same as queue_array, but hashed by data accessed with locality flag. */
  66. /* FIXME: we record only one task per data, assuming that the access is
  67. * RW, and thus only one task is ready to write to it. Do we really need to handle the R case too? */
  68. struct locality_entry *queued_tasks_per_data;
  69. /* This records the last data accessed by the worker */
  70. starpu_data_handle_t last_locality[MAX_LOCALITY];
  71. int nlast_locality;
  72. #endif
  73. };
  74. struct _starpu_work_stealing_data
  75. {
  76. int (*select_victim)(struct _starpu_work_stealing_data *, unsigned, int);
  77. struct _starpu_work_stealing_data_per_worker *per_worker;
  78. /* keep track of the work performed from the beginning of the algorithm to make
  79. * better decisions about which queue to select when stealing or deferring work
  80. */
  81. unsigned last_pop_worker;
  82. unsigned last_push_worker;
  83. };
  84. #ifdef USE_OVERLOAD
  85. /**
  86. * Minimum number of task we wait for being processed before we start assuming
  87. * on which worker the computation would be faster.
  88. */
  89. static int calibration_value = 0;
  90. #endif /* USE_OVERLOAD */
  91. /**
  92. * Return a worker from which a task can be stolen.
  93. * Selecting a worker is done in a round-robin fashion, unless
  94. * the worker previously selected doesn't own any task,
  95. * then we return the first non-empty worker.
  96. */
  97. static int select_victim_round_robin(struct _starpu_work_stealing_data *ws, unsigned sched_ctx_id)
  98. {
  99. unsigned worker = ws->last_pop_worker;
  100. unsigned nworkers;
  101. int *workerids = NULL;
  102. nworkers = starpu_sched_ctx_get_workers_list_raw(sched_ctx_id, &workerids);
  103. unsigned ntasks = 0;
  104. /* If the worker's queue is empty, let's try
  105. * the next ones */
  106. while (1)
  107. {
  108. /* Here helgrind would shout that this is unprotected, but we
  109. * are fine with getting outdated values, this is just an
  110. * estimation */
  111. ntasks = ws->per_worker[workerids[worker]].queue_array->ntasks;
  112. if (ntasks && ws->per_worker[workerids[worker]].busy)
  113. break;
  114. worker = (worker + 1) % nworkers;
  115. if (worker == ws->last_pop_worker)
  116. {
  117. /* We got back to the first worker,
  118. * don't go in infinite loop */
  119. ntasks = 0;
  120. break;
  121. }
  122. }
  123. ws->last_pop_worker = (worker + 1) % nworkers;
  124. worker = workerids[worker];
  125. if (ntasks)
  126. return worker;
  127. else
  128. return -1;
  129. }
  130. /**
  131. * Return a worker to whom add a task.
  132. * Selecting a worker is done in a round-robin fashion.
  133. */
  134. static unsigned select_worker_round_robin(struct _starpu_work_stealing_data *ws, struct starpu_task *task, unsigned sched_ctx_id)
  135. {
  136. unsigned worker;
  137. unsigned nworkers;
  138. int *workerids;
  139. nworkers = starpu_sched_ctx_get_workers_list_raw(sched_ctx_id, &workerids);
  140. worker = ws->last_push_worker;
  141. do
  142. worker = (worker + 1) % nworkers;
  143. while (!starpu_worker_can_execute_task_first_impl(workerids[worker], task, NULL));
  144. ws->last_push_worker = worker;
  145. return workerids[worker];
  146. }
  147. #ifdef USE_LOCALITY
  148. /* Select a worker according to the locality of the data of the task to be scheduled */
  149. static unsigned select_worker_locality(struct _starpu_work_stealing_data *ws, struct starpu_task *task, unsigned sched_ctx_id)
  150. {
  151. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  152. if (nbuffers == 0)
  153. return -1;
  154. unsigned i, n;
  155. unsigned ndata[STARPU_NMAXWORKERS] = { 0 };
  156. int best_worker = -1;
  157. unsigned best_ndata = 0;
  158. n = 0;
  159. for (i = 0; i < nbuffers; i++)
  160. if (STARPU_TASK_GET_MODE(task, i) & STARPU_LOCALITY)
  161. {
  162. starpu_data_handle_t data = STARPU_TASK_GET_HANDLE(task, i);
  163. int locality = data->last_locality;
  164. if (locality >= 0)
  165. ndata[locality]++;
  166. n++;
  167. }
  168. if (n)
  169. {
  170. /* Some locality buffers, choose worker which has most of them */
  171. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  172. struct starpu_sched_ctx_iterator it;
  173. workers->init_iterator(workers, &it);
  174. while(workers->has_next(workers, &it))
  175. {
  176. int workerid = workers->get_next(workers, &it);
  177. if (ndata[workerid] > best_ndata && ws->per_worker[workerid].busy)
  178. {
  179. best_worker = workerid;
  180. best_ndata = ndata[workerid];
  181. }
  182. }
  183. }
  184. return best_worker;
  185. }
  186. /* Record in the data which worker will handle the task with the locality flag */
  187. static void record_data_locality(struct starpu_task *task, int workerid)
  188. {
  189. /* Record where in locality data where the task went */
  190. unsigned i;
  191. for (i = 0; i < STARPU_TASK_GET_NBUFFERS(task); i++)
  192. if (STARPU_TASK_GET_MODE(task, i) & STARPU_LOCALITY)
  193. {
  194. STARPU_TASK_GET_HANDLE(task, i)->last_locality = workerid;
  195. }
  196. }
  197. #else
  198. static void record_data_locality(struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED)
  199. {
  200. }
  201. #endif
  202. #ifdef USE_LOCALITY_TASKS
  203. /* Record in the worker which data it used last with the locality flag */
  204. static void record_worker_locality(struct _starpu_work_stealing_data *ws, struct starpu_task *task, int workerid, unsigned sched_ctx_id)
  205. {
  206. /* Record where in locality data where the task went */
  207. unsigned i;
  208. struct _starpu_work_stealing_data_per_worker *data = &ws->per_worker[workerid];
  209. data->nlast_locality = 0;
  210. for (i = 0; i < STARPU_TASK_GET_NBUFFERS(task); i++)
  211. if (STARPU_TASK_GET_MODE(task, i) & STARPU_LOCALITY)
  212. {
  213. data->last_locality[data->nlast_locality] = STARPU_TASK_GET_HANDLE(task, i);
  214. data->nlast_locality++;
  215. if (data->nlast_locality == MAX_LOCALITY)
  216. break;
  217. }
  218. }
  219. /* Called when pushing a task to a queue */
  220. static void locality_pushed_task(struct _starpu_work_stealing_data *ws, struct starpu_task *task, int workerid, unsigned sched_ctx_id)
  221. {
  222. struct _starpu_work_stealing_data_per_worker *data = &ws->per_worker[workerid];
  223. unsigned i;
  224. for (i = 0; i < STARPU_TASK_GET_NBUFFERS(task); i++)
  225. if (STARPU_TASK_GET_MODE(task, i) & STARPU_LOCALITY)
  226. {
  227. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  228. struct locality_entry *entry;
  229. HASH_FIND_PTR(data->queued_tasks_per_data, &handle, entry);
  230. if (STARPU_LIKELY(!entry))
  231. {
  232. STARPU_MALLOC(entry, sizeof(*entry));
  233. entry->data = handle;
  234. entry->task = task;
  235. HASH_ADD_PTR(data->queued_tasks_per_data, data, entry);
  236. }
  237. }
  238. }
  239. /* Pick a task from workerid's queue, for execution on target */
  240. static struct starpu_task *ws_pick_task(struct _starpu_work_stealing_data *ws, int source, int target)
  241. {
  242. struct _starpu_work_stealing_data_per_worker *data_source = &ws->per_worker[source];
  243. struct _starpu_work_stealing_data_per_worker *data_target = &ws->per_worker[target];
  244. unsigned i, j, n = data_target->nlast_locality;
  245. struct starpu_task *(tasks[MAX_LOCALITY]) = { NULL }, *best_task;
  246. int ntasks[MAX_LOCALITY] = { 0 }, best_n; /* Number of locality data for this worker used by this task */
  247. /* Look at the last data accessed by this worker */
  248. STARPU_ASSERT(n < MAX_LOCALITY);
  249. for (i = 0; i < n; i++)
  250. {
  251. starpu_data_handle_t handle = data_target->last_locality[i];
  252. struct locality_entry *entry;
  253. HASH_FIND_PTR(data_source->queued_tasks_per_data, &handle, entry);
  254. if (entry)
  255. {
  256. /* Record task */
  257. tasks[i] = entry->task;
  258. ntasks[i] = 1;
  259. /* And increment counter of the same task */
  260. for (j = 0; j < i; j++)
  261. {
  262. if (tasks[j] == tasks[i])
  263. {
  264. ntasks[j]++;
  265. break;
  266. }
  267. }
  268. }
  269. }
  270. /* Now find the task with most locality data for this worker */
  271. best_n = 0;
  272. for (i = 0; i < n; i++)
  273. {
  274. if (ntasks[i] > best_n)
  275. {
  276. best_task = tasks[i];
  277. best_n = ntasks[i];
  278. }
  279. }
  280. if (best_n > 0)
  281. {
  282. /* found an interesting task, try to pick it! */
  283. if (_starpu_fifo_pop_this_task(data_source->queue_array, target, best_task))
  284. return best_task;
  285. }
  286. /* Didn't find an interesting task, or couldn't run it :( */
  287. return _starpu_fifo_pop_task(data_source->queue_array, target);
  288. }
  289. /* Called when popping a task from a queue */
  290. static void locality_popped_task(struct _starpu_work_stealing_data *ws, struct starpu_task *task, int workerid, unsigned sched_ctx_id)
  291. {
  292. struct _starpu_work_stealing_data_per_worker *data = &ws->per_worker[workerid];
  293. unsigned i;
  294. for (i = 0; i < STARPU_TASK_GET_NBUFFERS(task); i++)
  295. if (STARPU_TASK_GET_MODE(task, i) & STARPU_LOCALITY)
  296. {
  297. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  298. struct locality_entry *entry;
  299. HASH_FIND_PTR(data->queued_tasks_per_data, &handle, entry);
  300. if (STARPU_LIKELY(entry))
  301. {
  302. if (entry->task == task)
  303. {
  304. HASH_DEL(data->queued_tasks_per_data, entry);
  305. free(entry);
  306. }
  307. }
  308. }
  309. }
  310. #else
  311. static void record_worker_locality(struct _starpu_work_stealing_data *ws STARPU_ATTRIBUTE_UNUSED, struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
  312. {
  313. }
  314. /* Called when pushing a task to a queue */
  315. static void locality_pushed_task(struct _starpu_work_stealing_data *ws STARPU_ATTRIBUTE_UNUSED, struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
  316. {
  317. }
  318. /* Pick a task from workerid's queue, for execution on target */
  319. static struct starpu_task *ws_pick_task(struct _starpu_work_stealing_data *ws, int source, int target)
  320. {
  321. return _starpu_fifo_pop_task(ws->per_worker[source].queue_array, target);
  322. }
  323. /* Called when popping a task from a queue */
  324. static void locality_popped_task(struct _starpu_work_stealing_data *ws STARPU_ATTRIBUTE_UNUSED, struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
  325. {
  326. }
  327. #endif
  328. #ifdef USE_OVERLOAD
  329. /**
  330. * Return a ratio helpful to determine whether a worker is suitable to steal
  331. * tasks from or to put some tasks in its queue.
  332. *
  333. * \return a ratio with a positive or negative value, describing the current state of the worker :
  334. * a smaller value implies a faster worker with an relatively emptier queue : more suitable to put tasks in
  335. * a bigger value implies a slower worker with an reletively more replete queue : more suitable to steal tasks from
  336. */
  337. static float overload_metric(struct _starpu_work_stealing_data *ws, unsigned sched_ctx_id, unsigned id)
  338. {
  339. float execution_ratio = 0.0f;
  340. float current_ratio = 0.0f;
  341. int nprocessed = _starpu_get_deque_nprocessed(ws->per_worker[id].queue_array);
  342. unsigned njobs = _starpu_get_deque_njobs(ws->per_worker[id].queue_array);
  343. /* Did we get enough information ? */
  344. if (ws->performed_total > 0 && nprocessed > 0)
  345. {
  346. /* How fast or slow is the worker compared to the other workers */
  347. execution_ratio = (float) nprocessed / ws->performed_total;
  348. /* How replete is its queue */
  349. current_ratio = (float) njobs / nprocessed;
  350. }
  351. else
  352. {
  353. return 0.0f;
  354. }
  355. return (current_ratio - execution_ratio);
  356. }
  357. /**
  358. * Return the most suitable worker from which a task can be stolen.
  359. * The number of previously processed tasks, total and local,
  360. * and the number of tasks currently awaiting to be processed
  361. * by the tasks are taken into account to select the most suitable
  362. * worker to steal task from.
  363. */
  364. static int select_victim_overload(struct _starpu_work_stealing_data *ws, unsigned sched_ctx_id)
  365. {
  366. unsigned worker;
  367. float worker_ratio;
  368. unsigned best_worker = 0;
  369. float best_ratio = FLT_MIN;
  370. /* Don't try to play smart until we get
  371. * enough informations. */
  372. if (ws->performed_total < calibration_value)
  373. return select_victim_round_robin(ws, sched_ctx_id);
  374. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  375. struct starpu_sched_ctx_iterator it;
  376. workers->init_iterator(workers, &it);
  377. while(workers->has_next(workers, &it))
  378. {
  379. worker = workers->get_next(workers, &it);
  380. worker_ratio = overload_metric(ws, sched_ctx_id, worker);
  381. if (worker_ratio > best_ratio && ws->per_worker[worker].busy)
  382. {
  383. best_worker = worker;
  384. best_ratio = worker_ratio;
  385. }
  386. }
  387. return best_worker;
  388. }
  389. /**
  390. * Return the most suitable worker to whom add a task.
  391. * The number of previously processed tasks, total and local,
  392. * and the number of tasks currently awaiting to be processed
  393. * by the tasks are taken into account to select the most suitable
  394. * worker to add a task to.
  395. */
  396. static unsigned select_worker_overload(struct _starpu_work_stealing_data *ws, struct starpu_task *task, unsigned sched_ctx_id)
  397. {
  398. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  399. unsigned worker;
  400. float worker_ratio;
  401. unsigned best_worker = 0;
  402. float best_ratio = FLT_MAX;
  403. /* Don't try to play smart until we get
  404. * enough informations. */
  405. if (ws->performed_total < calibration_value)
  406. return select_worker_round_robin(task, sched_ctx_id);
  407. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  408. struct starpu_sched_ctx_iterator it;
  409. workers->init_iterator(workers, &it);
  410. while(workers->has_next(workers, &it))
  411. {
  412. worker = workers->get_next(workers, &it);
  413. worker_ratio = overload_metric(ws, sched_ctx_id, worker);
  414. if (worker_ratio < best_ratio &&
  415. starpu_worker_can_execute_task_first_impl(worker, task, NULL))
  416. {
  417. best_worker = worker;
  418. best_ratio = worker_ratio;
  419. }
  420. }
  421. return best_worker;
  422. }
  423. #endif /* USE_OVERLOAD */
  424. /**
  425. * Return a worker from which a task can be stolen.
  426. * This is a phony function used to call the right
  427. * function depending on the value of USE_OVERLOAD.
  428. */
  429. static inline int select_victim(struct _starpu_work_stealing_data *ws, unsigned sched_ctx_id,
  430. int workerid STARPU_ATTRIBUTE_UNUSED)
  431. {
  432. #ifdef USE_OVERLOAD
  433. return select_victim_overload(ws, sched_ctx_id);
  434. #else
  435. return select_victim_round_robin(ws, sched_ctx_id);
  436. #endif /* USE_OVERLOAD */
  437. }
  438. /**
  439. * Return a worker from which a task can be stolen.
  440. * This is a phony function used to call the right
  441. * function depending on the value of USE_OVERLOAD.
  442. */
  443. static inline unsigned select_worker(struct _starpu_work_stealing_data *ws, struct starpu_task *task, unsigned sched_ctx_id)
  444. {
  445. #ifdef USE_OVERLOAD
  446. return select_worker_overload(ws, task, sched_ctx_id);
  447. #else
  448. return select_worker_round_robin(ws, task, sched_ctx_id);
  449. #endif /* USE_OVERLOAD */
  450. }
  451. /* Note: this is not scalable work stealing, use lws instead */
  452. static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
  453. {
  454. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  455. struct starpu_task *task = NULL;
  456. unsigned workerid = starpu_worker_get_id_check();
  457. ws->per_worker[workerid].busy = 0;
  458. #ifdef STARPU_NON_BLOCKING_DRIVERS
  459. if (STARPU_RUNNING_ON_VALGRIND || !_starpu_fifo_empty(ws->per_worker[workerid].queue_array))
  460. #endif
  461. {
  462. STARPU_PTHREAD_MUTEX_LOCK(&ws->per_worker[workerid].worker_mutex);
  463. task = ws_pick_task(ws, workerid, workerid);
  464. if (task)
  465. locality_popped_task(ws, task, workerid, sched_ctx_id);
  466. STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[workerid].worker_mutex);
  467. }
  468. if (task)
  469. {
  470. /* there was a local task */
  471. ws->per_worker[workerid].busy = 1;
  472. return task;
  473. }
  474. starpu_pthread_mutex_t *sched_mutex;
  475. starpu_pthread_cond_t *sched_cond;
  476. starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
  477. /* While stealing, relieve mutex used to synchronize with pushers */
  478. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
  479. /* we need to steal someone's job */
  480. int victim = ws->select_victim(ws, sched_ctx_id, workerid);
  481. if (victim == -1)
  482. {
  483. STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
  484. return NULL;
  485. }
  486. if (STARPU_PTHREAD_MUTEX_TRYLOCK(&ws->per_worker[victim].worker_mutex))
  487. {
  488. /* victim is busy, don't bother it, come back later */
  489. STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
  490. return NULL;
  491. }
  492. if (ws->per_worker[victim].queue_array != NULL && ws->per_worker[victim].queue_array->ntasks > 0)
  493. {
  494. task = ws_pick_task(ws, victim, workerid);
  495. }
  496. if (task)
  497. {
  498. _STARPU_TRACE_WORK_STEALING(workerid, victim);
  499. _STARPU_TASK_BREAK_ON(task, sched);
  500. record_data_locality(task, workerid);
  501. record_worker_locality(ws, task, workerid, sched_ctx_id);
  502. locality_popped_task(ws, task, victim, sched_ctx_id);
  503. }
  504. STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[victim].worker_mutex);
  505. /* Done with stealing, resynchronize with core */
  506. STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
  507. ws->per_worker[workerid].busy = !!task;
  508. return task;
  509. }
  510. static
  511. int ws_push_task(struct starpu_task *task)
  512. {
  513. unsigned sched_ctx_id = task->sched_ctx;
  514. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  515. int workerid = -1;
  516. #ifdef USE_LOCALITY
  517. workerid = select_worker_locality(ws, task, sched_ctx_id);
  518. #endif
  519. if (workerid == -1)
  520. workerid = starpu_worker_get_id();
  521. /* If the current thread is not a worker but
  522. * the main thread (-1) or the current worker is not in the target
  523. * context, we find the better one to put task on its queue */
  524. if (workerid == -1 || !starpu_sched_ctx_contains_worker(workerid, sched_ctx_id) ||
  525. !starpu_worker_can_execute_task_first_impl(workerid, task, NULL))
  526. workerid = select_worker(ws, task, sched_ctx_id);
  527. starpu_pthread_mutex_t *sched_mutex;
  528. starpu_pthread_cond_t *sched_cond;
  529. starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
  530. STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
  531. STARPU_AYU_ADDTOTASKQUEUE(_starpu_get_job_associated_to_task(task)->job_id, workerid);
  532. STARPU_PTHREAD_MUTEX_LOCK(&ws->per_worker[workerid].worker_mutex);
  533. _STARPU_TASK_BREAK_ON(task, sched);
  534. record_data_locality(task, workerid);
  535. _starpu_fifo_push_task(ws->per_worker[workerid].queue_array, task);
  536. locality_pushed_task(ws, task, workerid, sched_ctx_id);
  537. STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[workerid].worker_mutex);
  538. starpu_push_task_end(task);
  539. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
  540. #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
  541. /* TODO: implement fine-grain signaling, similar to what eager does */
  542. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  543. struct starpu_sched_ctx_iterator it;
  544. workers->init_iterator(workers, &it);
  545. while(workers->has_next(workers, &it))
  546. starpu_wake_worker(workers->get_next(workers, &it));
  547. #endif
  548. return 0;
  549. }
  550. static void ws_add_workers(unsigned sched_ctx_id, int *workerids,unsigned nworkers)
  551. {
  552. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  553. unsigned i;
  554. int workerid;
  555. for (i = 0; i < nworkers; i++)
  556. {
  557. workerid = workerids[i];
  558. starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
  559. ws->per_worker[workerid].queue_array = _starpu_create_fifo();
  560. /* Tell helgrid that we are fine with getting outdated values,
  561. * this is just an estimation */
  562. STARPU_HG_DISABLE_CHECKING(ws->per_worker[workerid].queue_array->ntasks);
  563. STARPU_PTHREAD_MUTEX_INIT(&ws->per_worker[workerid].worker_mutex, NULL);
  564. ws->per_worker[workerid].busy = 0;
  565. STARPU_HG_DISABLE_CHECKING(ws->per_worker[workerid].busy);
  566. }
  567. }
  568. static void ws_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  569. {
  570. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  571. unsigned i;
  572. int workerid;
  573. for (i = 0; i < nworkers; i++)
  574. {
  575. workerid = workerids[i];
  576. if (ws->per_worker[workerid].queue_array != NULL)
  577. {
  578. _starpu_destroy_fifo(ws->per_worker[workerid].queue_array);
  579. ws->per_worker[workerid].queue_array = NULL;
  580. }
  581. free(ws->per_worker[workerid].proxlist);
  582. ws->per_worker[workerid].proxlist = NULL;
  583. STARPU_PTHREAD_MUTEX_DESTROY(&ws->per_worker[workerid].worker_mutex);
  584. }
  585. }
  586. static void initialize_ws_policy(unsigned sched_ctx_id)
  587. {
  588. struct _starpu_work_stealing_data *ws;
  589. STARPU_MALLOC(ws, sizeof(struct _starpu_work_stealing_data));
  590. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)ws);
  591. ws->last_pop_worker = 0;
  592. ws->last_push_worker = 0;
  593. STARPU_HG_DISABLE_CHECKING(ws->last_pop_worker);
  594. STARPU_HG_DISABLE_CHECKING(ws->last_push_worker);
  595. ws->select_victim = select_victim;
  596. unsigned nw = starpu_worker_get_count();
  597. STARPU_CALLOC(ws->per_worker, nw, sizeof(struct _starpu_work_stealing_data_per_worker));
  598. }
  599. static void deinit_ws_policy(unsigned sched_ctx_id)
  600. {
  601. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  602. free(ws->per_worker);
  603. free(ws);
  604. }
  605. struct starpu_sched_policy _starpu_sched_ws_policy =
  606. {
  607. .init_sched = initialize_ws_policy,
  608. .deinit_sched = deinit_ws_policy,
  609. .add_workers = ws_add_workers,
  610. .remove_workers = ws_remove_workers,
  611. .push_task = ws_push_task,
  612. .pop_task = ws_pop_task,
  613. .pre_exec_hook = NULL,
  614. .post_exec_hook = NULL,
  615. .pop_every_task = NULL,
  616. .policy_name = "ws",
  617. .policy_description = "work stealing",
  618. .worker_type = STARPU_WORKER_LIST,
  619. };
  620. /* local work stealing policy */
  621. /* Return a worker to steal a task from. The worker is selected according to
  622. * the proximity list built using the info on te architecture provided by hwloc
  623. */
  624. #ifdef STARPU_HAVE_HWLOC
  625. static int lws_select_victim(struct _starpu_work_stealing_data *ws, unsigned sched_ctx_id, int workerid)
  626. {
  627. int nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
  628. int neighbor;
  629. int i;
  630. for (i = 0; i < nworkers; i++)
  631. {
  632. neighbor = ws->per_worker[workerid].proxlist[i];
  633. int ntasks = ws->per_worker[neighbor].queue_array->ntasks;
  634. if (ntasks && ws->per_worker[workerid].busy)
  635. return neighbor;
  636. }
  637. return -1;
  638. }
  639. #endif
  640. static void lws_add_workers(unsigned sched_ctx_id, int *workerids,
  641. unsigned nworkers)
  642. {
  643. ws_add_workers(sched_ctx_id, workerids, nworkers);
  644. #ifdef STARPU_HAVE_HWLOC
  645. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  646. /* Build a proximity list for every worker. It is cheaper to
  647. * build this once and then use it for popping tasks rather
  648. * than traversing the hwloc tree every time a task must be
  649. * stolen */
  650. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  651. struct starpu_tree *tree = (struct starpu_tree*)workers->collection_private;
  652. int workerid;
  653. unsigned i;
  654. for (i = 0; i < nworkers; i++)
  655. {
  656. workerid = workerids[i];
  657. STARPU_MALLOC(ws->per_worker[workerid].proxlist, nworkers*sizeof(int));
  658. int bindid;
  659. struct starpu_tree *neighbour = NULL;
  660. struct starpu_sched_ctx_iterator it;
  661. workers->init_iterator(workers, &it);
  662. bindid = starpu_worker_get_bindid(workerid);
  663. it.value = starpu_tree_get(tree, bindid);
  664. int cnt = 0;
  665. for(;;)
  666. {
  667. neighbour = (struct starpu_tree*)it.value;
  668. int *neigh_workerids;
  669. int neigh_nworkers = starpu_bindid_get_workerids(neighbour->id, &neigh_workerids);
  670. int w;
  671. for(w = 0; w < neigh_nworkers; w++)
  672. {
  673. if(!it.visited[neigh_workerids[w]] && workers->present[neigh_workerids[w]])
  674. {
  675. ws->per_worker[workerid].proxlist[cnt++] = neigh_workerids[w];
  676. it.visited[neigh_workerids[w]] = 1;
  677. }
  678. }
  679. if(!workers->has_next(workers, &it))
  680. break;
  681. it.value = it.possible_value;
  682. it.possible_value = NULL;
  683. }
  684. }
  685. #endif
  686. }
  687. static void initialize_lws_policy(unsigned sched_ctx_id)
  688. {
  689. /* lws is loosely based on ws, except that it might use hwloc. */
  690. initialize_ws_policy(sched_ctx_id);
  691. #ifdef STARPU_HAVE_HWLOC
  692. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data *)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  693. ws->select_victim = lws_select_victim;
  694. #endif
  695. }
  696. struct starpu_sched_policy _starpu_sched_lws_policy =
  697. {
  698. .init_sched = initialize_lws_policy,
  699. .deinit_sched = deinit_ws_policy,
  700. .add_workers = lws_add_workers,
  701. .remove_workers = ws_remove_workers,
  702. .push_task = ws_push_task,
  703. .pop_task = ws_pop_task,
  704. .pre_exec_hook = NULL,
  705. .post_exec_hook = NULL,
  706. .pop_every_task = NULL,
  707. .policy_name = "lws",
  708. .policy_description = "locality work stealing",
  709. #ifdef STARPU_HAVE_HWLOC
  710. .worker_type = STARPU_WORKER_TREE,
  711. #else
  712. .worker_type = STARPU_WORKER_LIST,
  713. #endif
  714. };