work_stealing_policy.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2016 Université de Bordeaux
  4. * Copyright (C) 2010, 2011, 2012, 2013, 2016 CNRS
  5. * Copyright (C) 2011, 2012 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. /* Work stealing policy */
  19. #include <float.h>
  20. #include <core/workers.h>
  21. #include <sched_policies/fifo_queues.h>
  22. #include <core/debug.h>
  23. #include <starpu_scheduler.h>
  24. #ifdef HAVE_AYUDAME_H
  25. #include <Ayudame.h>
  26. #endif
  27. /* Experimental (dead) code which needs to be tested, fixed... */
  28. /* #define USE_OVERLOAD */
  29. /*
  30. * Experimental code for improving data cache locality:
  31. *
  32. * USE_LOCALITY:
  33. * - for each data, we record on which worker it was last accessed with the
  34. * locality flag.
  35. *
  36. * - when pushing a ready task, we choose the worker which has last accessed the
  37. * most data of the task with the locality flag.
  38. *
  39. * USE_LOCALITY_TASKS:
  40. * - for each worker, we record the locality data that the task used last (i.e. a rough
  41. * estimation of what is contained in the innermost caches).
  42. *
  43. * - for each worker, we have a hash table associating from a data handle to
  44. * all the ready tasks pushed to it that will use it with the locality flag.
  45. *
  46. * - When fetching a task from a queue, pick a task which has the biggest number
  47. * of data estimated to be contained in the cache.
  48. */
  49. //#define USE_LOCALITY
  50. //#define USE_LOCALITY_TASKS
  51. /* Maximum number of recorded locality data per task */
  52. #define MAX_LOCALITY 8
  53. /* Entry for queued_tasks_per_data: records that a queued task is accessing the data with locality flag */
  54. struct locality_entry {
  55. UT_hash_handle hh;
  56. starpu_data_handle_t data;
  57. struct starpu_task *task;
  58. };
  59. struct _starpu_work_stealing_data_per_worker
  60. {
  61. struct _starpu_fifo_taskq *queue_array;
  62. int *proxlist;
  63. #ifdef USE_LOCALITY_TASKS
  64. /* This records the same as queue_array, but hashed by data accessed with locality flag. */
  65. /* FIXME: we record only one task per data, assuming that the access is
  66. * RW, and thus only one task is ready to write to it. Do we really need to handle the R case too? */
  67. struct locality_entry *queued_tasks_per_data;
  68. /* This records the last data accessed by the worker */
  69. starpu_data_handle_t last_locality[MAX_LOCALITY];
  70. int nlast_locality;
  71. #endif
  72. };
  73. struct _starpu_work_stealing_data
  74. {
  75. unsigned (*select_victim)(unsigned, int);
  76. struct _starpu_work_stealing_data_per_worker *per_worker;
  77. /* keep track of the work performed from the beginning of the algorithm to make
  78. * better decisions about which queue to select when stealing or deferring work
  79. */
  80. unsigned last_pop_worker;
  81. unsigned last_push_worker;
  82. };
  83. #ifdef USE_OVERLOAD
  84. /**
  85. * Minimum number of task we wait for being processed before we start assuming
  86. * on which worker the computation would be faster.
  87. */
  88. static int calibration_value = 0;
  89. #endif /* USE_OVERLOAD */
  90. /**
  91. * Return a worker from which a task can be stolen.
  92. * Selecting a worker is done in a round-robin fashion, unless
  93. * the worker previously selected doesn't own any task,
  94. * then we return the first non-empty worker.
  95. */
  96. static unsigned select_victim_round_robin(unsigned sched_ctx_id)
  97. {
  98. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  99. unsigned worker = ws->last_pop_worker;
  100. unsigned nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
  101. int *workerids = NULL;
  102. starpu_sched_ctx_get_workers_list(sched_ctx_id, &workerids);
  103. /* If the worker's queue is empty, let's try
  104. * the next ones */
  105. while (1)
  106. {
  107. unsigned ntasks;
  108. /* Here helgrind would shout that this is unprotected, but we
  109. * are fine with getting outdated values, this is just an
  110. * estimation */
  111. ntasks = ws->per_worker[workerids[worker]].queue_array->ntasks;
  112. if (ntasks)
  113. break;
  114. worker = (worker + 1) % nworkers;
  115. if (worker == ws->last_pop_worker)
  116. {
  117. /* We got back to the first worker,
  118. * don't go in infinite loop */
  119. break;
  120. }
  121. }
  122. ws->last_pop_worker = (worker + 1) % nworkers;
  123. worker = workerids[worker];
  124. free(workerids);
  125. return worker;
  126. }
  127. /**
  128. * Return a worker to whom add a task.
  129. * Selecting a worker is done in a round-robin fashion.
  130. */
  131. static unsigned select_worker_round_robin(unsigned sched_ctx_id)
  132. {
  133. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  134. unsigned worker = ws->last_push_worker;
  135. unsigned nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
  136. int *workerids = NULL;
  137. starpu_sched_ctx_get_workers_list(sched_ctx_id, &workerids);
  138. ws->last_push_worker = (ws->last_push_worker + 1) % nworkers;
  139. worker = workerids[worker];
  140. free(workerids);
  141. return worker;
  142. }
  143. #ifdef USE_LOCALITY
  144. /* Select a worker according to the locality of the data of the task to be scheduled */
  145. static unsigned select_worker_locality(struct starpu_task *task, unsigned sched_ctx_id)
  146. {
  147. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  148. if (nbuffers == 0)
  149. return -1;
  150. unsigned i, n;
  151. unsigned ndata[STARPU_NMAXWORKERS] = { 0 };
  152. int best_worker = -1;
  153. unsigned best_ndata = 0;
  154. n = 0;
  155. for (i = 0; i < nbuffers; i++)
  156. if (STARPU_TASK_GET_MODE(task, i) & STARPU_LOCALITY)
  157. {
  158. starpu_data_handle_t data = STARPU_TASK_GET_HANDLE(task, i);
  159. int locality = data->last_locality;
  160. if (locality >= 0)
  161. ndata[locality]++;
  162. n++;
  163. }
  164. if (n)
  165. {
  166. /* Some locality buffers, choose worker which has most of them */
  167. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  168. struct starpu_sched_ctx_iterator it;
  169. workers->init_iterator(workers, &it);
  170. while(workers->has_next(workers, &it))
  171. {
  172. int workerid = workers->get_next(workers, &it);
  173. if (ndata[workerid] > best_ndata)
  174. {
  175. best_worker = workerid;
  176. best_ndata = ndata[workerid];
  177. }
  178. }
  179. }
  180. return best_worker;
  181. }
  182. /* Record in the data which worker will handle the task with the locality flag */
  183. static void record_data_locality(struct starpu_task *task, int workerid)
  184. {
  185. /* Record where in locality data where the task went */
  186. unsigned i;
  187. for (i = 0; i < STARPU_TASK_GET_NBUFFERS(task); i++)
  188. if (STARPU_TASK_GET_MODE(task, i) & STARPU_LOCALITY)
  189. {
  190. STARPU_TASK_GET_HANDLE(task, i)->last_locality = workerid;
  191. }
  192. }
  193. #else
  194. static void record_data_locality(struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED)
  195. {
  196. }
  197. #endif
  198. #ifdef USE_LOCALITY_TASKS
  199. /* Record in the worker which data it used last with the locality flag */
  200. static void record_worker_locality(struct starpu_task *task, int workerid, unsigned sched_ctx_id)
  201. {
  202. /* Record where in locality data where the task went */
  203. unsigned i;
  204. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  205. struct _starpu_work_stealing_data_per_worker *data = &ws->per_worker[workerid];
  206. data->nlast_locality = 0;
  207. for (i = 0; i < STARPU_TASK_GET_NBUFFERS(task); i++)
  208. if (STARPU_TASK_GET_MODE(task, i) & STARPU_LOCALITY)
  209. {
  210. data->last_locality[data->nlast_locality] = STARPU_TASK_GET_HANDLE(task, i);
  211. data->nlast_locality++;
  212. if (data->nlast_locality == MAX_LOCALITY)
  213. break;
  214. }
  215. }
  216. /* Called when pushing a task to a queue */
  217. static void locality_pushed_task(struct starpu_task *task, int workerid, unsigned sched_ctx_id)
  218. {
  219. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  220. struct _starpu_work_stealing_data_per_worker *data = &ws->per_worker[workerid];
  221. unsigned i;
  222. for (i = 0; i < STARPU_TASK_GET_NBUFFERS(task); i++)
  223. if (STARPU_TASK_GET_MODE(task, i) & STARPU_LOCALITY)
  224. {
  225. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  226. struct locality_entry *entry;
  227. HASH_FIND_PTR(data->queued_tasks_per_data, &handle, entry);
  228. if (STARPU_LIKELY(!entry))
  229. {
  230. entry = malloc(sizeof(*entry));
  231. entry->data = handle;
  232. entry->task = task;
  233. HASH_ADD_PTR(data->queued_tasks_per_data, data, entry);
  234. }
  235. }
  236. }
  237. /* Pick a task from workerid's queue, for execution on target */
  238. static struct starpu_task *ws_pick_task(int source, int target, unsigned sched_ctx_id)
  239. {
  240. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  241. struct _starpu_work_stealing_data_per_worker *data_source = &ws->per_worker[source];
  242. struct _starpu_work_stealing_data_per_worker *data_target = &ws->per_worker[target];
  243. unsigned i, j, n = data_target->nlast_locality;
  244. struct starpu_task *(tasks[MAX_LOCALITY]) = { NULL }, *best_task;
  245. int ntasks[MAX_LOCALITY] = { 0 }, best_n; /* Number of locality data for this worker used by this task */
  246. /* Look at the last data accessed by this worker */
  247. STARPU_ASSERT(n < MAX_LOCALITY);
  248. for (i = 0; i < n; i++)
  249. {
  250. starpu_data_handle_t handle = data_target->last_locality[i];
  251. struct locality_entry *entry;
  252. HASH_FIND_PTR(data_source->queued_tasks_per_data, &handle, entry);
  253. if (entry)
  254. {
  255. /* Record task */
  256. tasks[i] = entry->task;
  257. ntasks[i] = 1;
  258. /* And increment counter of the same task */
  259. for (j = 0; j < i; j++)
  260. {
  261. if (tasks[j] == tasks[i])
  262. {
  263. ntasks[j]++;
  264. break;
  265. }
  266. }
  267. }
  268. }
  269. /* Now find the task with most locality data for this worker */
  270. best_n = 0;
  271. for (i = 0; i < n; i++)
  272. {
  273. if (ntasks[i] > best_n)
  274. {
  275. best_task = tasks[i];
  276. best_n = ntasks[i];
  277. }
  278. }
  279. if (best_n > 0)
  280. {
  281. /* found an interesting task, try to pick it! */
  282. if (_starpu_fifo_pop_this_task(data_source->queue_array, target, best_task))
  283. return best_task;
  284. }
  285. /* Didn't find an interesting task, or couldn't run it :( */
  286. return _starpu_fifo_pop_task(data_source->queue_array, target);
  287. }
  288. /* Called when popping a task from a queue */
  289. static void locality_popped_task(struct starpu_task *task, int workerid, unsigned sched_ctx_id)
  290. {
  291. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  292. struct _starpu_work_stealing_data_per_worker *data = &ws->per_worker[workerid];
  293. unsigned i;
  294. for (i = 0; i < STARPU_TASK_GET_NBUFFERS(task); i++)
  295. if (STARPU_TASK_GET_MODE(task, i) & STARPU_LOCALITY)
  296. {
  297. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  298. struct locality_entry *entry;
  299. HASH_FIND_PTR(data->queued_tasks_per_data, &handle, entry);
  300. if (STARPU_LIKELY(entry))
  301. {
  302. if (entry->task == task)
  303. {
  304. HASH_DEL(data->queued_tasks_per_data, entry);
  305. free(entry);
  306. }
  307. }
  308. }
  309. }
  310. #else
  311. static void record_worker_locality(struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
  312. {
  313. }
  314. /* Called when pushing a task to a queue */
  315. static void locality_pushed_task(struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
  316. {
  317. }
  318. /* Pick a task from workerid's queue, for execution on target */
  319. static struct starpu_task *ws_pick_task(int source, int target, unsigned sched_ctx_id)
  320. {
  321. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  322. return _starpu_fifo_pop_task(ws->per_worker[source].queue_array, target);
  323. }
  324. /* Called when popping a task from a queue */
  325. static void locality_popped_task(struct starpu_task *task STARPU_ATTRIBUTE_UNUSED, int workerid STARPU_ATTRIBUTE_UNUSED, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
  326. {
  327. }
  328. #endif
  329. #ifdef USE_OVERLOAD
  330. /**
  331. * Return a ratio helpful to determine whether a worker is suitable to steal
  332. * tasks from or to put some tasks in its queue.
  333. *
  334. * \return a ratio with a positive or negative value, describing the current state of the worker :
  335. * a smaller value implies a faster worker with an relatively emptier queue : more suitable to put tasks in
  336. * a bigger value implies a slower worker with an reletively more replete queue : more suitable to steal tasks from
  337. */
  338. static float overload_metric(unsigned sched_ctx_id, unsigned id)
  339. {
  340. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  341. float execution_ratio = 0.0f;
  342. float current_ratio = 0.0f;
  343. int nprocessed = _starpu_get_deque_nprocessed(ws->per_worker[id].queue_array);
  344. unsigned njobs = _starpu_get_deque_njobs(ws->per_worker[id].queue_array);
  345. /* Did we get enough information ? */
  346. if (performed_total > 0 && nprocessed > 0)
  347. {
  348. /* How fast or slow is the worker compared to the other workers */
  349. execution_ratio = (float) nprocessed / performed_total;
  350. /* How replete is its queue */
  351. current_ratio = (float) njobs / nprocessed;
  352. }
  353. else
  354. {
  355. return 0.0f;
  356. }
  357. return (current_ratio - execution_ratio);
  358. }
  359. /**
  360. * Return the most suitable worker from which a task can be stolen.
  361. * The number of previously processed tasks, total and local,
  362. * and the number of tasks currently awaiting to be processed
  363. * by the tasks are taken into account to select the most suitable
  364. * worker to steal task from.
  365. */
  366. static unsigned select_victim_overload(unsigned sched_ctx_id)
  367. {
  368. unsigned worker;
  369. float worker_ratio;
  370. unsigned best_worker = 0;
  371. float best_ratio = FLT_MIN;
  372. /* Don't try to play smart until we get
  373. * enough informations. */
  374. if (performed_total < calibration_value)
  375. return select_victim_round_robin(sched_ctx_id);
  376. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  377. struct starpu_sched_ctx_iterator it;
  378. workers->init_iterator(workers, &it);
  379. while(workers->has_next(workers, &it))
  380. {
  381. worker = workers->get_next(workers, &it);
  382. worker_ratio = overload_metric(sched_ctx_id, worker);
  383. if (worker_ratio > best_ratio)
  384. {
  385. best_worker = worker;
  386. best_ratio = worker_ratio;
  387. }
  388. }
  389. return best_worker;
  390. }
  391. /**
  392. * Return the most suitable worker to whom add a task.
  393. * The number of previously processed tasks, total and local,
  394. * and the number of tasks currently awaiting to be processed
  395. * by the tasks are taken into account to select the most suitable
  396. * worker to add a task to.
  397. */
  398. static unsigned select_worker_overload(unsigned sched_ctx_id)
  399. {
  400. unsigned worker;
  401. float worker_ratio;
  402. unsigned best_worker = 0;
  403. float best_ratio = FLT_MAX;
  404. /* Don't try to play smart until we get
  405. * enough informations. */
  406. if (performed_total < calibration_value)
  407. return select_worker_round_robin(sched_ctx_id);
  408. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  409. struct starpu_sched_ctx_iterator it;
  410. workers->init_iterator(workers, &it);
  411. while(workers->has_next(workers, &it))
  412. {
  413. worker = workers->get_next(workers, &it);
  414. worker_ratio = overload_metric(sched_ctx_id, worker);
  415. if (worker_ratio < best_ratio)
  416. {
  417. best_worker = worker;
  418. best_ratio = worker_ratio;
  419. }
  420. }
  421. return best_worker;
  422. }
  423. #endif /* USE_OVERLOAD */
  424. /**
  425. * Return a worker from which a task can be stolen.
  426. * This is a phony function used to call the right
  427. * function depending on the value of USE_OVERLOAD.
  428. */
  429. static inline unsigned select_victim(unsigned sched_ctx_id,
  430. int workerid STARPU_ATTRIBUTE_UNUSED)
  431. {
  432. #ifdef USE_OVERLOAD
  433. return select_victim_overload(sched_ctx_id);
  434. #else
  435. return select_victim_round_robin(sched_ctx_id);
  436. #endif /* USE_OVERLOAD */
  437. }
  438. /**
  439. * Return a worker from which a task can be stolen.
  440. * This is a phony function used to call the right
  441. * function depending on the value of USE_OVERLOAD.
  442. */
  443. static inline unsigned select_worker(unsigned sched_ctx_id)
  444. {
  445. #ifdef USE_OVERLOAD
  446. return select_worker_overload(sched_ctx_id);
  447. #else
  448. return select_worker_round_robin(sched_ctx_id);
  449. #endif /* USE_OVERLOAD */
  450. }
  451. /* Note: this is not scalable work stealing, use lws instead */
  452. static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
  453. {
  454. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  455. struct starpu_task *task;
  456. int workerid = _starpu_worker_get_id_check();
  457. task = ws_pick_task(workerid, workerid, sched_ctx_id);
  458. if (task)
  459. {
  460. locality_popped_task(task, workerid, sched_ctx_id);
  461. /* there was a local task */
  462. return task;
  463. }
  464. starpu_pthread_mutex_t *worker_sched_mutex;
  465. starpu_pthread_cond_t *worker_sched_cond;
  466. starpu_worker_get_sched_condition(workerid, &worker_sched_mutex, &worker_sched_cond);
  467. /* we need to steal someone's job */
  468. unsigned victim = ws->select_victim(sched_ctx_id, workerid);
  469. /* Note: Releasing this mutex before taking the victim mutex, to avoid interlock*/
  470. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(worker_sched_mutex);
  471. starpu_pthread_mutex_t *victim_sched_mutex;
  472. starpu_pthread_cond_t *victim_sched_cond;
  473. starpu_worker_get_sched_condition(victim, &victim_sched_mutex, &victim_sched_cond);
  474. STARPU_PTHREAD_MUTEX_LOCK_SCHED(victim_sched_mutex);
  475. if (ws->per_worker[victim].queue_array != NULL && ws->per_worker[victim].queue_array->ntasks > 0)
  476. task = ws_pick_task(victim, workerid, sched_ctx_id);
  477. if (task)
  478. {
  479. _STARPU_TRACE_WORK_STEALING(workerid, victim);
  480. record_data_locality(task, workerid);
  481. record_worker_locality(task, workerid, sched_ctx_id);
  482. locality_popped_task(task, victim, sched_ctx_id);
  483. }
  484. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(victim_sched_mutex);
  485. STARPU_PTHREAD_MUTEX_LOCK_SCHED(worker_sched_mutex);
  486. if(!task)
  487. {
  488. if (ws->per_worker[workerid].queue_array != NULL && ws->per_worker[workerid].queue_array->ntasks > 0)
  489. task = ws_pick_task(workerid, workerid, sched_ctx_id);
  490. if (task)
  491. {
  492. locality_popped_task(task, workerid, sched_ctx_id);
  493. /* there was a local task */
  494. return task;
  495. }
  496. }
  497. return task;
  498. }
  499. static
  500. int ws_push_task(struct starpu_task *task)
  501. {
  502. unsigned sched_ctx_id = task->sched_ctx;
  503. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  504. int workerid = -1;
  505. pick_worker:
  506. #ifdef USE_LOCALITY
  507. workerid = select_worker_locality(task, sched_ctx_id);
  508. #endif
  509. if (workerid == -1)
  510. workerid = starpu_worker_get_id();
  511. /* If the current thread is not a worker but
  512. * the main thread (-1), we find the better one to
  513. * put task on its queue */
  514. if (workerid == -1)
  515. workerid = select_worker(sched_ctx_id);
  516. starpu_pthread_mutex_t *sched_mutex;
  517. starpu_pthread_cond_t *sched_cond;
  518. starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
  519. STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
  520. /* Maybe the worker we selected was removed before we picked the mutex */
  521. if (ws->per_worker[workerid].queue_array == NULL)
  522. goto pick_worker;
  523. record_data_locality(task, workerid);
  524. #ifdef HAVE_AYUDAME_H
  525. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  526. if (AYU_event)
  527. {
  528. intptr_t id = workerid;
  529. AYU_event(AYU_ADDTASKTOQUEUE, j->job_id, &id);
  530. }
  531. #endif
  532. _starpu_fifo_push_task(ws->per_worker[workerid].queue_array, task);
  533. locality_pushed_task(task, workerid, sched_ctx_id);
  534. starpu_push_task_end(task);
  535. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
  536. #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
  537. /* TODO: implement fine-grain signaling, similar to what eager does */
  538. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  539. struct starpu_sched_ctx_iterator it;
  540. workers->init_iterator(workers, &it);
  541. while(workers->has_next(workers, &it))
  542. starpu_wake_worker(workers->get_next(workers, &it));
  543. #endif
  544. return 0;
  545. }
  546. static void ws_add_workers(unsigned sched_ctx_id, int *workerids,unsigned nworkers)
  547. {
  548. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  549. unsigned i;
  550. int workerid;
  551. for (i = 0; i < nworkers; i++)
  552. {
  553. workerid = workerids[i];
  554. starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
  555. ws->per_worker[workerid].queue_array = _starpu_create_fifo();
  556. /* Tell helgrid that we are fine with getting outdated values,
  557. * this is just an estimation */
  558. STARPU_HG_DISABLE_CHECKING(ws->per_worker[workerid].queue_array->ntasks);
  559. }
  560. }
  561. static void ws_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  562. {
  563. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  564. unsigned i;
  565. int workerid;
  566. for (i = 0; i < nworkers; i++)
  567. {
  568. starpu_pthread_mutex_t *sched_mutex;
  569. starpu_pthread_cond_t *sched_cond;
  570. workerid = workerids[i];
  571. starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
  572. STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
  573. if (ws->per_worker[workerid].queue_array != NULL)
  574. {
  575. _starpu_destroy_fifo(ws->per_worker[workerid].queue_array);
  576. ws->per_worker[workerid].queue_array = NULL;
  577. }
  578. free(ws->per_worker[workerid].proxlist);
  579. ws->per_worker[workerid].proxlist = NULL;
  580. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
  581. }
  582. }
  583. static void initialize_ws_policy(unsigned sched_ctx_id)
  584. {
  585. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)malloc(sizeof(struct _starpu_work_stealing_data));
  586. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)ws);
  587. ws->last_pop_worker = 0;
  588. ws->last_push_worker = 0;
  589. STARPU_HG_DISABLE_CHECKING(ws->last_pop_worker);
  590. STARPU_HG_DISABLE_CHECKING(ws->last_push_worker);
  591. ws->select_victim = select_victim;
  592. unsigned nw = starpu_worker_get_count();
  593. ws->per_worker = calloc(nw, sizeof(struct _starpu_work_stealing_data_per_worker));
  594. }
  595. static void deinit_ws_policy(unsigned sched_ctx_id)
  596. {
  597. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  598. free(ws->per_worker);
  599. free(ws);
  600. }
  601. struct starpu_sched_policy _starpu_sched_ws_policy =
  602. {
  603. .init_sched = initialize_ws_policy,
  604. .deinit_sched = deinit_ws_policy,
  605. .add_workers = ws_add_workers,
  606. .remove_workers = ws_remove_workers,
  607. .push_task = ws_push_task,
  608. .pop_task = ws_pop_task,
  609. .pre_exec_hook = NULL,
  610. .post_exec_hook = NULL,
  611. .pop_every_task = NULL,
  612. .policy_name = "ws",
  613. .policy_description = "work stealing",
  614. .worker_type = STARPU_WORKER_LIST,
  615. };
  616. /* local work stealing policy */
  617. /* Return a worker to steal a task from. The worker is selected according to
  618. * the proximity list built using the info on te architecture provided by hwloc
  619. */
  620. static unsigned lws_select_victim(unsigned sched_ctx_id, int workerid)
  621. {
  622. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data *)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  623. int nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
  624. int neighbor;
  625. int i;
  626. for (i = 0; i < nworkers; i++)
  627. {
  628. neighbor = ws->per_worker[workerid].proxlist[i];
  629. /* if a worker was removed, then nothing tells us that the proxlist is correct */
  630. if (!starpu_sched_ctx_contains_worker(neighbor, sched_ctx_id))
  631. {
  632. i--;
  633. continue;
  634. }
  635. int ntasks = ws->per_worker[neighbor].queue_array->ntasks;
  636. if (ntasks)
  637. return neighbor;
  638. }
  639. return workerid;
  640. }
  641. static void lws_add_workers(unsigned sched_ctx_id, int *workerids,
  642. unsigned nworkers)
  643. {
  644. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  645. unsigned i;
  646. int workerid;
  647. ws_add_workers(sched_ctx_id, workerids, nworkers);
  648. #ifdef STARPU_HAVE_HWLOC
  649. /* Build a proximity list for every worker. It is cheaper to
  650. * build this once and then use it for popping tasks rather
  651. * than traversing the hwloc tree every time a task must be
  652. * stolen */
  653. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  654. struct starpu_tree *tree = (struct starpu_tree*)workers->workerids;
  655. for (i = 0; i < nworkers; i++)
  656. {
  657. workerid = workerids[i];
  658. ws->per_worker[workerid].proxlist = (int*)malloc(nworkers*sizeof(int));
  659. int bindid;
  660. struct starpu_tree *neighbour = NULL;
  661. struct starpu_sched_ctx_iterator it;
  662. workers->init_iterator(workers, &it);
  663. bindid = starpu_worker_get_bindid(workerid);
  664. it.value = starpu_tree_get(tree, bindid);
  665. int cnt = 0;
  666. for(;;)
  667. {
  668. neighbour = (struct starpu_tree*)it.value;
  669. int *neigh_workerids;
  670. int neigh_nworkers = starpu_bindid_get_workerids(neighbour->id, &neigh_workerids);
  671. int w;
  672. for(w = 0; w < neigh_nworkers; w++)
  673. {
  674. if(!it.visited[neigh_workerids[w]] && workers->present[neigh_workerids[w]])
  675. {
  676. ws->per_worker[workerid].proxlist[cnt++] = neigh_workerids[w];
  677. it.visited[neigh_workerids[w]] = 1;
  678. }
  679. }
  680. if(!workers->has_next(workers, &it))
  681. break;
  682. it.value = it.possible_value;
  683. it.possible_value = NULL;
  684. }
  685. }
  686. #endif
  687. }
  688. static void initialize_lws_policy(unsigned sched_ctx_id)
  689. {
  690. /* lws is loosely based on ws, except that it might use hwloc. */
  691. initialize_ws_policy(sched_ctx_id);
  692. #ifdef STARPU_HAVE_HWLOC
  693. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data *)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  694. ws->select_victim = lws_select_victim;
  695. #endif
  696. }
  697. struct starpu_sched_policy _starpu_sched_lws_policy =
  698. {
  699. .init_sched = initialize_lws_policy,
  700. .deinit_sched = deinit_ws_policy,
  701. .add_workers = lws_add_workers,
  702. .remove_workers = ws_remove_workers,
  703. .push_task = ws_push_task,
  704. .pop_task = ws_pop_task,
  705. .pre_exec_hook = NULL,
  706. .post_exec_hook = NULL,
  707. .pop_every_task = NULL,
  708. .policy_name = "lws",
  709. .policy_description = "locality work stealing",
  710. #ifdef STARPU_HAVE_HWLOC
  711. .worker_type = STARPU_WORKER_TREE,
  712. #else
  713. .worker_type = STARPU_WORKER_LIST,
  714. #endif
  715. };