policy_tools.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. /* #include <sched_ctx_hypervisor.h> */
  2. /* #include <pthread.h> */
  3. #include "policy_tools.h"
  4. //enum starpu_archtype STARPU_ALL;
  5. static int _compute_priority(unsigned sched_ctx)
  6. {
  7. struct policy_config *config = sched_ctx_hypervisor_get_config(sched_ctx);
  8. int total_priority = 0;
  9. struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx);
  10. int worker;
  11. if(workers->init_cursor)
  12. workers->init_cursor(workers);
  13. while(workers->has_next(workers))
  14. {
  15. worker = workers->get_next(workers);
  16. total_priority += config->priority[worker];
  17. }
  18. if(workers->init_cursor)
  19. workers->deinit_cursor(workers);
  20. return total_priority;
  21. }
  22. /* find the context with the slowest priority */
  23. unsigned _find_poor_sched_ctx(unsigned req_sched_ctx, int nworkers_to_move)
  24. {
  25. int i;
  26. int highest_priority = -1;
  27. int current_priority = 0;
  28. unsigned sched_ctx = STARPU_NMAX_SCHED_CTXS;
  29. int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
  30. int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
  31. struct policy_config *config = NULL;
  32. for(i = 0; i < nsched_ctxs; i++)
  33. {
  34. if(sched_ctxs[i] != STARPU_NMAX_SCHED_CTXS && sched_ctxs[i] != req_sched_ctx)
  35. {
  36. unsigned nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctxs[i]);
  37. config = sched_ctx_hypervisor_get_config(sched_ctxs[i]);
  38. if((nworkers + nworkers_to_move) <= config->max_nworkers)
  39. {
  40. current_priority = _compute_priority(sched_ctxs[i]);
  41. if (highest_priority < current_priority)
  42. {
  43. highest_priority = current_priority;
  44. sched_ctx = sched_ctxs[i];
  45. }
  46. }
  47. }
  48. }
  49. return sched_ctx;
  50. }
  51. int* _get_first_workers_in_list(int *workers, int nall_workers, unsigned *nworkers, enum starpu_archtype arch)
  52. {
  53. int *curr_workers = (int*)malloc((*nworkers)*sizeof(int));
  54. int w, worker;
  55. int nfound_workers = 0;
  56. for(w = 0; w < nall_workers; w++)
  57. {
  58. worker = workers == NULL ? w : workers[w];
  59. enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
  60. if(arch == STARPU_ALL || curr_arch == arch)
  61. {
  62. curr_workers[nfound_workers++] = worker;
  63. }
  64. if(nfound_workers == *nworkers)
  65. break;
  66. }
  67. if(nfound_workers < *nworkers)
  68. *nworkers = nfound_workers;
  69. return curr_workers;
  70. }
  71. /* get first nworkers with the highest idle time in the context */
  72. int* _get_first_workers(unsigned sched_ctx, unsigned *nworkers, enum starpu_archtype arch)
  73. {
  74. struct sched_ctx_wrapper* sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctx);
  75. struct policy_config *config = sched_ctx_hypervisor_get_config(sched_ctx);
  76. int *curr_workers = (int*)malloc((*nworkers) * sizeof(int));
  77. int i;
  78. for(i = 0; i < *nworkers; i++)
  79. curr_workers[i] = -1;
  80. struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx);
  81. int index;
  82. int worker;
  83. int considered = 0;
  84. if(workers->init_cursor)
  85. workers->init_cursor(workers);
  86. for(index = 0; index < *nworkers; index++)
  87. {
  88. while(workers->has_next(workers))
  89. {
  90. considered = 0;
  91. worker = workers->get_next(workers);
  92. enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
  93. if(arch == STARPU_ALL || curr_arch == arch)
  94. {
  95. if(!config->fixed_workers[worker])
  96. {
  97. for(i = 0; i < index; i++)
  98. {
  99. if(curr_workers[i] == worker)
  100. {
  101. considered = 1;
  102. break;
  103. }
  104. }
  105. if(!considered)
  106. {
  107. /* the first iteration*/
  108. if(curr_workers[index] < 0)
  109. curr_workers[index] = worker;
  110. /* small priority worker is the first to leave the ctx*/
  111. else if(config->priority[worker] <
  112. config->priority[curr_workers[index]])
  113. curr_workers[index] = worker;
  114. /* if we don't consider priorities check for the workers
  115. with the biggest idle time */
  116. else if(config->priority[worker] ==
  117. config->priority[curr_workers[index]])
  118. {
  119. double worker_idle_time = sc_w->current_idle_time[worker];
  120. double curr_worker_idle_time = sc_w->current_idle_time[curr_workers[index]];
  121. if(worker_idle_time > curr_worker_idle_time)
  122. curr_workers[index] = worker;
  123. }
  124. }
  125. }
  126. }
  127. }
  128. if(curr_workers[index] < 0)
  129. {
  130. *nworkers = index;
  131. break;
  132. }
  133. }
  134. if(workers->init_cursor)
  135. workers->deinit_cursor(workers);
  136. return curr_workers;
  137. }
  138. /* get the number of workers in the context that are allowed to be moved (that are not fixed) */
  139. unsigned _get_potential_nworkers(struct policy_config *config, unsigned sched_ctx, enum starpu_archtype arch)
  140. {
  141. struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx);
  142. unsigned potential_workers = 0;
  143. int worker;
  144. if(workers->init_cursor)
  145. workers->init_cursor(workers);
  146. while(workers->has_next(workers))
  147. {
  148. worker = workers->get_next(workers);
  149. enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
  150. if(arch == STARPU_ALL || curr_arch == arch)
  151. {
  152. if(!config->fixed_workers[worker])
  153. potential_workers++;
  154. }
  155. }
  156. if(workers->init_cursor)
  157. workers->deinit_cursor(workers);
  158. return potential_workers;
  159. }
  160. /* compute the number of workers that should be moved depending:
  161. - on the min/max number of workers in a context imposed by the user,
  162. - on the resource granularity imposed by the user for the resizing process*/
  163. unsigned _get_nworkers_to_move(unsigned req_sched_ctx)
  164. {
  165. struct policy_config *config = sched_ctx_hypervisor_get_config(req_sched_ctx);
  166. unsigned nworkers = starpu_get_nworkers_of_sched_ctx(req_sched_ctx);
  167. unsigned nworkers_to_move = 0;
  168. unsigned potential_moving_workers = _get_potential_nworkers(config, req_sched_ctx, STARPU_ALL);
  169. if(potential_moving_workers > 0)
  170. {
  171. if(potential_moving_workers <= config->min_nworkers)
  172. /* if we have to give more than min better give it all */
  173. /* => empty ctx will block until having the required workers */
  174. nworkers_to_move = potential_moving_workers;
  175. else if(potential_moving_workers > config->max_nworkers)
  176. {
  177. if((potential_moving_workers - config->granularity) > config->max_nworkers)
  178. // nworkers_to_move = config->granularity;
  179. nworkers_to_move = potential_moving_workers;
  180. else
  181. nworkers_to_move = potential_moving_workers - config->max_nworkers;
  182. }
  183. else if(potential_moving_workers > config->granularity)
  184. {
  185. if((nworkers - config->granularity) > config->min_nworkers)
  186. nworkers_to_move = config->granularity;
  187. else
  188. nworkers_to_move = potential_moving_workers - config->min_nworkers;
  189. }
  190. else
  191. {
  192. int nfixed_workers = nworkers - potential_moving_workers;
  193. if(nfixed_workers >= config->min_nworkers)
  194. nworkers_to_move = potential_moving_workers;
  195. else
  196. nworkers_to_move = potential_moving_workers - (config->min_nworkers - nfixed_workers);
  197. }
  198. if((nworkers - nworkers_to_move) > config->max_nworkers)
  199. nworkers_to_move = nworkers - config->max_nworkers;
  200. }
  201. return nworkers_to_move;
  202. }
  203. unsigned _resize(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, unsigned force_resize)
  204. {
  205. int ret = 1;
  206. if(force_resize)
  207. pthread_mutex_lock(&act_hypervisor_mutex);
  208. else
  209. ret = pthread_mutex_trylock(&act_hypervisor_mutex);
  210. if(ret != EBUSY)
  211. {
  212. unsigned nworkers_to_move = _get_nworkers_to_move(sender_sched_ctx);
  213. if(nworkers_to_move > 0)
  214. {
  215. unsigned poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  216. if(receiver_sched_ctx == STARPU_NMAX_SCHED_CTXS)
  217. {
  218. poor_sched_ctx = _find_poor_sched_ctx(sender_sched_ctx, nworkers_to_move);
  219. }
  220. else
  221. {
  222. poor_sched_ctx = receiver_sched_ctx;
  223. struct policy_config *config = sched_ctx_hypervisor_get_config(poor_sched_ctx);
  224. unsigned nworkers = starpu_get_nworkers_of_sched_ctx(poor_sched_ctx);
  225. unsigned nshared_workers = starpu_get_nshared_workers(sender_sched_ctx, poor_sched_ctx);
  226. if((nworkers+nworkers_to_move-nshared_workers) > config->max_nworkers)
  227. nworkers_to_move = nworkers > config->max_nworkers ? 0 : (config->max_nworkers - nworkers+nshared_workers);
  228. if(nworkers_to_move == 0) poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  229. }
  230. if(poor_sched_ctx != STARPU_NMAX_SCHED_CTXS)
  231. {
  232. int *workers_to_move = _get_first_workers(sender_sched_ctx, &nworkers_to_move, STARPU_ALL);
  233. sched_ctx_hypervisor_move_workers(sender_sched_ctx, poor_sched_ctx, workers_to_move, nworkers_to_move);
  234. struct policy_config *new_config = sched_ctx_hypervisor_get_config(poor_sched_ctx);
  235. int i;
  236. for(i = 0; i < nworkers_to_move; i++)
  237. new_config->max_idle[workers_to_move[i]] = new_config->max_idle[workers_to_move[i]] !=MAX_IDLE_TIME ? new_config->max_idle[workers_to_move[i]] : new_config->new_workers_max_idle;
  238. free(workers_to_move);
  239. }
  240. }
  241. pthread_mutex_unlock(&act_hypervisor_mutex);
  242. return 1;
  243. }
  244. return 0;
  245. }
  246. unsigned _resize_to_unknown_receiver(unsigned sender_sched_ctx)
  247. {
  248. return _resize(sender_sched_ctx, STARPU_NMAX_SCHED_CTXS, 0);
  249. }
  250. static double _get_elapsed_flops(struct sched_ctx_wrapper* sc_w, int *npus, enum starpu_archtype req_arch)
  251. {
  252. double ret_val = 0.0;
  253. struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sc_w->sched_ctx);
  254. int worker;
  255. if(workers->init_cursor)
  256. workers->init_cursor(workers);
  257. while(workers->has_next(workers))
  258. {
  259. worker = workers->get_next(workers);
  260. enum starpu_archtype arch = starpu_worker_get_type(worker);
  261. if(arch == req_arch)
  262. {
  263. ret_val += sc_w->elapsed_flops[worker];
  264. (*npus)++;
  265. }
  266. }
  267. if(workers->init_cursor)
  268. workers->deinit_cursor(workers);
  269. return ret_val;
  270. }
  271. double _get_ctx_velocity(struct sched_ctx_wrapper* sc_w)
  272. {
  273. double elapsed_flops = sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(sc_w);
  274. double total_elapsed_flops = sched_ctx_hypervisor_get_total_elapsed_flops_per_sched_ctx(sc_w);
  275. double prc = elapsed_flops/sc_w->total_flops;
  276. unsigned nworkers = starpu_get_nworkers_of_sched_ctx(sc_w->sched_ctx);
  277. double redim_sample = elapsed_flops == total_elapsed_flops ? HYPERVISOR_START_REDIM_SAMPLE*nworkers : HYPERVISOR_REDIM_SAMPLE*nworkers;
  278. if(prc >= redim_sample)
  279. {
  280. double curr_time = starpu_timing_now();
  281. double elapsed_time = curr_time - sc_w->start_time;
  282. return elapsed_flops/elapsed_time;
  283. }
  284. return 0.0;
  285. }
  286. /* compute an average value of the cpu velocity */
  287. double _get_velocity_per_worker_type(struct sched_ctx_wrapper* sc_w, enum starpu_archtype arch)
  288. {
  289. int npus = 0;
  290. double elapsed_flops = _get_elapsed_flops(sc_w, &npus, arch);
  291. if( elapsed_flops != 0.0)
  292. {
  293. double curr_time = starpu_timing_now();
  294. double elapsed_time = curr_time - sc_w->start_time;
  295. return (elapsed_flops/elapsed_time) / npus;
  296. }
  297. return -1.0;
  298. }
  299. /* check if there is a big velocity gap between the contexts */
  300. int _velocity_gap_btw_ctxs()
  301. {
  302. int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
  303. int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
  304. int i = 0, j = 0;
  305. struct sched_ctx_wrapper* sc_w;
  306. struct sched_ctx_wrapper* other_sc_w;
  307. for(i = 0; i < nsched_ctxs; i++)
  308. {
  309. sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctxs[i]);
  310. double ctx_v = _get_ctx_velocity(sc_w);
  311. if(ctx_v != 0.0)
  312. {
  313. for(j = 0; j < nsched_ctxs; j++)
  314. {
  315. if(sched_ctxs[i] != sched_ctxs[j])
  316. {
  317. other_sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctxs[j]);
  318. double other_ctx_v = _get_ctx_velocity(other_sc_w);
  319. if(other_ctx_v != 0.0)
  320. {
  321. double gap = ctx_v < other_ctx_v ? other_ctx_v / ctx_v : ctx_v / other_ctx_v ;
  322. if(gap > 2)
  323. return 1;
  324. }
  325. }
  326. }
  327. }
  328. }
  329. return 0;
  330. }
  331. void _get_total_nw(int *workers, int nworkers, int ntypes_of_workers, double total_nw[ntypes_of_workers])
  332. {
  333. int current_nworkers = workers == NULL ? starpu_worker_get_count() : nworkers;
  334. int w;
  335. for(w = 0; w < ntypes_of_workers; w++)
  336. total_nw[w] = 0.0;
  337. for(w = 0; w < current_nworkers; w++)
  338. {
  339. enum starpu_perf_archtype arch = workers == NULL ? starpu_worker_get_type(w) :
  340. starpu_worker_get_type(workers[w]);
  341. if(arch == STARPU_CPU_WORKER)
  342. total_nw[1]++;
  343. else
  344. total_nw[0]++;
  345. }
  346. }