sc_hypervisor.c 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011, 2012 INRIA
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <sc_hypervisor_intern.h>
  17. #include <common/uthash.h>
  18. #include <starpu_config.h>
  19. unsigned imposed_resize = 0;
  20. unsigned type_of_tasks_known = 0;
  21. struct starpu_sched_ctx_performance_counters* perf_counters = NULL;
  22. static void notify_idle_cycle(unsigned sched_ctx, int worker, double idle_time);
  23. static void notify_pushed_task(unsigned sched_ctx, int worker);
  24. static void notify_poped_task(unsigned sched_ctx, int worker, struct starpu_task *task, size_t data_size, uint32_t footprint);
  25. static void notify_post_exec_hook(unsigned sched_ctx, int taskid);
  26. static void notify_idle_end(unsigned sched_ctx, int worker);
  27. static void notify_submitted_job(struct starpu_task *task, unsigned footprint);
  28. static void notify_delete_context(unsigned sched_ctx);
  29. extern struct sc_hypervisor_policy idle_policy;
  30. extern struct sc_hypervisor_policy app_driven_policy;
  31. extern struct sc_hypervisor_policy gflops_rate_policy;
  32. #ifdef STARPU_HAVE_GLPK_H
  33. extern struct sc_hypervisor_policy feft_lp_policy;
  34. extern struct sc_hypervisor_policy teft_lp_policy;
  35. extern struct sc_hypervisor_policy ispeed_lp_policy;
  36. extern struct sc_hypervisor_policy debit_lp_policy;
  37. #endif // STARPU_HAVE_GLPK_
  38. extern struct sc_hypervisor_policy ispeed_policy;
  39. static struct sc_hypervisor_policy *predefined_policies[] =
  40. {
  41. &idle_policy,
  42. &app_driven_policy,
  43. #ifdef STARPU_HAVE_GLPK_H
  44. &feft_lp_policy,
  45. &teft_lp_policy,
  46. &ispeed_lp_policy,
  47. &debit_lp_policy,
  48. #endif // STARPU_HAVE_GLPK_H
  49. &gflops_rate_policy,
  50. &ispeed_policy
  51. };
  52. static void _load_hypervisor_policy(struct sc_hypervisor_policy *policy)
  53. {
  54. STARPU_ASSERT(policy);
  55. hypervisor.policy.name = policy->name;
  56. hypervisor.policy.size_ctxs = policy->size_ctxs;
  57. hypervisor.policy.handle_poped_task = policy->handle_poped_task;
  58. hypervisor.policy.handle_pushed_task = policy->handle_pushed_task;
  59. hypervisor.policy.handle_idle_cycle = policy->handle_idle_cycle;
  60. hypervisor.policy.handle_idle_end = policy->handle_idle_end;
  61. hypervisor.policy.handle_post_exec_hook = policy->handle_post_exec_hook;
  62. hypervisor.policy.handle_submitted_job = policy->handle_submitted_job;
  63. hypervisor.policy.end_ctx = policy->end_ctx;
  64. }
  65. static struct sc_hypervisor_policy *_find_hypervisor_policy_from_name(const char *policy_name)
  66. {
  67. if (!policy_name)
  68. return NULL;
  69. unsigned i;
  70. for (i = 0; i < sizeof(predefined_policies)/sizeof(predefined_policies[0]); i++)
  71. {
  72. struct sc_hypervisor_policy *p;
  73. p = predefined_policies[i];
  74. if (p->name)
  75. {
  76. if (strcmp(policy_name, p->name) == 0) {
  77. /* we found a policy with the requested name */
  78. return p;
  79. }
  80. }
  81. }
  82. fprintf(stderr, "Warning: hypervisor policy \"%s\" was not found, try \"help\" to get a list\n", policy_name);
  83. /* nothing was found */
  84. return NULL;
  85. }
  86. static struct sc_hypervisor_policy *_select_hypervisor_policy(struct sc_hypervisor_policy* hypervisor_policy)
  87. {
  88. struct sc_hypervisor_policy *selected_policy = NULL;
  89. if(hypervisor_policy && hypervisor_policy->custom)
  90. return hypervisor_policy;
  91. /* we look if the application specified the name of a policy to load */
  92. const char *policy_name;
  93. if (hypervisor_policy && hypervisor_policy->name)
  94. {
  95. policy_name = hypervisor_policy->name;
  96. }
  97. else
  98. {
  99. policy_name = getenv("HYPERVISOR_POLICY");
  100. }
  101. if (policy_name)
  102. selected_policy = _find_hypervisor_policy_from_name(policy_name);
  103. /* Perhaps there was no policy that matched the name */
  104. if (selected_policy)
  105. return selected_policy;
  106. /* If no policy was specified, we use the idle policy as a default */
  107. return &idle_policy;
  108. }
  109. /* initializez the performance counters that starpu will use to retrive hints for resizing */
  110. struct starpu_sched_ctx_performance_counters* sc_hypervisor_init(struct sc_hypervisor_policy *hypervisor_policy)
  111. {
  112. hypervisor.min_tasks = 0;
  113. hypervisor.nsched_ctxs = 0;
  114. starpu_pthread_mutex_init(&act_hypervisor_mutex, NULL);
  115. hypervisor.start_executing_time = starpu_timing_now();
  116. int i;
  117. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  118. {
  119. hypervisor.resize[i] = 0;
  120. hypervisor.allow_remove[i] = 1;
  121. hypervisor.configurations[i] = NULL;
  122. hypervisor.sr = NULL;
  123. hypervisor.check_min_tasks[i] = 1;
  124. hypervisor.sched_ctxs[i] = STARPU_NMAX_SCHED_CTXS;
  125. hypervisor.sched_ctx_w[i].sched_ctx = STARPU_NMAX_SCHED_CTXS;
  126. hypervisor.sched_ctx_w[i].config = NULL;
  127. hypervisor.sched_ctx_w[i].total_flops = 0.0;
  128. hypervisor.sched_ctx_w[i].submitted_flops = 0.0;
  129. hypervisor.sched_ctx_w[i].remaining_flops = 0.0;
  130. hypervisor.sched_ctx_w[i].start_time = 0.0;
  131. hypervisor.sched_ctx_w[i].real_start_time = 0.0;
  132. hypervisor.sched_ctx_w[i].resize_ack.receiver_sched_ctx = -1;
  133. hypervisor.sched_ctx_w[i].resize_ack.moved_workers = NULL;
  134. hypervisor.sched_ctx_w[i].resize_ack.nmoved_workers = 0;
  135. hypervisor.sched_ctx_w[i].resize_ack.acked_workers = NULL;
  136. starpu_pthread_mutex_init(&hypervisor.sched_ctx_w[i].mutex, NULL);
  137. int j;
  138. for(j = 0; j < STARPU_NMAXWORKERS; j++)
  139. {
  140. hypervisor.sched_ctx_w[i].current_idle_time[j] = 0.0;
  141. hypervisor.sched_ctx_w[i].pushed_tasks[j] = 0;
  142. hypervisor.sched_ctx_w[i].poped_tasks[j] = 0;
  143. hypervisor.sched_ctx_w[i].elapsed_flops[j] = 0.0;
  144. hypervisor.sched_ctx_w[i].elapsed_data[j] = 0;
  145. hypervisor.sched_ctx_w[i].elapsed_tasks[j] = 0;
  146. hypervisor.sched_ctx_w[i].total_elapsed_flops[j] = 0.0;
  147. hypervisor.sched_ctx_w[i].worker_to_be_removed[j] = 0;
  148. hypervisor.sched_ctx_w[i].ref_velocity[j] = -1.0;
  149. }
  150. }
  151. struct sc_hypervisor_policy *selected_hypervisor_policy = _select_hypervisor_policy(hypervisor_policy);
  152. _load_hypervisor_policy(selected_hypervisor_policy);
  153. perf_counters = (struct starpu_sched_ctx_performance_counters*)malloc(sizeof(struct starpu_sched_ctx_performance_counters));
  154. perf_counters->notify_idle_cycle = notify_idle_cycle;
  155. perf_counters->notify_pushed_task = notify_pushed_task;
  156. perf_counters->notify_poped_task = notify_poped_task;
  157. perf_counters->notify_post_exec_hook = notify_post_exec_hook;
  158. perf_counters->notify_idle_end = notify_idle_end;
  159. perf_counters->notify_submitted_job = notify_submitted_job;
  160. perf_counters->notify_delete_context = notify_delete_context;
  161. starpu_sched_ctx_notify_hypervisor_exists();
  162. return perf_counters;
  163. }
  164. const char* sc_hypervisor_get_policy()
  165. {
  166. return hypervisor.policy.name;
  167. }
  168. /* the user can forbid the resizing process*/
  169. void sc_hypervisor_stop_resize(unsigned sched_ctx)
  170. {
  171. imposed_resize = 1;
  172. hypervisor.resize[sched_ctx] = 0;
  173. }
  174. /* the user can restart the resizing process*/
  175. void sc_hypervisor_start_resize(unsigned sched_ctx)
  176. {
  177. imposed_resize = 1;
  178. hypervisor.resize[sched_ctx] = 1;
  179. }
  180. static void _print_current_time()
  181. {
  182. double curr_time = starpu_timing_now();
  183. double elapsed_time = (curr_time - hypervisor.start_executing_time) / 1000000.0; /* in seconds */
  184. fprintf(stdout, "Time: %lf\n", elapsed_time);
  185. int i;
  186. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  187. {
  188. if(hypervisor.sched_ctxs[i] != STARPU_NMAX_SCHED_CTXS)
  189. {
  190. struct sc_hypervisor_wrapper *sc_w = &hypervisor.sched_ctx_w[hypervisor.sched_ctxs[i]];
  191. double cpu_speed = sc_hypervisor_get_velocity(sc_w, STARPU_CPU_WORKER);
  192. double cuda_speed = sc_hypervisor_get_velocity(sc_w, STARPU_CUDA_WORKER);
  193. int ncpus = sc_hypervisor_get_nworkers_ctx(sc_w->sched_ctx, STARPU_CPU_WORKER);
  194. int ncuda = sc_hypervisor_get_nworkers_ctx(sc_w->sched_ctx, STARPU_CUDA_WORKER);
  195. fprintf(stdout, "%d: cpu_v = %lf cuda_v = %lf ncpus = %d ncuda = %d\n", hypervisor.sched_ctxs[i], cpu_speed, cuda_speed, ncpus, ncuda);
  196. }
  197. }
  198. return;
  199. }
  200. void sc_hypervisor_shutdown(void)
  201. {
  202. // printf("shutdown\n");
  203. int i;
  204. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  205. {
  206. if(hypervisor.sched_ctxs[i] != STARPU_NMAX_SCHED_CTXS && hypervisor.nsched_ctxs > 0)
  207. {
  208. sc_hypervisor_stop_resize(hypervisor.sched_ctxs[i]);
  209. sc_hypervisor_unregister_ctx(hypervisor.sched_ctxs[i]);
  210. starpu_pthread_mutex_destroy(&hypervisor.sched_ctx_w[i].mutex);
  211. }
  212. }
  213. perf_counters->notify_idle_cycle = NULL;
  214. perf_counters->notify_pushed_task = NULL;
  215. perf_counters->notify_poped_task = NULL;
  216. perf_counters->notify_post_exec_hook = NULL;
  217. perf_counters->notify_idle_end = NULL;
  218. perf_counters->notify_delete_context = NULL;
  219. free(perf_counters);
  220. perf_counters = NULL;
  221. starpu_pthread_mutex_destroy(&act_hypervisor_mutex);
  222. }
  223. /* the hypervisor is in charge only of the contexts registered to it*/
  224. void sc_hypervisor_register_ctx(unsigned sched_ctx, double total_flops)
  225. {
  226. starpu_pthread_mutex_lock(&act_hypervisor_mutex);
  227. hypervisor.configurations[sched_ctx] = NULL;
  228. hypervisor.resize_requests[sched_ctx] = NULL;
  229. starpu_pthread_mutex_init(&hypervisor.conf_mut[sched_ctx], NULL);
  230. starpu_pthread_mutex_init(&hypervisor.resize_mut[sched_ctx], NULL);
  231. _add_config(sched_ctx);
  232. hypervisor.sched_ctx_w[sched_ctx].sched_ctx = sched_ctx;
  233. hypervisor.sched_ctxs[hypervisor.nsched_ctxs++] = sched_ctx;
  234. hypervisor.sched_ctx_w[sched_ctx].total_flops = total_flops;
  235. hypervisor.sched_ctx_w[sched_ctx].remaining_flops = total_flops;
  236. if(strcmp(hypervisor.policy.name, "app_driven") == 0)
  237. hypervisor.resize[sched_ctx] = 1;
  238. starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
  239. }
  240. static int _get_first_free_sched_ctx(int *sched_ctxs, int nsched_ctxs)
  241. {
  242. int i;
  243. for(i = 0; i < nsched_ctxs; i++)
  244. if(sched_ctxs[i] == STARPU_NMAX_SCHED_CTXS)
  245. return i;
  246. return STARPU_NMAX_SCHED_CTXS;
  247. }
  248. /* rearange array of sched_ctxs in order not to have {MAXVAL, MAXVAL, 5, MAXVAL, 7}
  249. and have instead {5, 7, MAXVAL, MAXVAL, MAXVAL}
  250. it is easier afterwards to iterate the array
  251. */
  252. static void _rearange_sched_ctxs(int *sched_ctxs, int old_nsched_ctxs)
  253. {
  254. int first_free_id = STARPU_NMAX_SCHED_CTXS;
  255. int i;
  256. for(i = 0; i < old_nsched_ctxs; i++)
  257. {
  258. if(sched_ctxs[i] != STARPU_NMAX_SCHED_CTXS)
  259. {
  260. first_free_id = _get_first_free_sched_ctx(sched_ctxs, old_nsched_ctxs);
  261. if(first_free_id != STARPU_NMAX_SCHED_CTXS)
  262. {
  263. sched_ctxs[first_free_id] = sched_ctxs[i];
  264. sched_ctxs[i] = STARPU_NMAX_SCHED_CTXS;
  265. }
  266. }
  267. }
  268. }
  269. /* unregistered contexts will no longer be resized */
  270. void sc_hypervisor_unregister_ctx(unsigned sched_ctx)
  271. {
  272. if(hypervisor.policy.end_ctx)
  273. hypervisor.policy.end_ctx(sched_ctx);
  274. starpu_pthread_mutex_lock(&act_hypervisor_mutex);
  275. unsigned i;
  276. for(i = 0; i < hypervisor.nsched_ctxs; i++)
  277. {
  278. if(hypervisor.sched_ctxs[i] == (int)sched_ctx)
  279. {
  280. hypervisor.sched_ctxs[i] = STARPU_NMAX_SCHED_CTXS;
  281. break;
  282. }
  283. }
  284. _rearange_sched_ctxs(hypervisor.sched_ctxs, hypervisor.nsched_ctxs);
  285. hypervisor.nsched_ctxs--;
  286. hypervisor.sched_ctx_w[sched_ctx].sched_ctx = STARPU_NMAX_SCHED_CTXS;
  287. _remove_config(sched_ctx);
  288. /* free(hypervisor.configurations[sched_ctx]); */
  289. /* free(hypervisor.resize_requests[sched_ctx]); */
  290. starpu_pthread_mutex_destroy(&hypervisor.conf_mut[sched_ctx]);
  291. starpu_pthread_mutex_destroy(&hypervisor.resize_mut[sched_ctx]);
  292. if(hypervisor.nsched_ctxs == 1)
  293. sc_hypervisor_stop_resize(hypervisor.sched_ctxs[0]);
  294. starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
  295. }
  296. static double _get_best_total_elapsed_flops(struct sc_hypervisor_wrapper* sc_w, int *npus, enum starpu_worker_archtype req_arch)
  297. {
  298. double ret_val = 0.0;
  299. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sc_w->sched_ctx);
  300. int worker;
  301. struct starpu_sched_ctx_iterator it;
  302. if(workers->init_iterator)
  303. workers->init_iterator(workers, &it);
  304. while(workers->has_next(workers, &it))
  305. {
  306. worker = workers->get_next(workers, &it);
  307. enum starpu_worker_archtype arch = starpu_worker_get_type(worker);
  308. if(arch == req_arch)
  309. {
  310. if(sc_w->total_elapsed_flops[worker] > ret_val)
  311. ret_val = sc_w->total_elapsed_flops[worker];
  312. (*npus)++;
  313. }
  314. }
  315. return ret_val;
  316. }
  317. /* compute an average value of the cpu/cuda velocity */
  318. double sc_hypervisorsc_hypervisor_get_velocity_per_worker_type(struct sc_hypervisor_wrapper* sc_w, enum starpu_worker_archtype arch)
  319. {
  320. int npus = 0;
  321. double elapsed_flops = _get_best_total_elapsed_flops(sc_w, &npus, arch) / 1000000000.0 ; /* in gflops */
  322. if(npus == 0)
  323. return -1.0;
  324. if( elapsed_flops != 0.0)
  325. {
  326. double curr_time = starpu_timing_now();
  327. double elapsed_time = (curr_time - sc_w->real_start_time) / 1000000.0; /* in seconds */
  328. double velocity = (elapsed_flops/elapsed_time); /* in Gflops/s */
  329. return velocity;
  330. }
  331. return -1.0;
  332. }
  333. /* compute an average value of the cpu/cuda old velocity */
  334. double sc_hypervisor_get_ref_velocity_per_worker_type(struct sc_hypervisor_wrapper* sc_w, enum starpu_worker_archtype arch)
  335. {
  336. double ref_velocity = 0.0;
  337. unsigned nw = 0;
  338. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sc_w->sched_ctx);
  339. int worker;
  340. struct starpu_sched_ctx_iterator it;
  341. if(workers->init_iterator)
  342. workers->init_iterator(workers, &it);
  343. while(workers->has_next(workers, &it))
  344. {
  345. worker = workers->get_next(workers, &it);
  346. if(sc_w->ref_velocity[worker] > 1.0)
  347. {
  348. ref_velocity += sc_w->ref_velocity[worker];
  349. nw++;
  350. }
  351. }
  352. if(nw > 0)
  353. return ref_velocity / nw;
  354. return -1.0;
  355. }
  356. static int get_ntasks( int *tasks)
  357. {
  358. int ntasks = 0;
  359. int j;
  360. for(j = 0; j < STARPU_NMAXWORKERS; j++)
  361. {
  362. ntasks += tasks[j];
  363. }
  364. return ntasks;
  365. }
  366. static void _get_cpus(int *workers, int nworkers, int *cpus, int *ncpus)
  367. {
  368. int i, worker;
  369. *ncpus = 0;
  370. for(i = 0; i < nworkers; i++)
  371. {
  372. worker = workers[i];
  373. enum starpu_worker_archtype arch = starpu_worker_get_type(worker);
  374. if(arch == STARPU_CPU_WORKER)
  375. cpus[(*ncpus)++] = worker;
  376. }
  377. }
  378. int sc_hypervisor_get_nworkers_ctx(unsigned sched_ctx, enum starpu_worker_archtype arch)
  379. {
  380. int nworkers_ctx = 0;
  381. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx);
  382. int worker;
  383. struct starpu_sched_ctx_iterator it;
  384. if(workers->init_iterator)
  385. workers->init_iterator(workers, &it);
  386. while(workers->has_next(workers, &it))
  387. {
  388. worker = workers->get_next(workers, &it);
  389. enum starpu_worker_archtype curr_arch = starpu_worker_get_type(worker);
  390. if(curr_arch == arch || arch == STARPU_ANY_WORKER)
  391. nworkers_ctx++;
  392. }
  393. return nworkers_ctx;
  394. }
  395. static void _set_elapsed_flops_per_sched_ctx(unsigned sched_ctx, double val)
  396. {
  397. int i;
  398. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  399. {
  400. hypervisor.sched_ctx_w[sched_ctx].elapsed_flops[i] = val;
  401. if(val == 0)
  402. {
  403. hypervisor.sched_ctx_w[sched_ctx].elapsed_data[i] = 0;
  404. hypervisor.sched_ctx_w[sched_ctx].elapsed_tasks[i] = 0;
  405. }
  406. }
  407. }
  408. double sc_hypervisor_get_elapsed_flops_per_sched_ctx(struct sc_hypervisor_wrapper* sc_w)
  409. {
  410. double ret_val = 0.0;
  411. int i;
  412. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  413. ret_val += sc_w->elapsed_flops[i];
  414. return ret_val;
  415. }
  416. double sc_hypervisor_get_total_elapsed_flops_per_sched_ctx(struct sc_hypervisor_wrapper* sc_w)
  417. {
  418. double ret_val = 0.0;
  419. int i;
  420. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  421. ret_val += sc_w->total_elapsed_flops[i];
  422. return ret_val;
  423. }
  424. void _reset_resize_sample_info(unsigned sender_sched_ctx, unsigned receiver_sched_ctx)
  425. {
  426. /* info concerning only the gflops_rate strateg */
  427. struct sc_hypervisor_wrapper *sender_sc_w = &hypervisor.sched_ctx_w[sender_sched_ctx];
  428. struct sc_hypervisor_wrapper *receiver_sc_w = &hypervisor.sched_ctx_w[receiver_sched_ctx];
  429. double start_time = starpu_timing_now();
  430. sender_sc_w->start_time = start_time;
  431. _set_elapsed_flops_per_sched_ctx(sender_sched_ctx, 0.0);
  432. receiver_sc_w->start_time = start_time;
  433. _set_elapsed_flops_per_sched_ctx(receiver_sched_ctx, 0.0);
  434. }
  435. /* actually move the workers: the cpus are moved, gpus are only shared */
  436. /* forbids another resize request before this one is take into account */
  437. void sc_hypervisor_move_workers(unsigned sender_sched_ctx, unsigned receiver_sched_ctx, int* workers_to_move, unsigned nworkers_to_move, unsigned now)
  438. {
  439. if(nworkers_to_move > 0 && hypervisor.resize[sender_sched_ctx])
  440. {
  441. _print_current_time();
  442. unsigned j;
  443. printf("resize ctx %d with %d workers", sender_sched_ctx, nworkers_to_move);
  444. for(j = 0; j < nworkers_to_move; j++)
  445. printf(" %d", workers_to_move[j]);
  446. printf("\n");
  447. hypervisor.allow_remove[receiver_sched_ctx] = 0;
  448. starpu_sched_ctx_add_workers(workers_to_move, nworkers_to_move, receiver_sched_ctx);
  449. if(now)
  450. {
  451. unsigned j;
  452. printf("remove now from ctx %d:", sender_sched_ctx);
  453. for(j = 0; j < nworkers_to_move; j++)
  454. printf(" %d", workers_to_move[j]);
  455. printf("\n");
  456. starpu_sched_ctx_remove_workers(workers_to_move, nworkers_to_move, sender_sched_ctx);
  457. hypervisor.allow_remove[receiver_sched_ctx] = 1;
  458. _reset_resize_sample_info(sender_sched_ctx, receiver_sched_ctx);
  459. }
  460. else
  461. {
  462. int ret = starpu_pthread_mutex_trylock(&hypervisor.sched_ctx_w[sender_sched_ctx].mutex);
  463. if(ret != EBUSY)
  464. {
  465. hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.receiver_sched_ctx = receiver_sched_ctx;
  466. hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.moved_workers = (int*)malloc(nworkers_to_move * sizeof(int));
  467. hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.nmoved_workers = nworkers_to_move;
  468. hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.acked_workers = (int*)malloc(nworkers_to_move * sizeof(int));
  469. unsigned i;
  470. for(i = 0; i < nworkers_to_move; i++)
  471. {
  472. hypervisor.sched_ctx_w[sender_sched_ctx].current_idle_time[workers_to_move[i]] = 0.0;
  473. hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.moved_workers[i] = workers_to_move[i];
  474. hypervisor.sched_ctx_w[sender_sched_ctx].resize_ack.acked_workers[i] = 0;
  475. }
  476. hypervisor.resize[sender_sched_ctx] = 0;
  477. starpu_pthread_mutex_unlock(&hypervisor.sched_ctx_w[sender_sched_ctx].mutex);
  478. }
  479. }
  480. struct sc_hypervisor_policy_config *new_config = sc_hypervisor_get_config(receiver_sched_ctx);
  481. unsigned i;
  482. for(i = 0; i < nworkers_to_move; i++)
  483. new_config->max_idle[workers_to_move[i]] = new_config->max_idle[workers_to_move[i]] !=MAX_IDLE_TIME ? new_config->max_idle[workers_to_move[i]] : new_config->new_workers_max_idle;
  484. }
  485. return;
  486. }
  487. void sc_hypervisor_add_workers_to_sched_ctx(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx)
  488. {
  489. if(nworkers_to_add > 0 && hypervisor.resize[sched_ctx])
  490. {
  491. _print_current_time();
  492. unsigned j;
  493. printf("add to ctx %d:", sched_ctx);
  494. for(j = 0; j < nworkers_to_add; j++)
  495. printf(" %d", workers_to_add[j]);
  496. printf("\n");
  497. starpu_sched_ctx_add_workers(workers_to_add, nworkers_to_add, sched_ctx);
  498. struct sc_hypervisor_policy_config *new_config = sc_hypervisor_get_config(sched_ctx);
  499. unsigned i;
  500. for(i = 0; i < nworkers_to_add; i++)
  501. new_config->max_idle[workers_to_add[i]] = new_config->max_idle[workers_to_add[i]] != MAX_IDLE_TIME ? new_config->max_idle[workers_to_add[i]] : new_config->new_workers_max_idle;
  502. }
  503. return;
  504. }
  505. unsigned sc_hypervisor_can_resize(unsigned sched_ctx)
  506. {
  507. return hypervisor.resize[sched_ctx];
  508. }
  509. void sc_hypervisor_remove_workers_from_sched_ctx(int* workers_to_remove, unsigned nworkers_to_remove, unsigned sched_ctx, unsigned now)
  510. {
  511. if(nworkers_to_remove > 0 && hypervisor.resize[sched_ctx] && hypervisor.allow_remove[sched_ctx])
  512. {
  513. _print_current_time();
  514. unsigned nworkers = 0;
  515. int workers[nworkers_to_remove];
  516. if(now)
  517. {
  518. unsigned j;
  519. printf("remove explicitley now from ctx %d:", sched_ctx);
  520. for(j = 0; j < nworkers_to_remove; j++)
  521. printf(" %d", workers_to_remove[j]);
  522. printf("\n");
  523. starpu_sched_ctx_remove_workers(workers_to_remove, nworkers_to_remove, sched_ctx);
  524. }
  525. else
  526. {
  527. printf("try to remove from ctx %d: ", sched_ctx);
  528. unsigned j;
  529. for(j = 0; j < nworkers_to_remove; j++)
  530. printf(" %d", workers_to_remove[j]);
  531. printf("\n");
  532. int ret = starpu_pthread_mutex_trylock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
  533. if(ret != EBUSY)
  534. {
  535. unsigned i;
  536. for(i = 0; i < nworkers_to_remove; i++)
  537. if(starpu_sched_ctx_contains_worker(workers_to_remove[i], sched_ctx))
  538. workers[nworkers++] = workers_to_remove[i];
  539. hypervisor.sched_ctx_w[sched_ctx].resize_ack.receiver_sched_ctx = -1;
  540. hypervisor.sched_ctx_w[sched_ctx].resize_ack.moved_workers = (int*)malloc(nworkers_to_remove * sizeof(int));
  541. hypervisor.sched_ctx_w[sched_ctx].resize_ack.nmoved_workers = (int)nworkers;
  542. hypervisor.sched_ctx_w[sched_ctx].resize_ack.acked_workers = (int*)malloc(nworkers_to_remove * sizeof(int));
  543. for(i = 0; i < nworkers; i++)
  544. {
  545. hypervisor.sched_ctx_w[sched_ctx].current_idle_time[workers[i]] = 0.0;
  546. hypervisor.sched_ctx_w[sched_ctx].resize_ack.moved_workers[i] = workers[i];
  547. hypervisor.sched_ctx_w[sched_ctx].resize_ack.acked_workers[i] = 0;
  548. }
  549. hypervisor.resize[sched_ctx] = 0;
  550. starpu_pthread_mutex_unlock(&hypervisor.sched_ctx_w[sched_ctx].mutex);
  551. }
  552. }
  553. }
  554. return;
  555. }
  556. static unsigned _ack_resize_completed(unsigned sched_ctx, int worker)
  557. {
  558. if(worker != -1 && !starpu_sched_ctx_contains_worker(worker, sched_ctx))
  559. return 0;
  560. struct sc_hypervisor_resize_ack *resize_ack = NULL;
  561. unsigned sender_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  562. int i;
  563. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  564. {
  565. if(hypervisor.sched_ctxs[i] != STARPU_NMAX_SCHED_CTXS)
  566. {
  567. struct sc_hypervisor_wrapper *sc_w = &hypervisor.sched_ctx_w[hypervisor.sched_ctxs[i]];
  568. starpu_pthread_mutex_lock(&sc_w->mutex);
  569. unsigned only_remove = 0;
  570. if(sc_w->resize_ack.receiver_sched_ctx == -1 && hypervisor.sched_ctxs[i] != (int)sched_ctx &&
  571. sc_w->resize_ack.nmoved_workers > 0 && starpu_sched_ctx_contains_worker(worker, hypervisor.sched_ctxs[i]))
  572. {
  573. int j;
  574. for(j = 0; j < sc_w->resize_ack.nmoved_workers; j++)
  575. if(sc_w->resize_ack.moved_workers[j] == worker)
  576. {
  577. only_remove = 1;
  578. break;
  579. }
  580. }
  581. if(only_remove ||
  582. (sc_w->resize_ack.receiver_sched_ctx != -1 && sc_w->resize_ack.receiver_sched_ctx == (int)sched_ctx))
  583. {
  584. resize_ack = &sc_w->resize_ack;
  585. sender_sched_ctx = hypervisor.sched_ctxs[i];
  586. starpu_pthread_mutex_unlock(&sc_w->mutex);
  587. break;
  588. }
  589. starpu_pthread_mutex_unlock(&sc_w->mutex);
  590. }
  591. }
  592. /* if there is no ctx waiting for its ack return 1*/
  593. if(resize_ack == NULL)
  594. return 1;
  595. int ret = starpu_pthread_mutex_trylock(&hypervisor.sched_ctx_w[sender_sched_ctx].mutex);
  596. if(ret != EBUSY)
  597. {
  598. int *moved_workers = resize_ack->moved_workers;
  599. int nmoved_workers = resize_ack->nmoved_workers;
  600. int *acked_workers = resize_ack->acked_workers;
  601. if(worker != -1)
  602. {
  603. for(i = 0; i < nmoved_workers; i++)
  604. {
  605. int moved_worker = moved_workers[i];
  606. if(moved_worker == worker && acked_workers[i] == 0)
  607. {
  608. acked_workers[i] = 1;
  609. }
  610. }
  611. }
  612. int nacked_workers = 0;
  613. for(i = 0; i < nmoved_workers; i++)
  614. {
  615. nacked_workers += (acked_workers[i] == 1);
  616. }
  617. unsigned resize_completed = (nacked_workers == nmoved_workers);
  618. int receiver_sched_ctx = sched_ctx;
  619. if(resize_completed)
  620. {
  621. /* if the permission to resize is not allowed by the user don't do it
  622. whatever the application says */
  623. if(!((hypervisor.resize[sender_sched_ctx] == 0 || hypervisor.resize[receiver_sched_ctx] == 0) && imposed_resize))
  624. {
  625. /* int j; */
  626. /* printf("remove after ack from ctx %d:", sender_sched_ctx); */
  627. /* for(j = 0; j < nmoved_workers; j++) */
  628. /* printf(" %d", moved_workers[j]); */
  629. /* printf("\n"); */
  630. starpu_sched_ctx_remove_workers(moved_workers, nmoved_workers, sender_sched_ctx);
  631. _reset_resize_sample_info(sender_sched_ctx, receiver_sched_ctx);
  632. hypervisor.resize[sender_sched_ctx] = 1;
  633. hypervisor.allow_remove[receiver_sched_ctx] = 1;
  634. /* if the user allowed resizing leave the decisions to the application */
  635. if(imposed_resize) imposed_resize = 0;
  636. resize_ack->receiver_sched_ctx = -1;
  637. resize_ack->nmoved_workers = 0;
  638. free(resize_ack->moved_workers);
  639. free(resize_ack->acked_workers);
  640. }
  641. starpu_pthread_mutex_unlock(&hypervisor.sched_ctx_w[sender_sched_ctx].mutex);
  642. return resize_completed;
  643. }
  644. starpu_pthread_mutex_unlock(&hypervisor.sched_ctx_w[sender_sched_ctx].mutex);
  645. }
  646. return 0;
  647. }
  648. /* Enqueue a resize request for 'sched_ctx', to be executed when the
  649. * 'task_tag' tasks of 'sched_ctx' complete. */
  650. void sc_hypervisor_resize(unsigned sched_ctx, int task_tag)
  651. {
  652. struct resize_request_entry *entry;
  653. entry = malloc(sizeof *entry);
  654. STARPU_ASSERT(entry != NULL);
  655. entry->sched_ctx = sched_ctx;
  656. entry->task_tag = task_tag;
  657. starpu_pthread_mutex_lock(&hypervisor.resize_mut[sched_ctx]);
  658. HASH_ADD_INT(hypervisor.resize_requests[sched_ctx], task_tag, entry);
  659. starpu_pthread_mutex_unlock(&hypervisor.resize_mut[sched_ctx]);
  660. }
  661. /* notifies the hypervisor that the worker is no longer idle and a new task was pushed on its queue */
  662. static void notify_idle_end(unsigned sched_ctx, int worker)
  663. {
  664. if(hypervisor.resize[sched_ctx])
  665. hypervisor.sched_ctx_w[sched_ctx].current_idle_time[worker] = 0.0;
  666. if(hypervisor.policy.handle_idle_end)
  667. hypervisor.policy.handle_idle_end(sched_ctx, worker);
  668. }
  669. /* notifies the hypervisor that the worker spent another cycle in idle time */
  670. static void notify_idle_cycle(unsigned sched_ctx, int worker, double idle_time)
  671. {
  672. if(hypervisor.resize[sched_ctx])
  673. {
  674. struct sc_hypervisor_wrapper *sc_w = &hypervisor.sched_ctx_w[sched_ctx];
  675. sc_w->current_idle_time[worker] += idle_time;
  676. if(hypervisor.policy.handle_idle_cycle)
  677. {
  678. hypervisor.policy.handle_idle_cycle(sched_ctx, worker);
  679. }
  680. }
  681. return;
  682. }
  683. /* notifies the hypervisor that a new task was pushed on the queue of the worker */
  684. static void notify_pushed_task(unsigned sched_ctx, int worker)
  685. {
  686. hypervisor.sched_ctx_w[sched_ctx].pushed_tasks[worker]++;
  687. if(hypervisor.sched_ctx_w[sched_ctx].total_flops != 0.0 && hypervisor.sched_ctx_w[sched_ctx].start_time == 0.0)
  688. hypervisor.sched_ctx_w[sched_ctx].start_time = starpu_timing_now();
  689. if(hypervisor.sched_ctx_w[sched_ctx].total_flops != 0.0 && hypervisor.sched_ctx_w[sched_ctx].real_start_time == 0.0)
  690. hypervisor.sched_ctx_w[sched_ctx].real_start_time = starpu_timing_now();
  691. int ntasks = get_ntasks(hypervisor.sched_ctx_w[sched_ctx].pushed_tasks);
  692. if((hypervisor.min_tasks == 0 || (!(hypervisor.resize[sched_ctx] == 0 && imposed_resize) && ntasks == hypervisor.min_tasks)) && hypervisor.check_min_tasks[sched_ctx])
  693. {
  694. hypervisor.resize[sched_ctx] = 1;
  695. if(imposed_resize) imposed_resize = 0;
  696. hypervisor.check_min_tasks[sched_ctx] = 0;
  697. }
  698. if(hypervisor.policy.handle_pushed_task)
  699. hypervisor.policy.handle_pushed_task(sched_ctx, worker);
  700. }
  701. /* notifies the hypervisor that a task was poped from the queue of the worker */
  702. static void notify_poped_task(unsigned sched_ctx, int worker, struct starpu_task *task, size_t data_size, uint32_t footprint)
  703. {
  704. hypervisor.sched_ctx_w[sched_ctx].poped_tasks[worker]++;
  705. hypervisor.sched_ctx_w[sched_ctx].elapsed_flops[worker] += task->flops;
  706. hypervisor.sched_ctx_w[sched_ctx].elapsed_data[worker] += data_size ;
  707. hypervisor.sched_ctx_w[sched_ctx].elapsed_tasks[worker]++ ;
  708. hypervisor.sched_ctx_w[sched_ctx].total_elapsed_flops[worker] += task->flops;
  709. hypervisor.sched_ctx_w[sched_ctx].remaining_flops -= task->flops; //sc_hypervisor_get_elapsed_flops_per_sched_ctx(&hypervisor.sched_ctx_w[sched_ctx]);
  710. if(hypervisor.resize[sched_ctx])
  711. {
  712. if(hypervisor.policy.handle_poped_task)
  713. hypervisor.policy.handle_poped_task(sched_ctx, worker, task, footprint);
  714. }
  715. _ack_resize_completed(sched_ctx, worker);
  716. if(hypervisor.sched_ctx_w[sched_ctx].poped_tasks[worker] % 200 == 0)
  717. _print_current_time();
  718. }
  719. /* notifies the hypervisor that a tagged task has just been executed */
  720. static void notify_post_exec_hook(unsigned sched_ctx, int task_tag)
  721. {
  722. STARPU_ASSERT(task_tag > 0);
  723. unsigned conf_sched_ctx;
  724. unsigned i;
  725. starpu_pthread_mutex_lock(&act_hypervisor_mutex);
  726. unsigned ns = hypervisor.nsched_ctxs;
  727. starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
  728. for(i = 0; i < ns; i++)
  729. {
  730. struct configuration_entry *entry;
  731. conf_sched_ctx = hypervisor.sched_ctxs[i];
  732. starpu_pthread_mutex_lock(&hypervisor.conf_mut[conf_sched_ctx]);
  733. HASH_FIND_INT(hypervisor.configurations[conf_sched_ctx], &task_tag, entry);
  734. if (entry != NULL)
  735. {
  736. struct sc_hypervisor_policy_config *config = entry->configuration;
  737. sc_hypervisor_set_config(conf_sched_ctx, config);
  738. HASH_DEL(hypervisor.configurations[conf_sched_ctx], entry);
  739. free(config);
  740. }
  741. starpu_pthread_mutex_unlock(&hypervisor.conf_mut[conf_sched_ctx]);
  742. }
  743. if(hypervisor.resize[sched_ctx])
  744. {
  745. starpu_pthread_mutex_lock(&hypervisor.resize_mut[sched_ctx]);
  746. if(hypervisor.policy.handle_post_exec_hook)
  747. {
  748. /* Check whether 'task_tag' is in the 'resize_requests' set. */
  749. struct resize_request_entry *entry;
  750. HASH_FIND_INT(hypervisor.resize_requests[sched_ctx], &task_tag, entry);
  751. if (entry != NULL)
  752. {
  753. hypervisor.policy.handle_post_exec_hook(sched_ctx, task_tag);
  754. HASH_DEL(hypervisor.resize_requests[sched_ctx], entry);
  755. free(entry);
  756. }
  757. }
  758. starpu_pthread_mutex_unlock(&hypervisor.resize_mut[sched_ctx]);
  759. }
  760. return;
  761. }
  762. static void notify_submitted_job(struct starpu_task *task, uint32_t footprint)
  763. {
  764. starpu_pthread_mutex_lock(&act_hypervisor_mutex);
  765. hypervisor.sched_ctx_w[task->sched_ctx].submitted_flops += task->flops;
  766. starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
  767. if(hypervisor.policy.handle_submitted_job && !type_of_tasks_known)
  768. hypervisor.policy.handle_submitted_job(task->cl, task->sched_ctx, footprint);
  769. }
  770. void sc_hypervisor_set_type_of_task(struct starpu_codelet *cl, unsigned sched_ctx, uint32_t footprint)
  771. {
  772. type_of_tasks_known = 1;
  773. if(hypervisor.policy.handle_submitted_job)
  774. hypervisor.policy.handle_submitted_job(cl, sched_ctx, footprint);
  775. }
  776. static void notify_delete_context(unsigned sched_ctx)
  777. {
  778. _print_current_time();
  779. sc_hypervisor_unregister_ctx(sched_ctx);
  780. }
  781. void sc_hypervisor_size_ctxs(int *sched_ctxs, int nsched_ctxs, int *workers, int nworkers)
  782. {
  783. starpu_pthread_mutex_lock(&act_hypervisor_mutex);
  784. unsigned curr_nsched_ctxs = sched_ctxs == NULL ? hypervisor.nsched_ctxs : nsched_ctxs;
  785. int *curr_sched_ctxs = sched_ctxs == NULL ? hypervisor.sched_ctxs : sched_ctxs;
  786. starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
  787. unsigned s;
  788. for(s = 0; s < curr_nsched_ctxs; s++)
  789. hypervisor.resize[curr_sched_ctxs[s]] = 1;
  790. if(hypervisor.policy.size_ctxs)
  791. hypervisor.policy.size_ctxs(curr_sched_ctxs, curr_nsched_ctxs, workers, nworkers);
  792. }
  793. struct sc_hypervisor_wrapper* sc_hypervisor_get_wrapper(unsigned sched_ctx)
  794. {
  795. return &hypervisor.sched_ctx_w[sched_ctx];
  796. }
  797. int* sc_hypervisor_get_sched_ctxs()
  798. {
  799. return hypervisor.sched_ctxs;
  800. }
  801. int sc_hypervisor_get_nsched_ctxs()
  802. {
  803. int ns;
  804. ns = hypervisor.nsched_ctxs;
  805. return ns;
  806. }
  807. void sc_hypervisor_save_size_req(int *sched_ctxs, int nsched_ctxs, int *workers, int nworkers)
  808. {
  809. hypervisor.sr = (struct size_request*)malloc(sizeof(struct size_request));
  810. hypervisor.sr->sched_ctxs = sched_ctxs;
  811. hypervisor.sr->nsched_ctxs = nsched_ctxs;
  812. hypervisor.sr->workers = workers;
  813. hypervisor.sr->nworkers = nworkers;
  814. }
  815. unsigned sc_hypervisor_get_size_req(int **sched_ctxs, int* nsched_ctxs, int **workers, int *nworkers)
  816. {
  817. if(hypervisor.sr != NULL)
  818. {
  819. *sched_ctxs = hypervisor.sr->sched_ctxs;
  820. *nsched_ctxs = hypervisor.sr->nsched_ctxs;
  821. *workers = hypervisor.sr->workers;
  822. *nworkers = hypervisor.sr->nworkers;
  823. return 1;
  824. }
  825. return 0;
  826. }
  827. void sc_hypervisor_free_size_req(void)
  828. {
  829. if(hypervisor.sr != NULL)
  830. {
  831. free(hypervisor.sr);
  832. hypervisor.sr = NULL;
  833. }
  834. }
  835. double sc_hypervisor_get_velocity(struct sc_hypervisor_wrapper *sc_w, enum starpu_worker_archtype arch)
  836. {
  837. double velocity = sc_hypervisorsc_hypervisor_get_velocity_per_worker_type(sc_w, arch);
  838. if(velocity == -1.0)
  839. velocity = sc_hypervisor_get_ref_velocity_per_worker_type(sc_w, arch);
  840. if(velocity == -1.0)
  841. velocity = arch == STARPU_CPU_WORKER ? 5.0 : 100.0;
  842. return velocity;
  843. }