teft_lp_policy.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011, 2012 INRIA
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <starpu_config.h>
  17. #include "sc_hypervisor_lp.h"
  18. #include "sc_hypervisor_policy.h"
  19. #include <math.h>
  20. #include <sys/time.h>
  21. static struct sc_hypervisor_policy_task_pool *task_pools = NULL;
  22. static starpu_pthread_mutex_t mutex = STARPU_PTHREAD_MUTEX_INITIALIZER;
  23. struct teft_lp_data
  24. {
  25. int nt;
  26. double **tasks;
  27. unsigned *in_sched_ctxs;
  28. int *workers;
  29. struct sc_hypervisor_policy_task_pool *tmp_task_pools;
  30. unsigned size_ctxs;
  31. };
  32. static double _compute_workers_distrib(int ns, int nw, double final_w_in_s[ns][nw],
  33. unsigned is_integer, double tmax, void *specific_data)
  34. {
  35. struct teft_lp_data *sd = (struct teft_lp_data *)specific_data;
  36. int nt = sd->nt;
  37. double **final_tasks = sd->tasks;
  38. unsigned *in_sched_ctxs = sd->in_sched_ctxs;
  39. int *workers = sd->workers;
  40. struct sc_hypervisor_policy_task_pool *tmp_task_pools = sd->tmp_task_pools;
  41. unsigned size_ctxs = sd->size_ctxs;
  42. if(tmp_task_pools == NULL)
  43. return 0.0;
  44. double w_in_s[ns][nw];
  45. double tasks[nw][nt];
  46. double times[nw][nt];
  47. /* times in ms */
  48. sc_hypervisor_get_tasks_times(nw, nt, times, workers, size_ctxs, task_pools);
  49. double res = 0.0;
  50. #ifdef STARPU_HAVE_GLPK_H
  51. res = sc_hypervisor_lp_simulate_distrib_tasks(ns, nw, nt, w_in_s, tasks, times, is_integer, tmax, in_sched_ctxs, tmp_task_pools);
  52. #endif //STARPU_HAVE_GLPK_H
  53. if(res != 0.0)
  54. {
  55. int s, w, t;
  56. for(s = 0; s < ns; s++)
  57. for(w = 0; w < nw; w++)
  58. final_w_in_s[s][w] = w_in_s[s][w];
  59. for(w = 0; w < nw; w++)
  60. for(t = 0; t < nt; t++)
  61. final_tasks[w][t] = tasks[w][t];
  62. }
  63. return res;
  64. }
  65. static void _size_ctxs(unsigned *sched_ctxs, int nsched_ctxs , int *workers, int nworkers)
  66. {
  67. int ns = sched_ctxs == NULL ? sc_hypervisor_get_nsched_ctxs() : nsched_ctxs;
  68. int nw = workers == NULL ? (int)starpu_worker_get_count() : nworkers; /* Number of different workers */
  69. int nt = 0; /* Number of different kinds of tasks */
  70. starpu_pthread_mutex_lock(&mutex);
  71. struct sc_hypervisor_policy_task_pool * tp;
  72. for (tp = task_pools; tp; tp = tp->next)
  73. nt++;
  74. double w_in_s[ns][nw];
  75. // double tasks[nw][nt];
  76. double **tasks=(double**)malloc(nw*sizeof(double*));
  77. int i;
  78. for(i = 0; i < nw; i++)
  79. tasks[i] = (double*)malloc(nt*sizeof(double));
  80. struct teft_lp_data specific_data;
  81. specific_data.nt = nt;
  82. specific_data.tasks = tasks;
  83. specific_data.in_sched_ctxs = sched_ctxs;
  84. specific_data.workers = workers;
  85. specific_data.tmp_task_pools = task_pools;
  86. specific_data.size_ctxs = 1;
  87. /* smallest possible tmax, difficult to obtain as we
  88. compute the nr of flops and not the tasks */
  89. /*lp computes it in s but it's converted to ms just before return */
  90. double possible_tmax = sc_hypervisor_lp_get_tmax(nw, workers);
  91. double smallest_tmax = possible_tmax / 3;
  92. double tmax = possible_tmax * ns;
  93. double tmin = smallest_tmax;
  94. unsigned found_sol = sc_hypervisor_lp_execute_dichotomy(ns, nw, w_in_s, 1, (void*)&specific_data,
  95. tmin, tmax, smallest_tmax, _compute_workers_distrib);
  96. starpu_pthread_mutex_unlock(&mutex);
  97. /* if we did find at least one solution redistribute the resources */
  98. if(found_sol)
  99. {
  100. struct types_of_workers *tw = sc_hypervisor_get_types_of_workers(workers, nw);
  101. sc_hypervisor_lp_place_resources_in_ctx(ns, nw, w_in_s, sched_ctxs, workers, 1, tw);
  102. }
  103. for(i = 0; i < nw; i++)
  104. free(tasks[i]);
  105. free(tasks);
  106. }
  107. static void size_if_required()
  108. {
  109. int nsched_ctxs, nworkers;
  110. unsigned *sched_ctxs;
  111. int *workers;
  112. unsigned has_req = sc_hypervisor_get_size_req(&sched_ctxs, &nsched_ctxs, &workers, &nworkers);
  113. if(has_req)
  114. {
  115. struct sc_hypervisor_wrapper* sc_w = NULL;
  116. unsigned ready_to_size = 1;
  117. int s;
  118. starpu_pthread_mutex_lock(&act_hypervisor_mutex);
  119. for(s = 0; s < nsched_ctxs; s++)
  120. {
  121. sc_w = sc_hypervisor_get_wrapper(sched_ctxs[s]);
  122. // if(sc_w->submitted_flops < sc_w->total_flops)
  123. if((sc_w->submitted_flops + (0.1*sc_w->total_flops)) < sc_w->total_flops)
  124. ready_to_size = 0;
  125. }
  126. if(ready_to_size)
  127. {
  128. _size_ctxs(sched_ctxs, nsched_ctxs, workers, nworkers);
  129. sc_hypervisor_free_size_req();
  130. }
  131. starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
  132. }
  133. }
  134. static void teft_lp_handle_submitted_job(struct starpu_codelet *cl, unsigned sched_ctx, uint32_t footprint, size_t data_size)
  135. {
  136. /* count the tasks of the same type */
  137. starpu_pthread_mutex_lock(&mutex);
  138. sc_hypervisor_policy_add_task_to_pool(cl, sched_ctx, footprint, &task_pools, data_size);
  139. starpu_pthread_mutex_unlock(&mutex);
  140. size_if_required();
  141. }
  142. static void _try_resizing(unsigned *sched_ctxs, int nsched_ctxs , int *workers, int nworkers)
  143. {
  144. starpu_trace_user_event(2);
  145. int ns = sched_ctxs == NULL ? sc_hypervisor_get_nsched_ctxs() : nsched_ctxs;
  146. int nw = workers == NULL ? (int)starpu_worker_get_count() : nworkers; /* Number of different workers */
  147. sched_ctxs = sched_ctxs == NULL ? sc_hypervisor_get_sched_ctxs() : sched_ctxs;
  148. int nt = 0; /* Number of different kinds of tasks */
  149. // starpu_pthread_mutex_lock(&mutex);
  150. /* we don't take the mutex bc a correct value of the number of tasks is
  151. not required but we do a copy in order to be sure
  152. that the linear progr won't segfault if the list of
  153. submitted task will change during the exec */
  154. struct sc_hypervisor_policy_task_pool *tp = NULL;
  155. struct sc_hypervisor_policy_task_pool *tmp_task_pools = sc_hypervisor_policy_clone_task_pool(task_pools);
  156. for (tp = task_pools; tp; tp = tp->next)
  157. nt++;
  158. double w_in_s[ns][nw];
  159. // double tasks_per_worker[nw][nt];
  160. double **tasks_per_worker=(double**)malloc(nw*sizeof(double*));
  161. int i;
  162. for(i = 0; i < nw; i++)
  163. tasks_per_worker[i] = (double*)malloc(nt*sizeof(double));
  164. struct teft_lp_data specific_data;
  165. specific_data.nt = nt;
  166. specific_data.tasks = tasks_per_worker;
  167. specific_data.in_sched_ctxs = NULL;
  168. specific_data.workers = NULL;
  169. specific_data.tmp_task_pools = tmp_task_pools;
  170. specific_data.size_ctxs = 0;
  171. /* smallest possible tmax, difficult to obtain as we
  172. compute the nr of flops and not the tasks */
  173. /*lp computes it in s but it's converted to ms just before return */
  174. double possible_tmax = sc_hypervisor_lp_get_tmax(nw, NULL);
  175. double smallest_tmax = 0.0;//possible_tmax / 3;
  176. double tmax = possible_tmax * ns;
  177. double tmin = smallest_tmax;
  178. unsigned found_sol = sc_hypervisor_lp_execute_dichotomy(ns, nw, w_in_s, 1, (void*)&specific_data,
  179. tmin, tmax, smallest_tmax, _compute_workers_distrib);
  180. // starpu_pthread_mutex_unlock(&mutex);
  181. /* if we did find at least one solution redistribute the resources */
  182. if(found_sol)
  183. {
  184. struct types_of_workers *tw = sc_hypervisor_get_types_of_workers(workers, nw);
  185. sc_hypervisor_lp_place_resources_in_ctx(ns, nw, w_in_s, sched_ctxs, workers, 0, tw);
  186. }
  187. struct sc_hypervisor_policy_task_pool *next = NULL;
  188. struct sc_hypervisor_policy_task_pool *tmp_tp = tmp_task_pools;
  189. while(tmp_task_pools)
  190. {
  191. next = tmp_tp->next;
  192. free(tmp_tp);
  193. tmp_tp = next;
  194. tmp_task_pools = next;
  195. }
  196. for(i = 0; i < nw; i++)
  197. free(tasks_per_worker[i]);
  198. free(tasks_per_worker);
  199. }
  200. static void teft_lp_handle_poped_task(unsigned sched_ctx, __attribute__((unused))int worker, struct starpu_task *task, uint32_t footprint)
  201. {
  202. struct sc_hypervisor_wrapper* sc_w = sc_hypervisor_get_wrapper(sched_ctx);
  203. int ret = starpu_pthread_mutex_trylock(&act_hypervisor_mutex);
  204. if(ret != EBUSY)
  205. {
  206. if((sc_w->submitted_flops + (0.1*sc_w->total_flops)) < sc_w->total_flops)
  207. {
  208. starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
  209. return;
  210. }
  211. unsigned criteria = sc_hypervisor_get_resize_criteria();
  212. if(criteria != SC_NOTHING && criteria == SC_SPEED)
  213. {
  214. if(sc_hypervisor_check_speed_gap_btw_ctxs())
  215. {
  216. _try_resizing(NULL, -1, NULL, -1);
  217. }
  218. }
  219. starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
  220. }
  221. /* too expensive to take this mutex and correct value of the number of tasks is not compulsory */
  222. // starpu_pthread_mutex_lock(&mutex);
  223. sc_hypervisor_policy_remove_task_from_pool(task, footprint, &task_pools);
  224. // starpu_pthread_mutex_unlock(&mutex);
  225. }
  226. static void teft_lp_handle_idle_cycle(unsigned sched_ctx, int worker)
  227. {
  228. struct sc_hypervisor_wrapper* sc_w = sc_hypervisor_get_wrapper(sched_ctx);
  229. int ret = starpu_pthread_mutex_trylock(&act_hypervisor_mutex);
  230. if(ret != EBUSY)
  231. {
  232. if((sc_w->submitted_flops + (0.1*sc_w->total_flops)) < sc_w->total_flops)
  233. {
  234. starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
  235. return;
  236. }
  237. unsigned criteria = sc_hypervisor_get_resize_criteria();
  238. if(criteria != SC_NOTHING && criteria == SC_IDLE)
  239. {
  240. if(sc_hypervisor_check_idle(sched_ctx, worker))
  241. {
  242. _try_resizing(NULL, -1, NULL, -1);
  243. // sc_hypervisor_move_workers(sched_ctx, 3 - sched_ctx, &worker, 1, 1);
  244. }
  245. }
  246. starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
  247. }
  248. return;
  249. }
  250. static void teft_lp_size_ctxs(unsigned *sched_ctxs, int nsched_ctxs , int *workers, int nworkers)
  251. {
  252. sc_hypervisor_save_size_req(sched_ctxs, nsched_ctxs, workers, nworkers);
  253. }
  254. static void teft_lp_resize_ctxs(unsigned *sched_ctxs, int nsched_ctxs , int *workers, int nworkers)
  255. {
  256. int ret = starpu_pthread_mutex_trylock(&act_hypervisor_mutex);
  257. if(ret != EBUSY)
  258. {
  259. struct sc_hypervisor_wrapper* sc_w = NULL;
  260. int s = 0;
  261. for(s = 0; s < nsched_ctxs; s++)
  262. {
  263. sc_w = sc_hypervisor_get_wrapper(sched_ctxs[s]);
  264. if((sc_w->submitted_flops + (0.1*sc_w->total_flops)) < sc_w->total_flops)
  265. {
  266. starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
  267. return;
  268. }
  269. }
  270. _try_resizing(sched_ctxs, nsched_ctxs, workers, nworkers);
  271. starpu_pthread_mutex_unlock(&act_hypervisor_mutex);
  272. }
  273. }
  274. struct sc_hypervisor_policy teft_lp_policy = {
  275. .size_ctxs = teft_lp_size_ctxs,
  276. .resize_ctxs = teft_lp_resize_ctxs,
  277. .handle_poped_task = teft_lp_handle_poped_task,
  278. .handle_pushed_task = NULL,
  279. .handle_idle_cycle = teft_lp_handle_idle_cycle,
  280. .handle_idle_end = NULL,
  281. .handle_post_exec_hook = NULL,
  282. .handle_submitted_job = teft_lp_handle_submitted_job,
  283. .end_ctx = NULL,
  284. .custom = 0,
  285. .name = "teft_lp"
  286. };