simple_policy.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2012 INRIA
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <sched_ctx_hypervisor.h>
  17. #include <pthread.h>
  18. static int _compute_priority(unsigned sched_ctx)
  19. {
  20. struct policy_config *config = sched_ctx_hypervisor_get_config(sched_ctx);
  21. int total_priority = 0;
  22. struct starpu_sched_ctx_worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx);
  23. int worker;
  24. if(workers->init_cursor)
  25. workers->init_cursor(workers);
  26. while(workers->has_next(workers))
  27. {
  28. worker = workers->get_next(workers);
  29. total_priority += config->priority[worker];
  30. }
  31. if(workers->init_cursor)
  32. workers->deinit_cursor(workers);
  33. return total_priority;
  34. }
  35. static unsigned _find_poor_sched_ctx(unsigned req_sched_ctx, int nworkers_to_move)
  36. {
  37. int i;
  38. int highest_priority = -1;
  39. int current_priority = 0;
  40. unsigned sched_ctx = STARPU_NMAX_SCHED_CTXS;
  41. int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
  42. int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
  43. struct policy_config *config = NULL;
  44. for(i = 0; i < nsched_ctxs; i++)
  45. {
  46. if(sched_ctxs[i] != STARPU_NMAX_SCHED_CTXS && sched_ctxs[i] != req_sched_ctx)
  47. {
  48. unsigned nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctxs[i]);
  49. config = sched_ctx_hypervisor_get_config(sched_ctxs[i]);
  50. if((nworkers + nworkers_to_move) <= config->max_nworkers)
  51. {
  52. current_priority = _compute_priority(sched_ctxs[i]);
  53. if (highest_priority < current_priority)
  54. {
  55. highest_priority = current_priority;
  56. sched_ctx = sched_ctxs[i];
  57. }
  58. }
  59. }
  60. }
  61. return sched_ctx;
  62. }
  63. int* _get_first_workers(unsigned sched_ctx, unsigned *nworkers, enum starpu_archtype arch)
  64. {
  65. struct policy_config *config = sched_ctx_hypervisor_get_config(sched_ctx);
  66. int *curr_workers = (int*)malloc((*nworkers) * sizeof(int));
  67. int i;
  68. for(i = 0; i < *nworkers; i++)
  69. curr_workers[i] = -1;
  70. struct starpu_sched_ctx_worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx);
  71. int index;
  72. int worker;
  73. int considered = 0;
  74. if(workers->init_cursor)
  75. workers->init_cursor(workers);
  76. for(index = 0; index < *nworkers; index++)
  77. {
  78. while(workers->has_next(workers))
  79. {
  80. considered = 0;
  81. worker = workers->get_next(workers);
  82. enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
  83. if(arch == 0 || curr_arch == arch)
  84. {
  85. if(!config->fixed_workers[worker])
  86. {
  87. for(i = 0; i < index; i++)
  88. {
  89. if(curr_workers[i] == worker)
  90. {
  91. considered = 1;
  92. break;
  93. }
  94. }
  95. if(!considered)
  96. {
  97. /* the first iteration*/
  98. if(curr_workers[index] < 0)
  99. curr_workers[index] = worker;
  100. /* small priority worker is the first to leave the ctx*/
  101. else if(config->priority[worker] <
  102. config->priority[curr_workers[index]])
  103. curr_workers[index] = worker;
  104. /* if we don't consider priorities check for the workers
  105. with the biggest idle time */
  106. else if(config->priority[worker] ==
  107. config->priority[curr_workers[index]])
  108. {
  109. double worker_idle_time = sched_ctx_hypervisor_get_idle_time(sched_ctx, worker);
  110. double curr_worker_idle_time = sched_ctx_hypervisor_get_idle_time(sched_ctx, curr_workers[index]);
  111. if(worker_idle_time > curr_worker_idle_time)
  112. curr_workers[index] = worker;
  113. }
  114. }
  115. }
  116. }
  117. }
  118. if(curr_workers[index] < 0)
  119. {
  120. *nworkers = index;
  121. break;
  122. }
  123. }
  124. if(workers->init_cursor)
  125. workers->deinit_cursor(workers);
  126. return curr_workers;
  127. }
  128. static unsigned _get_potential_nworkers(struct policy_config *config, unsigned sched_ctx, enum starpu_archtype arch)
  129. {
  130. struct starpu_sched_ctx_worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx);
  131. unsigned potential_workers = 0;
  132. int worker;
  133. if(workers->init_cursor)
  134. workers->init_cursor(workers);
  135. while(workers->has_next(workers))
  136. {
  137. worker = workers->get_next(workers);
  138. enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
  139. if(arch == 0 || curr_arch == arch)
  140. {
  141. if(!config->fixed_workers[worker])
  142. potential_workers++;
  143. }
  144. }
  145. if(workers->init_cursor)
  146. workers->deinit_cursor(workers);
  147. return potential_workers;
  148. }
  149. static unsigned _get_nworkers_to_move(unsigned req_sched_ctx)
  150. {
  151. struct policy_config *config = sched_ctx_hypervisor_get_config(req_sched_ctx);
  152. unsigned nworkers = starpu_get_nworkers_of_sched_ctx(req_sched_ctx);
  153. unsigned nworkers_to_move = 0;
  154. unsigned potential_moving_workers = _get_potential_nworkers(config, req_sched_ctx, 0);
  155. if(potential_moving_workers > 0)
  156. {
  157. if(potential_moving_workers <= config->min_nworkers)
  158. /* if we have to give more than min better give it all */
  159. /* => empty ctx will block until having the required workers */
  160. nworkers_to_move = potential_moving_workers;
  161. else if(potential_moving_workers > config->max_nworkers)
  162. {
  163. if((potential_moving_workers - config->granularity) > config->max_nworkers)
  164. nworkers_to_move = config->granularity;
  165. else
  166. nworkers_to_move = potential_moving_workers - config->max_nworkers;
  167. }
  168. else if(potential_moving_workers > config->granularity)
  169. {
  170. if((nworkers - config->granularity) > config->min_nworkers)
  171. nworkers_to_move = config->granularity;
  172. else
  173. nworkers_to_move = potential_moving_workers - config->min_nworkers;
  174. }
  175. else
  176. {
  177. int nfixed_workers = nworkers - potential_moving_workers;
  178. if(nfixed_workers >= config->min_nworkers)
  179. nworkers_to_move = potential_moving_workers;
  180. else
  181. nworkers_to_move = potential_moving_workers - (config->min_nworkers - nfixed_workers);
  182. }
  183. if((nworkers - nworkers_to_move) > config->max_nworkers)
  184. nworkers_to_move = nworkers - config->max_nworkers;
  185. }
  186. return nworkers_to_move;
  187. }
  188. static unsigned _simple_resize(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, unsigned force_resize)
  189. {
  190. int ret = 1;
  191. if(force_resize)
  192. pthread_mutex_lock(&act_hypervisor_mutex);
  193. else
  194. ret = pthread_mutex_trylock(&act_hypervisor_mutex);
  195. if(ret != EBUSY)
  196. {
  197. unsigned nworkers_to_move = _get_nworkers_to_move(sender_sched_ctx);
  198. if(nworkers_to_move > 0)
  199. {
  200. unsigned poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  201. if(receiver_sched_ctx == STARPU_NMAX_SCHED_CTXS)
  202. poor_sched_ctx = _find_poor_sched_ctx(sender_sched_ctx, nworkers_to_move);
  203. else
  204. {
  205. poor_sched_ctx = receiver_sched_ctx;
  206. struct policy_config *config = sched_ctx_hypervisor_get_config(poor_sched_ctx);
  207. unsigned nworkers = starpu_get_nworkers_of_sched_ctx(poor_sched_ctx);
  208. unsigned nshared_workers = starpu_get_nshared_workers(sender_sched_ctx, poor_sched_ctx);
  209. if((nworkers+nworkers_to_move-nshared_workers) > config->max_nworkers)
  210. nworkers_to_move = nworkers > config->max_nworkers ? 0 : (config->max_nworkers - nworkers+nshared_workers);
  211. if(nworkers_to_move == 0) poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  212. }
  213. if(poor_sched_ctx != STARPU_NMAX_SCHED_CTXS)
  214. {
  215. int *workers_to_move = _get_first_workers(sender_sched_ctx, &nworkers_to_move, 0);
  216. sched_ctx_hypervisor_move_workers(sender_sched_ctx, poor_sched_ctx, workers_to_move, nworkers_to_move);
  217. struct policy_config *new_config = sched_ctx_hypervisor_get_config(poor_sched_ctx);
  218. int i;
  219. for(i = 0; i < nworkers_to_move; i++)
  220. new_config->max_idle[workers_to_move[i]] = new_config->max_idle[workers_to_move[i]] !=MAX_IDLE_TIME ? new_config->max_idle[workers_to_move[i]] : new_config->new_workers_max_idle;
  221. free(workers_to_move);
  222. }
  223. }
  224. pthread_mutex_unlock(&act_hypervisor_mutex);
  225. return 1;
  226. }
  227. return 0;
  228. }
  229. static int* _get_workers_to_move(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, int *nworkers)
  230. {
  231. int *workers = NULL;
  232. double v_receiver = sched_ctx_hypervisor_get_ctx_velocity(receiver_sched_ctx);
  233. double receiver_remainig_flops = sched_ctx_hypervisor_get_flops_left(receiver_sched_ctx);
  234. double sender_exp_end = sched_ctx_hypervisor_get_exp_end(sender_sched_ctx);
  235. double sender_v_cpu = sched_ctx_hypervisor_get_cpu_velocity(sender_sched_ctx);
  236. // double v_gcpu = sched_ctx_hypervisor_get_gpu_velocity(sender_sched_ctx);
  237. double v_for_rctx = (receiver_remainig_flops/(sender_exp_end - starpu_timing_now())) - v_receiver;
  238. // v_for_rctx /= 2;
  239. int nworkers_needed = v_for_rctx/sender_v_cpu;
  240. /* printf("%d->%d: v_rec %lf v %lf v_cpu %lf w_needed %d \n", sender_sched_ctx, receiver_sched_ctx, */
  241. /* v_receiver, v_for_rctx, sender_v_cpu, nworkers_needed); */
  242. if(nworkers_needed > 0)
  243. {
  244. struct policy_config *sender_config = sched_ctx_hypervisor_get_config(sender_sched_ctx);
  245. unsigned potential_moving_cpus = _get_potential_nworkers(sender_config, sender_sched_ctx, STARPU_CPU_WORKER);
  246. unsigned potential_moving_gpus = _get_potential_nworkers(sender_config, sender_sched_ctx, STARPU_CUDA_WORKER);
  247. unsigned sender_nworkers = starpu_get_nworkers_of_sched_ctx(sender_sched_ctx);
  248. struct policy_config *config = sched_ctx_hypervisor_get_config(receiver_sched_ctx);
  249. unsigned nworkers_ctx = starpu_get_nworkers_of_sched_ctx(receiver_sched_ctx);
  250. if(nworkers_needed < (potential_moving_cpus + 5 * potential_moving_gpus))
  251. {
  252. if((sender_nworkers - nworkers_needed) >= sender_config->min_nworkers)
  253. {
  254. if((nworkers_ctx + nworkers_needed) > config->max_nworkers)
  255. nworkers_needed = nworkers_ctx > config->max_nworkers ? 0 : (config->max_nworkers - nworkers_ctx);
  256. if(nworkers_needed > 0)
  257. {
  258. int ngpus = nworkers_needed / 5;
  259. int *gpus;
  260. gpus = _get_first_workers(sender_sched_ctx, &ngpus, STARPU_CUDA_WORKER);
  261. int ncpus = nworkers_needed - ngpus;
  262. int *cpus;
  263. cpus = _get_first_workers(sender_sched_ctx, &ncpus, STARPU_CPU_WORKER);
  264. workers = (int*)malloc(nworkers_needed*sizeof(int));
  265. int i;
  266. for(i = 0; i < ngpus; i++)
  267. workers[(*nworkers)++] = gpus[i];
  268. for(i = 0; i < ncpus; i++)
  269. workers[(*nworkers)++] = cpus[i];
  270. free(gpus);
  271. free(cpus);
  272. }
  273. }
  274. }
  275. else
  276. {
  277. int nworkers_to_move = _get_nworkers_to_move(sender_sched_ctx);
  278. if(sender_nworkers - nworkers_to_move >= sender_config->min_nworkers)
  279. {
  280. unsigned nshared_workers = starpu_get_nshared_workers(sender_sched_ctx, receiver_sched_ctx);
  281. if((nworkers_ctx + nworkers_to_move - nshared_workers) > config->max_nworkers)
  282. nworkers_to_move = nworkers_ctx > config->max_nworkers ? 0 : (config->max_nworkers - nworkers_ctx + nshared_workers);
  283. if(nworkers_to_move > 0)
  284. {
  285. workers = _get_first_workers(sender_sched_ctx, &nworkers_to_move, 0);
  286. *nworkers = nworkers_to_move;
  287. }
  288. }
  289. }
  290. }
  291. return workers;
  292. }
  293. static unsigned _simple_resize2(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, unsigned force_resize)
  294. {
  295. int ret = 1;
  296. if(force_resize)
  297. pthread_mutex_lock(&act_hypervisor_mutex);
  298. else
  299. ret = pthread_mutex_trylock(&act_hypervisor_mutex);
  300. if(ret != EBUSY)
  301. {
  302. int nworkers_to_move = 0;
  303. int *workers_to_move = _get_workers_to_move(sender_sched_ctx, receiver_sched_ctx, &nworkers_to_move);
  304. if(nworkers_to_move > 0)
  305. {
  306. sched_ctx_hypervisor_move_workers(sender_sched_ctx, receiver_sched_ctx, workers_to_move, nworkers_to_move);
  307. struct policy_config *new_config = sched_ctx_hypervisor_get_config(receiver_sched_ctx);
  308. int i;
  309. for(i = 0; i < nworkers_to_move; i++)
  310. new_config->max_idle[workers_to_move[i]] = new_config->max_idle[workers_to_move[i]] !=MAX_IDLE_TIME ? new_config->max_idle[workers_to_move[i]] : new_config->new_workers_max_idle;
  311. free(workers_to_move);
  312. }
  313. pthread_mutex_unlock(&act_hypervisor_mutex);
  314. return 1;
  315. }
  316. return 0;
  317. }
  318. static unsigned simple_resize(unsigned sender_sched_ctx)
  319. {
  320. return _simple_resize(sender_sched_ctx, STARPU_NMAX_SCHED_CTXS, 1);
  321. }
  322. static void simple_manage_idle_time(unsigned req_sched_ctx, int worker, double idle_time)
  323. {
  324. struct policy_config *config = sched_ctx_hypervisor_get_config(req_sched_ctx);
  325. if(config != NULL && idle_time > config->max_idle[worker])
  326. simple_resize(req_sched_ctx);
  327. return;
  328. }
  329. int _find_fastest_sched_ctx()
  330. {
  331. int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
  332. int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
  333. double first_exp_end = sched_ctx_hypervisor_get_exp_end(sched_ctxs[0]);
  334. int fastest_sched_ctx = first_exp_end == -1.0 ? -1 : sched_ctxs[0];
  335. double curr_exp_end = 0.0;
  336. int i;
  337. for(i = 1; i < nsched_ctxs; i++)
  338. {
  339. curr_exp_end = sched_ctx_hypervisor_get_exp_end(sched_ctxs[i]);
  340. if(first_exp_end > curr_exp_end && curr_exp_end != -1.0)
  341. {
  342. first_exp_end = curr_exp_end;
  343. fastest_sched_ctx = sched_ctxs[i];
  344. }
  345. }
  346. return fastest_sched_ctx;
  347. }
  348. int _find_slowest_sched_ctx()
  349. {
  350. int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
  351. int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
  352. int slowest_sched_ctx = -1;
  353. double curr_exp_end = 0.0;
  354. double last_exp_end = -1.0;
  355. int i;
  356. for(i = 0; i < nsched_ctxs; i++)
  357. {
  358. curr_exp_end = sched_ctx_hypervisor_get_exp_end(sched_ctxs[i]);
  359. /*if it hasn't started bc of no ressources give it priority */
  360. if(curr_exp_end == -1.0)
  361. return sched_ctxs[i];
  362. if(last_exp_end < curr_exp_end)
  363. {
  364. slowest_sched_ctx = sched_ctxs[i];
  365. last_exp_end = curr_exp_end;
  366. }
  367. }
  368. return slowest_sched_ctx;
  369. }
  370. int _find_slowest_available_sched_ctx(unsigned sched_ctx)
  371. {
  372. int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
  373. int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
  374. int slowest_sched_ctx = -1;
  375. double curr_exp_end = 0.0;
  376. double last_exp_end = -1.0;
  377. int i;
  378. for(i = 0; i < nsched_ctxs; i++)
  379. {
  380. if(sched_ctxs[i] != sched_ctx)
  381. {
  382. curr_exp_end = sched_ctx_hypervisor_get_exp_end(sched_ctxs[i]);
  383. /*if it hasn't started bc of no ressources give it priority */
  384. if(curr_exp_end == -1.0)
  385. return sched_ctxs[i];
  386. if(last_exp_end < curr_exp_end)
  387. {
  388. slowest_sched_ctx = sched_ctxs[i];
  389. last_exp_end = curr_exp_end;
  390. }
  391. }
  392. }
  393. return slowest_sched_ctx;
  394. }
  395. static void simple_manage_gflops_rate(unsigned sched_ctx)
  396. {
  397. double exp_end = sched_ctx_hypervisor_get_exp_end(sched_ctx);
  398. double flops_left_pct = sched_ctx_hypervisor_get_flops_left_pct(sched_ctx);
  399. if(flops_left_pct == 0.0f)
  400. {
  401. int slowest_sched_ctx = _find_slowest_available_sched_ctx(sched_ctx);
  402. if(slowest_sched_ctx != -1)
  403. {
  404. double slowest_flops_left_pct = sched_ctx_hypervisor_get_flops_left_pct(slowest_sched_ctx);
  405. printf("ctx %d finished & gives away the res to %d; slow_left %lf\n", sched_ctx, slowest_sched_ctx, slowest_flops_left_pct);
  406. if(slowest_flops_left_pct != 0.0f)
  407. {
  408. struct policy_config* config = sched_ctx_hypervisor_get_config(sched_ctx);
  409. config->min_nworkers = 0;
  410. config->max_nworkers = 0;
  411. _simple_resize(sched_ctx, slowest_sched_ctx, 1);
  412. sched_ctx_hypervisor_stop_resize(slowest_sched_ctx);
  413. }
  414. }
  415. }
  416. int fastest_sched_ctx = _find_fastest_sched_ctx();
  417. int slowest_sched_ctx = _find_slowest_sched_ctx();
  418. if(fastest_sched_ctx != -1 && slowest_sched_ctx != -1 && fastest_sched_ctx != slowest_sched_ctx)
  419. {
  420. double fastest_exp_end = sched_ctx_hypervisor_get_exp_end(fastest_sched_ctx);
  421. double slowest_exp_end = sched_ctx_hypervisor_get_exp_end(slowest_sched_ctx);
  422. double fastest_bef_res_exp_end = sched_ctx_hypervisor_get_bef_res_exp_end(fastest_sched_ctx);
  423. double slowest_bef_res_exp_end = sched_ctx_hypervisor_get_bef_res_exp_end(slowest_sched_ctx);
  424. // (fastest_bef_res_exp_end < slowest_bef_res_exp_end ||
  425. // fastest_bef_res_exp_end == 0.0 || slowest_bef_res_exp_end == 0)))
  426. if((slowest_exp_end == -1.0 && fastest_exp_end != -1.0) || ((fastest_exp_end + (fastest_exp_end*0.5)) < slowest_exp_end ))
  427. {
  428. double fast_flops_left_pct = sched_ctx_hypervisor_get_flops_left_pct(fastest_sched_ctx);
  429. if(fast_flops_left_pct < 0.8)
  430. _simple_resize(fastest_sched_ctx, slowest_sched_ctx, 0);
  431. }
  432. }
  433. }
  434. struct hypervisor_policy idle_policy =
  435. {
  436. .manage_idle_time = simple_manage_idle_time,
  437. .manage_gflops_rate = simple_manage_gflops_rate,
  438. .resize = simple_resize,
  439. };
  440. struct hypervisor_policy app_driven_policy =
  441. {
  442. .manage_idle_time = simple_manage_idle_time,
  443. .manage_gflops_rate = simple_manage_gflops_rate,
  444. .resize = simple_resize,
  445. };
  446. struct hypervisor_policy gflops_rate_policy =
  447. {
  448. .manage_idle_time = simple_manage_idle_time,
  449. .manage_gflops_rate = simple_manage_gflops_rate,
  450. .resize = simple_resize,
  451. };