work_stealing_policy.c 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2008-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. /* Work stealing policy */
  17. #include <float.h>
  18. #include <limits.h>
  19. #include <core/workers.h>
  20. #include <sched_policies/prio_deque.h>
  21. #include <core/debug.h>
  22. #include <starpu_scheduler.h>
  23. #include <core/sched_policy.h>
  24. #include <core/debug.h>
  25. #include <core/task.h>
  26. /* Experimental (dead) code which needs to be tested, fixed... */
  27. /* #define USE_OVERLOAD */
  28. /*
  29. * Experimental code for improving data cache locality:
  30. *
  31. * USE_LOCALITY:
  32. * - for each data, we record on which worker it was last accessed with the
  33. * locality flag.
  34. *
  35. * - when pushing a ready task, we choose the worker which has last accessed the
  36. * most data of the task with the locality flag.
  37. *
  38. * USE_LOCALITY_TASKS:
  39. * - for each worker, we record the locality data that the task used last (i.e. a rough
  40. * estimation of what is contained in the innermost caches).
  41. *
  42. * - for each worker, we have a hash table associating from a data handle to
  43. * all the ready tasks pushed to it that will use it with the locality flag.
  44. *
  45. * - When fetching a task from a queue, pick a task which has the biggest number
  46. * of data estimated to be contained in the cache.
  47. */
  48. //#define USE_LOCALITY
  49. //#define USE_LOCALITY_TASKS
  50. /* Maximum number of recorded locality data per task */
  51. #define MAX_LOCALITY 8
  52. /* Entry for queued_tasks_per_data: records that a queued task is accessing the data with locality flag */
  53. #ifdef USE_LOCALITY_TASKS
  54. struct locality_entry
  55. {
  56. UT_hash_handle hh;
  57. starpu_data_handle_t data;
  58. struct starpu_task *task;
  59. };
  60. #endif
  61. struct _starpu_work_stealing_data_per_worker
  62. {
  63. char fill1[STARPU_CACHELINE_SIZE];
  64. /* This is read-mostly, only updated when the queue becomes empty or
  65. * becomes non-empty, to make it generally cheap to check */
  66. unsigned notask; /* whether the queue is empty */
  67. char fill2[STARPU_CACHELINE_SIZE];
  68. struct _starpu_prio_deque queue;
  69. int running;
  70. int *proxlist;
  71. int busy; /* Whether this worker is working on a task */
  72. /* keep track of the work performed from the beginning of the algorithm to make
  73. * better decisions about which queue to select when deferring work
  74. */
  75. unsigned last_pop_worker;
  76. #ifdef USE_LOCALITY_TASKS
  77. /* This records the same as queue, but hashed by data accessed with locality flag. */
  78. /* FIXME: we record only one task per data, assuming that the access is
  79. * RW, and thus only one task is ready to write to it. Do we really need to handle the R case too? */
  80. struct locality_entry *queued_tasks_per_data;
  81. /* This records the last data accessed by the worker */
  82. starpu_data_handle_t last_locality[MAX_LOCALITY];
  83. int nlast_locality;
  84. #endif
  85. };
  86. struct _starpu_work_stealing_data
  87. {
  88. int (*select_victim)(struct _starpu_work_stealing_data *, unsigned, int);
  89. struct _starpu_work_stealing_data_per_worker *per_worker;
  90. /* keep track of the work performed from the beginning of the algorithm to make
  91. * better decisions about which queue to select when deferring work
  92. */
  93. unsigned last_push_worker;
  94. };
  95. #ifdef USE_OVERLOAD
  96. /**
  97. * Minimum number of task we wait for being processed before we start assuming
  98. * on which worker the computation would be faster.
  99. */
  100. static int calibration_value = 0;
  101. #endif /* USE_OVERLOAD */
  102. /**
  103. * Return a worker from which a task can be stolen.
  104. * Selecting a worker is done in a round-robin fashion, unless
  105. * the worker previously selected doesn't own any task,
  106. * then we return the first non-empty worker.
  107. */
  108. static int select_victim_round_robin(struct _starpu_work_stealing_data *ws, unsigned sched_ctx_id)
  109. {
  110. unsigned workerid = starpu_worker_get_id_check();
  111. unsigned worker = ws->per_worker[workerid].last_pop_worker;
  112. unsigned nworkers;
  113. int *workerids = NULL;
  114. nworkers = starpu_sched_ctx_get_workers_list_raw(sched_ctx_id, &workerids);
  115. unsigned ntasks = 0;
  116. /* If the worker's queue is empty, let's try
  117. * the next ones */
  118. while (1)
  119. {
  120. /* Here helgrind would shout that this is unprotected, but we
  121. * are fine with getting outdated values, this is just an
  122. * estimation */
  123. if (!ws->per_worker[workerids[worker]].notask)
  124. {
  125. if (ws->per_worker[workerids[worker]].busy
  126. || starpu_worker_is_blocked_in_parallel(workerids[worker])) {
  127. ntasks = 1;
  128. break;
  129. }
  130. }
  131. worker = (worker + 1) % nworkers;
  132. if (worker == ws->per_worker[workerid].last_pop_worker)
  133. {
  134. /* We got back to the first worker,
  135. * don't go in infinite loop */
  136. ntasks = 0;
  137. break;
  138. }
  139. }
  140. ws->per_worker[workerid].last_pop_worker = (worker + 1) % nworkers;
  141. worker = workerids[worker];
  142. if (ntasks)
  143. return worker;
  144. else
  145. return -1;
  146. }
  147. /**
  148. * Return a worker to whom add a task.
  149. * Selecting a worker is done in a round-robin fashion.
  150. */
  151. static unsigned select_worker_round_robin(struct _starpu_work_stealing_data *ws, struct starpu_task *task, unsigned sched_ctx_id)
  152. {
  153. unsigned worker;
  154. unsigned nworkers;
  155. int *workerids;
  156. nworkers = starpu_sched_ctx_get_workers_list_raw(sched_ctx_id, &workerids);
  157. worker = ws->last_push_worker;
  158. do
  159. worker = (worker + 1) % nworkers;
  160. while (!ws->per_worker[workerids[worker]].running || !starpu_worker_can_execute_task_first_impl(workerids[worker], task, NULL));
  161. ws->last_push_worker = worker;
  162. return workerids[worker];
  163. }
  164. #ifdef USE_LOCALITY
  165. /* Select a worker according to the locality of the data of the task to be scheduled */
  166. static unsigned select_worker_locality(struct _starpu_work_stealing_data *ws, struct starpu_task *task, unsigned sched_ctx_id)
  167. {
  168. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  169. if (nbuffers == 0)
  170. return -1;
  171. unsigned i, n;
  172. unsigned ndata[STARPU_NMAXWORKERS] = { 0 };
  173. int best_worker = -1;
  174. n = 0;
  175. for (i = 0; i < nbuffers; i++)
  176. {
  177. if (STARPU_TASK_GET_MODE(task, i) & STARPU_LOCALITY)
  178. {
  179. starpu_data_handle_t data = STARPU_TASK_GET_HANDLE(task, i);
  180. int locality = data->last_locality;
  181. if (locality >= 0)
  182. ndata[locality]++;
  183. n++;
  184. }
  185. }
  186. if (n)
  187. {
  188. /* Some locality buffers, choose worker which has most of them */
  189. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  190. struct starpu_sched_ctx_iterator it;
  191. unsigned best_ndata = 0;
  192. workers->init_iterator(workers, &it);
  193. while(workers->has_next(workers, &it))
  194. {
  195. int workerid = workers->get_next(workers, &it);
  196. if (ndata[workerid] > best_ndata && ws->per_worker[workerid].running && ws->per_worker[workerid].busy)
  197. {
  198. best_worker = workerid;
  199. best_ndata = ndata[workerid];
  200. }
  201. }
  202. }
  203. return best_worker;
  204. }
  205. /* Record in the data which worker will handle the task with the locality flag */
  206. static void record_data_locality(struct starpu_task *task, int workerid)
  207. {
  208. /* Record where in locality data where the task went */
  209. unsigned i;
  210. for (i = 0; i < STARPU_TASK_GET_NBUFFERS(task); i++)
  211. if (STARPU_TASK_GET_MODE(task, i) & STARPU_LOCALITY)
  212. {
  213. STARPU_TASK_GET_HANDLE(task, i)->last_locality = workerid;
  214. }
  215. }
  216. #else
  217. static void record_data_locality(struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED)
  218. {
  219. }
  220. #endif
  221. #ifdef USE_LOCALITY_TASKS
  222. /* Record in the worker which data it used last with the locality flag */
  223. static void record_worker_locality(struct _starpu_work_stealing_data *ws, struct starpu_task *task, int workerid, unsigned sched_ctx_id)
  224. {
  225. /* Record where in locality data where the task went */
  226. unsigned i;
  227. struct _starpu_work_stealing_data_per_worker *data = &ws->per_worker[workerid];
  228. data->nlast_locality = 0;
  229. for (i = 0; i < STARPU_TASK_GET_NBUFFERS(task); i++)
  230. if (STARPU_TASK_GET_MODE(task, i) & STARPU_LOCALITY)
  231. {
  232. data->last_locality[data->nlast_locality] = STARPU_TASK_GET_HANDLE(task, i);
  233. data->nlast_locality++;
  234. if (data->nlast_locality == MAX_LOCALITY)
  235. break;
  236. }
  237. }
  238. /* Called when pushing a task to a queue */
  239. static void locality_pushed_task(struct _starpu_work_stealing_data *ws, struct starpu_task *task, int workerid, unsigned sched_ctx_id)
  240. {
  241. struct _starpu_work_stealing_data_per_worker *data = &ws->per_worker[workerid];
  242. unsigned i;
  243. for (i = 0; i < STARPU_TASK_GET_NBUFFERS(task); i++)
  244. if (STARPU_TASK_GET_MODE(task, i) & STARPU_LOCALITY)
  245. {
  246. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  247. struct locality_entry *entry;
  248. HASH_FIND_PTR(data->queued_tasks_per_data, &handle, entry);
  249. if (STARPU_LIKELY(!entry))
  250. {
  251. _STARPU_MALLOC(entry, sizeof(*entry));
  252. entry->data = handle;
  253. entry->task = task;
  254. HASH_ADD_PTR(data->queued_tasks_per_data, data, entry);
  255. }
  256. }
  257. }
  258. /* Pick a task from workerid's queue, for execution on target */
  259. static struct starpu_task *ws_pick_task(struct _starpu_work_stealing_data *ws, int source, int target)
  260. {
  261. struct _starpu_work_stealing_data_per_worker *data_source = &ws->per_worker[source];
  262. struct _starpu_work_stealing_data_per_worker *data_target = &ws->per_worker[target];
  263. unsigned i, j, n = data_target->nlast_locality;
  264. struct starpu_task *(tasks[MAX_LOCALITY]) = { NULL }, *best_task = NULL;
  265. int ntasks[MAX_LOCALITY] = { 0 }, best_n; /* Number of locality data for this worker used by this task */
  266. /* Look at the last data accessed by this worker */
  267. STARPU_ASSERT(n < MAX_LOCALITY);
  268. for (i = 0; i < n; i++)
  269. {
  270. starpu_data_handle_t handle = data_target->last_locality[i];
  271. struct locality_entry *entry;
  272. HASH_FIND_PTR(data_source->queued_tasks_per_data, &handle, entry);
  273. if (entry)
  274. {
  275. /* Record task */
  276. tasks[i] = entry->task;
  277. ntasks[i] = 1;
  278. /* And increment counter of the same task */
  279. for (j = 0; j < i; j++)
  280. {
  281. if (tasks[j] == tasks[i])
  282. {
  283. ntasks[j]++;
  284. break;
  285. }
  286. }
  287. }
  288. }
  289. /* Now find the task with most locality data for this worker */
  290. best_n = 0;
  291. for (i = 0; i < n; i++)
  292. {
  293. if (ntasks[i] > best_n)
  294. {
  295. best_task = tasks[i];
  296. best_n = ntasks[i];
  297. }
  298. }
  299. if (best_n > 0)
  300. {
  301. /* found an interesting task, try to pick it! */
  302. if (_starpu_prio_deque_pop_this_task(&data_source->queue, target, best_task))
  303. {
  304. if (!data_source->queue.ntasks)
  305. {
  306. STARPU_ASSERT(ws->per_worker[source].notask == 0);
  307. ws->per_worker[source].notask = 1;
  308. }
  309. return best_task;
  310. }
  311. }
  312. /* Didn't find an interesting task, or couldn't run it :( */
  313. int skipped;
  314. struct starpu_task *task;
  315. if (source != target)
  316. task = _starpu_prio_deque_deque_task_for_worker(&data_source->queue, target, &skipped);
  317. else
  318. task = _starpu_prio_deque_pop_task_for_worker(&data_source->queue, target, &skipped);
  319. if (task && !data_source->queue.ntasks)
  320. {
  321. STARPU_ASSERT(ws->per_worker[source].notask == 0);
  322. ws->per_worker[source].notask = 1;
  323. }
  324. return task;
  325. }
  326. /* Called when popping a task from a queue */
  327. static void locality_popped_task(struct _starpu_work_stealing_data *ws, struct starpu_task *task, int workerid, unsigned sched_ctx_id)
  328. {
  329. struct _starpu_work_stealing_data_per_worker *data = &ws->per_worker[workerid];
  330. unsigned i;
  331. for (i = 0; i < STARPU_TASK_GET_NBUFFERS(task); i++)
  332. if (STARPU_TASK_GET_MODE(task, i) & STARPU_LOCALITY)
  333. {
  334. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  335. struct locality_entry *entry;
  336. HASH_FIND_PTR(data->queued_tasks_per_data, &handle, entry);
  337. if (STARPU_LIKELY(entry))
  338. {
  339. if (entry->task == task)
  340. {
  341. HASH_DEL(data->queued_tasks_per_data, entry);
  342. free(entry);
  343. }
  344. }
  345. }
  346. }
  347. #else
  348. static void record_worker_locality(struct _starpu_work_stealing_data *ws STARPU_ATTRIBUTE_UNUSED, struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
  349. {
  350. }
  351. /* Called when pushing a task to a queue */
  352. static void locality_pushed_task(struct _starpu_work_stealing_data *ws STARPU_ATTRIBUTE_UNUSED, struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
  353. {
  354. }
  355. /* Pick a task from workerid's queue, for execution on target */
  356. static struct starpu_task *ws_pick_task(struct _starpu_work_stealing_data *ws, int source, int target)
  357. {
  358. int skipped;
  359. struct starpu_task *task;
  360. if (source != target)
  361. task = _starpu_prio_deque_deque_task_for_worker(&ws->per_worker[source].queue, target, &skipped);
  362. else
  363. task = _starpu_prio_deque_pop_task_for_worker(&ws->per_worker[source].queue, target, &skipped);
  364. if (task && !ws->per_worker[source].queue.ntasks)
  365. {
  366. STARPU_ASSERT(ws->per_worker[source].notask == 0);
  367. ws->per_worker[source].notask = 1;
  368. }
  369. return task;
  370. }
  371. /* Called when popping a task from a queue */
  372. static void locality_popped_task(struct _starpu_work_stealing_data *ws STARPU_ATTRIBUTE_UNUSED, struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
  373. {
  374. }
  375. #endif
  376. #ifdef USE_OVERLOAD
  377. /**
  378. * Return a ratio helpful to determine whether a worker is suitable to steal
  379. * tasks from or to put some tasks in its queue.
  380. *
  381. * \return a ratio with a positive or negative value, describing the current state of the worker :
  382. * a smaller value implies a faster worker with an relatively emptier queue : more suitable to put tasks in
  383. * a bigger value implies a slower worker with an reletively more replete queue : more suitable to steal tasks from
  384. */
  385. static float overload_metric(struct _starpu_work_stealing_data *ws, unsigned sched_ctx_id, unsigned id)
  386. {
  387. float execution_ratio = 0.0f;
  388. float current_ratio = 0.0f;
  389. int nprocessed = _starpu_get_deque_nprocessed(ws->per_worker[id].queue);
  390. unsigned njobs = _starpu_get_deque_njobs(ws->per_worker[id].queue);
  391. /* Did we get enough information ? */
  392. if (ws->performed_total > 0 && nprocessed > 0)
  393. {
  394. /* How fast or slow is the worker compared to the other workers */
  395. execution_ratio = (float) nprocessed / ws->performed_total;
  396. /* How replete is its queue */
  397. current_ratio = (float) njobs / nprocessed;
  398. }
  399. else
  400. {
  401. return 0.0f;
  402. }
  403. return (current_ratio - execution_ratio);
  404. }
  405. /**
  406. * Return the most suitable worker from which a task can be stolen.
  407. * The number of previously processed tasks, total and local,
  408. * and the number of tasks currently awaiting to be processed
  409. * by the tasks are taken into account to select the most suitable
  410. * worker to steal task from.
  411. */
  412. static int select_victim_overload(struct _starpu_work_stealing_data *ws, unsigned sched_ctx_id)
  413. {
  414. unsigned best_worker = 0;
  415. float best_ratio = FLT_MIN;
  416. /* Don't try to play smart until we get
  417. * enough informations. */
  418. if (ws->performed_total < calibration_value)
  419. return select_victim_round_robin(ws, sched_ctx_id);
  420. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  421. struct starpu_sched_ctx_iterator it;
  422. workers->init_iterator(workers, &it);
  423. while(workers->has_next(workers, &it))
  424. {
  425. unsigned worker = workers->get_next(workers, &it);
  426. float worker_ratio = overload_metric(ws, sched_ctx_id, worker);
  427. if (worker_ratio > best_ratio && ws->per_worker[worker].running && ws->per_worker[worker].busy)
  428. {
  429. best_worker = worker;
  430. best_ratio = worker_ratio;
  431. }
  432. }
  433. return best_worker;
  434. }
  435. /**
  436. * Return the most suitable worker to whom add a task.
  437. * The number of previously processed tasks, total and local,
  438. * and the number of tasks currently awaiting to be processed
  439. * by the tasks are taken into account to select the most suitable
  440. * worker to add a task to.
  441. */
  442. static unsigned select_worker_overload(struct _starpu_work_stealing_data *ws, struct starpu_task *task, unsigned sched_ctx_id)
  443. {
  444. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  445. unsigned best_worker = 0;
  446. float best_ratio = FLT_MAX;
  447. /* Don't try to play smart until we get
  448. * enough informations. */
  449. if (ws->performed_total < calibration_value)
  450. return select_worker_round_robin(task, sched_ctx_id);
  451. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  452. struct starpu_sched_ctx_iterator it;
  453. workers->init_iterator(workers, &it);
  454. while(workers->has_next(workers, &it))
  455. {
  456. unsigned worker = workers->get_next(workers, &it);
  457. float worker_ratio = overload_metric(ws, sched_ctx_id, worker);
  458. if (worker_ratio < best_ratio && ws->per_worker[worker].running && starpu_worker_can_execute_task_first_impl(worker, task, NULL))
  459. {
  460. best_worker = worker;
  461. best_ratio = worker_ratio;
  462. }
  463. }
  464. return best_worker;
  465. }
  466. #endif /* USE_OVERLOAD */
  467. /**
  468. * Return a worker from which a task can be stolen.
  469. * This is a phony function used to call the right
  470. * function depending on the value of USE_OVERLOAD.
  471. */
  472. static inline int select_victim(struct _starpu_work_stealing_data *ws, unsigned sched_ctx_id,
  473. int workerid STARPU_ATTRIBUTE_UNUSED)
  474. {
  475. #ifdef USE_OVERLOAD
  476. return select_victim_overload(ws, sched_ctx_id);
  477. #else
  478. return select_victim_round_robin(ws, sched_ctx_id);
  479. #endif /* USE_OVERLOAD */
  480. }
  481. /**
  482. * Return a worker from which a task can be stolen.
  483. * This is a phony function used to call the right
  484. * function depending on the value of USE_OVERLOAD.
  485. */
  486. static inline unsigned select_worker(struct _starpu_work_stealing_data *ws, struct starpu_task *task, unsigned sched_ctx_id)
  487. {
  488. #ifdef USE_OVERLOAD
  489. return select_worker_overload(ws, task, sched_ctx_id);
  490. #else
  491. return select_worker_round_robin(ws, task, sched_ctx_id);
  492. #endif /* USE_OVERLOAD */
  493. }
  494. /* Note: this is not scalable work stealing, use lws instead */
  495. static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
  496. {
  497. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  498. struct starpu_task *task = NULL;
  499. unsigned workerid = starpu_worker_get_id_check();
  500. if (ws->per_worker[workerid].busy)
  501. ws->per_worker[workerid].busy = 0;
  502. #ifdef STARPU_NON_BLOCKING_DRIVERS
  503. if (STARPU_RUNNING_ON_VALGRIND || !_starpu_prio_deque_is_empty(&ws->per_worker[workerid].queue))
  504. #endif
  505. {
  506. task = ws_pick_task(ws, workerid, workerid);
  507. if (task)
  508. locality_popped_task(ws, task, workerid, sched_ctx_id);
  509. }
  510. if(task)
  511. {
  512. /* there was a local task */
  513. ws->per_worker[workerid].busy = 1;
  514. if (_starpu_get_nsched_ctxs() > 1)
  515. {
  516. starpu_worker_relax_on();
  517. _starpu_sched_ctx_lock_write(sched_ctx_id);
  518. starpu_worker_relax_off();
  519. starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
  520. if (_starpu_sched_ctx_worker_is_master_for_child_ctx(sched_ctx_id, workerid, task))
  521. task = NULL;
  522. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  523. }
  524. return task;
  525. }
  526. /* we need to steal someone's job */
  527. starpu_worker_relax_on();
  528. int victim = ws->select_victim(ws, sched_ctx_id, workerid);
  529. starpu_worker_relax_off();
  530. if (victim == -1)
  531. {
  532. return NULL;
  533. }
  534. if (_starpu_worker_trylock(victim))
  535. {
  536. /* victim is busy, don't bother it, come back later */
  537. return NULL;
  538. }
  539. if (ws->per_worker[victim].running && ws->per_worker[victim].queue.ntasks > 0)
  540. {
  541. task = ws_pick_task(ws, victim, workerid);
  542. }
  543. if (task)
  544. {
  545. _STARPU_TRACE_WORK_STEALING(workerid, victim);
  546. starpu_sched_task_break(task);
  547. starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, victim);
  548. record_data_locality(task, workerid);
  549. record_worker_locality(ws, task, workerid, sched_ctx_id);
  550. locality_popped_task(ws, task, victim, sched_ctx_id);
  551. }
  552. starpu_worker_unlock(victim);
  553. #ifndef STARPU_NON_BLOCKING_DRIVERS
  554. /* While stealing, perhaps somebody actually give us a task, don't miss
  555. * the opportunity to take it before going to sleep. */
  556. {
  557. struct _starpu_worker *worker = _starpu_get_worker_struct(starpu_worker_get_id());
  558. if (!task && worker->state_keep_awake)
  559. {
  560. task = ws_pick_task(ws, workerid, workerid);
  561. if (task)
  562. {
  563. /* keep_awake notice taken into account here, clear flag */
  564. worker->state_keep_awake = 0;
  565. locality_popped_task(ws, task, workerid, sched_ctx_id);
  566. }
  567. }
  568. }
  569. #endif
  570. if (task &&_starpu_get_nsched_ctxs() > 1)
  571. {
  572. starpu_worker_relax_on();
  573. _starpu_sched_ctx_lock_write(sched_ctx_id);
  574. starpu_worker_relax_off();
  575. if (_starpu_sched_ctx_worker_is_master_for_child_ctx(sched_ctx_id, workerid, task))
  576. task = NULL;
  577. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  578. if (!task)
  579. return NULL;
  580. }
  581. if (ws->per_worker[workerid].busy != !!task)
  582. ws->per_worker[workerid].busy = !!task;
  583. return task;
  584. }
  585. static
  586. int ws_push_task(struct starpu_task *task)
  587. {
  588. unsigned sched_ctx_id = task->sched_ctx;
  589. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  590. int workerid;
  591. #ifdef USE_LOCALITY
  592. workerid = select_worker_locality(ws, task, sched_ctx_id);
  593. #else
  594. workerid = -1;
  595. #endif
  596. if (workerid == -1)
  597. workerid = starpu_worker_get_id();
  598. /* If the current thread is not a worker but
  599. * the main thread (-1) or the current worker is not in the target
  600. * context, we find the better one to put task on its queue */
  601. if (workerid == -1 || !starpu_sched_ctx_contains_worker(workerid, sched_ctx_id) ||
  602. !starpu_worker_can_execute_task_first_impl(workerid, task, NULL))
  603. workerid = select_worker(ws, task, sched_ctx_id);
  604. starpu_worker_lock(workerid);
  605. STARPU_AYU_ADDTOTASKQUEUE(starpu_task_get_job_id(task), workerid);
  606. starpu_sched_task_break(task);
  607. record_data_locality(task, workerid);
  608. STARPU_ASSERT_MSG(ws->per_worker[workerid].running, "workerid=%d, ws=%p\n", workerid, ws);
  609. _starpu_prio_deque_push_back_task(&ws->per_worker[workerid].queue, task);
  610. if (ws->per_worker[workerid].queue.ntasks == 1)
  611. {
  612. STARPU_ASSERT(ws->per_worker[workerid].notask == 1);
  613. ws->per_worker[workerid].notask = 0;
  614. }
  615. locality_pushed_task(ws, task, workerid, sched_ctx_id);
  616. starpu_push_task_end(task);
  617. starpu_worker_unlock(workerid);
  618. starpu_sched_ctx_list_task_counters_increment(sched_ctx_id, workerid);
  619. #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
  620. /* TODO: implement fine-grain signaling, similar to what eager does */
  621. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  622. struct starpu_sched_ctx_iterator it;
  623. workers->init_iterator(workers, &it);
  624. while(workers->has_next(workers, &it))
  625. starpu_wake_worker_relax_light(workers->get_next(workers, &it));
  626. #endif
  627. return 0;
  628. }
  629. static void ws_add_workers(unsigned sched_ctx_id, int *workerids,unsigned nworkers)
  630. {
  631. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  632. unsigned i;
  633. for (i = 0; i < nworkers; i++)
  634. {
  635. int workerid = workerids[i];
  636. starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
  637. _starpu_prio_deque_init(&ws->per_worker[workerid].queue);
  638. ws->per_worker[workerid].notask = 1;
  639. ws->per_worker[workerid].running = 1;
  640. /* Tell helgrind that we are fine with getting outdated values,
  641. * this is just an estimation */
  642. STARPU_HG_DISABLE_CHECKING(ws->per_worker[workerid].notask);
  643. STARPU_HG_DISABLE_CHECKING(ws->per_worker[workerid].queue.ntasks);
  644. ws->per_worker[workerid].busy = 0;
  645. STARPU_HG_DISABLE_CHECKING(ws->per_worker[workerid].busy);
  646. }
  647. }
  648. static void ws_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  649. {
  650. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  651. unsigned i;
  652. for (i = 0; i < nworkers; i++)
  653. {
  654. int workerid = workerids[i];
  655. _starpu_prio_deque_destroy(&ws->per_worker[workerid].queue);
  656. ws->per_worker[workerid].running = 0;
  657. free(ws->per_worker[workerid].proxlist);
  658. ws->per_worker[workerid].proxlist = NULL;
  659. }
  660. }
  661. static void initialize_ws_policy(unsigned sched_ctx_id)
  662. {
  663. struct _starpu_work_stealing_data *ws;
  664. _STARPU_MALLOC(ws, sizeof(struct _starpu_work_stealing_data));
  665. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)ws);
  666. ws->last_push_worker = 0;
  667. STARPU_HG_DISABLE_CHECKING(ws->last_push_worker);
  668. ws->select_victim = select_victim;
  669. unsigned nw = starpu_worker_get_count();
  670. _STARPU_CALLOC(ws->per_worker, nw, sizeof(struct _starpu_work_stealing_data_per_worker));
  671. /* The application may use any integer */
  672. if (starpu_sched_ctx_min_priority_is_set(sched_ctx_id) == 0)
  673. starpu_sched_ctx_set_min_priority(sched_ctx_id, INT_MIN);
  674. if (starpu_sched_ctx_max_priority_is_set(sched_ctx_id) == 0)
  675. starpu_sched_ctx_set_max_priority(sched_ctx_id, INT_MAX);
  676. }
  677. static void deinit_ws_policy(unsigned sched_ctx_id)
  678. {
  679. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  680. free(ws->per_worker);
  681. free(ws);
  682. }
  683. struct starpu_sched_policy _starpu_sched_ws_policy =
  684. {
  685. .init_sched = initialize_ws_policy,
  686. .deinit_sched = deinit_ws_policy,
  687. .add_workers = ws_add_workers,
  688. .remove_workers = ws_remove_workers,
  689. .push_task = ws_push_task,
  690. .pop_task = ws_pop_task,
  691. .pre_exec_hook = NULL,
  692. .post_exec_hook = NULL,
  693. .pop_every_task = NULL,
  694. .policy_name = "ws",
  695. .policy_description = "work stealing",
  696. .worker_type = STARPU_WORKER_LIST,
  697. };
  698. /* local work stealing policy */
  699. /* Return a worker to steal a task from. The worker is selected according to
  700. * the proximity list built using the info on te architecture provided by hwloc
  701. */
  702. #ifdef STARPU_HAVE_HWLOC
  703. static int lws_select_victim(struct _starpu_work_stealing_data *ws, unsigned sched_ctx_id, int workerid)
  704. {
  705. int nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
  706. int i;
  707. for (i = 0; i < nworkers; i++)
  708. {
  709. int neighbor = ws->per_worker[workerid].proxlist[i];
  710. if (ws->per_worker[neighbor].notask)
  711. continue;
  712. /* FIXME: do not keep looking again and again at some worker
  713. * which has tasks, but that can't execute on me */
  714. if (ws->per_worker[neighbor].busy
  715. || starpu_worker_is_blocked_in_parallel(neighbor))
  716. return neighbor;
  717. }
  718. return -1;
  719. }
  720. #endif
  721. static void lws_add_workers(unsigned sched_ctx_id, int *workerids,
  722. unsigned nworkers)
  723. {
  724. ws_add_workers(sched_ctx_id, workerids, nworkers);
  725. #ifdef STARPU_HAVE_HWLOC
  726. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  727. /* Build a proximity list for every worker. It is cheaper to
  728. * build this once and then use it for popping tasks rather
  729. * than traversing the hwloc tree every time a task must be
  730. * stolen */
  731. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  732. struct starpu_tree *tree = (struct starpu_tree*)workers->collection_private;
  733. unsigned i;
  734. /* get the complete list of workers (not just the added one) and rebuild the proxlists */
  735. nworkers = starpu_sched_ctx_get_workers_list_raw(sched_ctx_id, &workerids);
  736. for (i = 0; i < nworkers; i++)
  737. {
  738. int workerid = workerids[i];
  739. if (ws->per_worker[workerid].proxlist == NULL)
  740. _STARPU_CALLOC(ws->per_worker[workerid].proxlist, STARPU_NMAXWORKERS, sizeof(int));
  741. int bindid;
  742. struct starpu_sched_ctx_iterator it;
  743. workers->init_iterator(workers, &it);
  744. bindid = starpu_worker_get_bindid(workerid);
  745. it.value = starpu_tree_get(tree, bindid);
  746. int cnt = 0;
  747. for(;;)
  748. {
  749. struct starpu_tree *neighbour = (struct starpu_tree*)it.value;
  750. int *neigh_workerids;
  751. int neigh_nworkers = starpu_bindid_get_workerids(neighbour->id, &neigh_workerids);
  752. int w;
  753. for(w = 0; w < neigh_nworkers; w++)
  754. {
  755. if(!it.visited[neigh_workerids[w]] && workers->present[neigh_workerids[w]])
  756. {
  757. ws->per_worker[workerid].proxlist[cnt++] = neigh_workerids[w];
  758. it.visited[neigh_workerids[w]] = 1;
  759. }
  760. }
  761. if(!workers->has_next(workers, &it))
  762. break;
  763. it.value = it.possible_value;
  764. it.possible_value = NULL;
  765. }
  766. }
  767. #endif
  768. }
  769. static void initialize_lws_policy(unsigned sched_ctx_id)
  770. {
  771. /* lws is loosely based on ws, except that it might use hwloc. */
  772. initialize_ws_policy(sched_ctx_id);
  773. if (starpu_worker_get_count() != starpu_cpu_worker_get_count()
  774. || starpu_memory_nodes_get_numa_count() > 1
  775. )
  776. {
  777. _STARPU_DISP("Warning: you are running the default lws scheduler, which is not a very smart scheduler, while the system has GPUs or several memory nodes. Make sure to read the StarPU documentation about adding performance models in order to be able to use the dmda or dmdas scheduler instead.\n");
  778. }
  779. #ifdef STARPU_HAVE_HWLOC
  780. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data *)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  781. ws->select_victim = lws_select_victim;
  782. #endif
  783. }
  784. struct starpu_sched_policy _starpu_sched_lws_policy =
  785. {
  786. .init_sched = initialize_lws_policy,
  787. .deinit_sched = deinit_ws_policy,
  788. .add_workers = lws_add_workers,
  789. .remove_workers = ws_remove_workers,
  790. .push_task = ws_push_task,
  791. .pop_task = ws_pop_task,
  792. .pre_exec_hook = NULL,
  793. .post_exec_hook = NULL,
  794. .pop_every_task = NULL,
  795. .policy_name = "lws",
  796. .policy_description = "locality work stealing",
  797. #ifdef STARPU_HAVE_HWLOC
  798. .worker_type = STARPU_WORKER_TREE,
  799. #else
  800. .worker_type = STARPU_WORKER_LIST,
  801. #endif
  802. };