sched_ctx_hypervisor.c 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011, 2012 INRIA
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <sched_ctx_hypervisor_intern.h>
  17. #include <common/uthash.h>
  18. #include <starpu_config.h>
  19. unsigned imposed_resize = 0;
  20. struct starpu_sched_ctx_performance_counters* perf_counters = NULL;
  21. static void notify_idle_cycle(unsigned sched_ctx, int worker, double idle_time);
  22. static void notify_pushed_task(unsigned sched_ctx, int worker);
  23. static void notify_poped_task(unsigned sched_ctx, int worker, struct starpu_task *task, size_t data_size, uint32_t footprint);
  24. static void notify_post_exec_hook(unsigned sched_ctx, int taskid);
  25. static void notify_idle_end(unsigned sched_ctx, int worker);
  26. static void notify_submitted_job(struct starpu_task *task, unsigned footprint);
  27. static void notify_delete_context(unsigned sched_ctx);
  28. extern struct sched_ctx_hypervisor_policy idle_policy;
  29. extern struct sched_ctx_hypervisor_policy app_driven_policy;
  30. extern struct sched_ctx_hypervisor_policy gflops_rate_policy;
  31. #ifdef STARPU_HAVE_GLPK_H
  32. extern struct sched_ctx_hypervisor_policy lp_policy;
  33. extern struct sched_ctx_hypervisor_policy lp2_policy;
  34. extern struct sched_ctx_hypervisor_policy ispeed_lp_policy;
  35. extern struct sched_ctx_hypervisor_policy debit_lp_policy;
  36. #endif // STARPU_HAVE_GLPK_
  37. extern struct sched_ctx_hypervisor_policy ispeed_policy;
  38. static struct sched_ctx_hypervisor_policy *predefined_policies[] =
  39. {
  40. &idle_policy,
  41. &app_driven_policy,
  42. #ifdef STARPU_HAVE_GLPK_H
  43. &lp_policy,
  44. &lp2_policy,
  45. &ispeed_lp_policy,
  46. &debit_lp_policy,
  47. #endif // STARPU_HAVE_GLPK_H
  48. &gflops_rate_policy,
  49. &ispeed_policy
  50. };
  51. static void _load_hypervisor_policy(struct sched_ctx_hypervisor_policy *policy)
  52. {
  53. STARPU_ASSERT(policy);
  54. hypervisor.policy.name = policy->name;
  55. hypervisor.policy.size_ctxs = policy->size_ctxs;
  56. hypervisor.policy.handle_poped_task = policy->handle_poped_task;
  57. hypervisor.policy.handle_pushed_task = policy->handle_pushed_task;
  58. hypervisor.policy.handle_idle_cycle = policy->handle_idle_cycle;
  59. hypervisor.policy.handle_idle_end = policy->handle_idle_end;
  60. hypervisor.policy.handle_post_exec_hook = policy->handle_post_exec_hook;
  61. hypervisor.policy.handle_submitted_job = policy->handle_submitted_job;
  62. hypervisor.policy.end_ctx = policy->end_ctx;
  63. }
  64. static struct sched_ctx_hypervisor_policy *_find_hypervisor_policy_from_name(const char *policy_name)
  65. {
  66. if (!policy_name)
  67. return NULL;
  68. unsigned i;
  69. for (i = 0; i < sizeof(predefined_policies)/sizeof(predefined_policies[0]); i++)
  70. {
  71. struct sched_ctx_hypervisor_policy *p;
  72. p = predefined_policies[i];
  73. if (p->name)
  74. {
  75. if (strcmp(policy_name, p->name) == 0) {
  76. /* we found a policy with the requested name */
  77. return p;
  78. }
  79. }
  80. }
  81. fprintf(stderr, "Warning: hypervisor policy \"%s\" was not found, try \"help\" to get a list\n", policy_name);
  82. /* nothing was found */
  83. return NULL;
  84. }
  85. static struct sched_ctx_hypervisor_policy *_select_hypervisor_policy(struct sched_ctx_hypervisor_policy* hypervisor_policy)
  86. {
  87. struct sched_ctx_hypervisor_policy *selected_policy = NULL;
  88. if(hypervisor_policy && hypervisor_policy->custom)
  89. return hypervisor_policy;
  90. /* we look if the application specified the name of a policy to load */
  91. const char *policy_name;
  92. if (hypervisor_policy && hypervisor_policy->name)
  93. {
  94. policy_name = hypervisor_policy->name;
  95. }
  96. else
  97. {
  98. policy_name = getenv("HYPERVISOR_POLICY");
  99. }
  100. if (policy_name)
  101. selected_policy = _find_hypervisor_policy_from_name(policy_name);
  102. /* Perhaps there was no policy that matched the name */
  103. if (selected_policy)
  104. return selected_policy;
  105. /* If no policy was specified, we use the idle policy as a default */
  106. return &idle_policy;
  107. }
  108. /* initializez the performance counters that starpu will use to retrive hints for resizing */
  109. struct starpu_sched_ctx_performance_counters* sched_ctx_hypervisor_init(struct sched_ctx_hypervisor_policy *hypervisor_policy)
  110. {
  111. hypervisor.min_tasks = 0;
  112. hypervisor.nsched_ctxs = 0;
  113. starpu_pthread_mutex_init(&act_hypervisor_mutex, NULL);
  114. hypervisor.start_executing_time = starpu_timing_now();
  115. int i;
  116. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  117. {
  118. hypervisor.resize[i] = 0;
  119. hypervisor.allow_remove[i] = 1;
  120. hypervisor.configurations[i] = NULL;
  121. hypervisor.sr = NULL;
  122. hypervisor.check_min_tasks[i] = 1;
  123. hypervisor.sched_ctxs[i] = STARPU_NMAX_SCHED_CTXS;
  124. hypervisor.sched_ctx_w[i].sched_ctx = STARPU_NMAX_SCHED_CTXS;
  125. hypervisor.sched_ctx_w[i].config = NULL;
  126. hypervisor.sched_ctx_w[i].total_flops = 0.0;
  127. hypervisor.sched_ctx_w[i].submitted_flops = 0.0;
  128. hypervisor.sched_ctx_w[i].remaining_flops = 0.0;
  129. hypervisor.sched_ctx_w[i].start_time = 0.0;
  130. hypervisor.sched_ctx_w[i].real_start_time = 0.0;
  131. hypervisor.sched_ctx_w[i].resize_ack.receiver_sched_ctx = -1;
  132. hypervisor.sched_ctx_w[i].resize_ack.moved_workers = NULL;
  133. hypervisor.sched_ctx_w[i].resize_ack.nmoved_workers = 0;
  134. hypervisor.sched_ctx_w[i].resize_ack.acked_workers = NULL;
  135. starpu_pthread_mutex_init(&hypervisor.sched_ctx_w[i].mutex, NULL);
  136. int j;
  137. for(j = 0; j < STARPU_NMAXWORKERS; j++)
  138. {
  139. hypervisor.sched_ctx_w[i].current_idle_time[j] = 0.0;
  140. hypervisor.sched_ctx_w[i].pushed_tasks[j] = 0;
  141. hypervisor.sched_ctx_w[i].poped_tasks[j] = 0;
  142. hypervisor.sched_ctx_w[i].elapsed_flops[j] = 0.0;
  143. hypervisor.sched_ctx_w[i].elapsed_data[j] = 0;
  144. hypervisor.sched_ctx_w[i].elapsed_tasks[j] = 0;
  145. hypervisor.sched_ctx_w[i].total_elapsed_flops[j] = 0.0;
  146. hypervisor.sched_ctx_w[i].worker_to_be_removed[j] = 0;
  147. hypervisor.sched_ctx_w[i].ref_velocity[j] = -1.0;
  148. }
  149. }
  150. struct sched_ctx_hypervisor_policy *selected_hypervisor_policy = _select_hypervisor_policy(hypervisor_policy);
  151. _load_hypervisor_policy(selected_hypervisor_policy);
  152. perf_counters = (struct starpu_sched_ctx_performance_counters*)malloc(sizeof(struct starpu_sched_ctx_performance_counters));
  153. perf_counters->notify_idle_cycle = notify_idle_cycle;
  154. perf_counters->notify_pushed_task = notify_pushed_task;
  155. perf_counters->notify_poped_task = notify_poped_task;
  156. perf_counters->notify_post_exec_hook = notify_post_exec_hook;
  157. perf_counters->notify_idle_end = notify_idle_end;
  158. perf_counters->notify_submitted_job = notify_submitted_job;
  159. perf_counters->notify_delete_context = notify_delete_context;
  160. starpu_sched_ctx_notify_hypervisor_exists();
  161. return perf_counters;
  162. }
  163. const char* sched_ctx_hypervisor_get_policy()
  164. {
  165. return hypervisor.policy.name;
  166. }
  167. /* the user can forbid the resizing process*/
  168. void sched_ctx_hypervisor_stop_resize(unsigned sched_ctx)
  169. {
  170. imposed_resize = 1;
  171. hypervisor.resize[sched_ctx] = 0;
  172. }
  173. /* the user can restart the resizing process*/
  174. void sched_ctx_hypervisor_start_resize(unsigned sched_ctx)
  175. {
  176. imposed_resize = 1;
  177. hypervisor.resize[sched_ctx] = 1;
  178. }
  179. static void _print_current_time()
  180. {
  181. double curr_time = starpu_timing_now();
  182. double elapsed_time = (curr_time - hypervisor.start_executing_time) / 1000000.0; /* in seconds */
  183. fprintf(stdout, "Time: %lf\n", elapsed_time);
  184. int i;
  185. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  186. {
  187. if(hypervisor.sched_ctxs[i] != STARPU_NMAX_SCHED_CTXS)
  188. {
  189. struct sched_ctx_hypervisor_wrapper *sc_w = &hypervisor.sched_ctx_w[hypervisor.sched_ctxs[i]];
  190. double cpu_speed = sched_ctx_hypervisor_get_velocity(sc_w, STARPU_CPU_WORKER);
  191. double cuda_speed = sched_ctx_hypervisor_get_velocity(sc_w, STARPU_CUDA_WORKER);
  192. int ncpus = sched_ctx_hypervisor_get_nworkers_ctx(sc_w->sched_ctx, STARPU_CPU_WORKER);
  193. int ncuda = sched_ctx_hypervisor_get_nworkers_ctx(sc_w->sched_ctx, STARPU_CUDA_WORKER);
  194. fprintf(stdout, "%d: cpu_v = %lf cuda_v = %lf ncpus = %d ncuda = %d\n", hypervisor.sched_ctxs[i], cpu_speed, cuda_speed, ncpus, ncuda);
  195. }
  196. }
  197. return;
  198. }
  199. void sched_ctx_hypervisor_shutdown(void)
  200. {
  201. // printf("shutdown\n");
  202. int i;
  203. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  204. {
  205. if(hypervisor.sched_ctxs[i] != STARPU_NMAX_SCHED_CTXS && hypervisor.nsched_ctxs > 0)
  206. {
  207. sched_ctx_hypervisor_stop_resize(hypervisor.sched_ctxs[i]);
  208. sched_ctx_hypervisor_unregister_ctx(hypervisor.sched_ctxs[i]);
  209. starpu_pthread_mutex_destroy(&hypervisor.sched_ctx_w[i].mutex);
  210. }
  211. }
  212. perf_counters->notify_idle_cycle = NULL;
  213. perf_counters->notify_pushed_task = NULL;
  214. perf_counters->notify_poped_task = NULL;
  215. perf_counters->notify_post_exec_hook = NULL;
  216. perf_counters->notify_idle_end = NULL;
  217. perf_counters->notify_delete_context = NULL;
  218. free(perf_counters);
  219. perf_counters = NULL;
  220. starpu_pthread_mutex_destroy(&act_hypervisor_mutex);
  221. }
  222. /* the hypervisor is in charge only of the contexts registered to it*/
  223. void sched_ctx_hypervisor_register_ctx(unsigned sched_ctx, double total_flops)
  224. {
  225. starpu_pthread_mutex_lock(&act_hypervisor_mutex);
  226. hypervisor.configurations[sched_ctx] = NULL;
  227. hypervisor.resize_requests[sched_ctx] = NULL;
  228. starpu_pthread_mutex_init(&hypervisor.conf_mut[sched_ctx], NULL);
  229. starpu_pthread_mutex_init(&hypervisor.resize_mut[sched_ctx], NULL);
  230. _add_config(sched_ctx);
  231. hypervisor.sched_ctx_w[sched_ctx].sched_ctx = sched_ctx;
  232. hypervisor.sched_ctxs[hypervisor.nsched_ctxs++] = sched_ctx;
  233. hypervisor.sched_ctx_w[sched_ctx].total_flops = total_flops;
  234. hypervisor.sched_ctx_w[sched_ctx].remaining_flops = total_flops;
  235. if(strcmp(hypervisor.policy.name, "app_driven") == 0)
  236. hypervisor.resize[sched_ctx] = 1;
  237. starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
  238. }
  239. static int _get_first_free_sched_ctx(int *sched_ctxs, int nsched_ctxs)
  240. {
  241. int i;
  242. for(i = 0; i < nsched_ctxs; i++)
  243. if(sched_ctxs[i] == STARPU_NMAX_SCHED_CTXS)
  244. return i;
  245. return STARPU_NMAX_SCHED_CTXS;
  246. }
  247. /* rearange array of sched_ctxs in order not to have {MAXVAL, MAXVAL, 5, MAXVAL, 7}
  248. and have instead {5, 7, MAXVAL, MAXVAL, MAXVAL}
  249. it is easier afterwards to iterate the array
  250. */
  251. static void _rearange_sched_ctxs(int *sched_ctxs, int old_nsched_ctxs)
  252. {
  253. int first_free_id = STARPU_NMAX_SCHED_CTXS;
  254. int i;
  255. for(i = 0; i < old_nsched_ctxs; i++)
  256. {
  257. if(sched_ctxs[i] != STARPU_NMAX_SCHED_CTXS)
  258. {
  259. first_free_id = _get_first_free_sched_ctx(sched_ctxs, old_nsched_ctxs);
  260. if(first_free_id != STARPU_NMAX_SCHED_CTXS)
  261. {
  262. sched_ctxs[first_free_id] = sched_ctxs[i];
  263. sched_ctxs[i] = STARPU_NMAX_SCHED_CTXS;
  264. }
  265. }
  266. }
  267. }
  268. /* unregistered contexts will no longer be resized */
  269. void sched_ctx_hypervisor_unregister_ctx(unsigned sched_ctx)
  270. {
  271. if(hypervisor.policy.end_ctx)
  272. hypervisor.policy.end_ctx(sched_ctx);
  273. starpu_pthread_mutex_lock(&act_hypervisor_mutex);
  274. unsigned i;
  275. for(i = 0; i < hypervisor.nsched_ctxs; i++)
  276. {
  277. if(hypervisor.sched_ctxs[i] == (int)sched_ctx)
  278. {
  279. hypervisor.sched_ctxs[i] = STARPU_NMAX_SCHED_CTXS;
  280. break;
  281. }
  282. }
  283. _rearange_sched_ctxs(hypervisor.sched_ctxs, hypervisor.nsched_ctxs);
  284. hypervisor.nsched_ctxs--;
  285. hypervisor.sched_ctx_w[sched_ctx].sched_ctx = STARPU_NMAX_SCHED_CTXS;
  286. _remove_config(sched_ctx);
  287. /* free(hypervisor.configurations[sched_ctx]); */
  288. /* free(hypervisor.resize_requests[sched_ctx]); */
  289. starpu_pthread_mutex_destroy(&hypervisor.conf_mut[sched_ctx]);
  290. starpu_pthread_mutex_destroy(&hypervisor.resize_mut[sched_ctx]);
  291. if(hypervisor.nsched_ctxs == 1)
  292. sched_ctx_hypervisor_stop_resize(hypervisor.sched_ctxs[0]);
  293. starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
  294. }
  295. static double _get_best_total_elapsed_flops(struct sched_ctx_hypervisor_wrapper* sc_w, int *npus, enum starpu_archtype req_arch)
  296. {
  297. double ret_val = 0.0;
  298. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sc_w->sched_ctx);
  299. int worker;
  300. struct starpu_sched_ctx_iterator it;
  301. if(workers->init_iterator)
  302. workers->init_iterator(workers, &it);
  303. while(workers->has_next(workers, &it))
  304. {
  305. worker = workers->get_next(workers, &it);
  306. enum starpu_archtype arch = starpu_worker_get_type(worker);
  307. if(arch == req_arch)
  308. {
  309. if(sc_w->total_elapsed_flops[worker] > ret_val)
  310. ret_val = sc_w->total_elapsed_flops[worker];
  311. (*npus)++;
  312. }
  313. }
  314. return ret_val;
  315. }
  316. /* compute an average value of the cpu/cuda velocity */
  317. double sched_ctx_hypervisor_get_velocity_per_worker_type(struct sched_ctx_hypervisor_wrapper* sc_w, enum starpu_archtype arch)
  318. {
  319. int npus = 0;
  320. double elapsed_flops = _get_best_total_elapsed_flops(sc_w, &npus, arch) / 1000000000.0 ; /* in gflops */
  321. if(npus == 0)
  322. return -1.0;
  323. if( elapsed_flops != 0.0)
  324. {
  325. double curr_time = starpu_timing_now();
  326. double elapsed_time = (curr_time - sc_w->real_start_time) / 1000000.0; /* in seconds */
  327. double velocity = (elapsed_flops/elapsed_time); /* in Gflops/s */
  328. return velocity;
  329. }
  330. return -1.0;
  331. }
  332. /* compute an average value of the cpu/cuda old velocity */
  333. double _get_ref_velocity_per_worker_type(struct sched_ctx_hypervisor_wrapper* sc_w, enum starpu_archtype arch)
  334. {
  335. double ref_velocity = 0.0;
  336. unsigned nw = 0;
  337. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sc_w->sched_ctx);
  338. int worker;
  339. struct starpu_sched_ctx_iterator it;
  340. if(workers->init_iterator)
  341. workers->init_iterator(workers, &it);
  342. while(workers->has_next(workers, &it))
  343. {
  344. worker = workers->get_next(workers, &it);
  345. if(sc_w->ref_velocity[worker] > 1.0)
  346. {
  347. ref_velocity += sc_w->ref_velocity[worker];
  348. nw++;
  349. }
  350. }
  351. if(nw > 0)
  352. return ref_velocity / nw;
  353. return -1.0;
  354. }
  355. static int get_ntasks( int *tasks)
  356. {
  357. int ntasks = 0;
  358. int j;
  359. for(j = 0; j < STARPU_NMAXWORKERS; j++)
  360. {
  361. ntasks += tasks[j];
  362. }
  363. return ntasks;
  364. }
  365. static void _get_cpus(int *workers, int nworkers, int *cpus, int *ncpus)
  366. {
  367. int i, worker;
  368. *ncpus = 0;
  369. for(i = 0; i < nworkers; i++)
  370. {
  371. worker = workers[i];
  372. enum starpu_archtype arch = starpu_worker_get_type(worker);
  373. if(arch == STARPU_CPU_WORKER)
  374. cpus[(*ncpus)++] = worker;
  375. }
  376. }
  377. int sched_ctx_hypervisor_get_nworkers_ctx(unsigned sched_ctx, enum starpu_archtype arch)
  378. {
  379. int nworkers_ctx = 0;
  380. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx);
  381. int worker;
  382. struct starpu_sched_ctx_iterator it;
  383. if(workers->init_iterator)
  384. workers->init_iterator(workers, &it);
  385. while(workers->has_next(workers, &it))
  386. {
  387. worker = workers->get_next(workers, &it);
  388. enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
  389. if(curr_arch == arch || arch == STARPU_ANY_WORKER)
  390. nworkers_ctx++;
  391. }
  392. return nworkers_ctx;
  393. }
  394. static void _set_elapsed_flops_per_sched_ctx(unsigned sched_ctx, double val)
  395. {
  396. int i;
  397. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  398. {
  399. hypervisor.sched_ctx_w[sched_ctx].elapsed_flops[i] = val;
  400. if(val == 0)
  401. {
  402. hypervisor.sched_ctx_w[sched_ctx].elapsed_data[i] = 0;
  403. hypervisor.sched_ctx_w[sched_ctx].elapsed_tasks[i] = 0;
  404. }
  405. }
  406. }
  407. double sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(struct sched_ctx_hypervisor_wrapper* sc_w)
  408. {
  409. double ret_val = 0.0;
  410. int i;
  411. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  412. ret_val += sc_w->elapsed_flops[i];
  413. return ret_val;
  414. }
  415. double sched_ctx_hypervisor_get_total_elapsed_flops_per_sched_ctx(struct sched_ctx_hypervisor_wrapper* sc_w)
  416. {
  417. double ret_val = 0.0;
  418. int i;
  419. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  420. ret_val += sc_w->total_elapsed_flops[i];
  421. return ret_val;
  422. }
  423. void _reset_resize_sample_info(unsigned sender_sched_ctx, unsigned receiver_sched_ctx)
  424. {
  425. /* info concerning only the gflops_rate strateg */
  426. struct sched_ctx_hypervisor_wrapper *sender_sc_w = &hypervisor.sched_ctx_w[sender_sched_ctx];
  427. struct sched_ctx_hypervisor_wrapper *receiver_sc_w = &hypervisor.sched_ctx_w[receiver_sched_ctx];
  428. double start_time = starpu_timing_now();
  429. sender_sc_w->start_time = start_time;
  430. // sender_sc_w->remaining_flops = sender_sc_w->remaining_flops - sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(sender_sc_w);
  431. _set_elapsed_flops_per_sched_ctx(sender_sched_ctx, 0.0);
  432. receiver_sc_w->start_time = start_time;
  433. // receiver_sc_w->remaining_flops = receiver_sc_w->remaining_flops - sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(receiver_sc_w);
  434. _set_elapsed_flops_per_sched_ctx(receiver_sched_ctx, 0.0);
  435. }
  436. /* actually move the workers: the cpus are moved, gpus are only shared */
  437. /* forbids another resize request before this one is take into account */
  438. void sched_ctx_hypervisor_move_workers(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, int* workers_to_move, unsigned nworkers_to_move, unsigned now)
  439. {
  440. if(nworkers_to_move > 0 && hypervisor.resize[sender_sched_ctx])// && hypervisor.resize[receiver_sched_ctx])
  441. {
  442. _print_current_time();
  443. unsigned j;
  444. printf("resize ctx %d with %d workers", sender_sched_ctx, nworkers_to_move);
  445. for(j = 0; j < nworkers_to_move; j++)
  446. printf(" %d", workers_to_move[j]);
  447. printf("\n");
  448. hypervisor.allow_remove[receiver_sched_ctx] = 0;
  449. starpu_sched_ctx_add_workers(workers_to_move, nworkers_to_move, receiver_sched_ctx);
  450. if(now)
  451. {
  452. unsigned j;
  453. printf("remove now from ctx %d:", sender_sched_ctx);
  454. for(j = 0; j < nworkers_to_move; j++)
  455. printf(" %d", workers_to_move[j]);
  456. printf("\n");
  457. starpu_sched_ctx_remove_workers(workers_to_move, nworkers_to_move, sender_sched_ctx);
  458. hypervisor.allow_remove[receiver_sched_ctx] = 1;
  459. _reset_resize_sample_info(sender_sched_ctx, receiver_sched_ctx);
  460. }
  461. else
  462. {
  463. int ret = starpu_pthread_mutex_trylock(&hypervisor.sched_ctx_w[sender_sched_ctx].mutex);
  464. if(ret != EBUSY)
  465. {
  466. hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.receiver_sched_ctx = receiver_sched_ctx;
  467. hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.moved_workers = (int*)malloc(nworkers_to_move * sizeof(int));
  468. hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.nmoved_workers = nworkers_to_move;
  469. hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.acked_workers = (int*)malloc(nworkers_to_move * sizeof(int));
  470. unsigned i;
  471. for(i = 0; i < nworkers_to_move; i++)
  472. {
  473. hypervisor.sched_ctx_w[sender_sched_ctx].current_idle_time[workers_to_move[i]] = 0.0;
  474. hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.moved_workers[i] = workers_to_move[i];
  475. hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.acked_workers[i] = 0;
  476. }
  477. hypervisor.resize[sender_sched_ctx] = 0;
  478. // hypervisor.resize[receiver_sched_ctx] = 0;
  479. starpu_pthread_mutex_unlock(&hypervisor.sched_ctx_w[sender_sched_ctx].mutex);
  480. }
  481. }
  482. struct sched_ctx_hypervisor_policy_config *new_config = sched_ctx_hypervisor_get_config(receiver_sched_ctx);
  483. unsigned i;
  484. for(i = 0; i < nworkers_to_move; i++)
  485. new_config->max_idle[workers_to_move[i]] = new_config->max_idle[workers_to_move[i]] !=MAX_IDLE_TIME ? new_config->max_idle[workers_to_move[i]] : new_config->new_workers_max_idle;
  486. }
  487. return;
  488. }
  489. void sched_ctx_hypervisor_add_workers_to_sched_ctx(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx)
  490. {
  491. if(nworkers_to_add > 0 && hypervisor.resize[sched_ctx])
  492. {
  493. _print_current_time();
  494. unsigned j;
  495. printf("add to ctx %d:", sched_ctx);
  496. for(j = 0; j < nworkers_to_add; j++)
  497. printf(" %d", workers_to_add[j]);
  498. printf("\n");
  499. starpu_sched_ctx_add_workers(workers_to_add, nworkers_to_add, sched_ctx);
  500. struct sched_ctx_hypervisor_policy_config *new_config = sched_ctx_hypervisor_get_config(sched_ctx);
  501. unsigned i;
  502. for(i = 0; i < nworkers_to_add; i++)
  503. new_config->max_idle[workers_to_add[i]] = new_config->max_idle[workers_to_add[i]] != MAX_IDLE_TIME ? new_config->max_idle[workers_to_add[i]] : new_config->new_workers_max_idle;
  504. }
  505. return;
  506. }
  507. unsigned sched_ctx_hypervisor_can_resize(unsigned sched_ctx)
  508. {
  509. return hypervisor.resize[sched_ctx];
  510. }
  511. void sched_ctx_hypervisor_remove_workers_from_sched_ctx(int* workers_to_remove, unsigned nworkers_to_remove, unsigned sched_ctx, unsigned now)
  512. {
  513. if(nworkers_to_remove > 0 && hypervisor.resize[sched_ctx] && hypervisor.allow_remove[sched_ctx])
  514. {
  515. _print_current_time();
  516. unsigned nworkers = 0;
  517. int workers[nworkers_to_remove];
  518. if(now)
  519. {
  520. unsigned j;
  521. printf("remove explicitley now from ctx %d:", sched_ctx);
  522. for(j = 0; j < nworkers_to_remove; j++)
  523. printf(" %d", workers_to_remove[j]);
  524. printf("\n");
  525. starpu_sched_ctx_remove_workers(workers_to_remove, nworkers_to_remove, sched_ctx);
  526. }
  527. else
  528. {
  529. printf("try to remove from ctx %d: ", sched_ctx);
  530. unsigned j;
  531. for(j = 0; j < nworkers_to_remove; j++)
  532. printf(" %d", workers_to_remove[j]);
  533. printf("\n");
  534. int ret = starpu_pthread_mutex_trylock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
  535. if(ret != EBUSY)
  536. {
  537. unsigned i;
  538. for(i = 0; i < nworkers_to_remove; i++)
  539. if(starpu_sched_ctx_contains_worker(workers_to_remove[i], sched_ctx))
  540. workers[nworkers++] = workers_to_remove[i];
  541. hypervisor.sched_ctx_w[sched_ctx].resize_ack.receiver_sched_ctx = -1;
  542. hypervisor.sched_ctx_w[sched_ctx].resize_ack.moved_workers = (int*)malloc(nworkers_to_remove * sizeof(int));
  543. hypervisor.sched_ctx_w[sched_ctx].resize_ack.nmoved_workers = (int)nworkers;
  544. hypervisor.sched_ctx_w[sched_ctx].resize_ack.acked_workers = (int*)malloc(nworkers_to_remove * sizeof(int));
  545. for(i = 0; i < nworkers; i++)
  546. {
  547. hypervisor.sched_ctx_w[sched_ctx].current_idle_time[workers[i]] = 0.0;
  548. hypervisor.sched_ctx_w[sched_ctx].resize_ack.moved_workers[i] = workers[i];
  549. hypervisor.sched_ctx_w[sched_ctx].resize_ack.acked_workers[i] = 0;
  550. }
  551. hypervisor.resize[sched_ctx] = 0;
  552. starpu_pthread_mutex_unlock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
  553. }
  554. }
  555. }
  556. return;
  557. }
  558. static unsigned _ack_resize_completed(unsigned sched_ctx, int worker)
  559. {
  560. if(worker != -1 && !starpu_sched_ctx_contains_worker(worker, sched_ctx))
  561. return 0;
  562. struct sched_ctx_hypervisor_resize_ack *resize_ack = NULL;
  563. unsigned sender_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  564. int i;
  565. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  566. {
  567. if(hypervisor.sched_ctxs[i] != STARPU_NMAX_SCHED_CTXS)
  568. {
  569. struct sched_ctx_hypervisor_wrapper *sc_w = &hypervisor.sched_ctx_w[hypervisor.sched_ctxs[i]];
  570. starpu_pthread_mutex_lock(&sc_w->mutex);
  571. unsigned only_remove = 0;
  572. if(sc_w->resize_ack.receiver_sched_ctx == -1 && hypervisor.sched_ctxs[i] != (int)sched_ctx &&
  573. sc_w->resize_ack.nmoved_workers > 0 && starpu_sched_ctx_contains_worker(worker, hypervisor.sched_ctxs[i]))
  574. {
  575. int j;
  576. for(j = 0; j < sc_w->resize_ack.nmoved_workers; j++)
  577. if(sc_w->resize_ack.moved_workers[j] == worker)
  578. {
  579. only_remove = 1;
  580. break;
  581. }
  582. }
  583. if(only_remove ||
  584. (sc_w->resize_ack.receiver_sched_ctx != -1 && sc_w->resize_ack.receiver_sched_ctx == (int)sched_ctx))
  585. {
  586. resize_ack = &sc_w->resize_ack;
  587. sender_sched_ctx = hypervisor.sched_ctxs[i];
  588. starpu_pthread_mutex_unlock(&sc_w->mutex);
  589. break;
  590. }
  591. starpu_pthread_mutex_unlock(&sc_w->mutex);
  592. }
  593. }
  594. /* if there is no ctx waiting for its ack return 1*/
  595. if(resize_ack == NULL)
  596. return 1;
  597. int ret = starpu_pthread_mutex_trylock(&hypervisor.sched_ctx_w[sender_sched_ctx].mutex);
  598. if(ret != EBUSY)
  599. {
  600. int *moved_workers = resize_ack->moved_workers;
  601. int nmoved_workers = resize_ack->nmoved_workers;
  602. int *acked_workers = resize_ack->acked_workers;
  603. if(worker != -1)
  604. {
  605. for(i = 0; i < nmoved_workers; i++)
  606. {
  607. int moved_worker = moved_workers[i];
  608. if(moved_worker == worker && acked_workers[i] == 0)
  609. {
  610. acked_workers[i] = 1;
  611. }
  612. }
  613. }
  614. int nacked_workers = 0;
  615. for(i = 0; i < nmoved_workers; i++)
  616. {
  617. nacked_workers += (acked_workers[i] == 1);
  618. }
  619. unsigned resize_completed = (nacked_workers == nmoved_workers);
  620. int receiver_sched_ctx = sched_ctx;
  621. if(resize_completed)
  622. {
  623. /* if the permission to resize is not allowed by the user don't do it
  624. whatever the application says */
  625. if(!((hypervisor.resize[sender_sched_ctx] == 0 || hypervisor.resize[receiver_sched_ctx] == 0) && imposed_resize))
  626. {
  627. /* int j; */
  628. /* printf("remove after ack from ctx %d:", sender_sched_ctx); */
  629. /* for(j = 0; j < nmoved_workers; j++) */
  630. /* printf(" %d", moved_workers[j]); */
  631. /* printf("\n"); */
  632. starpu_sched_ctx_remove_workers(moved_workers, nmoved_workers, sender_sched_ctx);
  633. _reset_resize_sample_info(sender_sched_ctx, receiver_sched_ctx);
  634. hypervisor.resize[sender_sched_ctx] = 1;
  635. hypervisor.allow_remove[receiver_sched_ctx] = 1;
  636. // hypervisor.resize[receiver_sched_ctx] = 1;
  637. /* if the user allowed resizing leave the decisions to the application */
  638. if(imposed_resize) imposed_resize = 0;
  639. resize_ack->receiver_sched_ctx = -1;
  640. resize_ack->nmoved_workers = 0;
  641. free(resize_ack->moved_workers);
  642. free(resize_ack->acked_workers);
  643. }
  644. starpu_pthread_mutex_unlock(&hypervisor.sched_ctx_w[sender_sched_ctx].mutex);
  645. return resize_completed;
  646. }
  647. starpu_pthread_mutex_unlock(&hypervisor.sched_ctx_w[sender_sched_ctx].mutex);
  648. }
  649. return 0;
  650. }
  651. /* Enqueue a resize request for 'sched_ctx', to be executed when the
  652. * 'task_tag' tasks of 'sched_ctx' complete. */
  653. void sched_ctx_hypervisor_resize(unsigned sched_ctx, int task_tag)
  654. {
  655. struct resize_request_entry *entry;
  656. entry = malloc(sizeof *entry);
  657. STARPU_ASSERT(entry != NULL);
  658. entry->sched_ctx = sched_ctx;
  659. entry->task_tag = task_tag;
  660. starpu_pthread_mutex_lock(&hypervisor.resize_mut[sched_ctx]);
  661. HASH_ADD_INT(hypervisor.resize_requests[sched_ctx], task_tag, entry);
  662. starpu_pthread_mutex_unlock(&hypervisor.resize_mut[sched_ctx]);
  663. }
  664. /* notifies the hypervisor that the worker is no longer idle and a new task was pushed on its queue */
  665. static void notify_idle_end(unsigned sched_ctx, int worker)
  666. {
  667. if(hypervisor.resize[sched_ctx])
  668. hypervisor.sched_ctx_w[sched_ctx].current_idle_time[worker] = 0.0;
  669. if(hypervisor.policy.handle_idle_end)
  670. hypervisor.policy.handle_idle_end(sched_ctx, worker);
  671. }
  672. /* notifies the hypervisor that the worker spent another cycle in idle time */
  673. static void notify_idle_cycle(unsigned sched_ctx, int worker, double idle_time)
  674. {
  675. if(hypervisor.resize[sched_ctx])
  676. {
  677. struct sched_ctx_hypervisor_wrapper *sc_w = &hypervisor.sched_ctx_w[sched_ctx];
  678. sc_w->current_idle_time[worker] += idle_time;
  679. if(hypervisor.policy.handle_idle_cycle)
  680. {
  681. hypervisor.policy.handle_idle_cycle(sched_ctx, worker);
  682. }
  683. }
  684. return;
  685. }
  686. /* notifies the hypervisor that a new task was pushed on the queue of the worker */
  687. static void notify_pushed_task(unsigned sched_ctx, int worker)
  688. {
  689. hypervisor.sched_ctx_w[sched_ctx].pushed_tasks[worker]++;
  690. if(hypervisor.sched_ctx_w[sched_ctx].total_flops != 0.0 && hypervisor.sched_ctx_w[sched_ctx].start_time == 0.0)
  691. hypervisor.sched_ctx_w[sched_ctx].start_time = starpu_timing_now();
  692. if(hypervisor.sched_ctx_w[sched_ctx].total_flops != 0.0 && hypervisor.sched_ctx_w[sched_ctx].real_start_time == 0.0)
  693. hypervisor.sched_ctx_w[sched_ctx].real_start_time = starpu_timing_now();
  694. int ntasks = get_ntasks(hypervisor.sched_ctx_w[sched_ctx].pushed_tasks);
  695. if((hypervisor.min_tasks == 0 || (!(hypervisor.resize[sched_ctx] == 0 && imposed_resize) && ntasks == hypervisor.min_tasks)) && hypervisor.check_min_tasks[sched_ctx])
  696. {
  697. hypervisor.resize[sched_ctx] = 1;
  698. if(imposed_resize) imposed_resize = 0;
  699. hypervisor.check_min_tasks[sched_ctx] = 0;
  700. }
  701. if(hypervisor.policy.handle_pushed_task)
  702. hypervisor.policy.handle_pushed_task(sched_ctx, worker);
  703. }
  704. /* notifies the hypervisor that a task was poped from the queue of the worker */
  705. static void notify_poped_task(unsigned sched_ctx, int worker, struct starpu_task *task, size_t data_size, uint32_t footprint)
  706. {
  707. hypervisor.sched_ctx_w[sched_ctx].poped_tasks[worker]++;
  708. hypervisor.sched_ctx_w[sched_ctx].elapsed_flops[worker] += task->flops;
  709. hypervisor.sched_ctx_w[sched_ctx].elapsed_data[worker] += data_size ;
  710. hypervisor.sched_ctx_w[sched_ctx].elapsed_tasks[worker]++ ;
  711. hypervisor.sched_ctx_w[sched_ctx].total_elapsed_flops[worker] += task->flops;
  712. hypervisor.sched_ctx_w[sched_ctx].remaining_flops -= task->flops; //sched_ctx_hypervisor_get_elapsed_flops_per_sched_ctx(&hypervisor.sched_ctx_w[sched_ctx]);
  713. if(hypervisor.resize[sched_ctx])
  714. {
  715. if(hypervisor.policy.handle_poped_task)
  716. hypervisor.policy.handle_poped_task(sched_ctx, worker, task, footprint);
  717. }
  718. _ack_resize_completed(sched_ctx, worker);
  719. if(hypervisor.sched_ctx_w[sched_ctx].poped_tasks[worker] % 200 == 0)
  720. _print_current_time();
  721. }
  722. /* notifies the hypervisor that a tagged task has just been executed */
  723. static void notify_post_exec_hook(unsigned sched_ctx, int task_tag)
  724. {
  725. STARPU_ASSERT(task_tag > 0);
  726. unsigned conf_sched_ctx;
  727. unsigned i;
  728. starpu_pthread_mutex_lock(&act_hypervisor_mutex);
  729. unsigned ns = hypervisor.nsched_ctxs;
  730. starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
  731. for(i = 0; i < ns; i++)
  732. {
  733. struct configuration_entry *entry;
  734. conf_sched_ctx = hypervisor.sched_ctxs[i];
  735. starpu_pthread_mutex_lock(&hypervisor.conf_mut[conf_sched_ctx]);
  736. HASH_FIND_INT(hypervisor.configurations[conf_sched_ctx], &task_tag, entry);
  737. if (entry != NULL)
  738. {
  739. struct sched_ctx_hypervisor_policy_config *config = entry->configuration;
  740. sched_ctx_hypervisor_set_config(conf_sched_ctx, config);
  741. HASH_DEL(hypervisor.configurations[conf_sched_ctx], entry);
  742. free(config);
  743. }
  744. starpu_pthread_mutex_unlock(&hypervisor.conf_mut[conf_sched_ctx]);
  745. }
  746. if(hypervisor.resize[sched_ctx])
  747. {
  748. starpu_pthread_mutex_lock(&hypervisor.resize_mut[sched_ctx]);
  749. if(hypervisor.policy.handle_post_exec_hook)
  750. {
  751. /* Check whether 'task_tag' is in the 'resize_requests' set. */
  752. struct resize_request_entry *entry;
  753. HASH_FIND_INT(hypervisor.resize_requests[sched_ctx], &task_tag, entry);
  754. if (entry != NULL)
  755. {
  756. hypervisor.policy.handle_post_exec_hook(sched_ctx,
  757. task_tag);
  758. HASH_DEL(hypervisor.resize_requests[sched_ctx], entry);
  759. free(entry);
  760. }
  761. }
  762. starpu_pthread_mutex_unlock(&hypervisor.resize_mut[sched_ctx]);
  763. }
  764. return;
  765. }
  766. static void notify_submitted_job(struct starpu_task *task, uint32_t footprint)
  767. {
  768. starpu_pthread_mutex_lock(&act_hypervisor_mutex);
  769. hypervisor.sched_ctx_w[task->sched_ctx].submitted_flops += task->flops;
  770. starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
  771. if(hypervisor.policy.handle_submitted_job)
  772. hypervisor.policy.handle_submitted_job(task, footprint);
  773. }
  774. static void notify_delete_context(unsigned sched_ctx)
  775. {
  776. _print_current_time();
  777. sched_ctx_hypervisor_unregister_ctx(sched_ctx);
  778. }
  779. void sched_ctx_hypervisor_size_ctxs(int *sched_ctxs, int nsched_ctxs, int *workers, int nworkers)
  780. {
  781. starpu_pthread_mutex_lock(&act_hypervisor_mutex);
  782. unsigned curr_nsched_ctxs = sched_ctxs == NULL ? hypervisor.nsched_ctxs : nsched_ctxs;
  783. int *curr_sched_ctxs = sched_ctxs == NULL ? hypervisor.sched_ctxs : sched_ctxs;
  784. starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
  785. unsigned s;
  786. for(s = 0; s < curr_nsched_ctxs; s++)
  787. hypervisor.resize[curr_sched_ctxs[s]] = 1;
  788. if(hypervisor.policy.size_ctxs)
  789. hypervisor.policy.size_ctxs(curr_sched_ctxs, curr_nsched_ctxs, workers, nworkers);
  790. }
  791. struct sched_ctx_hypervisor_wrapper* sched_ctx_hypervisor_get_wrapper(unsigned sched_ctx)
  792. {
  793. return &hypervisor.sched_ctx_w[sched_ctx];
  794. }
  795. int* sched_ctx_hypervisor_get_sched_ctxs()
  796. {
  797. return hypervisor.sched_ctxs;
  798. }
  799. int sched_ctx_hypervisor_get_nsched_ctxs()
  800. {
  801. int ns;
  802. ns = hypervisor.nsched_ctxs;
  803. return ns;
  804. }
  805. void sched_ctx_hypervisor_save_size_req(int *sched_ctxs, int nsched_ctxs, int *workers, int nworkers)
  806. {
  807. hypervisor.sr = (struct size_request*)malloc(sizeof(struct size_request));
  808. hypervisor.sr->sched_ctxs = sched_ctxs;
  809. hypervisor.sr->nsched_ctxs = nsched_ctxs;
  810. hypervisor.sr->workers = workers;
  811. hypervisor.sr->nworkers = nworkers;
  812. }
  813. unsigned sched_ctx_hypervisor_get_size_req(int **sched_ctxs, int* nsched_ctxs, int **workers, int *nworkers)
  814. {
  815. if(hypervisor.sr != NULL)
  816. {
  817. *sched_ctxs = hypervisor.sr->sched_ctxs;
  818. *nsched_ctxs = hypervisor.sr->nsched_ctxs;
  819. *workers = hypervisor.sr->workers;
  820. *nworkers = hypervisor.sr->nworkers;
  821. return 1;
  822. }
  823. return 0;
  824. }
  825. void sched_ctx_hypervisor_free_size_req(void)
  826. {
  827. if(hypervisor.sr != NULL)
  828. {
  829. free(hypervisor.sr);
  830. hypervisor.sr = NULL;
  831. }
  832. }
  833. double sched_ctx_hypervisor_get_velocity(struct sched_ctx_hypervisor_wrapper *sc_w, enum starpu_archtype arch)
  834. {
  835. double velocity = sched_ctx_hypervisor_get_velocity_per_worker_type(sc_w, arch);
  836. if(velocity == -1.0)
  837. velocity = _get_ref_velocity_per_worker_type(sc_w, arch);
  838. if(velocity == -1.0)
  839. velocity = arch == STARPU_CPU_WORKER ? 5.0 : 100.0;
  840. return velocity;
  841. }