gflops_rate_policy.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011-2013,2015 Inria
  4. * Copyright (C) 2012-2013,2016-2017 CNRS
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include "sc_hypervisor_policy.h"
  18. static double _get_total_elapsed_flops_per_sched_ctx(unsigned sched_ctx)
  19. {
  20. struct sc_hypervisor_wrapper* sc_w = sc_hypervisor_get_wrapper(sched_ctx);
  21. double ret_val = 0.0;
  22. int i;
  23. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  24. ret_val += sc_w->total_elapsed_flops[i];
  25. return ret_val;
  26. }
  27. double _get_exp_end(unsigned sched_ctx)
  28. {
  29. struct sc_hypervisor_wrapper *sc_w = sc_hypervisor_get_wrapper(sched_ctx);
  30. double elapsed_flops = sc_hypervisor_get_elapsed_flops_per_sched_ctx(sc_w);
  31. if( elapsed_flops >= 1.0)
  32. {
  33. double curr_time = starpu_timing_now();
  34. double elapsed_time = curr_time - sc_w->start_time;
  35. double exp_end = (elapsed_time * sc_w->remaining_flops / elapsed_flops) + curr_time;
  36. return exp_end;
  37. }
  38. return -1.0;
  39. }
  40. /* computes the instructions left to be executed out of the total instructions to execute */
  41. double _get_flops_left_pct(unsigned sched_ctx)
  42. {
  43. struct sc_hypervisor_wrapper *wrapper = sc_hypervisor_get_wrapper(sched_ctx);
  44. double total_elapsed_flops = _get_total_elapsed_flops_per_sched_ctx(sched_ctx);
  45. if(wrapper->total_flops == total_elapsed_flops || total_elapsed_flops > wrapper->total_flops)
  46. return 0.0;
  47. return (wrapper->total_flops - total_elapsed_flops)/wrapper->total_flops;
  48. }
  49. /* select the workers needed to be moved in order to force the sender and the receiver context to finish simultaneously */
  50. static int* _get_workers_to_move(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, int *nworkers)
  51. {
  52. struct sc_hypervisor_wrapper* sender_sc_w = sc_hypervisor_get_wrapper(sender_sched_ctx);
  53. struct sc_hypervisor_wrapper* receiver_sc_w = sc_hypervisor_get_wrapper(receiver_sched_ctx);
  54. int *workers = NULL;
  55. double v_receiver = sc_hypervisor_get_ctx_speed(receiver_sc_w);
  56. double receiver_remainig_flops = receiver_sc_w->remaining_flops;
  57. double sender_exp_end = _get_exp_end(sender_sched_ctx);
  58. double sender_v_cpu = sc_hypervisor_get_speed_per_worker_type(sender_sc_w, STARPU_CPU_WORKER);
  59. double v_for_rctx = (receiver_remainig_flops/(sender_exp_end - starpu_timing_now())) - v_receiver;
  60. int nworkers_needed = v_for_rctx/sender_v_cpu;
  61. /* printf("%d->%d: v_rec %lf v %lf v_cpu %lf w_needed %d \n", sender_sched_ctx, receiver_sched_ctx, */
  62. /* v_receiver, v_for_rctx, sender_v_cpu, nworkers_needed); */
  63. if(nworkers_needed > 0)
  64. {
  65. struct sc_hypervisor_policy_config *sender_config = sc_hypervisor_get_config(sender_sched_ctx);
  66. int potential_moving_cpus = sc_hypervisor_get_movable_nworkers(sender_config, sender_sched_ctx, STARPU_CPU_WORKER);
  67. int potential_moving_gpus = sc_hypervisor_get_movable_nworkers(sender_config, sender_sched_ctx, STARPU_CUDA_WORKER);
  68. int sender_nworkers = (int)starpu_sched_ctx_get_nworkers(sender_sched_ctx);
  69. struct sc_hypervisor_policy_config *config = sc_hypervisor_get_config(receiver_sched_ctx);
  70. int nworkers_ctx = (int)starpu_sched_ctx_get_nworkers(receiver_sched_ctx);
  71. if(nworkers_needed < (potential_moving_cpus + 5 * potential_moving_gpus))
  72. {
  73. if((sender_nworkers - nworkers_needed) >= sender_config->min_nworkers)
  74. {
  75. if((nworkers_ctx + nworkers_needed) > config->max_nworkers)
  76. nworkers_needed = nworkers_ctx > config->max_nworkers ? 0 : (config->max_nworkers - nworkers_ctx);
  77. if(nworkers_needed > 0)
  78. {
  79. int ngpus = nworkers_needed / 5;
  80. int *gpus;
  81. gpus = sc_hypervisor_get_idlest_workers(sender_sched_ctx, &ngpus, STARPU_CUDA_WORKER);
  82. int ncpus = nworkers_needed - ngpus;
  83. int *cpus;
  84. cpus = sc_hypervisor_get_idlest_workers(sender_sched_ctx, &ncpus, STARPU_CPU_WORKER);
  85. workers = (int*)malloc(nworkers_needed*sizeof(int));
  86. int i;
  87. printf("%d: gpus: ", nworkers_needed);
  88. for(i = 0; i < ngpus; i++)
  89. {
  90. workers[(*nworkers)++] = gpus[i];
  91. printf("%d ", gpus[i]);
  92. }
  93. printf(" cpus:");
  94. for(i = 0; i < ncpus; i++)
  95. {
  96. workers[(*nworkers)++] = cpus[i];
  97. printf("%d ", cpus[i]);
  98. }
  99. printf("\n");
  100. free(gpus);
  101. free(cpus);
  102. }
  103. }
  104. }
  105. else
  106. {
  107. /*if the needed number of workers is to big we only move the number of workers
  108. corresponding to the granularity set by the user */
  109. int nworkers_to_move = sc_hypervisor_compute_nworkers_to_move(sender_sched_ctx);
  110. if(sender_nworkers - nworkers_to_move >= sender_config->min_nworkers)
  111. {
  112. int nshared_workers = (int)starpu_sched_ctx_get_nshared_workers(sender_sched_ctx, receiver_sched_ctx);
  113. if((nworkers_ctx + nworkers_to_move - nshared_workers) > config->max_nworkers)
  114. nworkers_to_move = nworkers_ctx > config->max_nworkers ? 0 : (config->max_nworkers - nworkers_ctx + nshared_workers);
  115. if(nworkers_to_move > 0)
  116. {
  117. workers = sc_hypervisor_get_idlest_workers(sender_sched_ctx, &nworkers_to_move, STARPU_ANY_WORKER);
  118. *nworkers = nworkers_to_move;
  119. }
  120. }
  121. }
  122. }
  123. return workers;
  124. }
  125. static unsigned _gflops_rate_resize(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, unsigned force_resize)
  126. {
  127. int ret = 1;
  128. if(force_resize)
  129. STARPU_PTHREAD_MUTEX_LOCK(&act_hypervisor_mutex);
  130. else
  131. ret = starpu_pthread_mutex_trylock(&act_hypervisor_mutex);
  132. if(ret != EBUSY)
  133. {
  134. int nworkers_to_move = 0;
  135. int *workers_to_move = _get_workers_to_move(sender_sched_ctx, receiver_sched_ctx, &nworkers_to_move);
  136. if(nworkers_to_move > 0)
  137. {
  138. sc_hypervisor_move_workers(sender_sched_ctx, receiver_sched_ctx, workers_to_move, nworkers_to_move, 0);
  139. struct sc_hypervisor_policy_config *new_config = sc_hypervisor_get_config(receiver_sched_ctx);
  140. int i;
  141. for(i = 0; i < nworkers_to_move; i++)
  142. new_config->max_idle[workers_to_move[i]] = new_config->max_idle[workers_to_move[i]] !=MAX_IDLE_TIME ? new_config->max_idle[workers_to_move[i]] : new_config->new_workers_max_idle;
  143. free(workers_to_move);
  144. }
  145. STARPU_PTHREAD_MUTEX_UNLOCK(&act_hypervisor_mutex);
  146. return 1;
  147. }
  148. return 0;
  149. }
  150. static int _find_fastest_sched_ctx()
  151. {
  152. unsigned *sched_ctxs = sc_hypervisor_get_sched_ctxs();
  153. int nsched_ctxs = sc_hypervisor_get_nsched_ctxs();
  154. double first_exp_end = _get_exp_end(sched_ctxs[0]);
  155. int fastest_sched_ctx = first_exp_end == -1.0 ? -1 : (int)sched_ctxs[0];
  156. double curr_exp_end = 0.0;
  157. int i;
  158. for(i = 1; i < nsched_ctxs; i++)
  159. {
  160. curr_exp_end = _get_exp_end(sched_ctxs[i]);
  161. if((curr_exp_end < first_exp_end || first_exp_end == -1.0) && curr_exp_end != -1.0)
  162. {
  163. first_exp_end = curr_exp_end;
  164. fastest_sched_ctx = sched_ctxs[i];
  165. }
  166. }
  167. return fastest_sched_ctx;
  168. }
  169. static int _find_slowest_sched_ctx()
  170. {
  171. unsigned *sched_ctxs = sc_hypervisor_get_sched_ctxs();
  172. int nsched_ctxs = sc_hypervisor_get_nsched_ctxs();
  173. int slowest_sched_ctx = -1;
  174. double curr_exp_end = 0.0;
  175. double last_exp_end = -1.0;
  176. int i;
  177. for(i = 0; i < nsched_ctxs; i++)
  178. {
  179. curr_exp_end = _get_exp_end(sched_ctxs[i]);
  180. /*if it hasn't started bc of no ressources give it priority */
  181. if(curr_exp_end == -1.0)
  182. return sched_ctxs[i];
  183. if( curr_exp_end > last_exp_end)
  184. {
  185. slowest_sched_ctx = sched_ctxs[i];
  186. last_exp_end = curr_exp_end;
  187. }
  188. }
  189. return slowest_sched_ctx;
  190. }
  191. static int _find_slowest_available_sched_ctx(unsigned sched_ctx)
  192. {
  193. unsigned *sched_ctxs = sc_hypervisor_get_sched_ctxs();
  194. int nsched_ctxs = sc_hypervisor_get_nsched_ctxs();
  195. int slowest_sched_ctx = -1;
  196. double curr_exp_end = 0.0;
  197. double last_exp_end = -1.0;
  198. int i;
  199. for(i = 0; i < nsched_ctxs; i++)
  200. {
  201. if(sched_ctxs[i] != sched_ctx)
  202. {
  203. curr_exp_end = _get_exp_end(sched_ctxs[i]);
  204. /*if it hasn't started bc of no ressources give it priority */
  205. if(curr_exp_end == -1.0)
  206. return sched_ctxs[i];
  207. if(last_exp_end < curr_exp_end)
  208. {
  209. slowest_sched_ctx = sched_ctxs[i];
  210. last_exp_end = curr_exp_end;
  211. }
  212. }
  213. }
  214. return slowest_sched_ctx;
  215. }
  216. static void gflops_rate_resize(unsigned sched_ctx)
  217. {
  218. _get_exp_end(sched_ctx);
  219. double flops_left_pct = _get_flops_left_pct(sched_ctx);
  220. /* if the context finished all the instructions it had to execute
  221. we move all the resources to the slowest context */
  222. if(flops_left_pct == 0.0f)
  223. {
  224. int slowest_sched_ctx = _find_slowest_available_sched_ctx(sched_ctx);
  225. if(slowest_sched_ctx != -1)
  226. {
  227. double slowest_flops_left_pct = _get_flops_left_pct(slowest_sched_ctx);
  228. if(slowest_flops_left_pct != 0.0f)
  229. {
  230. struct sc_hypervisor_policy_config* config = sc_hypervisor_get_config(sched_ctx);
  231. config->min_nworkers = 0;
  232. config->max_nworkers = 0;
  233. printf("ctx %u finished & gives away the res to %d; slow_left %lf\n", sched_ctx, slowest_sched_ctx, slowest_flops_left_pct);
  234. sc_hypervisor_policy_resize(sched_ctx, slowest_sched_ctx, 1, 1);
  235. sc_hypervisor_stop_resize(slowest_sched_ctx);
  236. }
  237. }
  238. }
  239. int fastest_sched_ctx = _find_fastest_sched_ctx();
  240. int slowest_sched_ctx = _find_slowest_sched_ctx();
  241. if(fastest_sched_ctx != -1 && slowest_sched_ctx != -1 && fastest_sched_ctx != slowest_sched_ctx)
  242. {
  243. double fastest_exp_end = _get_exp_end(fastest_sched_ctx);
  244. double slowest_exp_end = _get_exp_end(slowest_sched_ctx);
  245. if((slowest_exp_end == -1.0 && fastest_exp_end != -1.0) || ((fastest_exp_end + (fastest_exp_end*0.5)) < slowest_exp_end ))
  246. {
  247. double fast_flops_left_pct = _get_flops_left_pct(fastest_sched_ctx);
  248. if(fast_flops_left_pct < 0.8)
  249. {
  250. struct sc_hypervisor_wrapper *sc_w = sc_hypervisor_get_wrapper(slowest_sched_ctx);
  251. double elapsed_flops = sc_hypervisor_get_elapsed_flops_per_sched_ctx(sc_w);
  252. if((elapsed_flops/sc_w->total_flops) > 0.1)
  253. _gflops_rate_resize(fastest_sched_ctx, slowest_sched_ctx, 0);
  254. }
  255. }
  256. }
  257. }
  258. static void gflops_rate_handle_poped_task(unsigned sched_ctx, __attribute__((unused)) int worker,
  259. __attribute__((unused))struct starpu_task *task, __attribute__((unused))uint32_t footprint)
  260. {
  261. gflops_rate_resize(sched_ctx);
  262. }
  263. struct sc_hypervisor_policy gflops_rate_policy = {
  264. .size_ctxs = NULL,
  265. .resize_ctxs = NULL,
  266. .handle_poped_task = gflops_rate_handle_poped_task,
  267. .handle_pushed_task = NULL,
  268. .handle_idle_cycle = NULL,
  269. .handle_idle_end = NULL,
  270. .handle_post_exec_hook = NULL,
  271. .handle_submitted_job = NULL,
  272. .end_ctx = NULL,
  273. .init_worker = NULL,
  274. .custom = 0,
  275. .name = "gflops_rate"
  276. };