work_stealing_policy.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2015 Université de Bordeaux
  4. * Copyright (C) 2010, 2011, 2012, 2013 CNRS
  5. * Copyright (C) 2011, 2012 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. /* Work stealing policy */
  19. #include <float.h>
  20. #include <core/workers.h>
  21. #include <sched_policies/fifo_queues.h>
  22. #include <core/debug.h>
  23. #include <starpu_scheduler.h>
  24. #ifdef HAVE_AYUDAME_H
  25. #include <Ayudame.h>
  26. #endif
  27. /* Experimental (dead) code which needs to be tested, fixed... */
  28. /* #define USE_OVERLOAD */
  29. struct _starpu_work_stealing_data
  30. {
  31. unsigned (*select_victim)(unsigned, int);
  32. struct _starpu_fifo_taskq **queue_array;
  33. int **proxlist;
  34. /* keep track of the work performed from the beginning of the algorithm to make
  35. * better decisions about which queue to select when stealing or deferring work
  36. */
  37. unsigned last_pop_worker;
  38. unsigned last_push_worker;
  39. };
  40. #ifdef USE_OVERLOAD
  41. /**
  42. * Minimum number of task we wait for being processed before we start assuming
  43. * on which worker the computation would be faster.
  44. */
  45. static int calibration_value = 0;
  46. #endif /* USE_OVERLOAD */
  47. /**
  48. * Return a worker from which a task can be stolen.
  49. * Selecting a worker is done in a round-robin fashion, unless
  50. * the worker previously selected doesn't own any task,
  51. * then we return the first non-empty worker.
  52. */
  53. static unsigned select_victim_round_robin(unsigned sched_ctx_id)
  54. {
  55. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  56. unsigned worker = ws->last_pop_worker;
  57. unsigned nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
  58. /* If the worker's queue is empty, let's try
  59. * the next ones */
  60. while (1)
  61. {
  62. unsigned ntasks;
  63. /* Here helgrind would shout that this is unprotected, but we
  64. * are fine with getting outdated values, this is just an
  65. * estimation */
  66. ntasks = ws->queue_array[worker]->ntasks;
  67. if (ntasks)
  68. break;
  69. worker = (worker + 1) % nworkers;
  70. if (worker == ws->last_pop_worker)
  71. {
  72. /* We got back to the first worker,
  73. * don't go in infinite loop */
  74. break;
  75. }
  76. }
  77. ws->last_pop_worker = (worker + 1) % nworkers;
  78. return worker;
  79. }
  80. /**
  81. * Return a worker to whom add a task.
  82. * Selecting a worker is done in a round-robin fashion.
  83. */
  84. static unsigned select_worker_round_robin(unsigned sched_ctx_id)
  85. {
  86. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  87. unsigned worker = ws->last_push_worker;
  88. unsigned nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
  89. ws->last_push_worker = (ws->last_push_worker + 1) % nworkers;
  90. return worker;
  91. }
  92. #ifdef USE_OVERLOAD
  93. /**
  94. * Return a ratio helpful to determine whether a worker is suitable to steal
  95. * tasks from or to put some tasks in its queue.
  96. *
  97. * \return a ratio with a positive or negative value, describing the current state of the worker :
  98. * a smaller value implies a faster worker with an relatively emptier queue : more suitable to put tasks in
  99. * a bigger value implies a slower worker with an reletively more replete queue : more suitable to steal tasks from
  100. */
  101. static float overload_metric(unsigned sched_ctx_id, unsigned id)
  102. {
  103. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  104. float execution_ratio = 0.0f;
  105. float current_ratio = 0.0f;
  106. int nprocessed = _starpu_get_deque_nprocessed(ws->queue_array[id]);
  107. unsigned njobs = _starpu_get_deque_njobs(ws->queue_array[id]);
  108. /* Did we get enough information ? */
  109. if (performed_total > 0 && nprocessed > 0)
  110. {
  111. /* How fast or slow is the worker compared to the other workers */
  112. execution_ratio = (float) nprocessed / performed_total;
  113. /* How replete is its queue */
  114. current_ratio = (float) njobs / nprocessed;
  115. }
  116. else
  117. {
  118. return 0.0f;
  119. }
  120. return (current_ratio - execution_ratio);
  121. }
  122. /**
  123. * Return the most suitable worker from which a task can be stolen.
  124. * The number of previously processed tasks, total and local,
  125. * and the number of tasks currently awaiting to be processed
  126. * by the tasks are taken into account to select the most suitable
  127. * worker to steal task from.
  128. */
  129. static unsigned select_victim_overload(unsigned sched_ctx_id)
  130. {
  131. unsigned worker;
  132. float worker_ratio;
  133. unsigned best_worker = 0;
  134. float best_ratio = FLT_MIN;
  135. /* Don't try to play smart until we get
  136. * enough informations. */
  137. if (performed_total < calibration_value)
  138. return select_victim_round_robin(sched_ctx_id);
  139. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  140. struct starpu_sched_ctx_iterator it;
  141. workers->init_iterator(workers, &it);
  142. while(workers->has_next(workers, &it))
  143. {
  144. worker = workers->get_next(workers, &it);
  145. worker_ratio = overload_metric(sched_ctx_id, worker);
  146. if (worker_ratio > best_ratio)
  147. {
  148. best_worker = worker;
  149. best_ratio = worker_ratio;
  150. }
  151. }
  152. return best_worker;
  153. }
  154. /**
  155. * Return the most suitable worker to whom add a task.
  156. * The number of previously processed tasks, total and local,
  157. * and the number of tasks currently awaiting to be processed
  158. * by the tasks are taken into account to select the most suitable
  159. * worker to add a task to.
  160. */
  161. static unsigned select_worker_overload(unsigned sched_ctx_id)
  162. {
  163. unsigned worker;
  164. float worker_ratio;
  165. unsigned best_worker = 0;
  166. float best_ratio = FLT_MAX;
  167. /* Don't try to play smart until we get
  168. * enough informations. */
  169. if (performed_total < calibration_value)
  170. return select_worker_round_robin(sched_ctx_id);
  171. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  172. struct starpu_sched_ctx_iterator it;
  173. workers->init_iterator(workers, &it);
  174. while(workers->has_next(workers, &it))
  175. {
  176. worker = workers->get_next(workers, &it);
  177. worker_ratio = overload_metric(sched_ctx_id, worker);
  178. if (worker_ratio < best_ratio)
  179. {
  180. best_worker = worker;
  181. best_ratio = worker_ratio;
  182. }
  183. }
  184. return best_worker;
  185. }
  186. #endif /* USE_OVERLOAD */
  187. /**
  188. * Return a worker from which a task can be stolen.
  189. * This is a phony function used to call the right
  190. * function depending on the value of USE_OVERLOAD.
  191. */
  192. static inline unsigned select_victim(unsigned sched_ctx_id,
  193. int workerid STARPU_ATTRIBUTE_UNUSED)
  194. {
  195. #ifdef USE_OVERLOAD
  196. return select_victim_overload(sched_ctx_id);
  197. #else
  198. return select_victim_round_robin(sched_ctx_id);
  199. #endif /* USE_OVERLOAD */
  200. }
  201. /**
  202. * Return a worker from which a task can be stolen.
  203. * This is a phony function used to call the right
  204. * function depending on the value of USE_OVERLOAD.
  205. */
  206. static inline unsigned select_worker(unsigned sched_ctx_id)
  207. {
  208. #ifdef USE_OVERLOAD
  209. return select_worker_overload(sched_ctx_id);
  210. #else
  211. return select_worker_round_robin(sched_ctx_id);
  212. #endif /* USE_OVERLOAD */
  213. }
  214. /* Note: this is not scalable work stealing, use lws instead */
  215. static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
  216. {
  217. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  218. struct starpu_task *task;
  219. int workerid = starpu_worker_get_id();
  220. STARPU_ASSERT(workerid != -1);
  221. task = _starpu_fifo_pop_task(ws->queue_array[workerid], workerid);
  222. if (task)
  223. {
  224. /* there was a local task */
  225. return task;
  226. }
  227. starpu_pthread_mutex_t *worker_sched_mutex;
  228. starpu_pthread_cond_t *worker_sched_cond;
  229. starpu_worker_get_sched_condition(workerid, &worker_sched_mutex, &worker_sched_cond);
  230. /* Note: Releasing this mutex before taking the victim mutex, to avoid interlock*/
  231. STARPU_PTHREAD_MUTEX_UNLOCK(worker_sched_mutex);
  232. /* we need to steal someone's job */
  233. unsigned victim = ws->select_victim(sched_ctx_id, workerid);
  234. starpu_pthread_mutex_t *victim_sched_mutex;
  235. starpu_pthread_cond_t *victim_sched_cond;
  236. starpu_worker_get_sched_condition(victim, &victim_sched_mutex, &victim_sched_cond);
  237. STARPU_PTHREAD_MUTEX_LOCK(victim_sched_mutex);
  238. task = _starpu_fifo_pop_task(ws->queue_array[victim], workerid);
  239. if (task)
  240. {
  241. _STARPU_TRACE_WORK_STEALING(workerid, victim);
  242. }
  243. STARPU_PTHREAD_MUTEX_UNLOCK(victim_sched_mutex);
  244. STARPU_PTHREAD_MUTEX_LOCK(worker_sched_mutex);
  245. if(!task)
  246. {
  247. task = _starpu_fifo_pop_task(ws->queue_array[workerid], workerid);
  248. if (task)
  249. {
  250. /* there was a local task */
  251. return task;
  252. }
  253. }
  254. return task;
  255. }
  256. static
  257. int ws_push_task(struct starpu_task *task)
  258. {
  259. unsigned sched_ctx_id = task->sched_ctx;
  260. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  261. int workerid = starpu_worker_get_id();
  262. /* If the current thread is not a worker but
  263. * the main thread (-1), we find the better one to
  264. * put task on its queue */
  265. if (workerid == -1)
  266. workerid = select_worker(sched_ctx_id);
  267. starpu_pthread_mutex_t *sched_mutex;
  268. starpu_pthread_cond_t *sched_cond;
  269. starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
  270. STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  271. #ifdef HAVE_AYUDAME_H
  272. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  273. if (AYU_event)
  274. {
  275. intptr_t id = workerid;
  276. AYU_event(AYU_ADDTASKTOQUEUE, j->job_id, &id);
  277. }
  278. #endif
  279. _starpu_fifo_push_task(ws->queue_array[workerid], task);
  280. starpu_push_task_end(task);
  281. STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  282. #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
  283. /* TODO: implement fine-grain signaling, similar to what eager does */
  284. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  285. struct starpu_sched_ctx_iterator it;
  286. workers->init_iterator(workers, &it);
  287. while(workers->has_next(workers, &it))
  288. starpu_wake_worker(workers->get_next(workers, &it));
  289. #endif
  290. return 0;
  291. }
  292. static void ws_add_workers(unsigned sched_ctx_id, int *workerids,unsigned nworkers)
  293. {
  294. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  295. unsigned i;
  296. int workerid;
  297. for (i = 0; i < nworkers; i++)
  298. {
  299. workerid = workerids[i];
  300. starpu_sched_ctx_worker_shares_tasks_lists(workerid, sched_ctx_id);
  301. ws->queue_array[workerid] = _starpu_create_fifo();
  302. /* Tell helgrid that we are fine with getting outdated values,
  303. * this is just an estimation */
  304. STARPU_HG_DISABLE_CHECKING(ws->queue_array[workerid]->ntasks);
  305. }
  306. }
  307. static void ws_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  308. {
  309. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  310. unsigned i;
  311. int workerid;
  312. for (i = 0; i < nworkers; i++)
  313. {
  314. workerid = workerids[i];
  315. _starpu_destroy_fifo(ws->queue_array[workerid]);
  316. if (ws->proxlist)
  317. free(ws->proxlist[workerid]);
  318. }
  319. }
  320. static void initialize_ws_policy(unsigned sched_ctx_id)
  321. {
  322. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)malloc(sizeof(struct _starpu_work_stealing_data));
  323. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)ws);
  324. ws->last_pop_worker = 0;
  325. ws->last_push_worker = 0;
  326. ws->proxlist = NULL;
  327. ws->select_victim = select_victim;
  328. unsigned nw = starpu_worker_get_count();
  329. ws->queue_array = (struct _starpu_fifo_taskq**)malloc(nw*sizeof(struct _starpu_fifo_taskq*));
  330. }
  331. static void deinit_ws_policy(unsigned sched_ctx_id)
  332. {
  333. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  334. free(ws->queue_array);
  335. free(ws->proxlist);
  336. free(ws);
  337. }
  338. struct starpu_sched_policy _starpu_sched_ws_policy =
  339. {
  340. .init_sched = initialize_ws_policy,
  341. .deinit_sched = deinit_ws_policy,
  342. .add_workers = ws_add_workers,
  343. .remove_workers = ws_remove_workers,
  344. .push_task = ws_push_task,
  345. .pop_task = ws_pop_task,
  346. .pre_exec_hook = NULL,
  347. .post_exec_hook = NULL,
  348. .pop_every_task = NULL,
  349. .policy_name = "ws",
  350. .policy_description = "work stealing",
  351. .worker_type = STARPU_WORKER_LIST,
  352. };
  353. /* local work stealing policy */
  354. /* Return a worker to steal a task from. The worker is selected according to
  355. * the proximity list built using the info on te architecture provided by hwloc
  356. */
  357. static unsigned lws_select_victim(unsigned sched_ctx_id, int workerid)
  358. {
  359. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data *)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  360. int nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
  361. int neighbor;
  362. int i;
  363. for (i = 0; i < nworkers; i++)
  364. {
  365. neighbor = ws->proxlist[workerid][i];
  366. int ntasks = ws->queue_array[neighbor]->ntasks;
  367. if (ntasks)
  368. return neighbor;
  369. }
  370. return workerid;
  371. }
  372. static void lws_add_workers(unsigned sched_ctx_id, int *workerids,
  373. unsigned nworkers)
  374. {
  375. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  376. unsigned i;
  377. int workerid;
  378. ws_add_workers(sched_ctx_id, workerids, nworkers);
  379. #ifdef STARPU_HAVE_HWLOC
  380. /* Build a proximity list for every worker. It is cheaper to
  381. * build this once and then use it for popping tasks rather
  382. * than traversing the hwloc tree every time a task must be
  383. * stolen */
  384. ws->proxlist = (int**)malloc(nworkers*sizeof(int*));
  385. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  386. struct starpu_tree *tree = (struct starpu_tree*)workers->workerids;
  387. for (i = 0; i < nworkers; i++)
  388. {
  389. workerid = workerids[i];
  390. ws->proxlist[workerid] = (int*)malloc(nworkers*sizeof(int));
  391. int bindid;
  392. struct starpu_tree *neighbour = NULL;
  393. struct starpu_sched_ctx_iterator it;
  394. workers->init_iterator(workers, &it);
  395. bindid = starpu_worker_get_bindid(workerid);
  396. it.value = starpu_tree_get(tree, bindid);
  397. int cnt = 0;
  398. for(;;)
  399. {
  400. neighbour = (struct starpu_tree*)it.value;
  401. int neigh_workerids[STARPU_NMAXWORKERS];
  402. int neigh_nworkers = starpu_worker_get_workerids(neighbour->id, neigh_workerids);
  403. int w;
  404. for(w = 0; w < neigh_nworkers; w++)
  405. {
  406. if(!it.visited[neigh_workerids[w]] && workers->present[neigh_workerids[w]])
  407. {
  408. ws->proxlist[workerid][cnt++] = neigh_workerids[w];
  409. it.visited[neigh_workerids[w]] = 1;
  410. }
  411. }
  412. if(!workers->has_next(workers, &it))
  413. break;
  414. it.value = it.possible_value;
  415. it.possible_value = NULL;
  416. }
  417. }
  418. #endif
  419. }
  420. static void initialize_lws_policy(unsigned sched_ctx_id)
  421. {
  422. /* lws is loosely based on ws, except that it might use hwloc. */
  423. initialize_ws_policy(sched_ctx_id);
  424. #ifdef STARPU_HAVE_HWLOC
  425. struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data *)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  426. ws->select_victim = lws_select_victim;
  427. #endif
  428. }
  429. struct starpu_sched_policy _starpu_sched_lws_policy =
  430. {
  431. .init_sched = initialize_lws_policy,
  432. .deinit_sched = deinit_ws_policy,
  433. .add_workers = lws_add_workers,
  434. .remove_workers = ws_remove_workers,
  435. .push_task = ws_push_task,
  436. .pop_task = ws_pop_task,
  437. .pre_exec_hook = NULL,
  438. .post_exec_hook = NULL,
  439. .pop_every_task = NULL,
  440. .policy_name = "lws",
  441. .policy_description = "locality work stealing",
  442. #ifdef STARPU_HAVE_HWLOC
  443. .worker_type = STARPU_WORKER_TREE,
  444. #else
  445. .worker_type = STARPU_WORKER_LIST,
  446. #endif
  447. };