simple_policy.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2012 INRIA
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <sched_ctx_hypervisor.h>
  17. #include <pthread.h>
  18. static int _compute_priority(unsigned sched_ctx)
  19. {
  20. struct sched_ctx_hypervisor_policy_config *config = sched_ctx_hypervisor_get_config(sched_ctx);
  21. int total_priority = 0;
  22. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx);
  23. int worker;
  24. starpu_iterator it;
  25. if(workers->init_iterator)
  26. workers->init_iterator(workers, &it);
  27. while(workers->has_next(workers, &it))
  28. {
  29. worker = workers->get_next(workers, &it);
  30. total_priority += config->priority[worker];
  31. }
  32. return total_priority;
  33. }
  34. static unsigned _find_poor_sched_ctx(unsigned req_sched_ctx, int nworkers_to_move)
  35. {
  36. int i;
  37. int highest_priority = -1;
  38. int current_priority = 0;
  39. unsigned sched_ctx = STARPU_NMAX_SCHED_CTXS;
  40. int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
  41. int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
  42. struct sched_ctx_hypervisor_policy_config *config = NULL;
  43. for(i = 0; i < nsched_ctxs; i++)
  44. {
  45. if(sched_ctxs[i] != STARPU_NMAX_SCHED_CTXS && sched_ctxs[i] != req_sched_ctx)
  46. {
  47. unsigned nworkers = starpu_sched_ctx_get_nworkers(sched_ctxs[i]);
  48. config = sched_ctx_hypervisor_get_config(sched_ctxs[i]);
  49. if((nworkers + nworkers_to_move) <= config->max_nworkers)
  50. {
  51. current_priority = _compute_priority(sched_ctxs[i]);
  52. if (highest_priority < current_priority)
  53. {
  54. highest_priority = current_priority;
  55. sched_ctx = sched_ctxs[i];
  56. }
  57. }
  58. }
  59. }
  60. return sched_ctx;
  61. }
  62. int* _get_first_workers(unsigned sched_ctx, unsigned *nworkers, enum starpu_archtype arch)
  63. {
  64. struct sched_ctx_hypervisor_policy_config *config = sched_ctx_hypervisor_get_config(sched_ctx);
  65. int *curr_workers = (int*)malloc((*nworkers) * sizeof(int));
  66. int i;
  67. for(i = 0; i < *nworkers; i++)
  68. curr_workers[i] = -1;
  69. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx);
  70. int index;
  71. int worker;
  72. int considered = 0;
  73. starpu_iterator it;
  74. if(workers->init_iterator)
  75. workers->init_iterator(workers, &it);
  76. for(index = 0; index < *nworkers; index++)
  77. {
  78. while(workers->has_next(workers, &it))
  79. {
  80. considered = 0;
  81. worker = workers->get_next(workers, &it);
  82. enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
  83. if(arch == 0 || curr_arch == arch)
  84. {
  85. if(!config->fixed_workers[worker])
  86. {
  87. for(i = 0; i < index; i++)
  88. {
  89. if(curr_workers[i] == worker)
  90. {
  91. considered = 1;
  92. break;
  93. }
  94. }
  95. if(!considered)
  96. {
  97. /* the first iteration*/
  98. if(curr_workers[index] < 0)
  99. curr_workers[index] = worker;
  100. /* small priority worker is the first to leave the ctx*/
  101. else if(config->priority[worker] <
  102. config->priority[curr_workers[index]])
  103. curr_workers[index] = worker;
  104. /* if we don't consider priorities check for the workers
  105. with the biggest idle time */
  106. else if(config->priority[worker] ==
  107. config->priority[curr_workers[index]])
  108. {
  109. double worker_idle_time = sched_ctx_hypervisor_get_idle_time(sched_ctx, worker);
  110. double curr_worker_idle_time = sched_ctx_hypervisor_get_idle_time(sched_ctx, curr_workers[index]);
  111. if(worker_idle_time > curr_worker_idle_time)
  112. curr_workers[index] = worker;
  113. }
  114. }
  115. }
  116. }
  117. }
  118. if(curr_workers[index] < 0)
  119. {
  120. *nworkers = index;
  121. break;
  122. }
  123. }
  124. return curr_workers;
  125. }
  126. static unsigned _get_potential_nworkers(struct sched_ctx_hypervisor_policy_config *config, unsigned sched_ctx, enum starpu_archtype arch)
  127. {
  128. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx);
  129. unsigned potential_workers = 0;
  130. int worker;
  131. starpu_iterator it;
  132. if(workers->init_iterator)
  133. workers->init_iterator(workers, &it);
  134. while(workers->has_next(workers, &it))
  135. {
  136. worker = workers->get_next(workers, &it);
  137. enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
  138. if(arch == 0 || curr_arch == arch)
  139. {
  140. if(!config->fixed_workers[worker])
  141. potential_workers++;
  142. }
  143. }
  144. return potential_workers;
  145. }
  146. static unsigned _get_nworkers_to_move(unsigned req_sched_ctx)
  147. {
  148. struct sched_ctx_hypervisor_policy_config *config = sched_ctx_hypervisor_get_config(req_sched_ctx);
  149. unsigned nworkers = starpu_sched_ctx_get_nworkers(req_sched_ctx);
  150. unsigned nworkers_to_move = 0;
  151. unsigned potential_moving_workers = _get_potential_nworkers(config, req_sched_ctx, 0);
  152. if(potential_moving_workers > 0)
  153. {
  154. if(potential_moving_workers <= config->min_nworkers)
  155. /* if we have to give more than min better give it all */
  156. /* => empty ctx will block until having the required workers */
  157. nworkers_to_move = potential_moving_workers;
  158. else if(potential_moving_workers > config->max_nworkers)
  159. {
  160. if((potential_moving_workers - config->granularity) > config->max_nworkers)
  161. nworkers_to_move = config->granularity;
  162. else
  163. nworkers_to_move = potential_moving_workers - config->max_nworkers;
  164. }
  165. else if(potential_moving_workers > config->granularity)
  166. {
  167. if((nworkers - config->granularity) > config->min_nworkers)
  168. nworkers_to_move = config->granularity;
  169. else
  170. nworkers_to_move = potential_moving_workers - config->min_nworkers;
  171. }
  172. else
  173. {
  174. int nfixed_workers = nworkers - potential_moving_workers;
  175. if(nfixed_workers >= config->min_nworkers)
  176. nworkers_to_move = potential_moving_workers;
  177. else
  178. nworkers_to_move = potential_moving_workers - (config->min_nworkers - nfixed_workers);
  179. }
  180. if((nworkers - nworkers_to_move) > config->max_nworkers)
  181. nworkers_to_move = nworkers - config->max_nworkers;
  182. }
  183. return nworkers_to_move;
  184. }
  185. static unsigned _simple_resize(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, unsigned force_resize)
  186. {
  187. int ret = 1;
  188. if(force_resize)
  189. pthread_mutex_lock(&act_hypervisor_mutex);
  190. else
  191. ret = pthread_mutex_trylock(&act_hypervisor_mutex);
  192. if(ret != EBUSY)
  193. {
  194. unsigned nworkers_to_move = _get_nworkers_to_move(sender_sched_ctx);
  195. if(nworkers_to_move > 0)
  196. {
  197. unsigned poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  198. if(receiver_sched_ctx == STARPU_NMAX_SCHED_CTXS)
  199. poor_sched_ctx = _find_poor_sched_ctx(sender_sched_ctx, nworkers_to_move);
  200. else
  201. {
  202. poor_sched_ctx = receiver_sched_ctx;
  203. struct sched_ctx_hypervisor_policy_config *config = sched_ctx_hypervisor_get_config(poor_sched_ctx);
  204. unsigned nworkers = starpu_sched_ctx_get_nworkers(poor_sched_ctx);
  205. unsigned nshared_workers = starpu_sched_ctx_get_nshared_workers(sender_sched_ctx, poor_sched_ctx);
  206. if((nworkers+nworkers_to_move-nshared_workers) > config->max_nworkers)
  207. nworkers_to_move = nworkers > config->max_nworkers ? 0 : (config->max_nworkers - nworkers+nshared_workers);
  208. if(nworkers_to_move == 0) poor_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  209. }
  210. if(poor_sched_ctx != STARPU_NMAX_SCHED_CTXS)
  211. {
  212. int *workers_to_move = _get_first_workers(sender_sched_ctx, &nworkers_to_move, 0);
  213. sched_ctx_hypervisor_move_workers(sender_sched_ctx, poor_sched_ctx, workers_to_move, nworkers_to_move);
  214. struct sched_ctx_hypervisor_policy_config *new_config = sched_ctx_hypervisor_get_config(poor_sched_ctx);
  215. int i;
  216. for(i = 0; i < nworkers_to_move; i++)
  217. new_config->max_idle[workers_to_move[i]] = new_config->max_idle[workers_to_move[i]] !=MAX_IDLE_TIME ? new_config->max_idle[workers_to_move[i]] : new_config->new_workers_max_idle;
  218. free(workers_to_move);
  219. }
  220. }
  221. pthread_mutex_unlock(&act_hypervisor_mutex);
  222. return 1;
  223. }
  224. return 0;
  225. }
  226. static int* _get_workers_to_move(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, int *nworkers)
  227. {
  228. int *workers = NULL;
  229. double v_receiver = sched_ctx_hypervisor_get_ctx_velocity(receiver_sched_ctx);
  230. double receiver_remainig_flops = sched_ctx_hypervisor_get_flops_left(receiver_sched_ctx);
  231. double sender_exp_end = sched_ctx_hypervisor_get_exp_end(sender_sched_ctx);
  232. double sender_v_cpu = sched_ctx_hypervisor_get_cpu_velocity(sender_sched_ctx);
  233. // double v_gcpu = sched_ctx_hypervisor_get_gpu_velocity(sender_sched_ctx);
  234. double v_for_rctx = (receiver_remainig_flops/(sender_exp_end - starpu_timing_now())) - v_receiver;
  235. // v_for_rctx /= 2;
  236. int nworkers_needed = v_for_rctx/sender_v_cpu;
  237. /* printf("%d->%d: v_rec %lf v %lf v_cpu %lf w_needed %d \n", sender_sched_ctx, receiver_sched_ctx, */
  238. /* v_receiver, v_for_rctx, sender_v_cpu, nworkers_needed); */
  239. if(nworkers_needed > 0)
  240. {
  241. struct sched_ctx_hypervisor_policy_config *sender_config = sched_ctx_hypervisor_get_config(sender_sched_ctx);
  242. unsigned potential_moving_cpus = _get_potential_nworkers(sender_config, sender_sched_ctx, STARPU_CPU_WORKER);
  243. unsigned potential_moving_gpus = _get_potential_nworkers(sender_config, sender_sched_ctx, STARPU_CUDA_WORKER);
  244. unsigned sender_nworkers = starpu_sched_ctx_get_nworkers(sender_sched_ctx);
  245. struct sched_ctx_hypervisor_policy_config *config = sched_ctx_hypervisor_get_config(receiver_sched_ctx);
  246. unsigned nworkers_ctx = starpu_sched_ctx_get_nworkers(receiver_sched_ctx);
  247. if(nworkers_needed < (potential_moving_cpus + 5 * potential_moving_gpus))
  248. {
  249. if((sender_nworkers - nworkers_needed) >= sender_config->min_nworkers)
  250. {
  251. if((nworkers_ctx + nworkers_needed) > config->max_nworkers)
  252. nworkers_needed = nworkers_ctx > config->max_nworkers ? 0 : (config->max_nworkers - nworkers_ctx);
  253. if(nworkers_needed > 0)
  254. {
  255. int ngpus = nworkers_needed / 5;
  256. int *gpus;
  257. gpus = _get_first_workers(sender_sched_ctx, &ngpus, STARPU_CUDA_WORKER);
  258. int ncpus = nworkers_needed - ngpus;
  259. int *cpus;
  260. cpus = _get_first_workers(sender_sched_ctx, &ncpus, STARPU_CPU_WORKER);
  261. workers = (int*)malloc(nworkers_needed*sizeof(int));
  262. int i;
  263. for(i = 0; i < ngpus; i++)
  264. workers[(*nworkers)++] = gpus[i];
  265. for(i = 0; i < ncpus; i++)
  266. workers[(*nworkers)++] = cpus[i];
  267. free(gpus);
  268. free(cpus);
  269. }
  270. }
  271. }
  272. else
  273. {
  274. int nworkers_to_move = _get_nworkers_to_move(sender_sched_ctx);
  275. if(sender_nworkers - nworkers_to_move >= sender_config->min_nworkers)
  276. {
  277. unsigned nshared_workers = starpu_sched_ctx_get_nshared_workers(sender_sched_ctx, receiver_sched_ctx);
  278. if((nworkers_ctx + nworkers_to_move - nshared_workers) > config->max_nworkers)
  279. nworkers_to_move = nworkers_ctx > config->max_nworkers ? 0 : (config->max_nworkers - nworkers_ctx + nshared_workers);
  280. if(nworkers_to_move > 0)
  281. {
  282. workers = _get_first_workers(sender_sched_ctx, &nworkers_to_move, 0);
  283. *nworkers = nworkers_to_move;
  284. }
  285. }
  286. }
  287. }
  288. return workers;
  289. }
  290. static unsigned _simple_resize2(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, unsigned force_resize)
  291. {
  292. int ret = 1;
  293. if(force_resize)
  294. pthread_mutex_lock(&act_hypervisor_mutex);
  295. else
  296. ret = pthread_mutex_trylock(&act_hypervisor_mutex);
  297. if(ret != EBUSY)
  298. {
  299. int nworkers_to_move = 0;
  300. int *workers_to_move = _get_workers_to_move(sender_sched_ctx, receiver_sched_ctx, &nworkers_to_move);
  301. if(nworkers_to_move > 0)
  302. {
  303. sched_ctx_hypervisor_move_workers(sender_sched_ctx, receiver_sched_ctx, workers_to_move, nworkers_to_move);
  304. struct sched_ctx_hypervisor_policy_config *new_config = sched_ctx_hypervisor_get_config(receiver_sched_ctx);
  305. int i;
  306. for(i = 0; i < nworkers_to_move; i++)
  307. new_config->max_idle[workers_to_move[i]] = new_config->max_idle[workers_to_move[i]] !=MAX_IDLE_TIME ? new_config->max_idle[workers_to_move[i]] : new_config->new_workers_max_idle;
  308. free(workers_to_move);
  309. }
  310. pthread_mutex_unlock(&act_hypervisor_mutex);
  311. return 1;
  312. }
  313. return 0;
  314. }
  315. static unsigned simple_resize(unsigned sender_sched_ctx)
  316. {
  317. return _simple_resize(sender_sched_ctx, STARPU_NMAX_SCHED_CTXS, 1);
  318. }
  319. static void simple_manage_idle_time(unsigned req_sched_ctx, int worker, double idle_time)
  320. {
  321. struct sched_ctx_hypervisor_policy_config *config = sched_ctx_hypervisor_get_config(req_sched_ctx);
  322. if(config != NULL && idle_time > config->max_idle[worker])
  323. simple_resize(req_sched_ctx);
  324. return;
  325. }
  326. int _find_fastest_sched_ctx()
  327. {
  328. int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
  329. int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
  330. double first_exp_end = sched_ctx_hypervisor_get_exp_end(sched_ctxs[0]);
  331. int fastest_sched_ctx = first_exp_end == -1.0 ? -1 : sched_ctxs[0];
  332. double curr_exp_end = 0.0;
  333. int i;
  334. for(i = 1; i < nsched_ctxs; i++)
  335. {
  336. curr_exp_end = sched_ctx_hypervisor_get_exp_end(sched_ctxs[i]);
  337. if(first_exp_end > curr_exp_end && curr_exp_end != -1.0)
  338. {
  339. first_exp_end = curr_exp_end;
  340. fastest_sched_ctx = sched_ctxs[i];
  341. }
  342. }
  343. return fastest_sched_ctx;
  344. }
  345. int _find_slowest_sched_ctx()
  346. {
  347. int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
  348. int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
  349. int slowest_sched_ctx = -1;
  350. double curr_exp_end = 0.0;
  351. double last_exp_end = -1.0;
  352. int i;
  353. for(i = 0; i < nsched_ctxs; i++)
  354. {
  355. curr_exp_end = sched_ctx_hypervisor_get_exp_end(sched_ctxs[i]);
  356. /*if it hasn't started bc of no ressources give it priority */
  357. if(curr_exp_end == -1.0)
  358. return sched_ctxs[i];
  359. if(last_exp_end < curr_exp_end)
  360. {
  361. slowest_sched_ctx = sched_ctxs[i];
  362. last_exp_end = curr_exp_end;
  363. }
  364. }
  365. return slowest_sched_ctx;
  366. }
  367. int _find_slowest_available_sched_ctx(unsigned sched_ctx)
  368. {
  369. int *sched_ctxs = sched_ctx_hypervisor_get_sched_ctxs();
  370. int nsched_ctxs = sched_ctx_hypervisor_get_nsched_ctxs();
  371. int slowest_sched_ctx = -1;
  372. double curr_exp_end = 0.0;
  373. double last_exp_end = -1.0;
  374. int i;
  375. for(i = 0; i < nsched_ctxs; i++)
  376. {
  377. if(sched_ctxs[i] != sched_ctx)
  378. {
  379. curr_exp_end = sched_ctx_hypervisor_get_exp_end(sched_ctxs[i]);
  380. /*if it hasn't started bc of no ressources give it priority */
  381. if(curr_exp_end == -1.0)
  382. return sched_ctxs[i];
  383. if(last_exp_end < curr_exp_end)
  384. {
  385. slowest_sched_ctx = sched_ctxs[i];
  386. last_exp_end = curr_exp_end;
  387. }
  388. }
  389. }
  390. return slowest_sched_ctx;
  391. }
  392. static void simple_manage_gflops_rate(unsigned sched_ctx)
  393. {
  394. double exp_end = sched_ctx_hypervisor_get_exp_end(sched_ctx);
  395. double flops_left_pct = sched_ctx_hypervisor_get_flops_left_pct(sched_ctx);
  396. if(flops_left_pct == 0.0f)
  397. {
  398. int slowest_sched_ctx = _find_slowest_available_sched_ctx(sched_ctx);
  399. if(slowest_sched_ctx != -1)
  400. {
  401. double slowest_flops_left_pct = sched_ctx_hypervisor_get_flops_left_pct(slowest_sched_ctx);
  402. printf("ctx %d finished & gives away the res to %d; slow_left %lf\n", sched_ctx, slowest_sched_ctx, slowest_flops_left_pct);
  403. if(slowest_flops_left_pct != 0.0f)
  404. {
  405. struct sched_ctx_hypervisor_policy_config* config = sched_ctx_hypervisor_get_config(sched_ctx);
  406. config->min_nworkers = 0;
  407. config->max_nworkers = 0;
  408. _simple_resize(sched_ctx, slowest_sched_ctx, 1);
  409. sched_ctx_hypervisor_stop_resize(slowest_sched_ctx);
  410. }
  411. }
  412. }
  413. int fastest_sched_ctx = _find_fastest_sched_ctx();
  414. int slowest_sched_ctx = _find_slowest_sched_ctx();
  415. if(fastest_sched_ctx != -1 && slowest_sched_ctx != -1 && fastest_sched_ctx != slowest_sched_ctx)
  416. {
  417. double fastest_exp_end = sched_ctx_hypervisor_get_exp_end(fastest_sched_ctx);
  418. double slowest_exp_end = sched_ctx_hypervisor_get_exp_end(slowest_sched_ctx);
  419. double fastest_bef_res_exp_end = sched_ctx_hypervisor_get_bef_res_exp_end(fastest_sched_ctx);
  420. double slowest_bef_res_exp_end = sched_ctx_hypervisor_get_bef_res_exp_end(slowest_sched_ctx);
  421. // (fastest_bef_res_exp_end < slowest_bef_res_exp_end ||
  422. // fastest_bef_res_exp_end == 0.0 || slowest_bef_res_exp_end == 0)))
  423. if((slowest_exp_end == -1.0 && fastest_exp_end != -1.0) || ((fastest_exp_end + (fastest_exp_end*0.5)) < slowest_exp_end ))
  424. {
  425. double fast_flops_left_pct = sched_ctx_hypervisor_get_flops_left_pct(fastest_sched_ctx);
  426. if(fast_flops_left_pct < 0.8)
  427. _simple_resize(fastest_sched_ctx, slowest_sched_ctx, 0);
  428. }
  429. }
  430. }
  431. struct sched_ctx_hypervisor_policy idle_policy =
  432. {
  433. .manage_idle_time = simple_manage_idle_time,
  434. .manage_gflops_rate = simple_manage_gflops_rate,
  435. .resize = simple_resize,
  436. };
  437. struct sched_ctx_hypervisor_policy app_driven_policy =
  438. {
  439. .manage_idle_time = simple_manage_idle_time,
  440. .manage_gflops_rate = simple_manage_gflops_rate,
  441. .resize = simple_resize,
  442. };
  443. struct sched_ctx_hypervisor_policy gflops_rate_policy =
  444. {
  445. .manage_idle_time = simple_manage_idle_time,
  446. .manage_gflops_rate = simple_manage_gflops_rate,
  447. .resize = simple_resize,
  448. };