sched_ctx.c 78 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011, 2013 INRIA
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <core/sched_policy.h>
  17. #include <core/sched_ctx.h>
  18. #include <common/utils.h>
  19. #include <stdarg.h>
  20. starpu_pthread_rwlock_t changing_ctx_mutex[STARPU_NMAX_SCHED_CTXS];
  21. static starpu_pthread_mutex_t sched_ctx_manag = STARPU_PTHREAD_MUTEX_INITIALIZER;
  22. static starpu_pthread_mutex_t finished_submit_mutex = STARPU_PTHREAD_MUTEX_INITIALIZER;
  23. static struct starpu_task stop_submission_task = STARPU_TASK_INITIALIZER;
  24. starpu_pthread_key_t sched_ctx_key;
  25. static unsigned with_hypervisor = 0;
  26. static double hyp_start_sample[STARPU_NMAX_SCHED_CTXS];
  27. static double hyp_start_allow_sample[STARPU_NMAX_SCHED_CTXS];
  28. static double flops[STARPU_NMAX_SCHED_CTXS][STARPU_NMAXWORKERS];
  29. static size_t data_size[STARPU_NMAX_SCHED_CTXS][STARPU_NMAXWORKERS];
  30. static double hyp_actual_start_sample[STARPU_NMAX_SCHED_CTXS];
  31. static double window_size;
  32. static int nobind;
  33. static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config);
  34. static void _starpu_sched_ctx_add_workers_to_master(unsigned sched_ctx_id, int *workerids, int nworkers, int new_master);
  35. static void _starpu_sched_ctx_wake_these_workers_up(unsigned sched_ctx_id, int *workerids, int nworkers);
  36. static int _starpu_sched_ctx_find_master(unsigned sched_ctx_id, int *workerids, int nworkers);
  37. static void _starpu_sched_ctx_set_master(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers, int master);
  38. static void _starpu_worker_gets_into_ctx(unsigned sched_ctx_id, struct _starpu_worker *worker)
  39. {
  40. unsigned ret_sched_ctx = _starpu_sched_ctx_elt_exists(worker->sched_ctx_list, sched_ctx_id);
  41. /* the worker was planning to go away in another ctx but finally he changed his mind &
  42. he's staying */
  43. if (!ret_sched_ctx)
  44. {
  45. /* add context to worker */
  46. _starpu_sched_ctx_list_add(&worker->sched_ctx_list, sched_ctx_id);
  47. worker->nsched_ctxs++;
  48. }
  49. worker->removed_from_ctx[sched_ctx_id] = 0;
  50. if(worker->tmp_sched_ctx == (int) sched_ctx_id)
  51. worker->tmp_sched_ctx = -1;
  52. return;
  53. }
  54. void _starpu_worker_gets_out_of_ctx(unsigned sched_ctx_id, struct _starpu_worker *worker)
  55. {
  56. unsigned ret_sched_ctx = _starpu_sched_ctx_elt_exists(worker->sched_ctx_list, sched_ctx_id);
  57. /* remove context from worker */
  58. if(ret_sched_ctx)
  59. {
  60. /* don't remove scheduling data here, there might be tasks running and when post_exec
  61. executes scheduling data is not there any more, do it when deleting context, then
  62. we really won't need it anymore */
  63. /* struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id); */
  64. /* if(sched_ctx && sched_ctx->sched_policy && sched_ctx->sched_policy->remove_workers) */
  65. /* { */
  66. /* _STARPU_TRACE_WORKER_SCHEDULING_PUSH; */
  67. /* sched_ctx->sched_policy->remove_workers(sched_ctx_id, &worker->workerid, 1); */
  68. /* _STARPU_TRACE_WORKER_SCHEDULING_POP; */
  69. /* } */
  70. if (!_starpu_sched_ctx_list_remove(&worker->sched_ctx_list, sched_ctx_id))
  71. worker->nsched_ctxs--;
  72. }
  73. return;
  74. }
  75. static void _starpu_update_workers_with_ctx(int *workerids, int nworkers, int sched_ctx_id)
  76. {
  77. int i;
  78. struct _starpu_worker *worker = NULL;
  79. for(i = 0; i < nworkers; i++)
  80. {
  81. worker = _starpu_get_worker_struct(workerids[i]);
  82. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  83. _starpu_worker_gets_into_ctx(sched_ctx_id, worker);
  84. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  85. }
  86. return;
  87. }
  88. static void _starpu_update_workers_without_ctx(int *workerids, int nworkers, int sched_ctx_id, unsigned now)
  89. {
  90. int i;
  91. struct _starpu_worker *worker = NULL;
  92. for(i = 0; i < nworkers; i++)
  93. {
  94. worker = _starpu_get_worker_struct(workerids[i]);
  95. if(now)
  96. {
  97. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  98. _starpu_worker_gets_out_of_ctx(sched_ctx_id, worker);
  99. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  100. }
  101. else
  102. {
  103. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  104. worker->removed_from_ctx[sched_ctx_id] = 1;
  105. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  106. }
  107. }
  108. return;
  109. }
  110. void starpu_sched_ctx_stop_task_submission()
  111. {
  112. _starpu_exclude_task_from_dag(&stop_submission_task);
  113. _starpu_task_submit_internally(&stop_submission_task);
  114. }
  115. void starpu_sched_ctx_worker_shares_tasks_lists(int workerid, int sched_ctx_id)
  116. {
  117. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  118. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  119. int curr_workerid = starpu_worker_get_id();
  120. /* if is the initial sched_ctx no point in taking the mutex, the workers are
  121. not launched yet, or if the current worker is calling this */
  122. if(!sched_ctx->is_initial_sched && workerid != curr_workerid)
  123. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  124. worker->shares_tasks_lists[sched_ctx_id] = 1;
  125. if(!sched_ctx->is_initial_sched && workerid != curr_workerid)
  126. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  127. }
  128. static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers,
  129. int *added_workers, int *n_added_workers)
  130. {
  131. struct starpu_worker_collection *workers = sched_ctx->workers;
  132. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  133. int nworkers_to_add = nworkers == -1 ? (int)config->topology.nworkers : nworkers;
  134. if (!nworkers_to_add)
  135. return;
  136. int workers_to_add[nworkers_to_add];
  137. int cpu_workers[nworkers_to_add];
  138. int ncpu_workers = 0;
  139. struct starpu_perfmodel_device devices[nworkers_to_add];
  140. int ndevices = 0;
  141. struct _starpu_worker *str_worker = NULL;
  142. int worker;
  143. int i = 0;
  144. for(i = 0; i < nworkers_to_add; i++)
  145. {
  146. /* added_workers is NULL for the call of this func at the creation of the context*/
  147. /* if the function is called at the creation of the context it's no need to do this verif */
  148. if(added_workers)
  149. {
  150. worker = workers->add(workers, (workerids == NULL ? i : workerids[i]));
  151. if(worker >= 0)
  152. added_workers[(*n_added_workers)++] = worker;
  153. else
  154. {
  155. int curr_workerid = starpu_worker_get_id();
  156. struct _starpu_worker *worker_str = _starpu_get_worker_struct(workerids[i]);
  157. if(curr_workerid != workerids[i])
  158. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker_str->sched_mutex);
  159. worker_str->removed_from_ctx[sched_ctx->id] = 0;
  160. if(curr_workerid != workerids[i])
  161. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker_str->sched_mutex);
  162. }
  163. }
  164. else
  165. {
  166. worker = (workerids == NULL ? i : workerids[i]);
  167. workers->add(workers, worker);
  168. workers_to_add[i] = worker;
  169. str_worker = _starpu_get_worker_struct(worker);
  170. str_worker->tmp_sched_ctx = (int)sched_ctx->id;
  171. }
  172. }
  173. int *wa;
  174. int na;
  175. if(added_workers)
  176. {
  177. na = *n_added_workers;
  178. wa = added_workers;
  179. }
  180. else
  181. {
  182. na = nworkers_to_add;
  183. wa = workers_to_add;
  184. }
  185. for(i = 0; i < na; i++)
  186. {
  187. worker = wa[i];
  188. str_worker = _starpu_get_worker_struct(worker);
  189. int dev1, dev2;
  190. unsigned found = 0;
  191. for(dev1 = 0; dev1 < str_worker->perf_arch.ndevices; dev1++)
  192. {
  193. for(dev2 = 0; dev2 < ndevices; dev2++)
  194. {
  195. if(devices[dev2].type == str_worker->perf_arch.devices[dev1].type &&
  196. devices[dev2].devid == str_worker->perf_arch.devices[dev1].devid)
  197. {
  198. devices[dev2].ncores += str_worker->perf_arch.devices[dev1].ncores;
  199. found = 1;
  200. break;
  201. }
  202. }
  203. if(!found)
  204. {
  205. devices[ndevices].type = str_worker->perf_arch.devices[dev1].type;
  206. devices[ndevices].devid = str_worker->perf_arch.devices[dev1].devid;
  207. devices[ndevices].ncores = str_worker->perf_arch.devices[dev1].ncores;
  208. ndevices++;
  209. }
  210. else
  211. found = 0;
  212. }
  213. if (!sched_ctx->sched_policy)
  214. {
  215. struct _starpu_worker *worker_str = _starpu_get_worker_struct(wa[i]);
  216. if (worker_str->arch == STARPU_CPU_WORKER)
  217. cpu_workers[ncpu_workers++] = wa[i];
  218. }
  219. }
  220. if(ndevices > 0)
  221. {
  222. if(sched_ctx->perf_arch.devices == NULL)
  223. sched_ctx->perf_arch.devices = (struct starpu_perfmodel_device*)malloc(ndevices*sizeof(struct starpu_perfmodel_device));
  224. else
  225. {
  226. int nfinal_devices = 0;
  227. int dev1, dev2;
  228. unsigned found = 0;
  229. for(dev1 = 0; dev1 < ndevices; dev1++)
  230. {
  231. for(dev2 = 0; dev2 < sched_ctx->perf_arch.ndevices; dev2++)
  232. {
  233. if(sched_ctx->perf_arch.devices[dev2].type == devices[dev1].type && sched_ctx->perf_arch.devices[dev2].devid == devices[dev1].devid)
  234. found = 1;
  235. }
  236. if(!found)
  237. {
  238. nfinal_devices++;
  239. }
  240. else
  241. found = 0;
  242. }
  243. int nsize = (sched_ctx->perf_arch.ndevices+nfinal_devices);
  244. sched_ctx->perf_arch.devices = (struct starpu_perfmodel_device*)realloc(sched_ctx->perf_arch.devices, nsize*sizeof(struct starpu_perfmodel_device));
  245. }
  246. int dev1, dev2;
  247. unsigned found = 0;
  248. for(dev1 = 0; dev1 < ndevices; dev1++)
  249. {
  250. for(dev2 = 0; dev2 < sched_ctx->perf_arch.ndevices; dev2++)
  251. {
  252. if(sched_ctx->perf_arch.devices[dev2].type == devices[dev1].type && sched_ctx->perf_arch.devices[dev2].devid == devices[dev1].devid)
  253. {
  254. if(sched_ctx->perf_arch.devices[dev2].type == STARPU_CPU_WORKER)
  255. sched_ctx->perf_arch.devices[dev2].ncores += devices[dev1].ncores;
  256. found = 1;
  257. }
  258. }
  259. if(!found)
  260. {
  261. sched_ctx->perf_arch.devices[sched_ctx->perf_arch.ndevices].type = devices[dev1].type;
  262. sched_ctx->perf_arch.devices[sched_ctx->perf_arch.ndevices].devid = devices[dev1].devid;
  263. sched_ctx->perf_arch.devices[sched_ctx->perf_arch.ndevices].ncores = devices[dev1].ncores;
  264. sched_ctx->perf_arch.ndevices++;
  265. }
  266. else
  267. found = 0;
  268. }
  269. }
  270. if(!sched_ctx->sched_policy)
  271. {
  272. if(!sched_ctx->awake_workers)
  273. {
  274. if(sched_ctx->main_master == -1)
  275. sched_ctx->main_master = starpu_sched_ctx_book_workers_for_task(sched_ctx->id, cpu_workers, ncpu_workers);
  276. else
  277. {
  278. _starpu_sched_ctx_add_workers_to_master(sched_ctx->id, cpu_workers, ncpu_workers, sched_ctx->main_master);
  279. }
  280. }
  281. else
  282. {
  283. sched_ctx->main_master = _starpu_sched_ctx_find_master(sched_ctx->id, cpu_workers, ncpu_workers);
  284. _starpu_sched_ctx_set_master(sched_ctx, cpu_workers, ncpu_workers, sched_ctx->main_master);
  285. }
  286. }
  287. else if(sched_ctx->sched_policy->add_workers)
  288. {
  289. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  290. if(added_workers)
  291. {
  292. if(*n_added_workers > 0)
  293. sched_ctx->sched_policy->add_workers(sched_ctx->id, added_workers, *n_added_workers);
  294. }
  295. else
  296. {
  297. sched_ctx->sched_policy->add_workers(sched_ctx->id, workers_to_add, nworkers_to_add);
  298. }
  299. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  300. }
  301. return;
  302. }
  303. static void _starpu_remove_workers_from_sched_ctx(struct _starpu_sched_ctx *sched_ctx, int *workerids,
  304. int nworkers, int *removed_workers, int *n_removed_workers)
  305. {
  306. struct starpu_worker_collection *workers = sched_ctx->workers;
  307. struct starpu_perfmodel_device devices[workers->nworkers];
  308. int ndevices = 0;
  309. int i = 0;
  310. for(i = 0; i < nworkers; i++)
  311. {
  312. if(workers->nworkers > 0)
  313. {
  314. if(_starpu_worker_belongs_to_a_sched_ctx(workerids[i], sched_ctx->id))
  315. {
  316. int worker = workers->remove(workers, workerids[i]);
  317. if(worker >= 0)
  318. removed_workers[(*n_removed_workers)++] = worker;
  319. }
  320. }
  321. }
  322. int worker;
  323. unsigned found = 0;
  324. int dev;
  325. struct starpu_sched_ctx_iterator it;
  326. if(workers->init_iterator)
  327. workers->init_iterator(workers, &it);
  328. while(workers->has_next(workers, &it))
  329. {
  330. worker = workers->get_next(workers, &it);
  331. struct _starpu_worker *str_worker = _starpu_get_worker_struct(worker);
  332. for(dev = 0; dev < str_worker->perf_arch.ndevices; dev++)
  333. {
  334. int dev2;
  335. for(dev2 = 0; dev2 < ndevices; dev2++)
  336. {
  337. if(devices[dev2].type == str_worker->perf_arch.devices[dev].type &&
  338. devices[dev2].devid == str_worker->perf_arch.devices[dev].devid)
  339. {
  340. if(devices[dev2].type == STARPU_CPU_WORKER)
  341. devices[dev2].ncores += str_worker->perf_arch.devices[dev].ncores;
  342. }
  343. found = 1;
  344. }
  345. if(!found)
  346. {
  347. devices[ndevices].type = str_worker->perf_arch.devices[dev].type;
  348. devices[ndevices].devid = str_worker->perf_arch.devices[dev].devid;
  349. devices[ndevices].ncores = str_worker->perf_arch.devices[dev].ncores;
  350. ndevices++;
  351. }
  352. else
  353. found = 0;
  354. }
  355. found = 0;
  356. }
  357. sched_ctx->perf_arch.ndevices = ndevices;
  358. for(dev = 0; dev < ndevices; dev++)
  359. {
  360. sched_ctx->perf_arch.devices[dev].type = devices[dev].type;
  361. sched_ctx->perf_arch.devices[dev].devid = devices[dev].devid;
  362. sched_ctx->perf_arch.devices[dev].ncores = devices[dev].ncores;
  363. }
  364. if(!sched_ctx->sched_policy)
  365. {
  366. if(!sched_ctx->awake_workers)
  367. {
  368. _starpu_sched_ctx_wake_these_workers_up(sched_ctx->id, removed_workers, *n_removed_workers);
  369. }
  370. }
  371. return;
  372. }
  373. static void _starpu_sched_ctx_free_scheduling_data(struct _starpu_sched_ctx *sched_ctx)
  374. {
  375. if(sched_ctx->sched_policy && sched_ctx->sched_policy->remove_workers)
  376. {
  377. int *workerids = NULL;
  378. unsigned nworkers_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &workerids);
  379. if(nworkers_ctx > 0)
  380. {
  381. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  382. sched_ctx->sched_policy->remove_workers(sched_ctx->id, workerids, nworkers_ctx);
  383. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  384. }
  385. free(workerids);
  386. }
  387. return;
  388. }
  389. #ifdef STARPU_HAVE_HWLOC
  390. static void _starpu_sched_ctx_create_hwloc_tree(struct _starpu_sched_ctx *sched_ctx)
  391. {
  392. sched_ctx->hwloc_workers_set = hwloc_bitmap_alloc();
  393. struct starpu_worker_collection *workers = sched_ctx->workers;
  394. struct _starpu_worker *worker;
  395. struct starpu_sched_ctx_iterator it;
  396. workers->init_iterator(workers, &it);
  397. while(workers->has_next(workers, &it))
  398. {
  399. worker = _starpu_get_worker_struct(workers->get_next(workers, &it));
  400. if(!starpu_worker_is_combined_worker(worker->workerid))
  401. {
  402. hwloc_bitmap_or(sched_ctx->hwloc_workers_set,
  403. sched_ctx->hwloc_workers_set,
  404. worker->hwloc_cpu_set);
  405. }
  406. }
  407. return;
  408. }
  409. #endif
  410. struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *policy, int *workerids,
  411. int nworkers_ctx, unsigned is_initial_sched,
  412. const char *sched_ctx_name,
  413. int min_prio_set, int min_prio,
  414. int max_prio_set, int max_prio,
  415. unsigned awake_workers,
  416. void (*sched_policy_init)(void))
  417. {
  418. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  419. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
  420. STARPU_ASSERT(config->topology.nsched_ctxs < STARPU_NMAX_SCHED_CTXS);
  421. unsigned id = _starpu_get_first_free_sched_ctx(config);
  422. struct _starpu_sched_ctx *sched_ctx = &config->sched_ctxs[id];
  423. sched_ctx->id = id;
  424. config->topology.nsched_ctxs++;
  425. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
  426. int nworkers = config->topology.nworkers;
  427. STARPU_ASSERT(nworkers_ctx <= nworkers);
  428. STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->empty_ctx_mutex, NULL);
  429. starpu_task_list_init(&sched_ctx->empty_ctx_tasks);
  430. STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->waiting_tasks_mutex, NULL);
  431. starpu_task_list_init(&sched_ctx->waiting_tasks);
  432. STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->sched_ctx_list_mutex, NULL);
  433. sched_ctx->sched_policy = policy ? (struct starpu_sched_policy*)malloc(sizeof(struct starpu_sched_policy)) : NULL;
  434. sched_ctx->is_initial_sched = is_initial_sched;
  435. sched_ctx->name = sched_ctx_name;
  436. sched_ctx->inheritor = STARPU_NMAX_SCHED_CTXS;
  437. sched_ctx->finished_submit = 0;
  438. sched_ctx->min_priority_is_set = min_prio_set;
  439. if (sched_ctx->min_priority_is_set) sched_ctx->min_priority = min_prio;
  440. sched_ctx->max_priority_is_set = max_prio_set;
  441. if (sched_ctx->max_priority_is_set) sched_ctx->max_priority = max_prio;
  442. _starpu_barrier_counter_init(&sched_ctx->tasks_barrier, 0);
  443. _starpu_barrier_counter_init(&sched_ctx->ready_tasks_barrier, 0);
  444. sched_ctx->ready_flops = 0.0;
  445. sched_ctx->main_master = -1;
  446. sched_ctx->perf_arch.devices = NULL;
  447. sched_ctx->perf_arch.ndevices = 0;
  448. sched_ctx->init_sched = sched_policy_init;
  449. int w;
  450. for(w = 0; w < nworkers; w++)
  451. {
  452. sem_init(&sched_ctx->fall_asleep_sem[w], 0, 0);
  453. sem_init(&sched_ctx->wake_up_sem[w], 0, 0);
  454. STARPU_PTHREAD_COND_INIT(&sched_ctx->parallel_sect_cond[w], NULL);
  455. STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->parallel_sect_mutex[w], NULL);
  456. STARPU_PTHREAD_COND_INIT(&sched_ctx->parallel_sect_cond_busy[w], NULL);
  457. sched_ctx->busy[w] = 0;
  458. sched_ctx->master[w] = -1;
  459. sched_ctx->parallel_sect[w] = 0;
  460. sched_ctx->sleeping[w] = 0;
  461. }
  462. /*init the strategy structs and the worker_collection of the ressources of the context */
  463. if(policy)
  464. {
  465. _starpu_init_sched_policy(config, sched_ctx, policy);
  466. sched_ctx->awake_workers = 1;
  467. }
  468. else
  469. {
  470. sched_ctx->awake_workers = awake_workers;
  471. starpu_sched_ctx_create_worker_collection(sched_ctx->id, STARPU_WORKER_LIST);
  472. }
  473. if(is_initial_sched)
  474. {
  475. int i;
  476. /*initialize the mutexes for all contexts */
  477. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  478. {
  479. STARPU_PTHREAD_RWLOCK_INIT(&changing_ctx_mutex[i], NULL);
  480. }
  481. }
  482. /* after having an worker_collection on the ressources add them */
  483. _starpu_add_workers_to_sched_ctx(sched_ctx, workerids, nworkers_ctx, NULL, NULL);
  484. #ifdef STARPU_HAVE_HWLOC
  485. /* build hwloc tree of the context */
  486. _starpu_sched_ctx_create_hwloc_tree(sched_ctx);
  487. #endif //STARPU_HAVE_HWLOC
  488. /* if we create the initial big sched ctx we can update workers' status here
  489. because they haven't been launched yet */
  490. if(is_initial_sched)
  491. {
  492. int i;
  493. for(i = 0; i < nworkers; i++)
  494. {
  495. struct _starpu_worker *worker = _starpu_get_worker_struct(i);
  496. if(!_starpu_sched_ctx_list_add(&worker->sched_ctx_list, sched_ctx->id))
  497. worker->nsched_ctxs++;
  498. }
  499. }
  500. return sched_ctx;
  501. }
  502. static void _get_workers(int min, int max, int *workers, int *nw, enum starpu_worker_archtype arch, unsigned allow_overlap)
  503. {
  504. int pus[max];
  505. int npus = 0;
  506. int i;
  507. int n = 0;
  508. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  509. if(config->topology.nsched_ctxs == 1)
  510. {
  511. /*we have all available resources */
  512. npus = starpu_worker_get_nids_by_type(arch, pus, max);
  513. /*TODO: hierarchical ctxs: get max good workers: close one to another */
  514. for(i = 0; i < npus; i++)
  515. workers[(*nw)++] = pus[i];
  516. }
  517. else
  518. {
  519. unsigned enough_ressources = 0;
  520. npus = starpu_worker_get_nids_ctx_free_by_type(arch, pus, max);
  521. for(i = 0; i < npus; i++)
  522. workers[(*nw)++] = pus[i];
  523. if(npus == max)
  524. /*we have enough available resources */
  525. enough_ressources = 1;
  526. if(!enough_ressources && npus >= min)
  527. /*we have enough available resources */
  528. enough_ressources = 1;
  529. if(!enough_ressources)
  530. {
  531. /* try to get ressources from ctx who have more than the min of workers they need */
  532. int s;
  533. for(s = 1; s < STARPU_NMAX_SCHED_CTXS; s++)
  534. {
  535. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  536. {
  537. int _npus = 0;
  538. int _pus[STARPU_NMAXWORKERS];
  539. _npus = _starpu_get_workers_of_sched_ctx(config->sched_ctxs[s].id, _pus, arch);
  540. int ctx_min = arch == STARPU_CPU_WORKER ? config->sched_ctxs[s].min_ncpus : config->sched_ctxs[s].min_ngpus;
  541. if(_npus > ctx_min)
  542. {
  543. if(npus < min)
  544. {
  545. n = (_npus - ctx_min) > (min - npus) ? min - npus : (_npus - ctx_min);
  546. npus += n;
  547. }
  548. /*TODO: hierarchical ctxs: get n good workers: close to the other ones I already assigned to the ctx */
  549. for(i = 0; i < n; i++)
  550. workers[(*nw)++] = _pus[i];
  551. starpu_sched_ctx_remove_workers(_pus, n, config->sched_ctxs[s].id);
  552. }
  553. }
  554. }
  555. if(npus >= min)
  556. enough_ressources = 1;
  557. }
  558. if(!enough_ressources)
  559. {
  560. /* if there is no available workers to satisfy the minimum required
  561. give them workers proportional to their requirements*/
  562. int global_npus = starpu_worker_get_count_by_type(arch);
  563. int req_npus = 0;
  564. int s;
  565. for(s = 1; s < STARPU_NMAX_SCHED_CTXS; s++)
  566. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  567. req_npus += arch == STARPU_CPU_WORKER ? config->sched_ctxs[s].min_ncpus : config->sched_ctxs[s].min_ngpus;
  568. req_npus += min;
  569. for(s = 1; s < STARPU_NMAX_SCHED_CTXS; s++)
  570. {
  571. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  572. {
  573. int ctx_min = arch == STARPU_CPU_WORKER ? config->sched_ctxs[s].min_ncpus : config->sched_ctxs[s].min_ngpus;
  574. double needed_npus = ((double)ctx_min * (double)global_npus) / (double)req_npus;
  575. int _npus = 0;
  576. int _pus[STARPU_NMAXWORKERS];
  577. _npus = _starpu_get_workers_of_sched_ctx(config->sched_ctxs[s].id, _pus, arch);
  578. if(needed_npus < (double)_npus)
  579. {
  580. double npus_to_rem = (double)_npus - needed_npus;
  581. int x = floor(npus_to_rem);
  582. double x_double = (double)x;
  583. double diff = npus_to_rem - x_double;
  584. int npus_to_remove = diff >= 0.5 ? x+1 : x;
  585. int pus_to_remove[npus_to_remove];
  586. int c = 0;
  587. /*TODO: hierarchical ctxs: get npus_to_remove good workers: close to the other ones I already assigned to the ctx */
  588. for(i = _npus-1; i >= (_npus - npus_to_remove); i--)
  589. {
  590. workers[(*nw)++] = _pus[i];
  591. pus_to_remove[c++] = _pus[i];
  592. }
  593. if(!allow_overlap)
  594. starpu_sched_ctx_remove_workers(pus_to_remove, npus_to_remove, config->sched_ctxs[s].id);
  595. }
  596. }
  597. }
  598. }
  599. }
  600. }
  601. unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const char *sched_ctx_name,
  602. int min_ncpus, int max_ncpus, int min_ngpus, int max_ngpus,
  603. unsigned allow_overlap)
  604. {
  605. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  606. struct starpu_sched_policy *selected_policy = _starpu_select_sched_policy(config, policy_name);
  607. struct _starpu_sched_ctx *sched_ctx = NULL;
  608. int workers[max_ncpus + max_ngpus];
  609. int nw = 0;
  610. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
  611. _get_workers(min_ncpus, max_ncpus, workers, &nw, STARPU_CPU_WORKER, allow_overlap);
  612. _get_workers(min_ngpus, max_ngpus, workers, &nw, STARPU_CUDA_WORKER, allow_overlap);
  613. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
  614. int i;
  615. printf("%d: ", nw);
  616. for(i = 0; i < nw; i++)
  617. printf("%d ", workers[i]);
  618. printf("\n");
  619. sched_ctx = _starpu_create_sched_ctx(selected_policy, workers, nw, 0, sched_ctx_name, 0, 0, 0, 0, 1, NULL);
  620. sched_ctx->min_ncpus = min_ncpus;
  621. sched_ctx->max_ncpus = max_ncpus;
  622. sched_ctx->min_ngpus = min_ngpus;
  623. sched_ctx->max_ngpus = max_ngpus;
  624. _starpu_unlock_mutex_if_prev_locked();
  625. int *added_workerids;
  626. unsigned nw_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &added_workerids);
  627. _starpu_update_workers_without_ctx(added_workerids, nw_ctx, sched_ctx->id, 0);
  628. free(added_workerids);
  629. _starpu_relock_mutex_if_prev_locked();
  630. #ifdef STARPU_USE_SC_HYPERVISOR
  631. sched_ctx->perf_counters = NULL;
  632. #endif
  633. return sched_ctx->id;
  634. }
  635. unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched_ctx_name, ...)
  636. {
  637. va_list varg_list;
  638. int arg_type;
  639. int min_prio_set = 0;
  640. int max_prio_set = 0;
  641. int min_prio = 0;
  642. int max_prio = 0;
  643. struct starpu_sched_policy *sched_policy = NULL;
  644. unsigned hierarchy_level = 0;
  645. unsigned nesting_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  646. unsigned awake_workers = 0;
  647. void (*init_sched)(void) = NULL;
  648. va_start(varg_list, sched_ctx_name);
  649. while ((arg_type = va_arg(varg_list, int)) != 0)
  650. {
  651. if (arg_type == STARPU_SCHED_CTX_POLICY_NAME)
  652. {
  653. char *policy_name = va_arg(varg_list, char *);
  654. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  655. sched_policy = _starpu_select_sched_policy(config, policy_name);
  656. }
  657. else if (arg_type == STARPU_SCHED_CTX_POLICY_STRUCT)
  658. {
  659. sched_policy = va_arg(varg_list, struct starpu_sched_policy *);
  660. }
  661. else if (arg_type == STARPU_SCHED_CTX_POLICY_MIN_PRIO)
  662. {
  663. min_prio = va_arg(varg_list, int);
  664. min_prio_set = 1;
  665. }
  666. else if (arg_type == STARPU_SCHED_CTX_POLICY_MAX_PRIO)
  667. {
  668. max_prio = va_arg(varg_list, int);
  669. max_prio_set = 1;
  670. }
  671. else if (arg_type == STARPU_SCHED_CTX_HIERARCHY_LEVEL)
  672. {
  673. hierarchy_level = va_arg(varg_list, unsigned);
  674. }
  675. else if (arg_type == STARPU_SCHED_CTX_NESTED)
  676. {
  677. nesting_sched_ctx = va_arg(varg_list, unsigned);
  678. }
  679. else if (arg_type == STARPU_SCHED_CTX_AWAKE_WORKERS)
  680. {
  681. awake_workers = 1;
  682. }
  683. else if (arg_type == STARPU_SCHED_CTX_POLICY_INIT)
  684. {
  685. init_sched = va_arg(varg_list, void(*)(void));
  686. }
  687. else
  688. {
  689. STARPU_ABORT_MSG("Unrecognized argument %d\n", arg_type);
  690. }
  691. }
  692. va_end(varg_list);
  693. if (workerids && nworkers != -1)
  694. {
  695. /* Make sure the user doesn't use invalid worker IDs. */
  696. unsigned num_workers = starpu_worker_get_count();
  697. int i;
  698. for (i = 0; i < nworkers; i++)
  699. {
  700. if (workerids[i] < 0 || workerids[i] >= num_workers)
  701. {
  702. _STARPU_ERROR("Invalid worker ID (%d) specified!\n", workerids[i]);
  703. return STARPU_NMAX_SCHED_CTXS;
  704. }
  705. }
  706. }
  707. struct _starpu_sched_ctx *sched_ctx = NULL;
  708. sched_ctx = _starpu_create_sched_ctx(sched_policy, workerids, nworkers, 0, sched_ctx_name, min_prio_set, min_prio, max_prio_set, max_prio, awake_workers, init_sched);
  709. sched_ctx->hierarchy_level = hierarchy_level;
  710. sched_ctx->nesting_sched_ctx = nesting_sched_ctx;
  711. _starpu_unlock_mutex_if_prev_locked();
  712. int *added_workerids;
  713. unsigned nw_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &added_workerids);
  714. _starpu_update_workers_with_ctx(added_workerids, nw_ctx, sched_ctx->id);
  715. free(added_workerids);
  716. _starpu_relock_mutex_if_prev_locked();
  717. #ifdef STARPU_USE_SC_HYPERVISOR
  718. sched_ctx->perf_counters = NULL;
  719. #endif
  720. return sched_ctx->id;
  721. }
  722. void starpu_sched_ctx_register_close_callback(unsigned sched_ctx_id, void (*close_callback)(unsigned sched_ctx_id, void* args), void *args)
  723. {
  724. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  725. sched_ctx->close_callback = close_callback;
  726. sched_ctx->close_args = args;
  727. return;
  728. }
  729. #ifdef STARPU_USE_SC_HYPERVISOR
  730. void starpu_sched_ctx_set_perf_counters(unsigned sched_ctx_id, void* perf_counters)
  731. {
  732. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  733. sched_ctx->perf_counters = (struct starpu_sched_ctx_performance_counters *)perf_counters;
  734. return;
  735. }
  736. #endif
  737. /* free all structures for the context */
  738. static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
  739. {
  740. STARPU_ASSERT(sched_ctx->id != STARPU_NMAX_SCHED_CTXS);
  741. struct _starpu_machine_config *config = _starpu_get_machine_config();
  742. int nworkers = config->topology.nworkers;
  743. int w;
  744. for(w = 0; w < nworkers; w++)
  745. {
  746. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[w]);
  747. while (sched_ctx->busy[w]) {
  748. STARPU_PTHREAD_COND_WAIT(&sched_ctx->parallel_sect_cond_busy[w], &sched_ctx->parallel_sect_mutex[w]);
  749. }
  750. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[w]);
  751. }
  752. if(sched_ctx->sched_policy)
  753. {
  754. _starpu_deinit_sched_policy(sched_ctx);
  755. free(sched_ctx->sched_policy);
  756. sched_ctx->sched_policy = NULL;
  757. }
  758. else
  759. {
  760. starpu_sched_ctx_delete_worker_collection(sched_ctx->id);
  761. }
  762. if (sched_ctx->perf_arch.devices)
  763. {
  764. free(sched_ctx->perf_arch.devices);
  765. sched_ctx->perf_arch.devices = NULL;
  766. }
  767. STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->empty_ctx_mutex);
  768. STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->waiting_tasks_mutex);
  769. STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->sched_ctx_list_mutex);
  770. sched_ctx->id = STARPU_NMAX_SCHED_CTXS;
  771. #ifdef STARPU_HAVE_HWLOC
  772. hwloc_bitmap_free(sched_ctx->hwloc_workers_set);
  773. #endif //STARPU_HAVE_HWLOC
  774. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
  775. config->topology.nsched_ctxs--;
  776. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
  777. }
  778. void starpu_sched_ctx_delete(unsigned sched_ctx_id)
  779. {
  780. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  781. #ifdef STARPU_USE_SC_HYPERVISOR
  782. if(sched_ctx != NULL && sched_ctx_id != 0 && sched_ctx_id != STARPU_NMAX_SCHED_CTXS
  783. && sched_ctx->perf_counters != NULL)
  784. {
  785. _STARPU_TRACE_HYPERVISOR_BEGIN();
  786. sched_ctx->perf_counters->notify_delete_context(sched_ctx_id);
  787. _STARPU_TRACE_HYPERVISOR_END();
  788. }
  789. #endif //STARPU_USE_SC_HYPERVISOR
  790. unsigned inheritor_sched_ctx_id = sched_ctx->inheritor;
  791. struct _starpu_sched_ctx *inheritor_sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx->inheritor);
  792. _starpu_unlock_mutex_if_prev_locked();
  793. STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx_id]);
  794. STARPU_ASSERT(sched_ctx->id != STARPU_NMAX_SCHED_CTXS);
  795. int *workerids;
  796. unsigned nworkers_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &workerids);
  797. /*if both of them have all the ressources is pointless*/
  798. /*trying to transfer ressources from one ctx to the other*/
  799. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  800. unsigned nworkers = config->topology.nworkers;
  801. if(nworkers_ctx > 0 && inheritor_sched_ctx && inheritor_sched_ctx->id != STARPU_NMAX_SCHED_CTXS &&
  802. !(nworkers_ctx == nworkers && nworkers_ctx == inheritor_sched_ctx->workers->nworkers))
  803. {
  804. starpu_sched_ctx_add_workers(workerids, nworkers_ctx, inheritor_sched_ctx_id);
  805. starpu_sched_ctx_set_priority(workerids, nworkers_ctx, inheritor_sched_ctx_id, 1);
  806. starpu_sched_ctx_set_priority_on_level(workerids, nworkers_ctx, inheritor_sched_ctx_id, 1);
  807. }
  808. if(!_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id))
  809. {
  810. if(!sched_ctx->sched_policy)
  811. starpu_sched_ctx_unbook_workers_for_task(sched_ctx->id, sched_ctx->main_master);
  812. /*if btw the mutex release & the mutex lock the context has changed take care to free all
  813. scheduling data before deleting the context */
  814. _starpu_update_workers_without_ctx(workerids, nworkers_ctx, sched_ctx_id, 1);
  815. _starpu_sched_ctx_free_scheduling_data(sched_ctx);
  816. _starpu_delete_sched_ctx(sched_ctx);
  817. }
  818. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
  819. /* workerids is malloc-ed in starpu_sched_ctx_get_workers_list, don't forget to free it when
  820. you don't use it anymore */
  821. free(workerids);
  822. _starpu_relock_mutex_if_prev_locked();
  823. return;
  824. }
  825. /* called after the workers are terminated so we don't have anything else to do but free the memory*/
  826. void _starpu_delete_all_sched_ctxs()
  827. {
  828. unsigned i;
  829. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  830. {
  831. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(i);
  832. STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[i]);
  833. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  834. {
  835. _starpu_sched_ctx_free_scheduling_data(sched_ctx);
  836. _starpu_barrier_counter_destroy(&sched_ctx->tasks_barrier);
  837. _starpu_barrier_counter_destroy(&sched_ctx->ready_tasks_barrier);
  838. _starpu_delete_sched_ctx(sched_ctx);
  839. }
  840. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[i]);
  841. STARPU_PTHREAD_RWLOCK_DESTROY(&changing_ctx_mutex[i]);
  842. }
  843. STARPU_PTHREAD_KEY_DELETE(sched_ctx_key);
  844. return;
  845. }
  846. static void _starpu_check_workers(int *workerids, int nworkers)
  847. {
  848. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  849. int nworkers_conf = config->topology.nworkers;
  850. int i;
  851. for(i = 0; i < nworkers; i++)
  852. {
  853. /* take care the user does not ask for a resource that does not exist */
  854. STARPU_ASSERT_MSG(workerids[i] >= 0 && workerids[i] <= nworkers_conf, "requested to add workerid = %d, but that is beyond the range 0 to %d", workerids[i], nworkers_conf);
  855. }
  856. }
  857. void _starpu_fetch_tasks_from_empty_ctx_list(struct _starpu_sched_ctx *sched_ctx)
  858. {
  859. unsigned unlocked = 0;
  860. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  861. if(starpu_task_list_empty(&sched_ctx->empty_ctx_tasks))
  862. {
  863. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  864. return;
  865. }
  866. else
  867. /* you're not suppose to get here if you deleted the context
  868. so no point in having the mutex locked */
  869. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx->id]);
  870. while(!starpu_task_list_empty(&sched_ctx->empty_ctx_tasks))
  871. {
  872. if(unlocked)
  873. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  874. struct starpu_task *old_task = starpu_task_list_pop_back(&sched_ctx->empty_ctx_tasks);
  875. unlocked = 1;
  876. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  877. if(old_task == &stop_submission_task)
  878. break;
  879. int ret = _starpu_push_task_to_workers(old_task);
  880. /* if we should stop poping from empty ctx tasks */
  881. if(ret == -EAGAIN) break;
  882. }
  883. if(!unlocked)
  884. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  885. /* leave the mutex as it was to avoid pbs in the caller function */
  886. STARPU_PTHREAD_RWLOCK_RDLOCK(&changing_ctx_mutex[sched_ctx->id]);
  887. return;
  888. }
  889. unsigned _starpu_can_push_task(struct _starpu_sched_ctx *sched_ctx, struct starpu_task *task)
  890. {
  891. if(sched_ctx->sched_policy && sched_ctx->sched_policy->simulate_push_task)
  892. {
  893. if (window_size == 0.0) return 1;
  894. STARPU_PTHREAD_RWLOCK_RDLOCK(&changing_ctx_mutex[sched_ctx->id]);
  895. double expected_end = sched_ctx->sched_policy->simulate_push_task(task);
  896. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx->id]);
  897. double expected_len = 0.0;
  898. if(hyp_actual_start_sample[sched_ctx->id] != 0.0)
  899. expected_len = expected_end - hyp_actual_start_sample[sched_ctx->id] ;
  900. else
  901. {
  902. printf("%d: sc start is 0.0\n", sched_ctx->id);
  903. expected_len = expected_end - starpu_timing_now();
  904. }
  905. if(expected_len < 0.0)
  906. printf("exp len negative %lf \n", expected_len);
  907. expected_len /= 1000000.0;
  908. // printf("exp_end %lf start %lf expected_len %lf \n", expected_end, hyp_actual_start_sample[sched_ctx->id], expected_len);
  909. if(expected_len > (window_size + 0.2*window_size))
  910. return 0;
  911. }
  912. return 1;
  913. }
  914. void _starpu_fetch_task_from_waiting_list(struct _starpu_sched_ctx *sched_ctx)
  915. {
  916. if(starpu_task_list_empty(&sched_ctx->waiting_tasks))
  917. return;
  918. struct starpu_task *old_task = starpu_task_list_back(&sched_ctx->waiting_tasks);
  919. if(_starpu_can_push_task(sched_ctx, old_task))
  920. {
  921. old_task = starpu_task_list_pop_back(&sched_ctx->waiting_tasks);
  922. _starpu_push_task_to_workers(old_task);
  923. }
  924. return;
  925. }
  926. void _starpu_push_task_to_waiting_list(struct _starpu_sched_ctx *sched_ctx, struct starpu_task *task)
  927. {
  928. starpu_task_list_push_front(&sched_ctx->waiting_tasks, task);
  929. return;
  930. }
  931. void starpu_sched_ctx_set_priority_on_level(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority)
  932. {
  933. /* int w; */
  934. /* struct _starpu_worker *worker = NULL; */
  935. /* for(w = 0; w < nworkers_to_add; w++) */
  936. /* { */
  937. /* worker = _starpu_get_worker_struct(workers_to_add[w]); */
  938. /* STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex); */
  939. /* struct _starpu_sched_ctx_list *l = NULL; */
  940. /* for (l = worker->sched_ctx_list; l; l = l->next) */
  941. /* { */
  942. /* if(l->sched_ctx != STARPU_NMAX_SCHED_CTXS && l->sched_ctx != sched_ctx && */
  943. /* starpu_sched_ctx_get_hierarchy_level(l->sched_ctx) == starpu_sched_ctx_get_hierarchy_level(sched_ctx)) */
  944. /* { */
  945. /* /\* the lock is taken inside the func *\/ */
  946. /* STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex); */
  947. /* starpu_sched_ctx_set_priority(&workers_to_add[w], 1, l->sched_ctx, priority); */
  948. /* STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex); */
  949. /* } */
  950. /* } */
  951. /* STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex); */
  952. /* } */
  953. /* return; */
  954. }
  955. static void _set_priority_hierarchically(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority)
  956. {
  957. if(starpu_sched_ctx_get_hierarchy_level(sched_ctx) > 0)
  958. {
  959. unsigned father = starpu_sched_ctx_get_inheritor(sched_ctx);
  960. starpu_sched_ctx_set_priority(workers_to_add, nworkers_to_add, father, priority);
  961. starpu_sched_ctx_set_priority_on_level(workers_to_add, nworkers_to_add, father, priority);
  962. _set_priority_hierarchically(workers_to_add, nworkers_to_add, father, priority);
  963. }
  964. return;
  965. }
  966. void starpu_sched_ctx_add_workers(int *workers_to_add, int nworkers_to_add, unsigned sched_ctx_id)
  967. {
  968. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  969. int added_workers[nworkers_to_add];
  970. int n_added_workers = 0;
  971. _starpu_unlock_mutex_if_prev_locked();
  972. STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx_id]);
  973. STARPU_ASSERT(workers_to_add != NULL && nworkers_to_add > 0);
  974. _starpu_check_workers(workers_to_add, nworkers_to_add);
  975. /* if the context has not already been deleted */
  976. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  977. {
  978. _starpu_add_workers_to_sched_ctx(sched_ctx, workers_to_add, nworkers_to_add, added_workers, &n_added_workers);
  979. if(n_added_workers > 0)
  980. {
  981. _starpu_update_workers_with_ctx(added_workers, n_added_workers, sched_ctx->id);
  982. }
  983. starpu_sched_ctx_set_priority(workers_to_add, nworkers_to_add, sched_ctx_id, 1);
  984. _set_priority_hierarchically(workers_to_add, nworkers_to_add, sched_ctx_id, 0);
  985. }
  986. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
  987. _starpu_relock_mutex_if_prev_locked();
  988. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  989. {
  990. STARPU_PTHREAD_RWLOCK_RDLOCK(&changing_ctx_mutex[sched_ctx_id]);
  991. _starpu_fetch_tasks_from_empty_ctx_list(sched_ctx);
  992. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
  993. }
  994. return;
  995. }
  996. void starpu_sched_ctx_remove_workers(int *workers_to_remove, int nworkers_to_remove, unsigned sched_ctx_id)
  997. {
  998. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  999. int removed_workers[sched_ctx->workers->nworkers];
  1000. int n_removed_workers = 0;
  1001. _starpu_check_workers(workers_to_remove, nworkers_to_remove);
  1002. _starpu_unlock_mutex_if_prev_locked();
  1003. STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx_id]);
  1004. /* if the context has not already been deleted */
  1005. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  1006. {
  1007. _starpu_remove_workers_from_sched_ctx(sched_ctx, workers_to_remove, nworkers_to_remove, removed_workers, &n_removed_workers);
  1008. if(n_removed_workers > 0)
  1009. {
  1010. _starpu_update_workers_without_ctx(removed_workers, n_removed_workers, sched_ctx_id, 0);
  1011. starpu_sched_ctx_set_priority(removed_workers, n_removed_workers, sched_ctx_id, 1);
  1012. }
  1013. }
  1014. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
  1015. _starpu_relock_mutex_if_prev_locked();
  1016. return;
  1017. }
  1018. int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _starpu_sched_ctx *sched_ctx)
  1019. {
  1020. unsigned worker = 0, nworkers = 0;
  1021. STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx->id]);
  1022. struct starpu_worker_collection *workers = sched_ctx->workers;
  1023. struct starpu_sched_ctx_iterator it;
  1024. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  1025. while(workers->has_next(workers, &it))
  1026. {
  1027. worker = workers->get_next(workers, &it);
  1028. STARPU_ASSERT_MSG(worker < STARPU_NMAXWORKERS, "worker id %d", worker);
  1029. if (starpu_worker_can_execute_task_first_impl(worker, task, NULL))
  1030. nworkers++;
  1031. }
  1032. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx->id]);
  1033. return nworkers;
  1034. }
  1035. /* unused sched_ctx have the id STARPU_NMAX_SCHED_CTXS */
  1036. void _starpu_init_all_sched_ctxs(struct _starpu_machine_config *config)
  1037. {
  1038. STARPU_PTHREAD_KEY_CREATE(&sched_ctx_key, NULL);
  1039. window_size = starpu_get_env_float_default("STARPU_WINDOW_TIME_SIZE", 0.0);
  1040. nobind = starpu_get_env_number("STARPU_WORKERS_NOBIND");
  1041. unsigned i;
  1042. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  1043. config->sched_ctxs[i].id = STARPU_NMAX_SCHED_CTXS;
  1044. return;
  1045. }
  1046. /* sched_ctx aren't necessarly one next to another */
  1047. /* for eg when we remove one its place is free */
  1048. /* when we add new one we reuse its place */
  1049. static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config)
  1050. {
  1051. unsigned i;
  1052. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  1053. if(config->sched_ctxs[i].id == STARPU_NMAX_SCHED_CTXS)
  1054. return i;
  1055. STARPU_ASSERT(0);
  1056. return STARPU_NMAX_SCHED_CTXS;
  1057. }
  1058. int _starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id)
  1059. {
  1060. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1061. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait_for_all must not be called from a task or callback");
  1062. return _starpu_barrier_counter_wait_for_empty_counter(&sched_ctx->tasks_barrier);
  1063. }
  1064. int _starpu_wait_for_n_submitted_tasks_of_sched_ctx(unsigned sched_ctx_id, unsigned n)
  1065. {
  1066. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1067. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait_for_n_submitted_tasks must not be called from a task or callback");
  1068. return _starpu_barrier_counter_wait_until_counter_reaches_down_to_n(&sched_ctx->tasks_barrier, n);
  1069. }
  1070. void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
  1071. {
  1072. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  1073. #ifndef STARPU_SANITIZE_THREAD
  1074. if (!config->watchdog_ok)
  1075. config->watchdog_ok = 1;
  1076. #endif
  1077. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1078. int reached = _starpu_barrier_counter_get_reached_start(&sched_ctx->tasks_barrier);
  1079. int finished = reached == 1;
  1080. /* when finished decrementing the tasks if the user signaled he will not submit tasks anymore
  1081. we can move all its workers to the inheritor context */
  1082. if(finished && sched_ctx->inheritor != STARPU_NMAX_SCHED_CTXS)
  1083. {
  1084. STARPU_PTHREAD_MUTEX_LOCK(&finished_submit_mutex);
  1085. if(sched_ctx->finished_submit)
  1086. {
  1087. STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
  1088. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  1089. {
  1090. if(sched_ctx->close_callback)
  1091. sched_ctx->close_callback(sched_ctx->id, sched_ctx->close_args);
  1092. int *workerids = NULL;
  1093. unsigned nworkers = starpu_sched_ctx_get_workers_list(sched_ctx->id, &workerids);
  1094. if(nworkers > 0)
  1095. {
  1096. starpu_sched_ctx_add_workers(workerids, nworkers, sched_ctx->inheritor);
  1097. free(workerids);
  1098. }
  1099. }
  1100. _starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->tasks_barrier, 0.0);
  1101. return;
  1102. }
  1103. STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
  1104. }
  1105. /* We also need to check for config->submitting = 0 (i.e. the
  1106. * user calle starpu_drivers_request_termination()), in which
  1107. * case we need to set config->running to 0 and wake workers,
  1108. * so they can terminate, just like
  1109. * starpu_drivers_request_termination() does.
  1110. */
  1111. STARPU_PTHREAD_MUTEX_LOCK(&config->submitted_mutex);
  1112. if(config->submitting == 0)
  1113. {
  1114. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  1115. {
  1116. if(sched_ctx->close_callback)
  1117. sched_ctx->close_callback(sched_ctx->id, sched_ctx->close_args);
  1118. }
  1119. ANNOTATE_HAPPENS_AFTER(&config->running);
  1120. config->running = 0;
  1121. ANNOTATE_HAPPENS_BEFORE(&config->running);
  1122. int s;
  1123. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  1124. {
  1125. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  1126. {
  1127. _starpu_check_nsubmitted_tasks_of_sched_ctx(config->sched_ctxs[s].id);
  1128. }
  1129. }
  1130. }
  1131. STARPU_PTHREAD_MUTEX_UNLOCK(&config->submitted_mutex);
  1132. _starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->tasks_barrier, 0.0);
  1133. return;
  1134. }
  1135. void _starpu_increment_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
  1136. {
  1137. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1138. _starpu_barrier_counter_increment(&sched_ctx->tasks_barrier, 0.0);
  1139. }
  1140. int _starpu_get_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
  1141. {
  1142. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1143. return _starpu_barrier_counter_get_reached_start(&sched_ctx->tasks_barrier);
  1144. }
  1145. int _starpu_check_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
  1146. {
  1147. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1148. return _starpu_barrier_counter_check(&sched_ctx->tasks_barrier);
  1149. }
  1150. unsigned _starpu_increment_nready_tasks_of_sched_ctx(unsigned sched_ctx_id, double ready_flops, struct starpu_task *task)
  1151. {
  1152. unsigned ret = 1;
  1153. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1154. if(!sched_ctx->is_initial_sched)
  1155. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->waiting_tasks_mutex);
  1156. _starpu_barrier_counter_increment(&sched_ctx->ready_tasks_barrier, ready_flops);
  1157. if(!sched_ctx->is_initial_sched)
  1158. {
  1159. if(!_starpu_can_push_task(sched_ctx, task))
  1160. {
  1161. _starpu_push_task_to_waiting_list(sched_ctx, task);
  1162. ret = 0;
  1163. }
  1164. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->waiting_tasks_mutex);
  1165. }
  1166. return ret;
  1167. }
  1168. void _starpu_decrement_nready_tasks_of_sched_ctx(unsigned sched_ctx_id, double ready_flops)
  1169. {
  1170. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1171. if(!sched_ctx->is_initial_sched)
  1172. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->waiting_tasks_mutex);
  1173. _starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->ready_tasks_barrier, ready_flops);
  1174. if(!sched_ctx->is_initial_sched)
  1175. {
  1176. _starpu_fetch_task_from_waiting_list(sched_ctx);
  1177. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->waiting_tasks_mutex);
  1178. }
  1179. }
  1180. int starpu_sched_ctx_get_nready_tasks(unsigned sched_ctx_id)
  1181. {
  1182. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1183. return _starpu_barrier_counter_get_reached_start(&sched_ctx->ready_tasks_barrier);
  1184. }
  1185. double starpu_sched_ctx_get_nready_flops(unsigned sched_ctx_id)
  1186. {
  1187. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1188. return _starpu_barrier_counter_get_reached_flops(&sched_ctx->ready_tasks_barrier);
  1189. }
  1190. int _starpu_wait_for_no_ready_of_sched_ctx(unsigned sched_ctx_id)
  1191. {
  1192. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1193. return _starpu_barrier_counter_wait_for_empty_counter(&sched_ctx->ready_tasks_barrier);
  1194. }
  1195. void starpu_sched_ctx_set_context(unsigned *sched_ctx)
  1196. {
  1197. starpu_pthread_setspecific(sched_ctx_key, (void*)sched_ctx);
  1198. }
  1199. unsigned starpu_sched_ctx_get_context()
  1200. {
  1201. unsigned *sched_ctx = (unsigned*)starpu_pthread_getspecific(sched_ctx_key);
  1202. if(sched_ctx == NULL)
  1203. return STARPU_NMAX_SCHED_CTXS;
  1204. STARPU_ASSERT(*sched_ctx < STARPU_NMAX_SCHED_CTXS);
  1205. return *sched_ctx;
  1206. }
  1207. unsigned _starpu_sched_ctx_get_current_context()
  1208. {
  1209. unsigned sched_ctx = starpu_sched_ctx_get_context();
  1210. if (sched_ctx == STARPU_NMAX_SCHED_CTXS)
  1211. return _starpu_get_initial_sched_ctx()->id;
  1212. else
  1213. return sched_ctx;
  1214. }
  1215. void starpu_sched_ctx_notify_hypervisor_exists()
  1216. {
  1217. with_hypervisor = 1;
  1218. int i, j;
  1219. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  1220. {
  1221. hyp_start_sample[i] = starpu_timing_now();
  1222. hyp_start_allow_sample[i] = 0.0;
  1223. for(j = 0; j < STARPU_NMAXWORKERS; j++)
  1224. {
  1225. flops[i][j] = 0.0;
  1226. data_size[i][j] = 0;
  1227. }
  1228. hyp_actual_start_sample[i] = 0.0;
  1229. }
  1230. }
  1231. unsigned starpu_sched_ctx_check_if_hypervisor_exists()
  1232. {
  1233. return with_hypervisor;
  1234. }
  1235. void starpu_sched_ctx_update_start_resizing_sample(unsigned sched_ctx_id, double start_sample)
  1236. {
  1237. hyp_actual_start_sample[sched_ctx_id] = start_sample;
  1238. }
  1239. unsigned _starpu_sched_ctx_allow_hypervisor(unsigned sched_ctx_id)
  1240. {
  1241. return 1;
  1242. double now = starpu_timing_now();
  1243. if(hyp_start_allow_sample[sched_ctx_id] > 0.0)
  1244. {
  1245. double allow_sample = (now - hyp_start_allow_sample[sched_ctx_id]) / 1000000.0;
  1246. if(allow_sample < 0.001)
  1247. return 1;
  1248. else
  1249. {
  1250. hyp_start_allow_sample[sched_ctx_id] = 0.0;
  1251. hyp_start_sample[sched_ctx_id] = starpu_timing_now();
  1252. return 0;
  1253. }
  1254. }
  1255. double forbid_sample = (now - hyp_start_sample[sched_ctx_id]) / 1000000.0;
  1256. if(forbid_sample > 0.01)
  1257. {
  1258. // hyp_start_sample[sched_ctx_id] = starpu_timing_now();
  1259. hyp_start_allow_sample[sched_ctx_id] = starpu_timing_now();
  1260. return 1;
  1261. }
  1262. return 0;
  1263. }
  1264. void starpu_sched_ctx_set_policy_data(unsigned sched_ctx_id, void* policy_data)
  1265. {
  1266. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1267. sched_ctx->policy_data = policy_data;
  1268. }
  1269. void* starpu_sched_ctx_get_policy_data(unsigned sched_ctx_id)
  1270. {
  1271. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1272. return sched_ctx->policy_data;
  1273. }
  1274. struct starpu_worker_collection* starpu_sched_ctx_create_worker_collection(unsigned sched_ctx_id, enum starpu_worker_collection_type worker_collection_type)
  1275. {
  1276. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1277. sched_ctx->workers = (struct starpu_worker_collection*)malloc(sizeof(struct starpu_worker_collection));
  1278. switch(worker_collection_type)
  1279. {
  1280. #ifdef STARPU_HAVE_HWLOC
  1281. case STARPU_WORKER_TREE:
  1282. sched_ctx->workers->has_next = worker_tree.has_next;
  1283. sched_ctx->workers->get_next = worker_tree.get_next;
  1284. sched_ctx->workers->add = worker_tree.add;
  1285. sched_ctx->workers->remove = worker_tree.remove;
  1286. sched_ctx->workers->init = worker_tree.init;
  1287. sched_ctx->workers->deinit = worker_tree.deinit;
  1288. sched_ctx->workers->init_iterator = worker_tree.init_iterator;
  1289. sched_ctx->workers->init_iterator_for_parallel_tasks = worker_tree.init_iterator_for_parallel_tasks;
  1290. sched_ctx->workers->type = STARPU_WORKER_TREE;
  1291. break;
  1292. #endif
  1293. // case STARPU_WORKER_LIST:
  1294. default:
  1295. sched_ctx->workers->has_next = worker_list.has_next;
  1296. sched_ctx->workers->get_next = worker_list.get_next;
  1297. sched_ctx->workers->add = worker_list.add;
  1298. sched_ctx->workers->remove = worker_list.remove;
  1299. sched_ctx->workers->init = worker_list.init;
  1300. sched_ctx->workers->deinit = worker_list.deinit;
  1301. sched_ctx->workers->init_iterator = worker_list.init_iterator;
  1302. sched_ctx->workers->init_iterator_for_parallel_tasks = worker_list.init_iterator_for_parallel_tasks;
  1303. sched_ctx->workers->type = STARPU_WORKER_LIST;
  1304. break;
  1305. }
  1306. /* construct the collection of workers(list/tree/etc.) */
  1307. sched_ctx->workers->init(sched_ctx->workers);
  1308. return sched_ctx->workers;
  1309. }
  1310. void starpu_sched_ctx_display_workers(unsigned sched_ctx_id, FILE *f)
  1311. {
  1312. int *workerids = NULL;
  1313. unsigned nworkers;
  1314. unsigned i;
  1315. nworkers = starpu_sched_ctx_get_workers_list(sched_ctx_id, &workerids);
  1316. fprintf(f, "[sched_ctx %d]: %d worker%s\n", sched_ctx_id, nworkers, nworkers>1?"s":"");
  1317. for (i = 0; i < nworkers; i++)
  1318. {
  1319. char name[256];
  1320. starpu_worker_get_name(workerids[i], name, 256);
  1321. fprintf(f, "\t\t%s\n", name);
  1322. }
  1323. free(workerids);
  1324. }
  1325. unsigned starpu_sched_ctx_get_workers_list(unsigned sched_ctx_id, int **workerids)
  1326. {
  1327. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1328. struct starpu_worker_collection *workers = sched_ctx->workers;
  1329. if(!workers) return 0;
  1330. *workerids = (int*)malloc(workers->nworkers*sizeof(int));
  1331. int worker;
  1332. unsigned nworkers = 0;
  1333. struct starpu_sched_ctx_iterator it;
  1334. workers->init_iterator(workers, &it);
  1335. while(workers->has_next(workers, &it))
  1336. {
  1337. worker = workers->get_next(workers, &it);
  1338. (*workerids)[nworkers++] = worker;
  1339. }
  1340. return nworkers;
  1341. }
  1342. void starpu_sched_ctx_delete_worker_collection(unsigned sched_ctx_id)
  1343. {
  1344. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1345. sched_ctx->workers->deinit(sched_ctx->workers);
  1346. free(sched_ctx->workers);
  1347. sched_ctx->workers = NULL;
  1348. }
  1349. struct starpu_worker_collection* starpu_sched_ctx_get_worker_collection(unsigned sched_ctx_id)
  1350. {
  1351. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1352. return sched_ctx->workers;
  1353. }
  1354. int _starpu_get_workers_of_sched_ctx(unsigned sched_ctx_id, int *pus, enum starpu_worker_archtype arch)
  1355. {
  1356. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1357. struct starpu_worker_collection *workers = sched_ctx->workers;
  1358. int worker;
  1359. int npus = 0;
  1360. struct starpu_sched_ctx_iterator it;
  1361. workers->init_iterator(workers, &it);
  1362. while(workers->has_next(workers, &it))
  1363. {
  1364. worker = workers->get_next(workers, &it);
  1365. enum starpu_worker_archtype curr_arch = starpu_worker_get_type(worker);
  1366. if(curr_arch == arch || arch == STARPU_ANY_WORKER)
  1367. pus[npus++] = worker;
  1368. }
  1369. return npus;
  1370. }
  1371. starpu_pthread_rwlock_t* _starpu_sched_ctx_get_changing_ctx_mutex(unsigned sched_ctx_id)
  1372. {
  1373. return &changing_ctx_mutex[sched_ctx_id];
  1374. }
  1375. unsigned starpu_sched_ctx_get_nworkers(unsigned sched_ctx_id)
  1376. {
  1377. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1378. if(sched_ctx != NULL)
  1379. return sched_ctx->workers->nworkers;
  1380. else
  1381. return 0;
  1382. }
  1383. unsigned starpu_sched_ctx_get_nshared_workers(unsigned sched_ctx_id, unsigned sched_ctx_id2)
  1384. {
  1385. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1386. struct _starpu_sched_ctx *sched_ctx2 = _starpu_get_sched_ctx_struct(sched_ctx_id2);
  1387. struct starpu_worker_collection *workers = sched_ctx->workers;
  1388. struct starpu_worker_collection *workers2 = sched_ctx2->workers;
  1389. int worker, worker2;
  1390. int shared_workers = 0;
  1391. struct starpu_sched_ctx_iterator it1, it2;
  1392. workers->init_iterator(workers, &it1);
  1393. workers2->init_iterator(workers2, &it2);
  1394. while(workers->has_next(workers, &it1))
  1395. {
  1396. worker = workers->get_next(workers, &it1);
  1397. while(workers2->has_next(workers2, &it2))
  1398. {
  1399. worker2 = workers2->get_next(workers2, &it2);
  1400. if(worker == worker2)
  1401. shared_workers++;
  1402. }
  1403. }
  1404. return shared_workers;
  1405. }
  1406. unsigned starpu_sched_ctx_contains_worker(int workerid, unsigned sched_ctx_id)
  1407. {
  1408. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1409. struct starpu_worker_collection *workers = sched_ctx->workers;
  1410. if(workers)
  1411. {
  1412. int worker;
  1413. struct starpu_sched_ctx_iterator it;
  1414. workers->init_iterator(workers, &it);
  1415. while(workers->has_next(workers, &it))
  1416. {
  1417. worker = workers->get_next(workers, &it);
  1418. if(worker == workerid)
  1419. return 1;
  1420. }
  1421. }
  1422. return 0;
  1423. }
  1424. unsigned starpu_sched_ctx_contains_type_of_worker(enum starpu_worker_archtype arch, unsigned sched_ctx_id)
  1425. {
  1426. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  1427. int worker;
  1428. struct starpu_sched_ctx_iterator it;
  1429. workers->init_iterator(workers, &it);
  1430. while(workers->has_next(workers, &it))
  1431. {
  1432. worker = workers->get_next(workers, &it);
  1433. enum starpu_worker_archtype curr_arch = starpu_worker_get_type(worker);
  1434. if(curr_arch == arch)
  1435. return 1;
  1436. }
  1437. return 0;
  1438. }
  1439. unsigned _starpu_worker_belongs_to_a_sched_ctx(int workerid, unsigned sched_ctx_id)
  1440. {
  1441. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  1442. int i;
  1443. struct _starpu_sched_ctx *sched_ctx = NULL;
  1444. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  1445. {
  1446. sched_ctx = &config->sched_ctxs[i];
  1447. if(sched_ctx && sched_ctx->id != STARPU_NMAX_SCHED_CTXS && sched_ctx->id != sched_ctx_id)
  1448. if(starpu_sched_ctx_contains_worker(workerid, sched_ctx->id))
  1449. return 1;
  1450. }
  1451. return 0;
  1452. }
  1453. unsigned starpu_sched_ctx_worker_get_id(unsigned sched_ctx_id)
  1454. {
  1455. int workerid = starpu_worker_get_id();
  1456. if(workerid != -1)
  1457. if(starpu_sched_ctx_contains_worker(workerid, sched_ctx_id))
  1458. return workerid;
  1459. return -1;
  1460. }
  1461. unsigned starpu_sched_ctx_get_ctx_for_task(struct starpu_task *task)
  1462. {
  1463. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  1464. unsigned ret_sched_ctx = task->sched_ctx;
  1465. if (task->possibly_parallel && !sched_ctx->sched_policy
  1466. && sched_ctx->nesting_sched_ctx != STARPU_NMAX_SCHED_CTXS)
  1467. ret_sched_ctx = sched_ctx->nesting_sched_ctx;
  1468. return ret_sched_ctx;
  1469. }
  1470. unsigned starpu_sched_ctx_overlapping_ctxs_on_worker(int workerid)
  1471. {
  1472. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1473. return worker->nsched_ctxs > 1;
  1474. }
  1475. void starpu_sched_ctx_set_inheritor(unsigned sched_ctx_id, unsigned inheritor)
  1476. {
  1477. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  1478. STARPU_ASSERT(inheritor < STARPU_NMAX_SCHED_CTXS);
  1479. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1480. sched_ctx->inheritor = inheritor;
  1481. return;
  1482. }
  1483. unsigned starpu_sched_ctx_get_inheritor(unsigned sched_ctx_id)
  1484. {
  1485. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  1486. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1487. return sched_ctx->inheritor;
  1488. }
  1489. unsigned starpu_sched_ctx_get_hierarchy_level(unsigned sched_ctx_id)
  1490. {
  1491. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  1492. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1493. return sched_ctx->hierarchy_level;
  1494. }
  1495. void starpu_sched_ctx_finished_submit(unsigned sched_ctx_id)
  1496. {
  1497. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1498. STARPU_PTHREAD_MUTEX_LOCK(&finished_submit_mutex);
  1499. sched_ctx->finished_submit = 1;
  1500. STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
  1501. return;
  1502. }
  1503. #ifdef STARPU_USE_SC_HYPERVISOR
  1504. void _starpu_sched_ctx_post_exec_task_cb(int workerid, struct starpu_task *task, size_t data_size2, uint32_t footprint)
  1505. {
  1506. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  1507. if(sched_ctx != NULL && task->sched_ctx != _starpu_get_initial_sched_ctx()->id &&
  1508. task->sched_ctx != STARPU_NMAX_SCHED_CTXS && sched_ctx->perf_counters != NULL)
  1509. {
  1510. flops[task->sched_ctx][workerid] += task->flops;
  1511. data_size[task->sched_ctx][workerid] += data_size2;
  1512. if(_starpu_sched_ctx_allow_hypervisor(sched_ctx->id) || task->hypervisor_tag > 0)
  1513. {
  1514. _STARPU_TRACE_HYPERVISOR_BEGIN();
  1515. sched_ctx->perf_counters->notify_post_exec_task(task, data_size[task->sched_ctx][workerid], footprint,
  1516. task->hypervisor_tag, flops[task->sched_ctx][workerid]);
  1517. _STARPU_TRACE_HYPERVISOR_END();
  1518. flops[task->sched_ctx][workerid] = 0.0;
  1519. data_size[task->sched_ctx][workerid] = 0;
  1520. }
  1521. }
  1522. }
  1523. void starpu_sched_ctx_call_pushed_task_cb(int workerid, unsigned sched_ctx_id)
  1524. {
  1525. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1526. if(sched_ctx != NULL && sched_ctx_id != _starpu_get_initial_sched_ctx()->id && sched_ctx_id != STARPU_NMAX_SCHED_CTXS
  1527. && sched_ctx->perf_counters != NULL && _starpu_sched_ctx_allow_hypervisor(sched_ctx_id))
  1528. {
  1529. _STARPU_TRACE_HYPERVISOR_BEGIN();
  1530. sched_ctx->perf_counters->notify_pushed_task(sched_ctx_id, workerid);
  1531. _STARPU_TRACE_HYPERVISOR_END();
  1532. }
  1533. }
  1534. #endif //STARPU_USE_SC_HYPERVISOR
  1535. int starpu_sched_get_min_priority(void)
  1536. {
  1537. return starpu_sched_ctx_get_min_priority(_starpu_sched_ctx_get_current_context());
  1538. }
  1539. int starpu_sched_get_max_priority(void)
  1540. {
  1541. return starpu_sched_ctx_get_max_priority(_starpu_sched_ctx_get_current_context());
  1542. }
  1543. int starpu_sched_set_min_priority(int min_prio)
  1544. {
  1545. return starpu_sched_ctx_set_min_priority(_starpu_sched_ctx_get_current_context(), min_prio);
  1546. }
  1547. int starpu_sched_set_max_priority(int max_prio)
  1548. {
  1549. return starpu_sched_ctx_set_max_priority(_starpu_sched_ctx_get_current_context(), max_prio);
  1550. }
  1551. int starpu_sched_ctx_get_min_priority(unsigned sched_ctx_id)
  1552. {
  1553. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1554. return sched_ctx->min_priority;
  1555. }
  1556. int starpu_sched_ctx_get_max_priority(unsigned sched_ctx_id)
  1557. {
  1558. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1559. return sched_ctx->max_priority;
  1560. }
  1561. int starpu_sched_ctx_set_min_priority(unsigned sched_ctx_id, int min_prio)
  1562. {
  1563. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1564. sched_ctx->min_priority = min_prio;
  1565. return 0;
  1566. }
  1567. int starpu_sched_ctx_set_max_priority(unsigned sched_ctx_id, int max_prio)
  1568. {
  1569. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1570. sched_ctx->max_priority = max_prio;
  1571. return 0;
  1572. }
  1573. int starpu_sched_ctx_min_priority_is_set(unsigned sched_ctx_id)
  1574. {
  1575. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1576. return sched_ctx->min_priority_is_set;
  1577. }
  1578. int starpu_sched_ctx_max_priority_is_set(unsigned sched_ctx_id)
  1579. {
  1580. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1581. return sched_ctx->max_priority_is_set;
  1582. }
  1583. void starpu_sched_ctx_set_priority(int *workers, int nworkers, unsigned sched_ctx_id, unsigned priority)
  1584. {
  1585. if(nworkers != -1)
  1586. {
  1587. int w;
  1588. struct _starpu_worker *worker = NULL;
  1589. for(w = 0; w < nworkers; w++)
  1590. {
  1591. worker = _starpu_get_worker_struct(workers[w]);
  1592. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  1593. _starpu_sched_ctx_list_move(&worker->sched_ctx_list, sched_ctx_id, priority);
  1594. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  1595. }
  1596. }
  1597. return;
  1598. }
  1599. unsigned starpu_sched_ctx_get_priority(int workerid, unsigned sched_ctx_id)
  1600. {
  1601. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1602. return _starpu_sched_ctx_elt_get_priority(worker->sched_ctx_list, sched_ctx_id);
  1603. }
  1604. unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker)
  1605. {
  1606. struct _starpu_sched_ctx_elt *e = NULL;
  1607. struct _starpu_sched_ctx_list_iterator list_it;
  1608. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  1609. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  1610. {
  1611. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  1612. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  1613. unsigned last_worker_awake = 1;
  1614. struct starpu_worker_collection *workers = sched_ctx->workers;
  1615. struct starpu_sched_ctx_iterator it;
  1616. int workerid;
  1617. workers->init_iterator(workers, &it);
  1618. while(workers->has_next(workers, &it))
  1619. {
  1620. workerid = workers->get_next(workers, &it);
  1621. if(workerid != worker->workerid && _starpu_worker_get_status(workerid) != STATUS_SLEEPING)
  1622. {
  1623. last_worker_awake = 0;
  1624. break;
  1625. }
  1626. }
  1627. if(last_worker_awake)
  1628. return 1;
  1629. }
  1630. return 0;
  1631. }
  1632. void starpu_sched_ctx_bind_current_thread_to_cpuid(unsigned cpuid)
  1633. {
  1634. _starpu_bind_thread_on_cpu(_starpu_get_machine_config(), cpuid, STARPU_NOWORKERID);
  1635. }
  1636. unsigned starpu_sched_ctx_worker_is_master_for_child_ctx(int workerid, unsigned sched_ctx_id)
  1637. {
  1638. if (_starpu_get_nsched_ctxs() <= 1)
  1639. return STARPU_NMAX_SCHED_CTXS;
  1640. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1641. struct _starpu_sched_ctx_elt *e = NULL;
  1642. struct _starpu_sched_ctx_list_iterator list_it;
  1643. struct _starpu_sched_ctx *sched_ctx = NULL;
  1644. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  1645. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  1646. {
  1647. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  1648. sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  1649. if(sched_ctx-> main_master == workerid && sched_ctx->nesting_sched_ctx == sched_ctx_id)
  1650. return sched_ctx->id;
  1651. }
  1652. return STARPU_NMAX_SCHED_CTXS;
  1653. }
  1654. unsigned starpu_sched_ctx_master_get_context(int masterid)
  1655. {
  1656. struct _starpu_worker *worker = _starpu_get_worker_struct(masterid);
  1657. struct _starpu_sched_ctx_elt *e = NULL;
  1658. struct _starpu_sched_ctx_list_iterator list_it;
  1659. struct _starpu_sched_ctx *sched_ctx = NULL;
  1660. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  1661. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  1662. {
  1663. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  1664. sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  1665. if(sched_ctx->main_master == masterid)
  1666. return sched_ctx->id;
  1667. }
  1668. return STARPU_NMAX_SCHED_CTXS;
  1669. }
  1670. struct _starpu_sched_ctx *__starpu_sched_ctx_get_sched_ctx_for_worker_and_job(struct _starpu_worker *worker, struct _starpu_job *j)
  1671. {
  1672. struct _starpu_sched_ctx_elt *e = NULL;
  1673. struct _starpu_sched_ctx_list_iterator list_it;
  1674. struct _starpu_sched_ctx *sched_ctx = NULL;
  1675. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  1676. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  1677. {
  1678. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  1679. sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  1680. if (j->task->sched_ctx == sched_ctx->id)
  1681. return sched_ctx;
  1682. }
  1683. return NULL;
  1684. }
  1685. void starpu_sched_ctx_revert_task_counters(unsigned sched_ctx_id, double ready_flops)
  1686. {
  1687. _starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx_id);
  1688. _starpu_decrement_nready_tasks_of_sched_ctx(sched_ctx_id, ready_flops);
  1689. }
  1690. void starpu_sched_ctx_move_task_to_ctx(struct starpu_task *task, unsigned sched_ctx, unsigned manage_mutex)
  1691. {
  1692. /* TODO: make something cleaner which differentiates between calls
  1693. from push or pop (have mutex or not) and from another worker or not */
  1694. int workerid = starpu_worker_get_id();
  1695. struct _starpu_worker *worker = NULL;
  1696. if(workerid != -1 && manage_mutex)
  1697. {
  1698. worker = _starpu_get_worker_struct(workerid);
  1699. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  1700. }
  1701. task->sched_ctx = sched_ctx;
  1702. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  1703. _starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
  1704. _starpu_repush_task(j);
  1705. if(workerid != -1 && manage_mutex)
  1706. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  1707. }
  1708. void starpu_sched_ctx_list_task_counters_increment(unsigned sched_ctx_id, int workerid)
  1709. {
  1710. /* Note : often we don't have any sched_mutex taken here but we
  1711. should, so take it */
  1712. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1713. if (worker->nsched_ctxs > 1)
  1714. {
  1715. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  1716. _starpu_sched_ctx_list_push_event(worker->sched_ctx_list, sched_ctx_id);
  1717. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  1718. }
  1719. }
  1720. void starpu_sched_ctx_list_task_counters_decrement(unsigned sched_ctx_id, int workerid)
  1721. {
  1722. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1723. if (worker->nsched_ctxs > 1)
  1724. _starpu_sched_ctx_list_pop_event(worker->sched_ctx_list, sched_ctx_id);
  1725. }
  1726. void starpu_sched_ctx_list_task_counters_reset(unsigned sched_ctx_id, int workerid)
  1727. {
  1728. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1729. if (worker->nsched_ctxs > 1)
  1730. _starpu_sched_ctx_list_pop_all_event(worker->sched_ctx_list, sched_ctx_id);
  1731. }
  1732. void starpu_sched_ctx_list_task_counters_increment_all(struct starpu_task *task, unsigned sched_ctx_id)
  1733. {
  1734. /* Note that with 1 ctx we will default to the global context,
  1735. hence our counters are useless */
  1736. if (_starpu_get_nsched_ctxs() > 1)
  1737. {
  1738. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1739. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  1740. unsigned worker = 0;
  1741. struct starpu_sched_ctx_iterator it;
  1742. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  1743. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->sched_ctx_list_mutex);
  1744. while(workers->has_next(workers, &it))
  1745. {
  1746. worker = workers->get_next(workers, &it);
  1747. starpu_sched_ctx_list_task_counters_increment(sched_ctx_id, worker);
  1748. }
  1749. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->sched_ctx_list_mutex);
  1750. }
  1751. }
  1752. void starpu_sched_ctx_list_task_counters_decrement_all(struct starpu_task *task, unsigned sched_ctx_id)
  1753. {
  1754. if (_starpu_get_nsched_ctxs() > 1)
  1755. {
  1756. int curr_workerid = starpu_worker_get_id();
  1757. struct _starpu_worker *curr_worker_str, *worker_str;
  1758. if(curr_workerid != -1)
  1759. {
  1760. curr_worker_str = _starpu_get_worker_struct(curr_workerid);
  1761. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&curr_worker_str->sched_mutex);
  1762. }
  1763. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1764. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  1765. unsigned worker = 0;
  1766. struct starpu_sched_ctx_iterator it;
  1767. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  1768. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->sched_ctx_list_mutex);
  1769. while(workers->has_next(workers, &it))
  1770. {
  1771. worker = workers->get_next(workers, &it);
  1772. worker_str = _starpu_get_worker_struct(worker);
  1773. if (worker_str->nsched_ctxs > 1)
  1774. {
  1775. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker_str->sched_mutex);
  1776. starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, worker);
  1777. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker_str->sched_mutex);
  1778. }
  1779. }
  1780. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->sched_ctx_list_mutex);
  1781. if(curr_workerid != -1)
  1782. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&curr_worker_str->sched_mutex);
  1783. }
  1784. }
  1785. void starpu_sched_ctx_list_task_counters_reset_all(struct starpu_task *task, unsigned sched_ctx_id)
  1786. {
  1787. if (_starpu_get_nsched_ctxs() > 1)
  1788. {
  1789. int curr_workerid = starpu_worker_get_id();
  1790. struct _starpu_worker *curr_worker_str, *worker_str;
  1791. if(curr_workerid != -1)
  1792. {
  1793. curr_worker_str = _starpu_get_worker_struct(curr_workerid);
  1794. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&curr_worker_str->sched_mutex);
  1795. }
  1796. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1797. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  1798. unsigned worker = 0;
  1799. struct starpu_sched_ctx_iterator it;
  1800. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  1801. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->sched_ctx_list_mutex);
  1802. while(workers->has_next(workers, &it))
  1803. {
  1804. worker = workers->get_next(workers, &it);
  1805. worker_str = _starpu_get_worker_struct(worker);
  1806. if (worker_str->nsched_ctxs > 1)
  1807. {
  1808. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker_str->sched_mutex);
  1809. starpu_sched_ctx_list_task_counters_reset(sched_ctx_id, worker);
  1810. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker_str->sched_mutex);
  1811. }
  1812. }
  1813. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->sched_ctx_list_mutex);
  1814. if(curr_workerid != -1)
  1815. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&curr_worker_str->sched_mutex);
  1816. }
  1817. }
  1818. static unsigned _worker_sleeping_in_other_ctx(unsigned sched_ctx_id, int workerid)
  1819. {
  1820. int s;
  1821. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  1822. {
  1823. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(s);
  1824. if(sched_ctx && sched_ctx->id > 0 && sched_ctx->id < STARPU_NMAX_SCHED_CTXS && sched_ctx->id != sched_ctx_id)
  1825. {
  1826. if(sched_ctx->parallel_sect[workerid])
  1827. return 1;
  1828. }
  1829. }
  1830. return 0;
  1831. }
  1832. static void _starpu_sched_ctx_get_workers_to_sleep(unsigned sched_ctx_id, int *workerids, int nworkers, int master)
  1833. {
  1834. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1835. int current_worker_id = starpu_worker_get_id();
  1836. unsigned sleeping[nworkers];
  1837. int w;
  1838. for(w = 0; w < nworkers; w++)
  1839. {
  1840. if(current_worker_id == -1 || workerids[w] != current_worker_id)
  1841. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerids[w]]);
  1842. sleeping[w] = _worker_sleeping_in_other_ctx(sched_ctx_id, workerids[w]);
  1843. sched_ctx->master[workerids[w]] = master;
  1844. sched_ctx->parallel_sect[workerids[w]] = 1;
  1845. if(current_worker_id == -1 || workerids[w] != current_worker_id)
  1846. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerids[w]]);
  1847. #ifndef STARPU_NON_BLOCKING_DRIVERS
  1848. starpu_wake_worker(workerids[w]);
  1849. #endif
  1850. }
  1851. int workerid;
  1852. for(w = 0; w < nworkers; w++)
  1853. {
  1854. workerid = workerids[w];
  1855. if((current_worker_id == -1 || workerid != current_worker_id) && !sleeping[w])
  1856. {
  1857. sem_wait(&sched_ctx->fall_asleep_sem[master]);
  1858. }
  1859. }
  1860. return;
  1861. }
  1862. void _starpu_sched_ctx_signal_worker_blocked(unsigned sched_ctx_id, int workerid)
  1863. {
  1864. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1865. worker->blocked = 1;
  1866. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1867. sched_ctx->sleeping[workerid] = 1;
  1868. int master = sched_ctx->master[workerid];
  1869. sem_post(&sched_ctx->fall_asleep_sem[master]);
  1870. return;
  1871. }
  1872. void _starpu_sched_ctx_signal_worker_woke_up(unsigned sched_ctx_id, int workerid)
  1873. {
  1874. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1875. int master = sched_ctx->master[workerid];
  1876. sem_post(&sched_ctx->wake_up_sem[master]);
  1877. sched_ctx->sleeping[workerid] = 0;
  1878. sched_ctx->master[workerid] = -1;
  1879. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1880. worker->blocked = 0;
  1881. return;
  1882. }
  1883. static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, int master)
  1884. {
  1885. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1886. int current_worker_id = starpu_worker_get_id();
  1887. struct starpu_worker_collection *workers = sched_ctx->workers;
  1888. struct starpu_sched_ctx_iterator it;
  1889. workers->init_iterator(workers, &it);
  1890. while(workers->has_next(workers, &it))
  1891. {
  1892. int workerid = workers->get_next(workers, &it);
  1893. int curr_master = sched_ctx->master[workerid];
  1894. if(curr_master == master && sched_ctx->parallel_sect[workerid])
  1895. {
  1896. if((current_worker_id == -1 || workerid != current_worker_id) && sched_ctx->sleeping[workerid])
  1897. {
  1898. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
  1899. STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond[workerid]);
  1900. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
  1901. sem_wait(&sched_ctx->wake_up_sem[master]);
  1902. }
  1903. else
  1904. sched_ctx->parallel_sect[workerid] = 0;
  1905. }
  1906. }
  1907. return;
  1908. }
  1909. void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void*), void* param, unsigned sched_ctx_id)
  1910. {
  1911. int *workerids;
  1912. int nworkers = starpu_sched_ctx_get_workers_list(sched_ctx_id, &workerids);
  1913. _starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, workerids, nworkers, workerids[nworkers-1]);
  1914. /* execute parallel code */
  1915. void* ret = func(param);
  1916. /* wake up starpu workers */
  1917. _starpu_sched_ctx_wake_up_workers(sched_ctx_id, workerids[nworkers-1]);
  1918. return ret;
  1919. }
  1920. void starpu_sched_ctx_get_available_cpuids(unsigned sched_ctx_id, int **cpuids, int *ncpuids)
  1921. {
  1922. int current_worker_id = starpu_worker_get_id();
  1923. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1924. struct starpu_worker_collection *workers = sched_ctx->workers;
  1925. (*cpuids) = (int*)malloc(workers->nworkers*sizeof(int));
  1926. int w = 0;
  1927. struct starpu_sched_ctx_iterator it;
  1928. int workerid;
  1929. workers->init_iterator(workers, &it);
  1930. while(workers->has_next(workers, &it))
  1931. {
  1932. workerid = workers->get_next(workers, &it);
  1933. int master = sched_ctx->master[workerid];
  1934. if(master == current_worker_id || workerid == current_worker_id || current_worker_id == -1)
  1935. {
  1936. (*cpuids)[w++] = starpu_worker_get_bindid(workerid);
  1937. }
  1938. }
  1939. *ncpuids = w;
  1940. return;
  1941. }
  1942. static void _starpu_sched_ctx_wake_these_workers_up(unsigned sched_ctx_id, int *workerids, int nworkers)
  1943. {
  1944. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1945. int current_worker_id = starpu_worker_get_id();
  1946. int masters[nworkers];
  1947. int w;
  1948. for(w = 0; w < nworkers; w++)
  1949. {
  1950. int workerid = workerids[w];
  1951. masters[w] = sched_ctx->master[workerid];
  1952. if(current_worker_id == -1 || workerid != current_worker_id)
  1953. {
  1954. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
  1955. STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond[workerid]);
  1956. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
  1957. }
  1958. else
  1959. sched_ctx->parallel_sect[workerid] = 0;
  1960. sched_ctx->master[workerid] = -1;
  1961. }
  1962. int workerid;
  1963. for(w = 0; w < nworkers; w++)
  1964. {
  1965. workerid = workerids[w];
  1966. if(masters[w] != -1)
  1967. {
  1968. int master = sched_ctx->master[workerid];
  1969. if(current_worker_id == -1 || workerid != current_worker_id)
  1970. sem_wait(&sched_ctx->wake_up_sem[master]);
  1971. }
  1972. }
  1973. return;
  1974. }
  1975. static int _starpu_sched_ctx_find_master(unsigned sched_ctx_id, int *workerids, int nworkers)
  1976. {
  1977. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1978. int new_master = workerids[nworkers-1];
  1979. int current_worker_id = starpu_worker_get_id();
  1980. int current_is_in_section = 0;
  1981. int npotential_masters = 0;
  1982. int nawake_workers = 0;
  1983. int ntrue_masters = 0;
  1984. int potential_masters[nworkers];
  1985. int awake_workers[nworkers];
  1986. int true_masters[nworkers];
  1987. int i,w;
  1988. for(w = 0 ; w < nworkers ; w++)
  1989. {
  1990. if (current_worker_id == workerids[w])
  1991. current_is_in_section = 1;
  1992. int master = sched_ctx->master[workerids[w]];
  1993. if (master > -1)
  1994. {
  1995. int already_seen = 0;
  1996. //Could create a function for this. Basically searching an element in an array.
  1997. for (i = 0 ; i < npotential_masters; i++)
  1998. {
  1999. if (potential_masters[i] == master)
  2000. {
  2001. already_seen = 1;
  2002. break;
  2003. }
  2004. }
  2005. if (!already_seen)
  2006. potential_masters[npotential_masters++] = master;
  2007. }
  2008. else if (master == -1)
  2009. awake_workers[nawake_workers++] = workerids[w];
  2010. }
  2011. for (i = 0 ; i < npotential_masters ; i++)
  2012. {
  2013. int master_is_in_section = 0;
  2014. //Could create a function for this. Basically searching an element in an array.
  2015. for (w = 0 ; w < nworkers ; w++)
  2016. {
  2017. if (workerids[w] == potential_masters[i])
  2018. {
  2019. master_is_in_section = 1;
  2020. break;
  2021. }
  2022. }
  2023. if (master_is_in_section)
  2024. true_masters[ntrue_masters++] = potential_masters[i];
  2025. }
  2026. if (current_is_in_section)
  2027. new_master = current_worker_id;
  2028. else
  2029. {
  2030. if (ntrue_masters > 1)
  2031. {
  2032. if (nawake_workers > 0)
  2033. new_master = awake_workers[nawake_workers - 1];
  2034. else
  2035. new_master = true_masters[ntrue_masters - 1];
  2036. }
  2037. }
  2038. return new_master;
  2039. }
  2040. static void _starpu_sched_ctx_add_workers_to_master(unsigned sched_ctx_id, int *workerids, int nworkers, int new_master)
  2041. {
  2042. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2043. int w;
  2044. int nput_to_sleep = 0;
  2045. int nwake_up = 0;
  2046. int put_to_sleep[nworkers];
  2047. int wake_up[nworkers];
  2048. for(w = 0 ; w < nworkers ; w++)
  2049. {
  2050. int master = sched_ctx->master[workerids[w]];
  2051. if (master == -1 && workerids[w] != new_master)
  2052. put_to_sleep[nput_to_sleep++] = workerids[w];
  2053. else if(master != -1 && workerids[w] == new_master)
  2054. wake_up[nwake_up++] = workerids[w];
  2055. }
  2056. if(nwake_up > 0)
  2057. _starpu_sched_ctx_wake_these_workers_up(sched_ctx_id, wake_up, nwake_up);
  2058. if(nput_to_sleep > 0)
  2059. _starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, put_to_sleep, nput_to_sleep, new_master);
  2060. }
  2061. static void _starpu_sched_ctx_set_master(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers, int master)
  2062. {
  2063. int i;
  2064. for(i = 0; i < nworkers; i++)
  2065. {
  2066. if(workerids[i] != master)
  2067. sched_ctx->master[workerids[i]] = master;
  2068. }
  2069. }
  2070. int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers)
  2071. {
  2072. int new_master = _starpu_sched_ctx_find_master(sched_ctx_id, workerids, nworkers);
  2073. _starpu_sched_ctx_add_workers_to_master(sched_ctx_id, workerids, nworkers, new_master);
  2074. return new_master;
  2075. }
  2076. void starpu_sched_ctx_unbook_workers_for_task(unsigned sched_ctx_id, int master)
  2077. {
  2078. /* wake up starpu workers */
  2079. _starpu_sched_ctx_wake_up_workers(sched_ctx_id, master);
  2080. }
  2081. struct starpu_perfmodel_arch * _starpu_sched_ctx_get_perf_archtype(unsigned sched_ctx_id)
  2082. {
  2083. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2084. return &sched_ctx->perf_arch;
  2085. }
  2086. int starpu_sched_ctx_get_worker_rank(unsigned sched_ctx_id)
  2087. {
  2088. int idx = 0;
  2089. int curr_workerid = starpu_worker_get_id();
  2090. int worker;
  2091. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2092. if(sched_ctx->sched_policy || !sched_ctx->awake_workers)
  2093. return -1;
  2094. struct starpu_worker_collection *workers = sched_ctx->workers;
  2095. struct starpu_sched_ctx_iterator it;
  2096. workers->init_iterator(workers, &it);
  2097. while(workers->has_next(workers, &it))
  2098. {
  2099. worker = workers->get_next(workers, &it);
  2100. if(worker == curr_workerid)
  2101. return idx;
  2102. idx++;
  2103. }
  2104. return -1;
  2105. }
  2106. void (*starpu_sched_ctx_get_sched_policy_init(unsigned sched_ctx_id))(void)
  2107. {
  2108. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2109. return sched_ctx->init_sched;
  2110. }
  2111. unsigned starpu_sched_ctx_has_starpu_scheduler(unsigned sched_ctx_id, unsigned *awake_workers)
  2112. {
  2113. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2114. *awake_workers = sched_ctx->awake_workers;
  2115. return sched_ctx->sched_policy != NULL;
  2116. }