policy_tools.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2012 INRIA
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. /* #include <sched_ctx_hypervisor.h> */
  17. /* #include <pthread.h> */
  18. #include "policy_tools.h"
  19. static int _compute_priority(unsigned sched_ctx)
  20. {
  21. struct starpu_sched_ctx_hypervisor_policy_config *config = sched_ctx_hypervisor_get_config(sched_ctx);
  22. int total_priority = 0;
  23. struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx);
  24. int worker;
  25. if(workers->init_cursor)
  26. workers->init_cursor(workers);
  27. while(workers->has_next(workers))
  28. {
  29. worker = workers->get_next(workers);
  30. total_priority += config->priority[worker];
  31. }
  32. if (workers->deinit_cursor)
  33. workers->deinit_cursor(workers);
  34. return total_priority;
  35. }
  36. /* find the context with the slowest priority */
  37. unsigned _find_poor_sched_ctx(unsigned req_sched_ctx, int nworkers_to_move)
  38. {
  39. int i;
  40. int highest_priority = -1;
  41. int current_priority = 0;
  42. unsigned sched_ctx = STARPU_NMAX_SCHED_CTXS;
  43. int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
  44. int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
  45. struct starpu_sched_ctx_hypervisor_policy_config *config = NULL;
  46. for(i = 0; i < nsched_ctxs; i++)
  47. {
  48. if(sched_ctxs[i] != STARPU_NMAX_SCHED_CTXS && sched_ctxs[i] != req_sched_ctx)
  49. {
  50. unsigned nworkers = starpu_sched_ctx_get_nworkers(sched_ctxs[i]);
  51. config = sched_ctx_hypervisor_get_config(sched_ctxs[i]);
  52. if((nworkers + nworkers_to_move) <= config->max_nworkers)
  53. {
  54. current_priority = _compute_priority(sched_ctxs[i]);
  55. if (highest_priority < current_priority)
  56. {
  57. highest_priority = current_priority;
  58. sched_ctx = sched_ctxs[i];
  59. }
  60. }
  61. }
  62. }
  63. return sched_ctx;
  64. }
  65. int* _get_first_workers_in_list(int *workers, int nall_workers, unsigned *nworkers, enum starpu_archtype arch)
  66. {
  67. int *curr_workers = (int*)malloc((*nworkers)*sizeof(int));
  68. int w, worker;
  69. int nfound_workers = 0;
  70. for(w = 0; w < nall_workers; w++)
  71. {
  72. worker = workers == NULL ? w : workers[w];
  73. enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
  74. if(arch == STARPU_ANY_WORKER || curr_arch == arch)
  75. {
  76. curr_workers[nfound_workers++] = worker;
  77. }
  78. if(nfound_workers == *nworkers)
  79. break;
  80. }
  81. if(nfound_workers < *nworkers)
  82. *nworkers = nfound_workers;
  83. return curr_workers;
  84. }
  85. /* get first nworkers with the highest idle time in the context */
  86. int* _get_first_workers(unsigned sched_ctx, int *nworkers, enum starpu_archtype arch)
  87. {
  88. struct starpu_sched_ctx_hypervisor_wrapper* sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctx);
  89. struct starpu_sched_ctx_hypervisor_policy_config *config = sched_ctx_hypervisor_get_config(sched_ctx);
  90. int *curr_workers = (int*)malloc((*nworkers) * sizeof(int));
  91. int i;
  92. for(i = 0; i < *nworkers; i++)
  93. curr_workers[i] = -1;
  94. struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx);
  95. int index;
  96. int worker;
  97. int considered = 0;
  98. if(workers->init_cursor)
  99. workers->init_cursor(workers);
  100. for(index = 0; index < *nworkers; index++)
  101. {
  102. while(workers->has_next(workers))
  103. {
  104. considered = 0;
  105. worker = workers->get_next(workers);
  106. enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
  107. if(arch == STARPU_ANY_WORKER || curr_arch == arch)
  108. {
  109. if(!config->fixed_workers[worker])
  110. {
  111. for(i = 0; i < index; i++)
  112. {
  113. if(curr_workers[i] == worker)
  114. {
  115. considered = 1;
  116. break;
  117. }
  118. }
  119. if(!considered)
  120. {
  121. /* the first iteration*/
  122. if(curr_workers[index] < 0)
  123. curr_workers[index] = worker;
  124. /* small priority worker is the first to leave the ctx*/
  125. else if(config->priority[worker] <
  126. config->priority[curr_workers[index]])
  127. curr_workers[index] = worker;
  128. /* if we don't consider priorities check for the workers
  129. with the biggest idle time */
  130. else if(config->priority[worker] ==
  131. config->priority[curr_workers[index]])
  132. {
  133. double worker_idle_time = sc_w->current_idle_time[worker];
  134. double curr_worker_idle_time = sc_w->current_idle_time[curr_workers[index]];
  135. if(worker_idle_time > curr_worker_idle_time)
  136. curr_workers[index] = worker;
  137. }
  138. }
  139. }
  140. }
  141. }
  142. if(curr_workers[index] < 0)
  143. {
  144. *nworkers = index;
  145. break;
  146. }
  147. }
  148. if (workers->deinit_cursor)
  149. workers->deinit_cursor(workers);
  150. return curr_workers;
  151. }
  152. /* get the number of workers in the context that are allowed to be moved (that are not fixed) */
  153. unsigned _get_potential_nworkers(struct starpu_sched_ctx_hypervisor_policy_config *config, unsigned sched_ctx, enum starpu_archtype arch)
  154. {
  155. struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx);
  156. unsigned potential_workers = 0;
  157. int worker;
  158. if(workers->init_cursor)
  159. workers->init_cursor(workers);
  160. while(workers->has_next(workers))
  161. {
  162. worker = workers->get_next(workers);
  163. enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
  164. if(arch == STARPU_ANY_WORKER || curr_arch == arch)
  165. {
  166. if(!config->fixed_workers[worker])
  167. potential_workers++;
  168. }
  169. }
  170. if (workers->deinit_cursor)
  171. workers->deinit_cursor(workers);
  172. return potential_workers;
  173. }
  174. /* compute the number of workers that should be moved depending:
  175. - on the min/max number of workers in a context imposed by the user,
  176. - on the resource granularity imposed by the user for the resizing process*/
  177. int _get_nworkers_to_move(unsigned req_sched_ctx)
  178. {
  179. struct starpu_sched_ctx_hypervisor_policy_config *config = sched_ctx_hypervisor_get_config(req_sched_ctx);
  180. unsigned nworkers = starpu_sched_ctx_get_nworkers(req_sched_ctx);
  181. unsigned nworkers_to_move = 0;
  182. unsigned potential_moving_workers = _get_potential_nworkers(config, req_sched_ctx, STARPU_ANY_WORKER);
  183. if(potential_moving_workers > 0)
  184. {
  185. if(potential_moving_workers <= config->min_nworkers)
  186. /* if we have to give more than min better give it all */
  187. /* => empty ctx will block until having the required workers */
  188. nworkers_to_move = potential_moving_workers;
  189. else if(potential_moving_workers > config->max_nworkers)
  190. {
  191. if((potential_moving_workers - config->granularity) > config->max_nworkers)
  192. // nworkers_to_move = config->granularity;
  193. nworkers_to_move = potential_moving_workers;
  194. else
  195. nworkers_to_move = potential_moving_workers - config->max_nworkers;
  196. }
  197. else if(potential_moving_workers > config->granularity)
  198. {
  199. if((nworkers - config->granularity) > config->min_nworkers)
  200. nworkers_to_move = config->granularity;
  201. else
  202. nworkers_to_move = potential_moving_workers - config->min_nworkers;
  203. }
  204. else
  205. {
  206. int nfixed_workers = nworkers - potential_moving_workers;
  207. if(nfixed_workers >= config->min_nworkers)
  208. nworkers_to_move = potential_moving_workers;
  209. else
  210. nworkers_to_move = potential_moving_workers - (config->min_nworkers - nfixed_workers);
  211. }
  212. if((nworkers - nworkers_to_move) > config->max_nworkers)
  213. nworkers_to_move = nworkers - config->max_nworkers;
  214. }
  215. return nworkers_to_move;
  216. }
  217. unsigned _resize(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, unsigned force_resize, unsigned now)
  218. {
  219. int ret = 1;
  220. if(force_resize)
  221. pthread_mutex_lock(&act_hypervisor_mutex);
  222. else
  223. ret = pthread_mutex_trylock(&act_hypervisor_mutex);
  224. if(ret != EBUSY)
  225. {
  226. int nworkers_to_move = _get_nworkers_to_move(sender_sched_ctx);
  227. if(nworkers_to_move > 0)
  228. {
  229. unsigned poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  230. if(receiver_sched_ctx == STARPU_NMAX_SCHED_CTXS)
  231. {
  232. poor_sched_ctx = _find_poor_sched_ctx(sender_sched_ctx, nworkers_to_move);
  233. }
  234. else
  235. {
  236. poor_sched_ctx = receiver_sched_ctx;
  237. struct starpu_sched_ctx_hypervisor_policy_config *config = sched_ctx_hypervisor_get_config(poor_sched_ctx);
  238. unsigned nworkers = starpu_sched_ctx_get_nworkers(poor_sched_ctx);
  239. unsigned nshared_workers = starpu_sched_ctx_get_nshared_workers(sender_sched_ctx, poor_sched_ctx);
  240. if((nworkers+nworkers_to_move-nshared_workers) > config->max_nworkers)
  241. nworkers_to_move = nworkers > config->max_nworkers ? 0 : (config->max_nworkers - nworkers+nshared_workers);
  242. if(nworkers_to_move == 0) poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  243. }
  244. if(poor_sched_ctx != STARPU_NMAX_SCHED_CTXS)
  245. {
  246. int *workers_to_move = _get_first_workers(sender_sched_ctx, &nworkers_to_move, STARPU_ANY_WORKER);
  247. sched_ctx_hypervisor_move_workers(sender_sched_ctx, poor_sched_ctx, workers_to_move, nworkers_to_move, now);
  248. struct starpu_sched_ctx_hypervisor_policy_config *new_config = sched_ctx_hypervisor_get_config(poor_sched_ctx);
  249. int i;
  250. for(i = 0; i < nworkers_to_move; i++)
  251. new_config->max_idle[workers_to_move[i]] = new_config->max_idle[workers_to_move[i]] !=MAX_IDLE_TIME ? new_config->max_idle[workers_to_move[i]] : new_config->new_workers_max_idle;
  252. free(workers_to_move);
  253. }
  254. }
  255. pthread_mutex_unlock(&act_hypervisor_mutex);
  256. return 1;
  257. }
  258. return 0;
  259. }
  260. unsigned _resize_to_unknown_receiver(unsigned sender_sched_ctx, unsigned now)
  261. {
  262. return _resize(sender_sched_ctx, STARPU_NMAX_SCHED_CTXS, 0, now);
  263. }
  264. static double _get_elapsed_flops(struct starpu_sched_ctx_hypervisor_wrapper* sc_w, int *npus, enum starpu_archtype req_arch)
  265. {
  266. double ret_val = 0.0;
  267. struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sc_w->sched_ctx);
  268. int worker;
  269. if(workers->init_cursor)
  270. workers->init_cursor(workers);
  271. while(workers->has_next(workers))
  272. {
  273. worker = workers->get_next(workers);
  274. enum starpu_archtype arch = starpu_worker_get_type(worker);
  275. if(arch == req_arch)
  276. {
  277. ret_val += sc_w->elapsed_flops[worker];
  278. (*npus)++;
  279. }
  280. }
  281. if (workers->deinit_cursor)
  282. workers->deinit_cursor(workers);
  283. return ret_val;
  284. }
  285. double _get_ctx_velocity(struct starpu_sched_ctx_hypervisor_wrapper* sc_w)
  286. {
  287. double elapsed_flops = sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(sc_w);
  288. double total_elapsed_flops = sched_ctx_hypervisor_get_total_elapsed_flops_per_sched_ctx(sc_w);
  289. double prc = elapsed_flops/sc_w->total_flops;
  290. unsigned nworkers = starpu_sched_ctx_get_nworkers(sc_w->sched_ctx);
  291. double redim_sample = elapsed_flops == total_elapsed_flops ? HYPERVISOR_START_REDIM_SAMPLE*nworkers : HYPERVISOR_REDIM_SAMPLE*nworkers;
  292. if(prc >= redim_sample)
  293. {
  294. double curr_time = starpu_timing_now();
  295. double elapsed_time = curr_time - sc_w->start_time;
  296. return elapsed_flops/elapsed_time;
  297. }
  298. return 0.0;
  299. }
  300. /* compute an average value of the cpu velocity */
  301. double _get_velocity_per_worker_type(struct starpu_sched_ctx_hypervisor_wrapper* sc_w, enum starpu_archtype arch)
  302. {
  303. int npus = 0;
  304. double elapsed_flops = _get_elapsed_flops(sc_w, &npus, arch);
  305. if( elapsed_flops != 0.0)
  306. {
  307. double curr_time = starpu_timing_now();
  308. double elapsed_time = curr_time - sc_w->start_time;
  309. return (elapsed_flops/elapsed_time) / npus;
  310. }
  311. return -1.0;
  312. }
  313. /* check if there is a big velocity gap between the contexts */
  314. int _velocity_gap_btw_ctxs()
  315. {
  316. int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
  317. int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
  318. int i = 0, j = 0;
  319. struct starpu_sched_ctx_hypervisor_wrapper* sc_w;
  320. struct starpu_sched_ctx_hypervisor_wrapper* other_sc_w;
  321. for(i = 0; i < nsched_ctxs; i++)
  322. {
  323. sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctxs[i]);
  324. double ctx_v = _get_ctx_velocity(sc_w);
  325. if(ctx_v != 0.0)
  326. {
  327. for(j = 0; j < nsched_ctxs; j++)
  328. {
  329. if(sched_ctxs[i] != sched_ctxs[j])
  330. {
  331. other_sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctxs[j]);
  332. double other_ctx_v = _get_ctx_velocity(other_sc_w);
  333. if(other_ctx_v != 0.0)
  334. {
  335. double gap = ctx_v < other_ctx_v ? other_ctx_v / ctx_v : ctx_v / other_ctx_v ;
  336. if(gap > 1.5)
  337. return 1;
  338. }
  339. else
  340. return 1;
  341. }
  342. }
  343. }
  344. }
  345. return 0;
  346. }
  347. void _get_total_nw(int *workers, int nworkers, int ntypes_of_workers, int total_nw[ntypes_of_workers])
  348. {
  349. int current_nworkers = workers == NULL ? starpu_worker_get_count() : nworkers;
  350. int w;
  351. for(w = 0; w < ntypes_of_workers; w++)
  352. total_nw[w] = 0;
  353. for(w = 0; w < current_nworkers; w++)
  354. {
  355. enum starpu_archtype arch = workers == NULL ? starpu_worker_get_type(w) :
  356. starpu_worker_get_type(workers[w]);
  357. if(arch == STARPU_CPU_WORKER)
  358. total_nw[1]++;
  359. else
  360. total_nw[0]++;
  361. }
  362. }