policy_tools.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2012 INRIA
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. /* #include <sched_ctx_hypervisor.h> */
  17. /* #include <pthread.h> */
  18. #include "policy_tools.h"
  19. static int _compute_priority(unsigned sched_ctx)
  20. {
  21. struct sched_ctx_hypervisor_policy_config *config = sched_ctx_hypervisor_get_config(sched_ctx);
  22. int total_priority = 0;
  23. struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx);
  24. int worker;
  25. struct starpu_iterator it;
  26. if(workers->init_iterator)
  27. workers->init_iterator(workers, &it);
  28. while(workers->has_next(workers, &it))
  29. {
  30. worker = workers->get_next(workers, &it);
  31. total_priority += config->priority[worker];
  32. }
  33. return total_priority;
  34. }
  35. /* find the context with the slowest priority */
  36. unsigned _find_poor_sched_ctx(unsigned req_sched_ctx, int nworkers_to_move)
  37. {
  38. int i;
  39. int highest_priority = -1;
  40. int current_priority = 0;
  41. unsigned sched_ctx = STARPU_NMAX_SCHED_CTXS;
  42. int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
  43. int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
  44. struct sched_ctx_hypervisor_policy_config *config = NULL;
  45. for(i = 0; i < nsched_ctxs; i++)
  46. {
  47. if(sched_ctxs[i] != STARPU_NMAX_SCHED_CTXS && sched_ctxs[i] != req_sched_ctx)
  48. {
  49. unsigned nworkers = starpu_sched_ctx_get_nworkers(sched_ctxs[i]);
  50. config = sched_ctx_hypervisor_get_config(sched_ctxs[i]);
  51. if((nworkers + nworkers_to_move) <= config->max_nworkers)
  52. {
  53. current_priority = _compute_priority(sched_ctxs[i]);
  54. if (highest_priority < current_priority)
  55. {
  56. highest_priority = current_priority;
  57. sched_ctx = sched_ctxs[i];
  58. }
  59. }
  60. }
  61. }
  62. return sched_ctx;
  63. }
  64. int* _get_first_workers_in_list(int *workers, int nall_workers, unsigned *nworkers, enum starpu_archtype arch)
  65. {
  66. int *curr_workers = (int*)malloc((*nworkers)*sizeof(int));
  67. int w, worker;
  68. int nfound_workers = 0;
  69. for(w = 0; w < nall_workers; w++)
  70. {
  71. worker = workers == NULL ? w : workers[w];
  72. enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
  73. if(arch == STARPU_ANY_WORKER || curr_arch == arch)
  74. {
  75. curr_workers[nfound_workers++] = worker;
  76. }
  77. if(nfound_workers == *nworkers)
  78. break;
  79. }
  80. if(nfound_workers < *nworkers)
  81. *nworkers = nfound_workers;
  82. return curr_workers;
  83. }
  84. /* get first nworkers with the highest idle time in the context */
  85. int* _get_first_workers(unsigned sched_ctx, int *nworkers, enum starpu_archtype arch)
  86. {
  87. struct sched_ctx_hypervisor_wrapper* sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctx);
  88. struct sched_ctx_hypervisor_policy_config *config = sched_ctx_hypervisor_get_config(sched_ctx);
  89. int *curr_workers = (int*)malloc((*nworkers) * sizeof(int));
  90. int i;
  91. for(i = 0; i < *nworkers; i++)
  92. curr_workers[i] = -1;
  93. struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx);
  94. int index;
  95. int worker;
  96. int considered = 0;
  97. struct starpu_iterator it;
  98. if(workers->init_iterator)
  99. workers->init_iterator(workers, &it);
  100. for(index = 0; index < *nworkers; index++)
  101. {
  102. while(workers->has_next(workers, &it))
  103. {
  104. considered = 0;
  105. worker = workers->get_next(workers, &it);
  106. enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
  107. if(arch == STARPU_ANY_WORKER || curr_arch == arch)
  108. {
  109. if(!config->fixed_workers[worker])
  110. {
  111. for(i = 0; i < index; i++)
  112. {
  113. if(curr_workers[i] == worker)
  114. {
  115. considered = 1;
  116. break;
  117. }
  118. }
  119. if(!considered)
  120. {
  121. /* the first iteration*/
  122. if(curr_workers[index] < 0)
  123. curr_workers[index] = worker;
  124. /* small priority worker is the first to leave the ctx*/
  125. else if(config->priority[worker] <
  126. config->priority[curr_workers[index]])
  127. curr_workers[index] = worker;
  128. /* if we don't consider priorities check for the workers
  129. with the biggest idle time */
  130. else if(config->priority[worker] ==
  131. config->priority[curr_workers[index]])
  132. {
  133. double worker_idle_time = sc_w->current_idle_time[worker];
  134. double curr_worker_idle_time = sc_w->current_idle_time[curr_workers[index]];
  135. if(worker_idle_time > curr_worker_idle_time)
  136. curr_workers[index] = worker;
  137. }
  138. }
  139. }
  140. }
  141. }
  142. if(curr_workers[index] < 0)
  143. {
  144. *nworkers = index;
  145. break;
  146. }
  147. }
  148. return curr_workers;
  149. }
  150. /* get the number of workers in the context that are allowed to be moved (that are not fixed) */
  151. unsigned _get_potential_nworkers(struct sched_ctx_hypervisor_policy_config *config, unsigned sched_ctx, enum starpu_archtype arch)
  152. {
  153. struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx);
  154. unsigned potential_workers = 0;
  155. int worker;
  156. struct starpu_iterator it;
  157. if(workers->init_iterator)
  158. workers->init_iterator(workers, &it);
  159. while(workers->has_next(workers, &it))
  160. {
  161. worker = workers->get_next(workers, &it);
  162. enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
  163. if(arch == STARPU_ANY_WORKER || curr_arch == arch)
  164. {
  165. if(!config->fixed_workers[worker])
  166. potential_workers++;
  167. }
  168. }
  169. return potential_workers;
  170. }
  171. /* compute the number of workers that should be moved depending:
  172. - on the min/max number of workers in a context imposed by the user,
  173. - on the resource granularity imposed by the user for the resizing process*/
  174. int _get_nworkers_to_move(unsigned req_sched_ctx)
  175. {
  176. struct sched_ctx_hypervisor_policy_config *config = sched_ctx_hypervisor_get_config(req_sched_ctx);
  177. unsigned nworkers = starpu_sched_ctx_get_nworkers(req_sched_ctx);
  178. unsigned nworkers_to_move = 0;
  179. unsigned potential_moving_workers = _get_potential_nworkers(config, req_sched_ctx, STARPU_ANY_WORKER);
  180. if(potential_moving_workers > 0)
  181. {
  182. if(potential_moving_workers <= config->min_nworkers)
  183. /* if we have to give more than min better give it all */
  184. /* => empty ctx will block until having the required workers */
  185. nworkers_to_move = potential_moving_workers;
  186. else if(potential_moving_workers > config->max_nworkers)
  187. {
  188. if((potential_moving_workers - config->granularity) > config->max_nworkers)
  189. // nworkers_to_move = config->granularity;
  190. nworkers_to_move = potential_moving_workers;
  191. else
  192. nworkers_to_move = potential_moving_workers - config->max_nworkers;
  193. }
  194. else if(potential_moving_workers > config->granularity)
  195. {
  196. if((nworkers - config->granularity) > config->min_nworkers)
  197. nworkers_to_move = config->granularity;
  198. else
  199. nworkers_to_move = potential_moving_workers - config->min_nworkers;
  200. }
  201. else
  202. {
  203. int nfixed_workers = nworkers - potential_moving_workers;
  204. if(nfixed_workers >= config->min_nworkers)
  205. nworkers_to_move = potential_moving_workers;
  206. else
  207. nworkers_to_move = potential_moving_workers - (config->min_nworkers - nfixed_workers);
  208. }
  209. if((nworkers - nworkers_to_move) > config->max_nworkers)
  210. nworkers_to_move = nworkers - config->max_nworkers;
  211. }
  212. return nworkers_to_move;
  213. }
  214. unsigned _resize(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, unsigned force_resize, unsigned now)
  215. {
  216. int ret = 1;
  217. if(force_resize)
  218. pthread_mutex_lock(&act_hypervisor_mutex);
  219. else
  220. ret = pthread_mutex_trylock(&act_hypervisor_mutex);
  221. if(ret != EBUSY)
  222. {
  223. int nworkers_to_move = _get_nworkers_to_move(sender_sched_ctx);
  224. if(nworkers_to_move > 0)
  225. {
  226. unsigned poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  227. if(receiver_sched_ctx == STARPU_NMAX_SCHED_CTXS)
  228. {
  229. poor_sched_ctx = _find_poor_sched_ctx(sender_sched_ctx, nworkers_to_move);
  230. }
  231. else
  232. {
  233. poor_sched_ctx = receiver_sched_ctx;
  234. struct sched_ctx_hypervisor_policy_config *config = sched_ctx_hypervisor_get_config(poor_sched_ctx);
  235. unsigned nworkers = starpu_sched_ctx_get_nworkers(poor_sched_ctx);
  236. unsigned nshared_workers = starpu_sched_ctx_get_nshared_workers(sender_sched_ctx, poor_sched_ctx);
  237. if((nworkers+nworkers_to_move-nshared_workers) > config->max_nworkers)
  238. nworkers_to_move = nworkers > config->max_nworkers ? 0 : (config->max_nworkers - nworkers+nshared_workers);
  239. if(nworkers_to_move == 0) poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  240. }
  241. if(poor_sched_ctx != STARPU_NMAX_SCHED_CTXS)
  242. {
  243. int *workers_to_move = _get_first_workers(sender_sched_ctx, &nworkers_to_move, STARPU_ANY_WORKER);
  244. sched_ctx_hypervisor_move_workers(sender_sched_ctx, poor_sched_ctx, workers_to_move, nworkers_to_move, now);
  245. struct sched_ctx_hypervisor_policy_config *new_config = sched_ctx_hypervisor_get_config(poor_sched_ctx);
  246. int i;
  247. for(i = 0; i < nworkers_to_move; i++)
  248. new_config->max_idle[workers_to_move[i]] = new_config->max_idle[workers_to_move[i]] !=MAX_IDLE_TIME ? new_config->max_idle[workers_to_move[i]] : new_config->new_workers_max_idle;
  249. free(workers_to_move);
  250. }
  251. }
  252. pthread_mutex_unlock(&act_hypervisor_mutex);
  253. return 1;
  254. }
  255. return 0;
  256. }
  257. unsigned _resize_to_unknown_receiver(unsigned sender_sched_ctx, unsigned now)
  258. {
  259. return _resize(sender_sched_ctx, STARPU_NMAX_SCHED_CTXS, 0, now);
  260. }
  261. static double _get_elapsed_flops(struct sched_ctx_hypervisor_wrapper* sc_w, int *npus, enum starpu_archtype req_arch)
  262. {
  263. double ret_val = 0.0;
  264. struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sc_w->sched_ctx);
  265. int worker;
  266. struct starpu_iterator it;
  267. if(workers->init_iterator)
  268. workers->init_iterator(workers, &it);
  269. while(workers->has_next(workers, &it))
  270. {
  271. worker = workers->get_next(workers, &it);
  272. enum starpu_archtype arch = starpu_worker_get_type(worker);
  273. if(arch == req_arch)
  274. {
  275. ret_val += sc_w->elapsed_flops[worker];
  276. (*npus)++;
  277. }
  278. }
  279. return ret_val;
  280. }
  281. double _get_ctx_velocity(struct sched_ctx_hypervisor_wrapper* sc_w)
  282. {
  283. double elapsed_flops = sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(sc_w);
  284. double total_elapsed_flops = sched_ctx_hypervisor_get_total_elapsed_flops_per_sched_ctx(sc_w);
  285. double prc = elapsed_flops/sc_w->total_flops;
  286. double redim_sample = elapsed_flops == total_elapsed_flops ? HYPERVISOR_START_REDIM_SAMPLE : HYPERVISOR_REDIM_SAMPLE;
  287. if(prc >= redim_sample)
  288. {
  289. double curr_time = starpu_timing_now();
  290. double elapsed_time = curr_time - sc_w->start_time;
  291. return elapsed_flops/elapsed_time;
  292. }
  293. return 0.0;
  294. }
  295. /* compute an average value of the cpu velocity */
  296. double _get_velocity_per_worker_type(struct sched_ctx_hypervisor_wrapper* sc_w, enum starpu_archtype arch)
  297. {
  298. int npus = 0;
  299. double elapsed_flops = _get_elapsed_flops(sc_w, &npus, arch);
  300. if( elapsed_flops != 0.0)
  301. {
  302. double curr_time = starpu_timing_now();
  303. double elapsed_time = curr_time - sc_w->start_time;
  304. return (elapsed_flops/elapsed_time) / npus;
  305. }
  306. return -1.0;
  307. }
  308. /* check if there is a big velocity gap between the contexts */
  309. int _velocity_gap_btw_ctxs()
  310. {
  311. int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
  312. int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
  313. int i = 0, j = 0;
  314. struct sched_ctx_hypervisor_wrapper* sc_w;
  315. struct sched_ctx_hypervisor_wrapper* other_sc_w;
  316. for(i = 0; i < nsched_ctxs; i++)
  317. {
  318. sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctxs[i]);
  319. double ctx_v = _get_ctx_velocity(sc_w);
  320. if(ctx_v != 0.0)
  321. {
  322. for(j = 0; j < nsched_ctxs; j++)
  323. {
  324. if(sched_ctxs[i] != sched_ctxs[j])
  325. {
  326. unsigned nworkers = starpu_sched_ctx_get_nworkers(sched_ctxs[j]);
  327. if(nworkers == 0)
  328. return 1;
  329. other_sc_w = sched_ctx_hypervisor_get_wrapper(sched_ctxs[j]);
  330. double other_ctx_v = _get_ctx_velocity(other_sc_w);
  331. if(other_ctx_v != 0.0)
  332. {
  333. double gap = ctx_v < other_ctx_v ? other_ctx_v / ctx_v : ctx_v / other_ctx_v ;
  334. if(gap > 1.5)
  335. return 1;
  336. }
  337. }
  338. }
  339. }
  340. }
  341. return 0;
  342. }
  343. void _get_total_nw(int *workers, int nworkers, int ntypes_of_workers, int total_nw[ntypes_of_workers])
  344. {
  345. int current_nworkers = workers == NULL ? starpu_worker_get_count() : nworkers;
  346. int w;
  347. for(w = 0; w < ntypes_of_workers; w++)
  348. total_nw[w] = 0;
  349. for(w = 0; w < current_nworkers; w++)
  350. {
  351. enum starpu_archtype arch = workers == NULL ? starpu_worker_get_type(w) :
  352. starpu_worker_get_type(workers[w]);
  353. if(arch == STARPU_CPU_WORKER)
  354. total_nw[1]++;
  355. else
  356. total_nw[0]++;
  357. }
  358. }