sched_ctx.c 92 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011-2018 Inria
  4. * Copyright (C) 2017 Arthur Chevalier
  5. * Copyright (C) 2012-2019 CNRS
  6. * Copyright (C) 2012-2019 Université de Bordeaux
  7. * Copyright (C) 2016 Uppsala University
  8. *
  9. * StarPU is free software; you can redistribute it and/or modify
  10. * it under the terms of the GNU Lesser General Public License as published by
  11. * the Free Software Foundation; either version 2.1 of the License, or (at
  12. * your option) any later version.
  13. *
  14. * StarPU is distributed in the hope that it will be useful, but
  15. * WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  17. *
  18. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  19. */
  20. #include <core/sched_policy.h>
  21. #include <core/sched_ctx.h>
  22. #include <common/utils.h>
  23. #include <stdarg.h>
  24. #include <core/task.h>
  25. enum _starpu_ctx_change_op
  26. {
  27. ctx_change_invalid = 0,
  28. ctx_change_add = 1,
  29. ctx_change_remove = 2
  30. };
  31. static starpu_pthread_mutex_t sched_ctx_manag = STARPU_PTHREAD_MUTEX_INITIALIZER;
  32. static starpu_pthread_mutex_t finished_submit_mutex = STARPU_PTHREAD_MUTEX_INITIALIZER;
  33. static struct starpu_task stop_submission_task = STARPU_TASK_INITIALIZER;
  34. starpu_pthread_key_t sched_ctx_key;
  35. static unsigned with_hypervisor = 0;
  36. static double hyp_start_sample[STARPU_NMAX_SCHED_CTXS];
  37. static double hyp_start_allow_sample[STARPU_NMAX_SCHED_CTXS];
  38. static double flops[STARPU_NMAX_SCHED_CTXS][STARPU_NMAXWORKERS];
  39. static size_t data_size[STARPU_NMAX_SCHED_CTXS][STARPU_NMAXWORKERS];
  40. static double hyp_actual_start_sample[STARPU_NMAX_SCHED_CTXS];
  41. static double window_size;
  42. static int nobind;
  43. static int occupied_sms = 0;
  44. static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config);
  45. static void _starpu_sched_ctx_put_new_master(unsigned sched_ctx_id);
  46. static void _starpu_sched_ctx_block_workers_in_parallel(unsigned sched_ctx_id, unsigned all);
  47. static void _starpu_sched_ctx_unblock_workers_in_parallel(unsigned sched_ctx_id, unsigned all);
  48. static void _starpu_sched_ctx_update_parallel_workers_with(unsigned sched_ctx_id);
  49. static void _starpu_sched_ctx_update_parallel_workers_without(unsigned sched_ctx_id);
  50. static void set_priority_on_notified_workers(int *workers, int nworkers, unsigned sched_ctx_id, unsigned priority);
  51. static void set_priority_hierarchically_on_notified_workers(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority);
  52. static void fetch_tasks_from_empty_ctx_list(struct _starpu_sched_ctx *sched_ctx);
  53. static void add_notified_workers(int *workers_to_add, int nworkers_to_add, unsigned sched_ctx_id);
  54. /* reused from combined_workers.c */
  55. static int compar_int(const void *pa, const void *pb)
  56. {
  57. int a = *((int *)pa);
  58. int b = *((int *)pb);
  59. return a - b;
  60. }
  61. /* reused from combined_workers.c */
  62. static void sort_workerid_array(int nworkers, int workerid_array[])
  63. {
  64. qsort(workerid_array, nworkers, sizeof(int), compar_int);
  65. }
  66. /* notify workers that a ctx change operation is about to proceed.
  67. *
  68. * workerids must be sorted by ascending id
  69. *
  70. * Once this function returns, the notified workers must not start a new
  71. * scheduling operation until they are notified that the ctx change op is
  72. * done.
  73. */
  74. static void notify_workers_about_changing_ctx_pending(const unsigned nworkers, const int * const workerids)
  75. {
  76. STARPU_ASSERT(!_starpu_worker_sched_op_pending());
  77. const int cur_workerid = _starpu_worker_get_id();
  78. unsigned i;
  79. for (i=0; i<nworkers; i++)
  80. {
  81. /* check that workerids[] is sorted to prevent multi-lock acquisition deadlocks */
  82. STARPU_ASSERT(i == 0 || (workerids[i] > workerids[i-1]));
  83. if (starpu_worker_is_combined_worker(workerids[i]))
  84. continue;
  85. if (workerids[i] == cur_workerid)
  86. continue;
  87. struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
  88. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  89. _starpu_worker_enter_changing_ctx_op(worker);
  90. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  91. }
  92. }
  93. /* notify workers that a ctx change operation is complete.
  94. *
  95. * workerids must be sorted by ascending id
  96. *
  97. * Once this function returns, the workers may proceed with scheduling operations again.
  98. */
  99. static void notify_workers_about_changing_ctx_done(const unsigned nworkers, const int * const workerids)
  100. {
  101. STARPU_ASSERT(!_starpu_worker_sched_op_pending());
  102. const int cur_workerid = _starpu_worker_get_id();
  103. unsigned i;
  104. for (i=0; i<nworkers; i++)
  105. {
  106. /* check that workerids[] is sorted to prevent multi-lock acquisition deadlocks */
  107. STARPU_ASSERT(i == 0 || (workerids[i] > workerids[i-1]));
  108. if (starpu_worker_is_combined_worker(workerids[i]))
  109. continue;
  110. if (workerids[i] == cur_workerid)
  111. continue;
  112. struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
  113. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  114. _starpu_worker_leave_changing_ctx_op(worker);
  115. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  116. }
  117. }
  118. static void _starpu_worker_gets_into_ctx(unsigned sched_ctx_id, struct _starpu_worker *worker)
  119. {
  120. unsigned ret_sched_ctx = _starpu_sched_ctx_elt_exists(worker->sched_ctx_list, sched_ctx_id);
  121. /* the worker was planning to go away in another ctx but finally he changed his mind &
  122. he's staying */
  123. if (!ret_sched_ctx)
  124. {
  125. /* add context to worker */
  126. _starpu_sched_ctx_list_add(&worker->sched_ctx_list, sched_ctx_id);
  127. worker->nsched_ctxs++;
  128. }
  129. worker->removed_from_ctx[sched_ctx_id] = 0;
  130. if(worker->tmp_sched_ctx == (int) sched_ctx_id)
  131. worker->tmp_sched_ctx = -1;
  132. return;
  133. }
  134. void _starpu_worker_gets_out_of_ctx(unsigned sched_ctx_id, struct _starpu_worker *worker)
  135. {
  136. unsigned ret_sched_ctx = _starpu_sched_ctx_elt_exists(worker->sched_ctx_list, sched_ctx_id);
  137. /* remove context from worker */
  138. if(ret_sched_ctx)
  139. {
  140. /* don't remove scheduling data here, there might be tasks running and when post_exec
  141. executes scheduling data is not there any more, do it when deleting context, then
  142. we really won't need it anymore */
  143. /* struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id); */
  144. /* if(sched_ctx && sched_ctx->sched_policy && sched_ctx->sched_policy->remove_workers) */
  145. /* { */
  146. /* _STARPU_SCHED_BEGIN; */
  147. /* sched_ctx->sched_policy->remove_workers(sched_ctx_id, &worker->workerid, 1); */
  148. /* _STARPU_SCHED_END; */
  149. /* } */
  150. if (!_starpu_sched_ctx_list_remove(&worker->sched_ctx_list, sched_ctx_id))
  151. worker->nsched_ctxs--;
  152. }
  153. return;
  154. }
  155. #if 0
  156. static void _starpu_update_workers_with_ctx(int *workerids, int nworkers, int sched_ctx_id)
  157. {
  158. int i;
  159. struct _starpu_worker *worker = NULL;
  160. for(i = 0; i < nworkers; i++)
  161. {
  162. worker = _starpu_get_worker_struct(workerids[i]);
  163. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  164. _starpu_worker_gets_into_ctx(sched_ctx_id, worker);
  165. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  166. }
  167. return;
  168. }
  169. #endif
  170. static void _starpu_update_notified_workers_with_ctx(int *workerids, int nworkers, int sched_ctx_id)
  171. {
  172. int i;
  173. for(i = 0; i < nworkers; i++)
  174. {
  175. struct _starpu_worker *worker;
  176. worker = _starpu_get_worker_struct(workerids[i]);
  177. _starpu_worker_gets_into_ctx(sched_ctx_id, worker);
  178. }
  179. return;
  180. }
  181. #if 0
  182. static void _starpu_update_workers_without_ctx(int *workerids, int nworkers, int sched_ctx_id, unsigned now)
  183. {
  184. int i;
  185. struct _starpu_worker *worker = NULL;
  186. for(i = 0; i < nworkers; i++)
  187. {
  188. worker = _starpu_get_worker_struct(workerids[i]);
  189. if(now)
  190. {
  191. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  192. _starpu_worker_gets_out_of_ctx(sched_ctx_id, worker);
  193. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  194. }
  195. else
  196. {
  197. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  198. worker->removed_from_ctx[sched_ctx_id] = 1;
  199. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  200. }
  201. }
  202. return;
  203. }
  204. #endif
  205. static void _starpu_update_notified_workers_without_ctx(int *workerids, int nworkers, int sched_ctx_id, unsigned now)
  206. {
  207. int i;
  208. for(i = 0; i < nworkers; i++)
  209. {
  210. struct _starpu_worker *worker;
  211. worker = _starpu_get_worker_struct(workerids[i]);
  212. if(now)
  213. {
  214. _starpu_worker_gets_out_of_ctx(sched_ctx_id, worker);
  215. }
  216. else
  217. {
  218. worker->removed_from_ctx[sched_ctx_id] = 1;
  219. }
  220. }
  221. return;
  222. }
  223. void starpu_sched_ctx_stop_task_submission()
  224. {
  225. _starpu_exclude_task_from_dag(&stop_submission_task);
  226. int ret = _starpu_task_submit_internally(&stop_submission_task);
  227. STARPU_ASSERT(!ret);
  228. }
  229. /* must be called with sched_mutex locked */
  230. void starpu_sched_ctx_worker_shares_tasks_lists(int workerid, int sched_ctx_id)
  231. {
  232. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  233. worker->shares_tasks_lists[sched_ctx_id] = 1;
  234. }
  235. static void _do_add_notified_workers(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers)
  236. {
  237. int ndevices = 0;
  238. struct starpu_perfmodel_device devices[nworkers];
  239. int i = 0;
  240. for(i = 0; i < nworkers; i++)
  241. {
  242. int workerid = workerids[i];
  243. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  244. int dev1, dev2;
  245. unsigned found = 0;
  246. for(dev1 = 0; dev1 < worker->perf_arch.ndevices; dev1++)
  247. {
  248. for(dev2 = 0; dev2 < ndevices; dev2++)
  249. {
  250. if(devices[dev2].type == worker->perf_arch.devices[dev1].type &&
  251. devices[dev2].devid == worker->perf_arch.devices[dev1].devid)
  252. {
  253. devices[dev2].ncores += worker->perf_arch.devices[dev1].ncores;
  254. found = 1;
  255. break;
  256. }
  257. }
  258. if(!found)
  259. {
  260. devices[ndevices].type = worker->perf_arch.devices[dev1].type;
  261. devices[ndevices].devid = worker->perf_arch.devices[dev1].devid;
  262. devices[ndevices].ncores = worker->perf_arch.devices[dev1].ncores;
  263. ndevices++;
  264. }
  265. else
  266. found = 0;
  267. }
  268. }
  269. if(ndevices > 0)
  270. {
  271. if(sched_ctx->perf_arch.devices == NULL)
  272. {
  273. _STARPU_MALLOC(sched_ctx->perf_arch.devices, ndevices*sizeof(struct starpu_perfmodel_device));
  274. }
  275. else
  276. {
  277. int nfinal_devices = 0;
  278. int dev1, dev2;
  279. unsigned found = 0;
  280. for(dev1 = 0; dev1 < ndevices; dev1++)
  281. {
  282. for(dev2 = 0; dev2 < sched_ctx->perf_arch.ndevices; dev2++)
  283. {
  284. if(sched_ctx->perf_arch.devices[dev2].type == devices[dev1].type && sched_ctx->perf_arch.devices[dev2].devid == devices[dev1].devid)
  285. found = 1;
  286. }
  287. if(!found)
  288. {
  289. nfinal_devices++;
  290. }
  291. else
  292. found = 0;
  293. }
  294. int nsize = (sched_ctx->perf_arch.ndevices+nfinal_devices);
  295. _STARPU_REALLOC(sched_ctx->perf_arch.devices, nsize*sizeof(struct starpu_perfmodel_device));
  296. }
  297. int dev1, dev2;
  298. unsigned found = 0;
  299. for(dev1 = 0; dev1 < ndevices; dev1++)
  300. {
  301. for(dev2 = 0; dev2 < sched_ctx->perf_arch.ndevices; dev2++)
  302. {
  303. if(sched_ctx->perf_arch.devices[dev2].type == devices[dev1].type && sched_ctx->perf_arch.devices[dev2].devid == devices[dev1].devid)
  304. {
  305. if(sched_ctx->perf_arch.devices[dev2].type == STARPU_CPU_WORKER)
  306. sched_ctx->perf_arch.devices[dev2].ncores += devices[dev1].ncores;
  307. found = 1;
  308. }
  309. }
  310. if(!found)
  311. {
  312. sched_ctx->perf_arch.devices[sched_ctx->perf_arch.ndevices].type = devices[dev1].type;
  313. sched_ctx->perf_arch.devices[sched_ctx->perf_arch.ndevices].devid = devices[dev1].devid;
  314. if (sched_ctx->stream_worker != -1)
  315. sched_ctx->perf_arch.devices[sched_ctx->perf_arch.ndevices].ncores = sched_ctx->nsms;
  316. else
  317. sched_ctx->perf_arch.devices[sched_ctx->perf_arch.ndevices].ncores = devices[dev1].ncores;
  318. sched_ctx->perf_arch.ndevices++;
  319. }
  320. else
  321. found = 0;
  322. }
  323. }
  324. _starpu_sched_ctx_update_parallel_workers_with(sched_ctx->id);
  325. }
  326. static void _starpu_add_workers_to_new_sched_ctx(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers)
  327. {
  328. struct starpu_worker_collection *workers = sched_ctx->workers;
  329. struct _starpu_machine_config *config = _starpu_get_machine_config();
  330. if (nworkers == -1)
  331. nworkers = config->topology.nworkers;
  332. if (!nworkers)
  333. return;
  334. int _workerids[nworkers];
  335. int i;
  336. if (workerids == NULL)
  337. {
  338. for(i = 0; i < nworkers; i++)
  339. _workerids[i] = i;
  340. workerids = _workerids;
  341. }
  342. for(i = 0; i < nworkers; i++)
  343. {
  344. int workerid = workerids[i];
  345. {
  346. int _workerid = workers->add(workers, workerid);
  347. STARPU_ASSERT(_workerid >= 0);
  348. (void)_workerid;
  349. }
  350. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  351. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  352. worker->tmp_sched_ctx = (int)sched_ctx->id;
  353. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  354. }
  355. sort_workerid_array(nworkers, workerids);
  356. notify_workers_about_changing_ctx_pending(nworkers, workerids);
  357. _do_add_notified_workers(sched_ctx, workerids, nworkers);
  358. if(sched_ctx->sched_policy && sched_ctx->sched_policy->add_workers)
  359. {
  360. _STARPU_SCHED_BEGIN;
  361. sched_ctx->sched_policy->add_workers(sched_ctx->id, workerids, nworkers);
  362. _STARPU_SCHED_END;
  363. }
  364. notify_workers_about_changing_ctx_done(nworkers, workerids);
  365. }
  366. static void _starpu_remove_workers_from_sched_ctx(struct _starpu_sched_ctx *sched_ctx, int *workerids,
  367. int nworkers, int *removed_workers, int *n_removed_workers)
  368. {
  369. struct starpu_worker_collection *workers = sched_ctx->workers;
  370. struct starpu_perfmodel_device devices[workers->nworkers];
  371. int ndevices = 0;
  372. int i = 0;
  373. for(i = 0; i < nworkers; i++)
  374. {
  375. if(workers->nworkers > 0)
  376. {
  377. if(_starpu_worker_belongs_to_a_sched_ctx(workerids[i], sched_ctx->id))
  378. {
  379. int worker = workers->remove(workers, workerids[i]);
  380. if(worker >= 0)
  381. removed_workers[(*n_removed_workers)++] = worker;
  382. }
  383. }
  384. }
  385. unsigned found = 0;
  386. int dev;
  387. struct starpu_sched_ctx_iterator it;
  388. if(workers->init_iterator)
  389. workers->init_iterator(workers, &it);
  390. while(workers->has_next(workers, &it))
  391. {
  392. int worker = workers->get_next(workers, &it);
  393. struct _starpu_worker *str_worker = _starpu_get_worker_struct(worker);
  394. for(dev = 0; dev < str_worker->perf_arch.ndevices; dev++)
  395. {
  396. int dev2;
  397. for(dev2 = 0; dev2 < ndevices; dev2++)
  398. {
  399. if(devices[dev2].type == str_worker->perf_arch.devices[dev].type &&
  400. devices[dev2].devid == str_worker->perf_arch.devices[dev].devid)
  401. {
  402. if(devices[dev2].type == STARPU_CPU_WORKER)
  403. devices[dev2].ncores += str_worker->perf_arch.devices[dev].ncores;
  404. }
  405. found = 1;
  406. }
  407. if(!found)
  408. {
  409. devices[ndevices].type = str_worker->perf_arch.devices[dev].type;
  410. devices[ndevices].devid = str_worker->perf_arch.devices[dev].devid;
  411. devices[ndevices].ncores = str_worker->perf_arch.devices[dev].ncores;
  412. ndevices++;
  413. }
  414. else
  415. found = 0;
  416. }
  417. found = 0;
  418. }
  419. sched_ctx->perf_arch.ndevices = ndevices;
  420. for(dev = 0; dev < ndevices; dev++)
  421. {
  422. sched_ctx->perf_arch.devices[dev].type = devices[dev].type;
  423. sched_ctx->perf_arch.devices[dev].devid = devices[dev].devid;
  424. sched_ctx->perf_arch.devices[dev].ncores = devices[dev].ncores;
  425. }
  426. _starpu_sched_ctx_update_parallel_workers_without(sched_ctx->id);
  427. return;
  428. }
  429. static void _starpu_sched_ctx_free_scheduling_data(struct _starpu_sched_ctx *sched_ctx)
  430. {
  431. if(sched_ctx->sched_policy && sched_ctx->sched_policy->remove_workers)
  432. {
  433. int *workerids = NULL;
  434. unsigned nworkers_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &workerids);
  435. if(nworkers_ctx > 0)
  436. {
  437. _STARPU_SCHED_BEGIN;
  438. sched_ctx->sched_policy->remove_workers(sched_ctx->id, workerids, nworkers_ctx);
  439. _STARPU_SCHED_END;
  440. }
  441. free(workerids);
  442. }
  443. return;
  444. }
  445. #ifdef STARPU_HAVE_HWLOC
  446. static void _starpu_sched_ctx_create_hwloc_tree(struct _starpu_sched_ctx *sched_ctx)
  447. {
  448. sched_ctx->hwloc_workers_set = hwloc_bitmap_alloc();
  449. struct starpu_worker_collection *workers = sched_ctx->workers;
  450. struct _starpu_worker *worker;
  451. struct starpu_sched_ctx_iterator it;
  452. workers->init_iterator(workers, &it);
  453. while(workers->has_next(workers, &it))
  454. {
  455. unsigned workerid = workers->get_next(workers, &it);
  456. if(!starpu_worker_is_combined_worker(workerid))
  457. {
  458. worker = _starpu_get_worker_struct(workerid);
  459. hwloc_bitmap_or(sched_ctx->hwloc_workers_set,
  460. sched_ctx->hwloc_workers_set,
  461. worker->hwloc_cpu_set);
  462. }
  463. }
  464. return;
  465. }
  466. #endif
  467. /* Must be called with sched_ctx_manag mutex held */
  468. struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *policy, int *workerids,
  469. int nworkers_ctx, unsigned is_initial_sched,
  470. const char *sched_ctx_name,
  471. int min_prio_set, int min_prio,
  472. int max_prio_set, int max_prio,
  473. unsigned awake_workers,
  474. void (*sched_policy_init)(unsigned),
  475. void * user_data,
  476. int nsub_ctxs, int *sub_ctxs, int nsms)
  477. {
  478. struct _starpu_machine_config *config = _starpu_get_machine_config();
  479. STARPU_ASSERT(config->topology.nsched_ctxs < STARPU_NMAX_SCHED_CTXS);
  480. unsigned id = _starpu_get_first_free_sched_ctx(config);
  481. struct _starpu_sched_ctx *sched_ctx = &config->sched_ctxs[id];
  482. STARPU_ASSERT(sched_ctx->do_schedule == 0);
  483. sched_ctx->id = id;
  484. int nworkers = config->topology.nworkers;
  485. int i;
  486. STARPU_ASSERT(nworkers_ctx <= nworkers);
  487. starpu_task_list_init(&sched_ctx->empty_ctx_tasks);
  488. starpu_task_list_init(&sched_ctx->waiting_tasks);
  489. if (policy)
  490. {
  491. _STARPU_MALLOC(sched_ctx->sched_policy, sizeof(struct starpu_sched_policy));
  492. }
  493. else
  494. {
  495. sched_ctx->sched_policy = NULL;
  496. }
  497. sched_ctx->is_initial_sched = is_initial_sched;
  498. sched_ctx->name = sched_ctx_name;
  499. sched_ctx->inheritor = STARPU_NMAX_SCHED_CTXS;
  500. sched_ctx->finished_submit = 0;
  501. sched_ctx->min_priority_is_set = min_prio_set;
  502. if (sched_ctx->min_priority_is_set)
  503. sched_ctx->min_priority = min_prio;
  504. else
  505. sched_ctx->min_priority = 0;
  506. sched_ctx->max_priority_is_set = max_prio_set;
  507. if (sched_ctx->max_priority_is_set)
  508. sched_ctx->max_priority = max_prio;
  509. else
  510. sched_ctx->max_priority = 0;
  511. _starpu_barrier_counter_init(&sched_ctx->tasks_barrier, 0);
  512. _starpu_barrier_counter_init(&sched_ctx->ready_tasks_barrier, 0);
  513. sched_ctx->ready_flops = 0.0;
  514. for (i = 0; i < (int) (sizeof(sched_ctx->iterations)/sizeof(sched_ctx->iterations[0])); i++)
  515. sched_ctx->iterations[i] = -1;
  516. sched_ctx->iteration_level = 0;
  517. sched_ctx->main_master = -1;
  518. sched_ctx->perf_arch.devices = NULL;
  519. sched_ctx->perf_arch.ndevices = 0;
  520. sched_ctx->init_sched = sched_policy_init;
  521. sched_ctx->user_data = user_data;
  522. sched_ctx->sms_start_idx = 0;
  523. sched_ctx->sms_end_idx = STARPU_NMAXSMS;
  524. sched_ctx->nsms = nsms;
  525. sched_ctx->stream_worker = -1;
  526. memset(&sched_ctx->lock_write_owner, 0, sizeof(sched_ctx->lock_write_owner));
  527. STARPU_PTHREAD_RWLOCK_INIT(&sched_ctx->rwlock, NULL);
  528. if(nsms > 0)
  529. {
  530. STARPU_ASSERT_MSG(workerids, "workerids is needed when setting nsms");
  531. sched_ctx->sms_start_idx = occupied_sms;
  532. sched_ctx->sms_end_idx = occupied_sms+nsms;
  533. occupied_sms += nsms;
  534. _STARPU_DEBUG("ctx %u: stream worker %d nsms %d ocupied sms %d\n", sched_ctx->id, workerids[0], nsms, occupied_sms);
  535. STARPU_ASSERT_MSG(occupied_sms <= STARPU_NMAXSMS , "STARPU:requested more sms than available");
  536. _starpu_worker_set_stream_ctx(workerids[0], sched_ctx);
  537. sched_ctx->stream_worker = workerids[0];
  538. }
  539. sched_ctx->nsub_ctxs = 0;
  540. sched_ctx->parallel_view = 0;
  541. /*init the strategy structs and the worker_collection of the ressources of the context */
  542. if(policy)
  543. {
  544. _starpu_init_sched_policy(config, sched_ctx, policy);
  545. sched_ctx->awake_workers = 1;
  546. }
  547. else
  548. {
  549. sched_ctx->awake_workers = awake_workers;
  550. starpu_sched_ctx_create_worker_collection(sched_ctx->id, STARPU_WORKER_LIST);
  551. }
  552. /*add sub_ctxs before add workers, in order to be able to associate them if necessary */
  553. if(nsub_ctxs != 0)
  554. {
  555. for(i = 0; i < nsub_ctxs; i++)
  556. sched_ctx->sub_ctxs[i] = sub_ctxs[i];
  557. sched_ctx->nsub_ctxs = nsub_ctxs;
  558. }
  559. /* starpu_do_schedule() starts to consider the new sched_ctx for scheduling
  560. * once 'sched_cts->do_schedule == 1' becomes visible.
  561. * Make sure the sched_ctx struct and the policy struct initialization are complete at this time. */
  562. STARPU_WMB();
  563. sched_ctx->do_schedule = 1;
  564. _starpu_add_workers_to_new_sched_ctx(sched_ctx, workerids, nworkers_ctx);
  565. #ifdef STARPU_HAVE_HWLOC
  566. /* build hwloc tree of the context */
  567. _starpu_sched_ctx_create_hwloc_tree(sched_ctx);
  568. #endif //STARPU_HAVE_HWLOC
  569. /* if we create the initial big sched ctx we can update workers' status here
  570. because they haven't been launched yet */
  571. if(is_initial_sched)
  572. {
  573. for(i = 0; i < nworkers; i++)
  574. {
  575. struct _starpu_worker *worker = _starpu_get_worker_struct(i);
  576. if(!_starpu_sched_ctx_list_add(&worker->sched_ctx_list, sched_ctx->id))
  577. worker->nsched_ctxs++;
  578. }
  579. }
  580. (void)STARPU_ATOMIC_ADD(&config->topology.nsched_ctxs,1);
  581. return sched_ctx;
  582. }
  583. int starpu_sched_ctx_get_nsms(unsigned sched_ctx)
  584. {
  585. struct _starpu_sched_ctx *sc = _starpu_get_sched_ctx_struct(sched_ctx);
  586. return sc->nsms;
  587. }
  588. void starpu_sched_ctx_get_sms_interval(int stream_workerid, int *start, int *end)
  589. {
  590. struct _starpu_sched_ctx *sc = _starpu_worker_get_ctx_stream(stream_workerid);
  591. *start = sc->sms_start_idx;
  592. *end = sc->sms_end_idx;
  593. }
  594. int starpu_sched_ctx_get_sub_ctxs(unsigned sched_ctx, int *ctxs)
  595. {
  596. struct _starpu_sched_ctx *sc = _starpu_get_sched_ctx_struct(sched_ctx);
  597. int i;
  598. for(i = 0; i < sc->nsub_ctxs; i++)
  599. ctxs[i] = sc->sub_ctxs[i];
  600. return sc->nsub_ctxs;
  601. }
  602. int starpu_sched_ctx_get_stream_worker(unsigned sub_ctx)
  603. {
  604. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sub_ctx);
  605. struct starpu_worker_collection *workers = sched_ctx->workers;
  606. struct starpu_sched_ctx_iterator it;
  607. int worker = -1;
  608. workers->init_iterator(workers, &it);
  609. if(workers->has_next(workers, &it))
  610. {
  611. worker = workers->get_next(workers, &it);
  612. }
  613. return worker;
  614. }
  615. unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched_ctx_name, ...)
  616. {
  617. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
  618. va_list varg_list;
  619. int arg_type;
  620. int min_prio_set = 0;
  621. int max_prio_set = 0;
  622. int min_prio = 0;
  623. int max_prio = 0;
  624. int nsms = 0;
  625. int *sub_ctxs = NULL;
  626. int nsub_ctxs = 0;
  627. void *user_data = NULL;
  628. struct starpu_sched_policy *sched_policy = NULL;
  629. unsigned hierarchy_level = 0;
  630. unsigned nesting_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  631. unsigned awake_workers = 0;
  632. void (*init_sched)(unsigned) = NULL;
  633. va_start(varg_list, sched_ctx_name);
  634. while ((arg_type = va_arg(varg_list, int)) != 0)
  635. {
  636. if (arg_type == STARPU_SCHED_CTX_POLICY_NAME)
  637. {
  638. char *policy_name = va_arg(varg_list, char *);
  639. struct _starpu_machine_config *config = _starpu_get_machine_config();
  640. sched_policy = _starpu_select_sched_policy(config, policy_name);
  641. }
  642. else if (arg_type == STARPU_SCHED_CTX_POLICY_STRUCT)
  643. {
  644. sched_policy = va_arg(varg_list, struct starpu_sched_policy *);
  645. }
  646. else if (arg_type == STARPU_SCHED_CTX_POLICY_MIN_PRIO)
  647. {
  648. min_prio = va_arg(varg_list, int);
  649. min_prio_set = 1;
  650. }
  651. else if (arg_type == STARPU_SCHED_CTX_POLICY_MAX_PRIO)
  652. {
  653. max_prio = va_arg(varg_list, int);
  654. max_prio_set = 1;
  655. }
  656. else if (arg_type == STARPU_SCHED_CTX_HIERARCHY_LEVEL)
  657. {
  658. hierarchy_level = va_arg(varg_list, unsigned);
  659. }
  660. else if (arg_type == STARPU_SCHED_CTX_NESTED)
  661. {
  662. nesting_sched_ctx = va_arg(varg_list, unsigned);
  663. }
  664. else if (arg_type == STARPU_SCHED_CTX_AWAKE_WORKERS)
  665. {
  666. awake_workers = 1;
  667. }
  668. else if (arg_type == STARPU_SCHED_CTX_POLICY_INIT)
  669. {
  670. init_sched = va_arg(varg_list, void(*)(unsigned));
  671. }
  672. else if (arg_type == STARPU_SCHED_CTX_USER_DATA)
  673. {
  674. user_data = va_arg(varg_list, void *);
  675. }
  676. else if (arg_type == STARPU_SCHED_CTX_SUB_CTXS)
  677. {
  678. sub_ctxs = va_arg(varg_list, int*);
  679. nsub_ctxs = va_arg(varg_list, int);
  680. }
  681. else if (arg_type == STARPU_SCHED_CTX_CUDA_NSMS)
  682. {
  683. nsms = va_arg(varg_list, int);
  684. }
  685. else
  686. {
  687. STARPU_ABORT_MSG("Unrecognized argument %d\n", arg_type);
  688. }
  689. }
  690. va_end(varg_list);
  691. /* Make sure the user doesn't use invalid worker IDs. */
  692. int num_workers = starpu_worker_get_count();
  693. int i;
  694. for (i = 0; i < nworkers; i++)
  695. {
  696. if (workerids[i] < 0 || workerids[i] >= num_workers)
  697. {
  698. _STARPU_ERROR("Invalid worker ID (%d) specified!\n", workerids[i]);
  699. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
  700. return STARPU_NMAX_SCHED_CTXS;
  701. }
  702. }
  703. struct _starpu_sched_ctx *sched_ctx;
  704. sched_ctx = _starpu_create_sched_ctx(sched_policy, workerids, nworkers, 0, sched_ctx_name, min_prio_set, min_prio, max_prio_set, max_prio, awake_workers, init_sched, user_data, nsub_ctxs, sub_ctxs, nsms);
  705. sched_ctx->hierarchy_level = hierarchy_level;
  706. sched_ctx->nesting_sched_ctx = nesting_sched_ctx;
  707. int *added_workerids;
  708. unsigned nw_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &added_workerids);
  709. sort_workerid_array(nw_ctx, added_workerids);
  710. notify_workers_about_changing_ctx_pending(nw_ctx, added_workerids);
  711. _starpu_sched_ctx_lock_write(sched_ctx->id);
  712. _starpu_update_notified_workers_with_ctx(added_workerids, nw_ctx, sched_ctx->id);
  713. notify_workers_about_changing_ctx_done(nw_ctx, added_workerids);
  714. _starpu_sched_ctx_unlock_write(sched_ctx->id);
  715. free(added_workerids);
  716. #ifdef STARPU_USE_SC_HYPERVISOR
  717. sched_ctx->perf_counters = NULL;
  718. #endif
  719. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
  720. return sched_ctx->id;
  721. }
  722. int fstarpu_sched_ctx_create(int *workerids, int nworkers, const char *sched_ctx_name, void **arglist)
  723. {
  724. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
  725. int arg_i = 0;
  726. int min_prio_set = 0;
  727. int max_prio_set = 0;
  728. int min_prio = 0;
  729. int max_prio = 0;
  730. int nsms = 0;
  731. int *sub_ctxs = NULL;
  732. int nsub_ctxs = 0;
  733. void *user_data = NULL;
  734. struct starpu_sched_policy *sched_policy = NULL;
  735. unsigned hierarchy_level = 0;
  736. unsigned nesting_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  737. unsigned awake_workers = 0;
  738. void (*init_sched)(unsigned) = NULL;
  739. while (arglist[arg_i] != NULL)
  740. {
  741. const int arg_type = (int)(intptr_t)arglist[arg_i];
  742. if (arg_type == STARPU_SCHED_CTX_POLICY_NAME)
  743. {
  744. arg_i++;
  745. char *policy_name = arglist[arg_i];
  746. struct _starpu_machine_config *config = _starpu_get_machine_config();
  747. sched_policy = _starpu_select_sched_policy(config, policy_name);
  748. }
  749. else if (arg_type == STARPU_SCHED_CTX_POLICY_STRUCT)
  750. {
  751. arg_i++;
  752. sched_policy = arglist[arg_i];
  753. }
  754. else if (arg_type == STARPU_SCHED_CTX_POLICY_MIN_PRIO)
  755. {
  756. arg_i++;
  757. min_prio = *(int *)arglist[arg_i];
  758. min_prio_set = 1;
  759. }
  760. else if (arg_type == STARPU_SCHED_CTX_POLICY_MAX_PRIO)
  761. {
  762. arg_i++;
  763. max_prio = *(int *)arglist[arg_i];
  764. max_prio_set = 1;
  765. }
  766. else if (arg_type == STARPU_SCHED_CTX_HIERARCHY_LEVEL)
  767. {
  768. arg_i++;
  769. int val = *(int *)arglist[arg_i];
  770. STARPU_ASSERT(val >= 0);
  771. hierarchy_level = (unsigned)val;
  772. }
  773. else if (arg_type == STARPU_SCHED_CTX_NESTED)
  774. {
  775. arg_i++;
  776. int val = *(int *)arglist[arg_i];
  777. STARPU_ASSERT(val >= 0);
  778. nesting_sched_ctx = (unsigned)val;
  779. }
  780. else if (arg_type == STARPU_SCHED_CTX_AWAKE_WORKERS)
  781. {
  782. awake_workers = 1;
  783. }
  784. else if (arg_type == STARPU_SCHED_CTX_POLICY_INIT)
  785. {
  786. arg_i++;
  787. init_sched = arglist[arg_i];
  788. }
  789. else if (arg_type == STARPU_SCHED_CTX_USER_DATA)
  790. {
  791. arg_i++;
  792. user_data = arglist[arg_i];
  793. }
  794. else if (arg_type == STARPU_SCHED_CTX_SUB_CTXS)
  795. {
  796. arg_i++;
  797. sub_ctxs = (int*)arglist[arg_i];
  798. arg_i++;
  799. nsub_ctxs = *(int*)arglist[arg_i];
  800. }
  801. else if (arg_type == STARPU_SCHED_CTX_CUDA_NSMS)
  802. {
  803. arg_i++;
  804. nsms = *(int*)arglist[arg_i];
  805. }
  806. else
  807. {
  808. STARPU_ABORT_MSG("Unrecognized argument %d\n", arg_type);
  809. }
  810. arg_i++;
  811. }
  812. if (workerids && nworkers != -1)
  813. {
  814. /* Make sure the user doesn't use invalid worker IDs. */
  815. int num_workers = starpu_worker_get_count();
  816. int i;
  817. for (i = 0; i < nworkers; i++)
  818. {
  819. if (workerids[i] < 0 || workerids[i] >= num_workers)
  820. {
  821. _STARPU_ERROR("Invalid worker ID (%d) specified!\n", workerids[i]);
  822. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
  823. return STARPU_NMAX_SCHED_CTXS;
  824. }
  825. }
  826. }
  827. struct _starpu_sched_ctx *sched_ctx;
  828. sched_ctx = _starpu_create_sched_ctx(sched_policy, workerids, nworkers, 0, sched_ctx_name, min_prio_set, min_prio, max_prio_set, max_prio, awake_workers, init_sched, user_data, nsub_ctxs, sub_ctxs, nsms);
  829. sched_ctx->hierarchy_level = hierarchy_level;
  830. sched_ctx->nesting_sched_ctx = nesting_sched_ctx;
  831. int *added_workerids;
  832. unsigned nw_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &added_workerids);
  833. sort_workerid_array(nw_ctx, added_workerids);
  834. notify_workers_about_changing_ctx_pending(nw_ctx, added_workerids);
  835. _starpu_sched_ctx_lock_write(sched_ctx->id);
  836. _starpu_update_notified_workers_with_ctx(added_workerids, nw_ctx, sched_ctx->id);
  837. notify_workers_about_changing_ctx_done(nw_ctx, added_workerids);
  838. _starpu_sched_ctx_unlock_write(sched_ctx->id);
  839. free(added_workerids);
  840. #ifdef STARPU_USE_SC_HYPERVISOR
  841. sched_ctx->perf_counters = NULL;
  842. #endif
  843. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
  844. return (int)sched_ctx->id;
  845. }
  846. void starpu_sched_ctx_register_close_callback(unsigned sched_ctx_id, void (*close_callback)(unsigned sched_ctx_id, void* args), void *args)
  847. {
  848. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  849. sched_ctx->close_callback = close_callback;
  850. sched_ctx->close_args = args;
  851. return;
  852. }
  853. #ifdef STARPU_USE_SC_HYPERVISOR
  854. void starpu_sched_ctx_set_perf_counters(unsigned sched_ctx_id, void* perf_counters)
  855. {
  856. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  857. sched_ctx->perf_counters = (struct starpu_sched_ctx_performance_counters *)perf_counters;
  858. return;
  859. }
  860. #endif
  861. /* free all structures for the context
  862. Must be called with sched_ctx_manag mutex held */
  863. static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
  864. {
  865. STARPU_ASSERT(sched_ctx->id != STARPU_NMAX_SCHED_CTXS);
  866. STARPU_ASSERT(sched_ctx->do_schedule == 1);
  867. sched_ctx->do_schedule = 0;
  868. struct _starpu_machine_config *config = _starpu_get_machine_config();
  869. if(sched_ctx->sched_policy)
  870. {
  871. _starpu_deinit_sched_policy(sched_ctx);
  872. free(sched_ctx->sched_policy);
  873. sched_ctx->sched_policy = NULL;
  874. }
  875. else
  876. {
  877. starpu_sched_ctx_delete_worker_collection(sched_ctx->id);
  878. }
  879. if (sched_ctx->perf_arch.devices)
  880. {
  881. free(sched_ctx->perf_arch.devices);
  882. sched_ctx->perf_arch.devices = NULL;
  883. }
  884. sched_ctx->min_priority_is_set = 0;
  885. sched_ctx->max_priority_is_set = 0;
  886. sched_ctx->id = STARPU_NMAX_SCHED_CTXS;
  887. #ifdef STARPU_HAVE_HWLOC
  888. hwloc_bitmap_free(sched_ctx->hwloc_workers_set);
  889. #endif //STARPU_HAVE_HWLOC
  890. config->topology.nsched_ctxs--;
  891. }
  892. void starpu_sched_ctx_delete(unsigned sched_ctx_id)
  893. {
  894. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
  895. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  896. STARPU_ASSERT(sched_ctx);
  897. #ifdef STARPU_USE_SC_HYPERVISOR
  898. if (sched_ctx_id != 0 && sched_ctx_id != STARPU_NMAX_SCHED_CTXS && sched_ctx->perf_counters != NULL)
  899. {
  900. _STARPU_TRACE_HYPERVISOR_BEGIN();
  901. sched_ctx->perf_counters->notify_delete_context(sched_ctx_id);
  902. _STARPU_TRACE_HYPERVISOR_END();
  903. }
  904. #endif //STARPU_USE_SC_HYPERVISOR
  905. _starpu_sched_ctx_lock_write(sched_ctx_id);
  906. unsigned inheritor_sched_ctx_id = sched_ctx->inheritor;
  907. struct _starpu_sched_ctx *inheritor_sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx->inheritor);
  908. _starpu_sched_ctx_lock_write(inheritor_sched_ctx_id);
  909. STARPU_ASSERT(sched_ctx->id != STARPU_NMAX_SCHED_CTXS);
  910. int i;
  911. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  912. {
  913. struct _starpu_sched_ctx *psched_ctx = _starpu_get_sched_ctx_struct(i);
  914. if (psched_ctx->inheritor == sched_ctx_id)
  915. {
  916. _starpu_sched_ctx_lock_write(i);
  917. psched_ctx->inheritor = inheritor_sched_ctx_id;
  918. _starpu_sched_ctx_unlock_write(i);
  919. }
  920. }
  921. int *workerids;
  922. unsigned nworkers_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &workerids);
  923. int backup_workerids[nworkers_ctx];
  924. memcpy(backup_workerids, workerids, nworkers_ctx*sizeof(backup_workerids[0]));
  925. sort_workerid_array(nworkers_ctx, backup_workerids);
  926. notify_workers_about_changing_ctx_pending(nworkers_ctx, backup_workerids);
  927. /*if both of them have all the ressources is pointless*/
  928. /*trying to transfer ressources from one ctx to the other*/
  929. struct _starpu_machine_config *config = _starpu_get_machine_config();
  930. unsigned nworkers = config->topology.nworkers;
  931. if(nworkers_ctx > 0 && inheritor_sched_ctx && inheritor_sched_ctx->id != STARPU_NMAX_SCHED_CTXS &&
  932. !(nworkers_ctx == nworkers && nworkers_ctx == inheritor_sched_ctx->workers->nworkers))
  933. {
  934. add_notified_workers(workerids, nworkers_ctx, inheritor_sched_ctx_id);
  935. }
  936. notify_workers_about_changing_ctx_done(nworkers_ctx, backup_workerids);
  937. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  938. int wait_status = _starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id);
  939. _starpu_sched_ctx_lock_write(sched_ctx_id);
  940. notify_workers_about_changing_ctx_pending(nworkers_ctx, backup_workerids);
  941. if(!wait_status)
  942. {
  943. if(!sched_ctx->sched_policy)
  944. _starpu_sched_ctx_unblock_workers_in_parallel(sched_ctx_id, 0);
  945. /*if btw the mutex release & the mutex lock the context has changed take care to free all
  946. scheduling data before deleting the context */
  947. /* announce upcoming context changes, then wait for sched_op operations to
  948. * complete before altering the sched_ctx under sched_mutex protection */
  949. _starpu_update_notified_workers_without_ctx(workerids, nworkers_ctx, sched_ctx_id, 1);
  950. _starpu_sched_ctx_free_scheduling_data(sched_ctx);
  951. notify_workers_about_changing_ctx_done(nworkers_ctx, backup_workerids);
  952. occupied_sms -= sched_ctx->nsms;
  953. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  954. _starpu_sched_ctx_unlock_write(inheritor_sched_ctx_id);
  955. STARPU_PTHREAD_RWLOCK_DESTROY(&sched_ctx->rwlock);
  956. _starpu_delete_sched_ctx(sched_ctx);
  957. }
  958. else
  959. {
  960. notify_workers_about_changing_ctx_done(nworkers_ctx, backup_workerids);
  961. occupied_sms -= sched_ctx->nsms;
  962. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  963. _starpu_sched_ctx_unlock_write(inheritor_sched_ctx_id);
  964. }
  965. /* workerids is malloc-ed in starpu_sched_ctx_get_workers_list, don't forget to free it when
  966. you don't use it anymore */
  967. free(workerids);
  968. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
  969. }
  970. /* called after the workers are terminated so we don't have anything else to do but free the memory*/
  971. void _starpu_delete_all_sched_ctxs()
  972. {
  973. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
  974. unsigned i;
  975. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  976. {
  977. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(i);
  978. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  979. {
  980. _starpu_sched_ctx_lock_write(i);
  981. _starpu_sched_ctx_free_scheduling_data(sched_ctx);
  982. _starpu_barrier_counter_destroy(&sched_ctx->tasks_barrier);
  983. _starpu_barrier_counter_destroy(&sched_ctx->ready_tasks_barrier);
  984. _starpu_sched_ctx_unlock_write(i);
  985. STARPU_PTHREAD_RWLOCK_DESTROY(&sched_ctx->rwlock);
  986. _starpu_delete_sched_ctx(sched_ctx);
  987. }
  988. }
  989. STARPU_PTHREAD_KEY_DELETE(sched_ctx_key);
  990. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
  991. }
  992. static void _starpu_check_workers(int *workerids, int nworkers)
  993. {
  994. struct _starpu_machine_config *config = _starpu_get_machine_config();
  995. int nworkers_conf = config->topology.nworkers;
  996. int i;
  997. for(i = 0; i < nworkers; i++)
  998. {
  999. /* take care the user does not ask for a resource that does not exist */
  1000. STARPU_ASSERT_MSG(workerids[i] >= 0 && workerids[i] <= nworkers_conf, "requested to add workerid = %d, but that is beyond the range 0 to %d", workerids[i], nworkers_conf);
  1001. }
  1002. }
  1003. /* ctx_mutex must be held when calling this function */
  1004. static void fetch_tasks_from_empty_ctx_list(struct _starpu_sched_ctx *sched_ctx)
  1005. {
  1006. struct starpu_task_list list;
  1007. starpu_task_list_move(&list, &sched_ctx->empty_ctx_tasks);
  1008. _starpu_sched_ctx_unlock_write(sched_ctx->id);
  1009. while(!starpu_task_list_empty(&list))
  1010. {
  1011. struct starpu_task *old_task = starpu_task_list_pop_back(&list);
  1012. if(old_task == &stop_submission_task)
  1013. break;
  1014. /* if no workers are able to execute the task, it will be put
  1015. * in the empty_ctx_tasks list forever again */
  1016. unsigned nworkers = _starpu_nworkers_able_to_execute_task(old_task, sched_ctx);
  1017. STARPU_ASSERT(nworkers > 0);
  1018. int ret = _starpu_push_task_to_workers(old_task);
  1019. /* if we should stop poping from empty ctx tasks */
  1020. if (ret == -EAGAIN)
  1021. break;
  1022. }
  1023. _starpu_sched_ctx_lock_write(sched_ctx->id);
  1024. }
  1025. unsigned _starpu_can_push_task(struct _starpu_sched_ctx *sched_ctx, struct starpu_task *task)
  1026. {
  1027. if(sched_ctx->sched_policy && sched_ctx->sched_policy->simulate_push_task)
  1028. {
  1029. if (window_size == 0.0)
  1030. return 1;
  1031. _starpu_sched_ctx_lock_read(sched_ctx->id);
  1032. double expected_end = sched_ctx->sched_policy->simulate_push_task(task);
  1033. _starpu_sched_ctx_unlock_read(sched_ctx->id);
  1034. double expected_len = 0.0;
  1035. if(hyp_actual_start_sample[sched_ctx->id] != 0.0)
  1036. {
  1037. expected_len = expected_end - hyp_actual_start_sample[sched_ctx->id] ;
  1038. }
  1039. else
  1040. {
  1041. _STARPU_MSG("%u: sc start is 0.0\n", sched_ctx->id);
  1042. expected_len = expected_end - starpu_timing_now();
  1043. }
  1044. if(expected_len < 0.0)
  1045. _STARPU_MSG("exp len negative %lf \n", expected_len);
  1046. expected_len /= 1000000.0;
  1047. // _STARPU_MSG("exp_end %lf start %lf expected_len %lf \n", expected_end, hyp_actual_start_sample[sched_ctx->id], expected_len);
  1048. if(expected_len > (window_size + 0.2*window_size))
  1049. return 0;
  1050. }
  1051. return 1;
  1052. }
  1053. void _starpu_fetch_task_from_waiting_list(struct _starpu_sched_ctx *sched_ctx)
  1054. {
  1055. if(starpu_task_list_empty(&sched_ctx->waiting_tasks))
  1056. return;
  1057. struct starpu_task *old_task = starpu_task_list_back(&sched_ctx->waiting_tasks);
  1058. if(_starpu_can_push_task(sched_ctx, old_task))
  1059. {
  1060. old_task = starpu_task_list_pop_back(&sched_ctx->waiting_tasks);
  1061. _starpu_push_task_to_workers(old_task);
  1062. }
  1063. return;
  1064. }
  1065. void _starpu_push_task_to_waiting_list(struct _starpu_sched_ctx *sched_ctx, struct starpu_task *task)
  1066. {
  1067. starpu_task_list_push_front(&sched_ctx->waiting_tasks, task);
  1068. return;
  1069. }
  1070. static void set_priority_hierarchically_on_notified_workers(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority)
  1071. {
  1072. if(starpu_sched_ctx_get_hierarchy_level(sched_ctx) > 0)
  1073. {
  1074. unsigned father = starpu_sched_ctx_get_inheritor(sched_ctx);
  1075. set_priority_on_notified_workers(workers_to_add, nworkers_to_add, father, priority);
  1076. set_priority_hierarchically_on_notified_workers(workers_to_add, nworkers_to_add, father, priority);
  1077. }
  1078. return;
  1079. }
  1080. static void add_notified_workers(int *workerids, int nworkers, unsigned sched_ctx_id)
  1081. {
  1082. if (!nworkers)
  1083. return;
  1084. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1085. /* if the context has not already been deleted */
  1086. if(sched_ctx->id == STARPU_NMAX_SCHED_CTXS)
  1087. return;
  1088. int added_workers[nworkers];
  1089. int n_added_workers = 0;
  1090. {
  1091. struct starpu_worker_collection *workers = sched_ctx->workers;
  1092. int i = 0;
  1093. for(i = 0; i < nworkers; i++)
  1094. {
  1095. int workerid = workers->add(workers, workerids[i]);
  1096. if(workerid >= 0)
  1097. {
  1098. added_workers[n_added_workers] = workerid;
  1099. n_added_workers++;
  1100. }
  1101. else
  1102. {
  1103. struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
  1104. worker->removed_from_ctx[sched_ctx->id] = 0;
  1105. }
  1106. }
  1107. }
  1108. _do_add_notified_workers(sched_ctx, workerids, nworkers);
  1109. if(n_added_workers > 0)
  1110. {
  1111. if(sched_ctx->sched_policy && sched_ctx->sched_policy->add_workers)
  1112. {
  1113. _STARPU_SCHED_BEGIN;
  1114. sched_ctx->sched_policy->add_workers(sched_ctx->id, added_workers, n_added_workers);
  1115. _STARPU_SCHED_END;
  1116. }
  1117. _starpu_update_notified_workers_with_ctx(added_workers, n_added_workers, sched_ctx->id);
  1118. }
  1119. set_priority_on_notified_workers(workerids, nworkers, sched_ctx_id, 1);
  1120. set_priority_hierarchically_on_notified_workers(workerids, nworkers, sched_ctx_id, 0);
  1121. fetch_tasks_from_empty_ctx_list(sched_ctx);
  1122. }
  1123. /* Queue a new ctx change operation in the list of deferred ctx changes of the current worker.
  1124. *
  1125. * The set of workers to notify should contain all workers directly or
  1126. * indirectly affected by the change. In particular, all workers of
  1127. * sched_ctx_id should be notified even if they are not part of the change */
  1128. static void _defer_ctx_change(int sched_ctx_id, enum _starpu_ctx_change_op op, int nworkers_to_notify, int *workerids_to_notify, int nworkers_to_change, int *workerids_to_change)
  1129. {
  1130. STARPU_ASSERT(_starpu_worker_sched_op_pending());
  1131. if (nworkers_to_change == 0)
  1132. return;
  1133. int workerid = starpu_worker_get_id_check();
  1134. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1135. struct _starpu_ctx_change_list *l = &worker->ctx_change_list;
  1136. struct _starpu_ctx_change *chg = _starpu_ctx_change_new();
  1137. chg->sched_ctx_id = sched_ctx_id;
  1138. STARPU_ASSERT(op == ctx_change_add || op == ctx_change_remove);
  1139. chg->op = op;
  1140. STARPU_ASSERT(workerids_to_change != NULL);
  1141. chg->nworkers_to_change = nworkers_to_change;
  1142. _STARPU_MALLOC(chg->workerids_to_change, nworkers_to_change * sizeof(chg->workerids_to_change[0]));
  1143. memcpy(chg->workerids_to_change, workerids_to_change, nworkers_to_change * sizeof(chg->workerids_to_change[0]));
  1144. if (nworkers_to_notify != 0)
  1145. {
  1146. STARPU_ASSERT(workerids_to_notify != NULL);
  1147. chg->nworkers_to_notify = nworkers_to_notify;
  1148. _STARPU_MALLOC(chg->workerids_to_notify, nworkers_to_notify * sizeof(chg->workerids_to_notify[0]));
  1149. memcpy(chg->workerids_to_notify, workerids_to_notify, nworkers_to_notify * sizeof(chg->workerids_to_notify[0]));
  1150. }
  1151. else
  1152. {
  1153. STARPU_ASSERT(workerids_to_notify == NULL);
  1154. chg->nworkers_to_notify = 0;
  1155. chg->workerids_to_notify = 0;
  1156. }
  1157. _starpu_ctx_change_list_push_back(l, chg);
  1158. }
  1159. void starpu_sched_ctx_add_workers(int *workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx_id)
  1160. {
  1161. STARPU_ASSERT(workers_to_add != NULL && nworkers_to_add > 0);
  1162. _starpu_check_workers(workers_to_add, nworkers_to_add);
  1163. int *ctx_workerids = NULL;
  1164. _starpu_sched_ctx_lock_read(sched_ctx_id);
  1165. unsigned ctx_nworkers = starpu_sched_ctx_get_workers_list_raw(sched_ctx_id, &ctx_workerids);
  1166. _starpu_sched_ctx_unlock_read(sched_ctx_id);
  1167. int cumulated_workerids[ctx_nworkers + nworkers_to_add];
  1168. memcpy(cumulated_workerids, ctx_workerids, ctx_nworkers*sizeof(cumulated_workerids[0]));
  1169. unsigned cumulated_nworkers = ctx_nworkers;
  1170. {
  1171. unsigned i;
  1172. for (i=0; i<nworkers_to_add; i++)
  1173. {
  1174. int workerid = workers_to_add[i];
  1175. unsigned duplicate = 0;
  1176. unsigned j;
  1177. for (j = 0; j < cumulated_nworkers; j++)
  1178. {
  1179. if (cumulated_workerids[j] == workerid)
  1180. {
  1181. duplicate = 1;
  1182. break;
  1183. }
  1184. }
  1185. if (!duplicate)
  1186. {
  1187. cumulated_workerids[cumulated_nworkers] = workerid;
  1188. cumulated_nworkers++;
  1189. }
  1190. }
  1191. }
  1192. /* all workers from the cumulated list must be notified, notifying the
  1193. * workers_to_add list is not sufficient because the other workers of
  1194. * the context might access the ctx worker list being changed. */
  1195. if (_starpu_worker_sched_op_pending())
  1196. {
  1197. _defer_ctx_change(sched_ctx_id, ctx_change_add, cumulated_nworkers, cumulated_workerids, nworkers_to_add, workers_to_add);
  1198. }
  1199. else
  1200. {
  1201. sort_workerid_array(cumulated_nworkers, cumulated_workerids);
  1202. notify_workers_about_changing_ctx_pending(cumulated_nworkers, cumulated_workerids);
  1203. _starpu_sched_ctx_lock_write(sched_ctx_id);
  1204. add_notified_workers(workers_to_add, nworkers_to_add, sched_ctx_id);
  1205. notify_workers_about_changing_ctx_done(cumulated_nworkers, cumulated_workerids);
  1206. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  1207. }
  1208. }
  1209. void starpu_sched_ctx_add_combined_workers(int *combined_workers_to_add, unsigned n_combined_workers_to_add, unsigned sched_ctx_id)
  1210. {
  1211. _starpu_sched_ctx_lock_write(sched_ctx_id);
  1212. add_notified_workers(combined_workers_to_add, n_combined_workers_to_add, sched_ctx_id);
  1213. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  1214. }
  1215. static void remove_notified_workers(int *workerids, int nworkers, unsigned sched_ctx_id)
  1216. {
  1217. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1218. int removed_workers[sched_ctx->workers->nworkers];
  1219. int n_removed_workers = 0;
  1220. _starpu_remove_workers_from_sched_ctx(sched_ctx, workerids, nworkers, removed_workers, &n_removed_workers);
  1221. if(n_removed_workers > 0)
  1222. {
  1223. _starpu_update_notified_workers_without_ctx(removed_workers, n_removed_workers, sched_ctx_id, 0);
  1224. set_priority_on_notified_workers(removed_workers, n_removed_workers, sched_ctx_id, 1);
  1225. }
  1226. }
  1227. void starpu_sched_ctx_remove_workers(int *workers_to_remove, unsigned nworkers_to_remove, unsigned sched_ctx_id)
  1228. {
  1229. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1230. _starpu_check_workers(workers_to_remove, nworkers_to_remove);
  1231. int *ctx_workerids = NULL;
  1232. _starpu_sched_ctx_lock_read(sched_ctx_id);
  1233. unsigned ctx_nworkers = starpu_sched_ctx_get_workers_list_raw(sched_ctx_id, &ctx_workerids);
  1234. _starpu_sched_ctx_unlock_read(sched_ctx_id);
  1235. int cumulated_workerids[ctx_nworkers + nworkers_to_remove];
  1236. memcpy(cumulated_workerids, ctx_workerids, ctx_nworkers*sizeof(cumulated_workerids[0]));
  1237. unsigned cumulated_nworkers = ctx_nworkers;
  1238. {
  1239. unsigned i;
  1240. for (i=0; i<nworkers_to_remove; i++)
  1241. {
  1242. int workerid = workers_to_remove[i];
  1243. unsigned duplicate = 0;
  1244. unsigned j;
  1245. for (j = 0; j < cumulated_nworkers; j++)
  1246. {
  1247. if (cumulated_workerids[j] == workerid)
  1248. {
  1249. duplicate = 1;
  1250. break;
  1251. }
  1252. }
  1253. if (!duplicate)
  1254. {
  1255. cumulated_workerids[cumulated_nworkers] = workerid;
  1256. cumulated_nworkers++;
  1257. }
  1258. }
  1259. }
  1260. /* if the context has not already been deleted */
  1261. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  1262. {
  1263. if (_starpu_worker_sched_op_pending())
  1264. {
  1265. _defer_ctx_change(sched_ctx_id, ctx_change_remove, cumulated_nworkers, cumulated_workerids, nworkers_to_remove, workers_to_remove);
  1266. }
  1267. else
  1268. {
  1269. sort_workerid_array(cumulated_nworkers, cumulated_workerids);
  1270. notify_workers_about_changing_ctx_pending(cumulated_nworkers, cumulated_workerids);
  1271. _starpu_sched_ctx_lock_write(sched_ctx_id);
  1272. remove_notified_workers(workers_to_remove, nworkers_to_remove, sched_ctx_id);
  1273. notify_workers_about_changing_ctx_done(cumulated_nworkers, cumulated_workerids);
  1274. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  1275. }
  1276. }
  1277. }
  1278. int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _starpu_sched_ctx *sched_ctx)
  1279. {
  1280. unsigned nworkers = 0;
  1281. _starpu_sched_ctx_lock_read(sched_ctx->id);
  1282. struct starpu_worker_collection *workers = sched_ctx->workers;
  1283. struct starpu_sched_ctx_iterator it;
  1284. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  1285. while(workers->has_next(workers, &it))
  1286. {
  1287. unsigned worker = workers->get_next(workers, &it);
  1288. STARPU_ASSERT_MSG(worker < STARPU_NMAXWORKERS, "worker id %u", worker);
  1289. if (starpu_worker_can_execute_task_first_impl(worker, task, NULL))
  1290. nworkers++;
  1291. }
  1292. _starpu_sched_ctx_unlock_read(sched_ctx->id);
  1293. return nworkers;
  1294. }
  1295. /* unused sched_ctx have the id STARPU_NMAX_SCHED_CTXS */
  1296. void _starpu_init_all_sched_ctxs(struct _starpu_machine_config *config)
  1297. {
  1298. STARPU_PTHREAD_KEY_CREATE(&sched_ctx_key, NULL);
  1299. STARPU_PTHREAD_SETSPECIFIC(sched_ctx_key, (void*)(uintptr_t)STARPU_NMAX_SCHED_CTXS);
  1300. window_size = starpu_get_env_float_default("STARPU_WINDOW_TIME_SIZE", 0.0);
  1301. nobind = starpu_get_env_number("STARPU_WORKERS_NOBIND");
  1302. unsigned i;
  1303. for(i = 0; i <= STARPU_NMAX_SCHED_CTXS; i++)
  1304. {
  1305. config->sched_ctxs[i].do_schedule = 0;
  1306. config->sched_ctxs[i].id = STARPU_NMAX_SCHED_CTXS;
  1307. }
  1308. return;
  1309. }
  1310. /* sched_ctx aren't necessarly one next to another */
  1311. /* for eg when we remove one its place is free */
  1312. /* when we add new one we reuse its place */
  1313. static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config)
  1314. {
  1315. unsigned i;
  1316. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  1317. if(config->sched_ctxs[i].id == STARPU_NMAX_SCHED_CTXS)
  1318. return i;
  1319. STARPU_ASSERT(0);
  1320. return STARPU_NMAX_SCHED_CTXS;
  1321. }
  1322. int _starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id)
  1323. {
  1324. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1325. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait_for_all must not be called from a task or callback");
  1326. _starpu_barrier_counter_wait_for_empty_counter(&sched_ctx->tasks_barrier);
  1327. return 0;
  1328. }
  1329. int _starpu_wait_for_n_submitted_tasks_of_sched_ctx(unsigned sched_ctx_id, unsigned n)
  1330. {
  1331. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1332. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait_for_n_submitted_tasks must not be called from a task or callback");
  1333. return _starpu_barrier_counter_wait_until_counter_reaches_down_to_n(&sched_ctx->tasks_barrier, n);
  1334. }
  1335. void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
  1336. {
  1337. struct _starpu_machine_config *config = _starpu_get_machine_config();
  1338. #ifndef STARPU_SANITIZE_THREAD
  1339. if (!config->watchdog_ok)
  1340. config->watchdog_ok = 1;
  1341. #endif
  1342. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1343. int reached = _starpu_barrier_counter_get_reached_start(&sched_ctx->tasks_barrier);
  1344. int finished = reached == 1;
  1345. /* when finished decrementing the tasks if the user signaled he will not submit tasks anymore
  1346. we can move all its workers to the inheritor context */
  1347. if(finished && sched_ctx->inheritor != STARPU_NMAX_SCHED_CTXS)
  1348. {
  1349. STARPU_PTHREAD_MUTEX_LOCK(&finished_submit_mutex);
  1350. if(sched_ctx->finished_submit)
  1351. {
  1352. STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
  1353. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  1354. {
  1355. if(sched_ctx->close_callback)
  1356. sched_ctx->close_callback(sched_ctx->id, sched_ctx->close_args);
  1357. int *workerids = NULL;
  1358. unsigned nworkers = starpu_sched_ctx_get_workers_list(sched_ctx->id, &workerids);
  1359. if(nworkers > 0)
  1360. {
  1361. starpu_sched_ctx_add_workers(workerids, nworkers, sched_ctx->inheritor);
  1362. free(workerids);
  1363. }
  1364. }
  1365. _starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->tasks_barrier, 0.0);
  1366. return;
  1367. }
  1368. STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
  1369. }
  1370. /* We also need to check for config->submitting = 0 (i.e. the
  1371. * user calle starpu_drivers_request_termination()), in which
  1372. * case we need to set config->running to 0 and wake workers,
  1373. * so they can terminate, just like
  1374. * starpu_drivers_request_termination() does.
  1375. */
  1376. STARPU_PTHREAD_MUTEX_LOCK(&config->submitted_mutex);
  1377. if(config->submitting == 0)
  1378. {
  1379. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  1380. {
  1381. if(sched_ctx->close_callback)
  1382. sched_ctx->close_callback(sched_ctx->id, sched_ctx->close_args);
  1383. }
  1384. ANNOTATE_HAPPENS_AFTER(&config->running);
  1385. config->running = 0;
  1386. ANNOTATE_HAPPENS_BEFORE(&config->running);
  1387. int s;
  1388. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  1389. {
  1390. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  1391. {
  1392. _starpu_check_nsubmitted_tasks_of_sched_ctx(config->sched_ctxs[s].id);
  1393. }
  1394. }
  1395. }
  1396. STARPU_PTHREAD_MUTEX_UNLOCK(&config->submitted_mutex);
  1397. _starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->tasks_barrier, 0.0);
  1398. return;
  1399. }
  1400. void _starpu_increment_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
  1401. {
  1402. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1403. _starpu_barrier_counter_increment(&sched_ctx->tasks_barrier, 0.0);
  1404. }
  1405. int _starpu_get_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
  1406. {
  1407. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1408. return _starpu_barrier_counter_get_reached_start(&sched_ctx->tasks_barrier);
  1409. }
  1410. int _starpu_check_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
  1411. {
  1412. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1413. return _starpu_barrier_counter_check(&sched_ctx->tasks_barrier);
  1414. }
  1415. unsigned _starpu_increment_nready_tasks_of_sched_ctx(unsigned sched_ctx_id, double ready_flops, struct starpu_task *task)
  1416. {
  1417. unsigned ret = 1;
  1418. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1419. if(!sched_ctx->is_initial_sched)
  1420. {
  1421. _starpu_sched_ctx_lock_write(sched_ctx->id);
  1422. }
  1423. _starpu_barrier_counter_increment(&sched_ctx->ready_tasks_barrier, ready_flops);
  1424. if(!sched_ctx->is_initial_sched)
  1425. {
  1426. if(!_starpu_can_push_task(sched_ctx, task))
  1427. {
  1428. _starpu_push_task_to_waiting_list(sched_ctx, task);
  1429. ret = 0;
  1430. }
  1431. _starpu_sched_ctx_unlock_write(sched_ctx->id);
  1432. }
  1433. return ret;
  1434. }
  1435. void _starpu_decrement_nready_tasks_of_sched_ctx_locked(unsigned sched_ctx_id, double ready_flops)
  1436. {
  1437. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1438. _starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->ready_tasks_barrier, ready_flops);
  1439. }
  1440. void _starpu_decrement_nready_tasks_of_sched_ctx(unsigned sched_ctx_id, double ready_flops)
  1441. {
  1442. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1443. if(!sched_ctx->is_initial_sched)
  1444. {
  1445. _starpu_sched_ctx_lock_write(sched_ctx->id);
  1446. }
  1447. _starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->ready_tasks_barrier, ready_flops);
  1448. if(!sched_ctx->is_initial_sched)
  1449. {
  1450. _starpu_fetch_task_from_waiting_list(sched_ctx);
  1451. _starpu_sched_ctx_unlock_write(sched_ctx->id);
  1452. }
  1453. }
  1454. int starpu_sched_ctx_get_nready_tasks(unsigned sched_ctx_id)
  1455. {
  1456. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1457. return _starpu_barrier_counter_get_reached_start(&sched_ctx->ready_tasks_barrier);
  1458. }
  1459. double starpu_sched_ctx_get_nready_flops(unsigned sched_ctx_id)
  1460. {
  1461. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1462. return _starpu_barrier_counter_get_reached_flops(&sched_ctx->ready_tasks_barrier);
  1463. }
  1464. int _starpu_wait_for_no_ready_of_sched_ctx(unsigned sched_ctx_id)
  1465. {
  1466. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1467. _starpu_barrier_counter_wait_for_empty_counter(&sched_ctx->ready_tasks_barrier);
  1468. return 0;
  1469. }
  1470. /*
  1471. * FIXME: This should rather be
  1472. * void starpu_sched_ctx_set_context(unsigned sched_ctx)
  1473. */
  1474. void starpu_sched_ctx_set_context(unsigned *sched_ctx)
  1475. {
  1476. STARPU_PTHREAD_SETSPECIFIC(sched_ctx_key, (void*)(uintptr_t)*sched_ctx);
  1477. }
  1478. unsigned starpu_sched_ctx_get_context()
  1479. {
  1480. return (unsigned)(uintptr_t)STARPU_PTHREAD_GETSPECIFIC(sched_ctx_key);
  1481. }
  1482. unsigned _starpu_sched_ctx_get_current_context()
  1483. {
  1484. unsigned sched_ctx = starpu_sched_ctx_get_context();
  1485. if (sched_ctx == STARPU_NMAX_SCHED_CTXS)
  1486. return _starpu_get_initial_sched_ctx()->id;
  1487. else
  1488. return sched_ctx;
  1489. }
  1490. void starpu_sched_ctx_notify_hypervisor_exists()
  1491. {
  1492. with_hypervisor = 1;
  1493. int i, j;
  1494. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  1495. {
  1496. hyp_start_sample[i] = starpu_timing_now();
  1497. hyp_start_allow_sample[i] = 0.0;
  1498. for(j = 0; j < STARPU_NMAXWORKERS; j++)
  1499. {
  1500. flops[i][j] = 0.0;
  1501. data_size[i][j] = 0;
  1502. }
  1503. hyp_actual_start_sample[i] = 0.0;
  1504. }
  1505. }
  1506. unsigned starpu_sched_ctx_check_if_hypervisor_exists()
  1507. {
  1508. return with_hypervisor;
  1509. }
  1510. void starpu_sched_ctx_update_start_resizing_sample(unsigned sched_ctx_id, double start_sample)
  1511. {
  1512. hyp_actual_start_sample[sched_ctx_id] = start_sample;
  1513. }
  1514. unsigned _starpu_sched_ctx_allow_hypervisor(unsigned sched_ctx_id)
  1515. {
  1516. (void) sched_ctx_id;
  1517. return 1;
  1518. #if 0
  1519. double now = starpu_timing_now();
  1520. if(hyp_start_allow_sample[sched_ctx_id] > 0.0)
  1521. {
  1522. double allow_sample = (now - hyp_start_allow_sample[sched_ctx_id]) / 1000000.0;
  1523. if(allow_sample < 0.001)
  1524. return 1;
  1525. else
  1526. {
  1527. hyp_start_allow_sample[sched_ctx_id] = 0.0;
  1528. hyp_start_sample[sched_ctx_id] = starpu_timing_now();
  1529. return 0;
  1530. }
  1531. }
  1532. double forbid_sample = (now - hyp_start_sample[sched_ctx_id]) / 1000000.0;
  1533. if(forbid_sample > 0.01)
  1534. {
  1535. // hyp_start_sample[sched_ctx_id] = starpu_timing_now();
  1536. hyp_start_allow_sample[sched_ctx_id] = starpu_timing_now();
  1537. return 1;
  1538. }
  1539. return 0;
  1540. #endif
  1541. }
  1542. void starpu_sched_ctx_set_policy_data(unsigned sched_ctx_id, void* policy_data)
  1543. {
  1544. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1545. sched_ctx->policy_data = policy_data;
  1546. }
  1547. void* starpu_sched_ctx_get_policy_data(unsigned sched_ctx_id)
  1548. {
  1549. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1550. return sched_ctx->policy_data;
  1551. }
  1552. struct starpu_sched_policy *starpu_sched_ctx_get_sched_policy(unsigned sched_ctx_id)
  1553. {
  1554. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1555. return sched_ctx->sched_policy;
  1556. }
  1557. struct starpu_worker_collection* starpu_sched_ctx_create_worker_collection(unsigned sched_ctx_id, enum starpu_worker_collection_type worker_collection_type)
  1558. {
  1559. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1560. _STARPU_MALLOC(sched_ctx->workers, sizeof(struct starpu_worker_collection));
  1561. switch(worker_collection_type)
  1562. {
  1563. #ifdef STARPU_HAVE_HWLOC
  1564. case STARPU_WORKER_TREE:
  1565. sched_ctx->workers->has_next = worker_tree.has_next;
  1566. sched_ctx->workers->get_next = worker_tree.get_next;
  1567. sched_ctx->workers->add = worker_tree.add;
  1568. sched_ctx->workers->remove = worker_tree.remove;
  1569. sched_ctx->workers->init = worker_tree.init;
  1570. sched_ctx->workers->deinit = worker_tree.deinit;
  1571. sched_ctx->workers->init_iterator = worker_tree.init_iterator;
  1572. sched_ctx->workers->init_iterator_for_parallel_tasks = worker_tree.init_iterator_for_parallel_tasks;
  1573. sched_ctx->workers->type = STARPU_WORKER_TREE;
  1574. break;
  1575. #endif
  1576. // case STARPU_WORKER_LIST:
  1577. default:
  1578. sched_ctx->workers->has_next = worker_list.has_next;
  1579. sched_ctx->workers->get_next = worker_list.get_next;
  1580. sched_ctx->workers->add = worker_list.add;
  1581. sched_ctx->workers->remove = worker_list.remove;
  1582. sched_ctx->workers->init = worker_list.init;
  1583. sched_ctx->workers->deinit = worker_list.deinit;
  1584. sched_ctx->workers->init_iterator = worker_list.init_iterator;
  1585. sched_ctx->workers->init_iterator_for_parallel_tasks = worker_list.init_iterator_for_parallel_tasks;
  1586. sched_ctx->workers->type = STARPU_WORKER_LIST;
  1587. break;
  1588. }
  1589. /* construct the collection of workers(list/tree/etc.) */
  1590. sched_ctx->workers->init(sched_ctx->workers);
  1591. return sched_ctx->workers;
  1592. }
  1593. void starpu_sched_ctx_display_workers(unsigned sched_ctx_id, FILE *f)
  1594. {
  1595. int *workerids = NULL;
  1596. unsigned nworkers;
  1597. unsigned i;
  1598. nworkers = starpu_sched_ctx_get_workers_list(sched_ctx_id, &workerids);
  1599. fprintf(f, "[sched_ctx %u]: %u worker%s\n", sched_ctx_id, nworkers, nworkers>1?"s":"");
  1600. for (i = 0; i < nworkers; i++)
  1601. {
  1602. char name[256];
  1603. starpu_worker_get_name(workerids[i], name, 256);
  1604. fprintf(f, "\t\t%s\n", name);
  1605. }
  1606. free(workerids);
  1607. }
  1608. unsigned starpu_sched_ctx_get_workers_list_raw(unsigned sched_ctx_id, int **workerids)
  1609. {
  1610. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1611. *workerids = sched_ctx->workers->workerids;
  1612. return sched_ctx->workers->nworkers;
  1613. }
  1614. unsigned starpu_sched_ctx_get_workers_list(unsigned sched_ctx_id, int **workerids)
  1615. {
  1616. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1617. struct starpu_worker_collection *workers = sched_ctx->workers;
  1618. unsigned nworkers = 0;
  1619. struct starpu_sched_ctx_iterator it;
  1620. if(!workers)
  1621. return 0;
  1622. _STARPU_MALLOC(*workerids, workers->nworkers*sizeof(int));
  1623. workers->init_iterator(workers, &it);
  1624. while(workers->has_next(workers, &it))
  1625. {
  1626. int worker = workers->get_next(workers, &it);
  1627. (*workerids)[nworkers++] = worker;
  1628. }
  1629. return nworkers;
  1630. }
  1631. void starpu_sched_ctx_delete_worker_collection(unsigned sched_ctx_id)
  1632. {
  1633. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1634. sched_ctx->workers->deinit(sched_ctx->workers);
  1635. free(sched_ctx->workers);
  1636. sched_ctx->workers = NULL;
  1637. }
  1638. struct starpu_worker_collection* starpu_sched_ctx_get_worker_collection(unsigned sched_ctx_id)
  1639. {
  1640. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1641. return sched_ctx->workers;
  1642. }
  1643. int _starpu_get_workers_of_sched_ctx(unsigned sched_ctx_id, int *pus, enum starpu_worker_archtype arch)
  1644. {
  1645. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1646. struct starpu_worker_collection *workers = sched_ctx->workers;
  1647. int npus = 0;
  1648. struct starpu_sched_ctx_iterator it;
  1649. workers->init_iterator(workers, &it);
  1650. while(workers->has_next(workers, &it))
  1651. {
  1652. int worker = workers->get_next(workers, &it);
  1653. enum starpu_worker_archtype curr_arch = starpu_worker_get_type(worker);
  1654. if(curr_arch == arch || arch == STARPU_ANY_WORKER)
  1655. pus[npus++] = worker;
  1656. }
  1657. return npus;
  1658. }
  1659. unsigned starpu_sched_ctx_get_nworkers(unsigned sched_ctx_id)
  1660. {
  1661. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1662. if(sched_ctx != NULL)
  1663. return sched_ctx->workers->nworkers;
  1664. else
  1665. return 0;
  1666. }
  1667. unsigned starpu_sched_ctx_get_nshared_workers(unsigned sched_ctx_id, unsigned sched_ctx_id2)
  1668. {
  1669. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1670. struct _starpu_sched_ctx *sched_ctx2 = _starpu_get_sched_ctx_struct(sched_ctx_id2);
  1671. struct starpu_worker_collection *workers = sched_ctx->workers;
  1672. struct starpu_worker_collection *workers2 = sched_ctx2->workers;
  1673. int shared_workers = 0;
  1674. struct starpu_sched_ctx_iterator it1, it2;
  1675. workers->init_iterator(workers, &it1);
  1676. workers2->init_iterator(workers2, &it2);
  1677. while(workers->has_next(workers, &it1))
  1678. {
  1679. int worker = workers->get_next(workers, &it1);
  1680. while(workers2->has_next(workers2, &it2))
  1681. {
  1682. int worker2 = workers2->get_next(workers2, &it2);
  1683. if(worker == worker2)
  1684. shared_workers++;
  1685. }
  1686. }
  1687. return shared_workers;
  1688. }
  1689. unsigned starpu_sched_ctx_contains_worker(int workerid, unsigned sched_ctx_id)
  1690. {
  1691. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1692. struct starpu_worker_collection *workers = sched_ctx->workers;
  1693. if(workers)
  1694. {
  1695. unsigned i;
  1696. for (i = 0; i < workers->nworkers; i++)
  1697. if (workerid == workers->workerids[i])
  1698. return 1;
  1699. }
  1700. return 0;
  1701. }
  1702. unsigned starpu_sched_ctx_contains_type_of_worker(enum starpu_worker_archtype arch, unsigned sched_ctx_id)
  1703. {
  1704. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  1705. unsigned i;
  1706. for (i = 0; i < workers->nworkers; i++)
  1707. {
  1708. int worker = workers->workerids[i];
  1709. enum starpu_worker_archtype curr_arch = starpu_worker_get_type(worker);
  1710. if(curr_arch == arch)
  1711. return 1;
  1712. }
  1713. return 0;
  1714. }
  1715. unsigned _starpu_worker_belongs_to_a_sched_ctx(int workerid, unsigned sched_ctx_id)
  1716. {
  1717. struct _starpu_machine_config *config = _starpu_get_machine_config();
  1718. int i;
  1719. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  1720. {
  1721. struct _starpu_sched_ctx *sched_ctx = &config->sched_ctxs[i];
  1722. if(sched_ctx && sched_ctx->id != STARPU_NMAX_SCHED_CTXS && sched_ctx->id != sched_ctx_id)
  1723. if(starpu_sched_ctx_contains_worker(workerid, sched_ctx->id))
  1724. return 1;
  1725. }
  1726. return 0;
  1727. }
  1728. unsigned starpu_sched_ctx_worker_get_id(unsigned sched_ctx_id)
  1729. {
  1730. int workerid = starpu_worker_get_id();
  1731. if(workerid != -1)
  1732. if(starpu_sched_ctx_contains_worker(workerid, sched_ctx_id))
  1733. return workerid;
  1734. return -1;
  1735. }
  1736. unsigned starpu_sched_ctx_get_ctx_for_task(struct starpu_task *task)
  1737. {
  1738. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  1739. unsigned ret_sched_ctx = task->sched_ctx;
  1740. if (task->possibly_parallel && !sched_ctx->sched_policy
  1741. && sched_ctx->nesting_sched_ctx != STARPU_NMAX_SCHED_CTXS)
  1742. ret_sched_ctx = sched_ctx->nesting_sched_ctx;
  1743. return ret_sched_ctx;
  1744. }
  1745. unsigned starpu_sched_ctx_overlapping_ctxs_on_worker(int workerid)
  1746. {
  1747. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1748. return worker->nsched_ctxs > 1;
  1749. }
  1750. void starpu_sched_ctx_set_inheritor(unsigned sched_ctx_id, unsigned inheritor)
  1751. {
  1752. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  1753. STARPU_ASSERT(inheritor < STARPU_NMAX_SCHED_CTXS);
  1754. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1755. sched_ctx->inheritor = inheritor;
  1756. return;
  1757. }
  1758. unsigned starpu_sched_ctx_get_inheritor(unsigned sched_ctx_id)
  1759. {
  1760. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  1761. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1762. return sched_ctx->inheritor;
  1763. }
  1764. unsigned starpu_sched_ctx_get_hierarchy_level(unsigned sched_ctx_id)
  1765. {
  1766. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  1767. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1768. return sched_ctx->hierarchy_level;
  1769. }
  1770. void starpu_sched_ctx_finished_submit(unsigned sched_ctx_id)
  1771. {
  1772. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1773. STARPU_PTHREAD_MUTEX_LOCK(&finished_submit_mutex);
  1774. sched_ctx->finished_submit = 1;
  1775. STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
  1776. return;
  1777. }
  1778. #ifdef STARPU_USE_SC_HYPERVISOR
  1779. void _starpu_sched_ctx_post_exec_task_cb(int workerid, struct starpu_task *task, size_t data_size2, uint32_t footprint)
  1780. {
  1781. if (workerid < 0)
  1782. return;
  1783. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  1784. if(sched_ctx != NULL && task->sched_ctx != _starpu_get_initial_sched_ctx()->id &&
  1785. task->sched_ctx != STARPU_NMAX_SCHED_CTXS && sched_ctx->perf_counters != NULL)
  1786. {
  1787. flops[task->sched_ctx][workerid] += task->flops;
  1788. data_size[task->sched_ctx][workerid] += data_size2;
  1789. if(_starpu_sched_ctx_allow_hypervisor(sched_ctx->id) || task->hypervisor_tag > 0)
  1790. {
  1791. _STARPU_TRACE_HYPERVISOR_BEGIN();
  1792. sched_ctx->perf_counters->notify_post_exec_task(task, data_size[task->sched_ctx][workerid], footprint,
  1793. task->hypervisor_tag, flops[task->sched_ctx][workerid]);
  1794. _STARPU_TRACE_HYPERVISOR_END();
  1795. flops[task->sched_ctx][workerid] = 0.0;
  1796. data_size[task->sched_ctx][workerid] = 0;
  1797. }
  1798. }
  1799. }
  1800. void starpu_sched_ctx_call_pushed_task_cb(int workerid, unsigned sched_ctx_id)
  1801. {
  1802. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1803. if(sched_ctx != NULL && sched_ctx_id != _starpu_get_initial_sched_ctx()->id && sched_ctx_id != STARPU_NMAX_SCHED_CTXS
  1804. && sched_ctx->perf_counters != NULL && _starpu_sched_ctx_allow_hypervisor(sched_ctx_id))
  1805. {
  1806. _STARPU_TRACE_HYPERVISOR_BEGIN();
  1807. sched_ctx->perf_counters->notify_pushed_task(sched_ctx_id, workerid);
  1808. _STARPU_TRACE_HYPERVISOR_END();
  1809. }
  1810. }
  1811. #endif //STARPU_USE_SC_HYPERVISOR
  1812. int starpu_sched_get_min_priority(void)
  1813. {
  1814. return starpu_sched_ctx_get_min_priority(_starpu_sched_ctx_get_current_context());
  1815. }
  1816. int starpu_sched_get_max_priority(void)
  1817. {
  1818. return starpu_sched_ctx_get_max_priority(_starpu_sched_ctx_get_current_context());
  1819. }
  1820. int starpu_sched_set_min_priority(int min_prio)
  1821. {
  1822. return starpu_sched_ctx_set_min_priority(_starpu_sched_ctx_get_current_context(), min_prio);
  1823. }
  1824. int starpu_sched_set_max_priority(int max_prio)
  1825. {
  1826. return starpu_sched_ctx_set_max_priority(_starpu_sched_ctx_get_current_context(), max_prio);
  1827. }
  1828. int starpu_sched_ctx_get_min_priority(unsigned sched_ctx_id)
  1829. {
  1830. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1831. return sched_ctx->min_priority;
  1832. }
  1833. int starpu_sched_ctx_get_max_priority(unsigned sched_ctx_id)
  1834. {
  1835. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1836. return sched_ctx->max_priority;
  1837. }
  1838. int starpu_sched_ctx_set_min_priority(unsigned sched_ctx_id, int min_prio)
  1839. {
  1840. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1841. sched_ctx->min_priority = min_prio;
  1842. return 0;
  1843. }
  1844. int starpu_sched_ctx_set_max_priority(unsigned sched_ctx_id, int max_prio)
  1845. {
  1846. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1847. sched_ctx->max_priority = max_prio;
  1848. return 0;
  1849. }
  1850. int starpu_sched_ctx_min_priority_is_set(unsigned sched_ctx_id)
  1851. {
  1852. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1853. return sched_ctx->min_priority_is_set;
  1854. }
  1855. int starpu_sched_ctx_max_priority_is_set(unsigned sched_ctx_id)
  1856. {
  1857. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1858. return sched_ctx->max_priority_is_set;
  1859. }
  1860. static void set_priority_on_notified_workers(int *workers, int nworkers, unsigned sched_ctx_id, unsigned priority)
  1861. {
  1862. if(nworkers != -1)
  1863. {
  1864. int w;
  1865. struct _starpu_worker *worker = NULL;
  1866. for(w = 0; w < nworkers; w++)
  1867. {
  1868. worker = _starpu_get_worker_struct(workers[w]);
  1869. _starpu_sched_ctx_list_move(&worker->sched_ctx_list, sched_ctx_id, priority);
  1870. }
  1871. }
  1872. }
  1873. void starpu_sched_ctx_set_priority(int *workerids, int nworkers, unsigned sched_ctx_id, unsigned priority)
  1874. {
  1875. if(nworkers != -1)
  1876. {
  1877. notify_workers_about_changing_ctx_pending(nworkers, workerids);
  1878. _starpu_sched_ctx_lock_write(sched_ctx_id);
  1879. int w;
  1880. for(w = 0; w < nworkers; w++)
  1881. {
  1882. struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[w]);
  1883. _starpu_sched_ctx_list_move(&worker->sched_ctx_list, sched_ctx_id, priority);
  1884. }
  1885. notify_workers_about_changing_ctx_done(nworkers, workerids);
  1886. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  1887. }
  1888. }
  1889. unsigned starpu_sched_ctx_get_priority(int workerid, unsigned sched_ctx_id)
  1890. {
  1891. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1892. return _starpu_sched_ctx_elt_get_priority(worker->sched_ctx_list, sched_ctx_id);
  1893. }
  1894. unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker)
  1895. {
  1896. /* The worker being checked must have its status set to sleeping during
  1897. * the check, to allow for an other worker being checked concurrently
  1898. * to make the safe, pessimistic assumption that it is the last worker
  1899. * awake. In the worst case, both workers will follow this pessimistic
  1900. * path and perform one more scheduling loop */
  1901. STARPU_HG_DISABLE_CHECKING(_starpu_config.workers[worker->workerid].status);
  1902. STARPU_ASSERT(_starpu_config.workers[worker->workerid].status == STATUS_SLEEPING);
  1903. STARPU_HG_ENABLE_CHECKING(_starpu_config.workers[worker->workerid].status);
  1904. struct _starpu_sched_ctx_list_iterator list_it;
  1905. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  1906. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  1907. {
  1908. struct _starpu_sched_ctx_elt *e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  1909. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  1910. unsigned last_worker_awake = 1;
  1911. struct starpu_worker_collection *workers = sched_ctx->workers;
  1912. /* workers can be NULL in some corner cases, since we do not lock sched_ctx here */
  1913. if (workers != NULL)
  1914. {
  1915. struct starpu_sched_ctx_iterator it;
  1916. workers->init_iterator(workers, &it);
  1917. while(workers->has_next(workers, &it))
  1918. {
  1919. int workerid = workers->get_next(workers, &it);
  1920. if(workerid != worker->workerid)
  1921. {
  1922. if(starpu_worker_is_combined_worker(workerid))
  1923. {
  1924. continue;
  1925. }
  1926. /* The worker status is intendedly checked
  1927. * without taking locks. If multiple workers
  1928. * are concurrently assessing whether they are
  1929. * the last worker awake, they will follow the
  1930. * pessimistic path and assume that they are
  1931. * the last worker awake */
  1932. STARPU_HG_DISABLE_CHECKING(_starpu_config.workers[workerid].status);
  1933. const int cond = _starpu_config.workers[workerid].status != STATUS_SLEEPING;
  1934. STARPU_HG_ENABLE_CHECKING(_starpu_config.workers[workerid].status);
  1935. if (cond)
  1936. {
  1937. last_worker_awake = 0;
  1938. break;
  1939. }
  1940. }
  1941. }
  1942. }
  1943. if(last_worker_awake)
  1944. return 1;
  1945. }
  1946. return 0;
  1947. }
  1948. void starpu_sched_ctx_bind_current_thread_to_cpuid(unsigned cpuid)
  1949. {
  1950. _starpu_bind_thread_on_cpu(cpuid, STARPU_NOWORKERID, NULL);
  1951. }
  1952. unsigned starpu_sched_ctx_worker_is_master_for_child_ctx(int workerid, unsigned sched_ctx_id)
  1953. {
  1954. if (_starpu_get_nsched_ctxs() <= 1)
  1955. return STARPU_NMAX_SCHED_CTXS;
  1956. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1957. struct _starpu_sched_ctx_list_iterator list_it;
  1958. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  1959. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  1960. {
  1961. struct _starpu_sched_ctx_elt *e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  1962. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  1963. if(sched_ctx-> main_master == workerid && sched_ctx->nesting_sched_ctx == sched_ctx_id)
  1964. return sched_ctx->id;
  1965. }
  1966. return STARPU_NMAX_SCHED_CTXS;
  1967. }
  1968. unsigned starpu_sched_ctx_master_get_context(int masterid)
  1969. {
  1970. struct _starpu_worker *worker = _starpu_get_worker_struct(masterid);
  1971. struct _starpu_sched_ctx_list_iterator list_it;
  1972. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  1973. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  1974. {
  1975. struct _starpu_sched_ctx_elt *e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  1976. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  1977. if(sched_ctx->main_master == masterid)
  1978. return sched_ctx->id;
  1979. }
  1980. return STARPU_NMAX_SCHED_CTXS;
  1981. }
  1982. struct _starpu_sched_ctx *__starpu_sched_ctx_get_sched_ctx_for_worker_and_job(struct _starpu_worker *worker, struct _starpu_job *j)
  1983. {
  1984. struct _starpu_sched_ctx_list_iterator list_it;
  1985. struct _starpu_sched_ctx *ret = NULL;
  1986. starpu_worker_lock(worker->workerid);
  1987. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  1988. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  1989. {
  1990. struct _starpu_sched_ctx_elt *e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  1991. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  1992. if (j->task->sched_ctx == sched_ctx->id)
  1993. {
  1994. ret = sched_ctx;
  1995. break;
  1996. }
  1997. }
  1998. starpu_worker_unlock(worker->workerid);
  1999. return ret;
  2000. }
  2001. void starpu_sched_ctx_revert_task_counters_ctx_locked(unsigned sched_ctx_id, double ready_flops)
  2002. {
  2003. _starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx_id);
  2004. _starpu_decrement_nready_tasks_of_sched_ctx_locked(sched_ctx_id, ready_flops);
  2005. }
  2006. void starpu_sched_ctx_revert_task_counters(unsigned sched_ctx_id, double ready_flops)
  2007. {
  2008. _starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx_id);
  2009. _starpu_decrement_nready_tasks_of_sched_ctx(sched_ctx_id, ready_flops);
  2010. }
  2011. void starpu_sched_ctx_move_task_to_ctx_locked(struct starpu_task *task, unsigned sched_ctx, unsigned with_repush)
  2012. {
  2013. /* Restore state just like out of dependency layers */
  2014. STARPU_ASSERT(task->status == STARPU_TASK_READY);
  2015. task->status = STARPU_TASK_BLOCKED;
  2016. /* TODO: make something cleaner which differentiates between calls
  2017. from push or pop (have mutex or not) and from another worker or not */
  2018. task->sched_ctx = sched_ctx;
  2019. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  2020. _starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
  2021. if(with_repush)
  2022. _starpu_repush_task(j);
  2023. else
  2024. _starpu_increment_nready_tasks_of_sched_ctx(j->task->sched_ctx, j->task->flops, j->task);
  2025. }
  2026. #if 0
  2027. void starpu_sched_ctx_move_task_to_ctx(struct starpu_task *task, unsigned sched_ctx, unsigned manage_mutex,
  2028. unsigned with_repush)
  2029. {
  2030. /* TODO: make something cleaner which differentiates between calls
  2031. from push or pop (have mutex or not) and from another worker or not */
  2032. int workerid = starpu_worker_get_id();
  2033. struct _starpu_worker *worker = NULL;
  2034. if(workerid != -1 && manage_mutex)
  2035. {
  2036. worker = _starpu_get_worker_struct(workerid);
  2037. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  2038. }
  2039. task->sched_ctx = sched_ctx;
  2040. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  2041. _starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
  2042. if(with_repush)
  2043. _starpu_repush_task(j);
  2044. else
  2045. _starpu_increment_nready_tasks_of_sched_ctx(j->task->sched_ctx, j->task->flops, j->task);
  2046. if(workerid != -1 && manage_mutex)
  2047. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  2048. }
  2049. #endif
  2050. void starpu_sched_ctx_list_task_counters_increment(unsigned sched_ctx_id, int workerid)
  2051. {
  2052. /* Note : often we don't have any sched_mutex taken here but we
  2053. should, so take it */
  2054. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  2055. /* FIXME: why do we push events only when the worker belongs to more than one ctx? */
  2056. if (worker->nsched_ctxs > 1)
  2057. {
  2058. starpu_worker_lock(workerid);
  2059. _starpu_sched_ctx_list_push_event(worker->sched_ctx_list, sched_ctx_id);
  2060. starpu_worker_unlock(workerid);
  2061. }
  2062. }
  2063. void starpu_sched_ctx_list_task_counters_decrement(unsigned sched_ctx_id, int workerid)
  2064. {
  2065. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  2066. if (worker->nsched_ctxs > 1)
  2067. _starpu_sched_ctx_list_pop_event(worker->sched_ctx_list, sched_ctx_id);
  2068. }
  2069. void starpu_sched_ctx_list_task_counters_reset(unsigned sched_ctx_id, int workerid)
  2070. {
  2071. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  2072. if (worker->nsched_ctxs > 1)
  2073. _starpu_sched_ctx_list_pop_all_event(worker->sched_ctx_list, sched_ctx_id);
  2074. }
  2075. void starpu_sched_ctx_list_task_counters_increment_all_ctx_locked(struct starpu_task *task, unsigned sched_ctx_id)
  2076. {
  2077. /* TODO: add proper, but light-enough locking to sched_ctx counters */
  2078. /* Note that with 1 ctx we will default to the global context,
  2079. hence our counters are useless */
  2080. if (_starpu_get_nsched_ctxs() > 1)
  2081. {
  2082. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  2083. struct starpu_sched_ctx_iterator it;
  2084. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  2085. while(workers->has_next(workers, &it))
  2086. {
  2087. int worker = workers->get_next(workers, &it);
  2088. starpu_sched_ctx_list_task_counters_increment(sched_ctx_id, worker);
  2089. }
  2090. }
  2091. }
  2092. void starpu_sched_ctx_list_task_counters_increment_all(struct starpu_task *task, unsigned sched_ctx_id)
  2093. {
  2094. /* TODO: add proper, but light-enough locking to sched_ctx counters */
  2095. /* Note that with 1 ctx we will default to the global context,
  2096. hence our counters are useless */
  2097. if (_starpu_get_nsched_ctxs() > 1)
  2098. {
  2099. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  2100. struct starpu_sched_ctx_iterator it;
  2101. _starpu_sched_ctx_lock_write(sched_ctx_id);
  2102. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  2103. while(workers->has_next(workers, &it))
  2104. {
  2105. int worker = workers->get_next(workers, &it);
  2106. starpu_sched_ctx_list_task_counters_increment(sched_ctx_id, worker);
  2107. }
  2108. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  2109. }
  2110. }
  2111. void starpu_sched_ctx_list_task_counters_decrement_all_ctx_locked(struct starpu_task *task, unsigned sched_ctx_id)
  2112. {
  2113. if (_starpu_get_nsched_ctxs() > 1)
  2114. {
  2115. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  2116. struct starpu_sched_ctx_iterator it;
  2117. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  2118. while(workers->has_next(workers, &it))
  2119. {
  2120. int workerid = workers->get_next(workers, &it);
  2121. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  2122. if (worker->nsched_ctxs > 1)
  2123. {
  2124. starpu_worker_lock(workerid);
  2125. starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
  2126. starpu_worker_unlock(workerid);
  2127. }
  2128. }
  2129. }
  2130. }
  2131. void starpu_sched_ctx_list_task_counters_decrement_all(struct starpu_task *task, unsigned sched_ctx_id)
  2132. {
  2133. if (_starpu_get_nsched_ctxs() > 1)
  2134. {
  2135. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  2136. struct starpu_sched_ctx_iterator it;
  2137. _starpu_sched_ctx_lock_write(sched_ctx_id);
  2138. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  2139. while(workers->has_next(workers, &it))
  2140. {
  2141. int workerid = workers->get_next(workers, &it);
  2142. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  2143. if (worker->nsched_ctxs > 1)
  2144. {
  2145. starpu_worker_lock(workerid);
  2146. starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
  2147. starpu_worker_unlock(workerid);
  2148. }
  2149. }
  2150. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  2151. }
  2152. }
  2153. void starpu_sched_ctx_list_task_counters_reset_all(struct starpu_task *task, unsigned sched_ctx_id)
  2154. {
  2155. if (_starpu_get_nsched_ctxs() > 1)
  2156. {
  2157. _starpu_sched_ctx_lock_write(sched_ctx_id);
  2158. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  2159. struct starpu_sched_ctx_iterator it;
  2160. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  2161. while(workers->has_next(workers, &it))
  2162. {
  2163. int workerid = workers->get_next(workers, &it);
  2164. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  2165. if (worker->nsched_ctxs > 1)
  2166. {
  2167. starpu_worker_lock(workerid);
  2168. starpu_sched_ctx_list_task_counters_reset(sched_ctx_id, workerid);
  2169. starpu_worker_unlock(workerid);
  2170. }
  2171. }
  2172. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  2173. }
  2174. }
  2175. static void _starpu_sched_ctx_block_workers_in_parallel(unsigned sched_ctx_id, unsigned all)
  2176. {
  2177. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2178. int current_worker_id = starpu_worker_get_id();
  2179. int master, temp_master = 0;
  2180. struct starpu_worker_collection *workers = sched_ctx->workers;
  2181. struct starpu_sched_ctx_iterator it;
  2182. /* temporarily put a master if needed */
  2183. if (sched_ctx->main_master == -1)
  2184. {
  2185. _starpu_sched_ctx_put_new_master(sched_ctx_id);
  2186. temp_master = 1;
  2187. }
  2188. master = sched_ctx->main_master;
  2189. workers->init_iterator(workers, &it);
  2190. while(workers->has_next(workers, &it))
  2191. {
  2192. int workerid = workers->get_next(workers, &it);
  2193. if(starpu_worker_get_type(workerid) == STARPU_CPU_WORKER
  2194. && (workerid != master || all)
  2195. && (current_worker_id == -1 || workerid != current_worker_id))
  2196. {
  2197. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  2198. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  2199. _starpu_worker_request_blocking_in_parallel(worker);
  2200. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  2201. }
  2202. }
  2203. if (temp_master)
  2204. sched_ctx->main_master = -1;
  2205. }
  2206. static void _starpu_sched_ctx_unblock_workers_in_parallel(unsigned sched_ctx_id, unsigned all)
  2207. {
  2208. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2209. int current_worker_id = starpu_worker_get_id();
  2210. int master, temp_master = 0;
  2211. struct starpu_worker_collection *workers = sched_ctx->workers;
  2212. struct starpu_sched_ctx_iterator it;
  2213. /* temporarily put a master if needed */
  2214. if (sched_ctx->main_master == -1)
  2215. {
  2216. _starpu_sched_ctx_put_new_master(sched_ctx_id);
  2217. temp_master = 1;
  2218. }
  2219. master = sched_ctx->main_master;
  2220. workers->init_iterator(workers, &it);
  2221. while(workers->has_next(workers, &it))
  2222. {
  2223. int workerid = workers->get_next(workers, &it);
  2224. if(starpu_worker_get_type(workerid) == STARPU_CPU_WORKER
  2225. && (workerid != master || all))
  2226. {
  2227. if (current_worker_id == -1 || workerid != current_worker_id)
  2228. {
  2229. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  2230. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  2231. _starpu_worker_request_unblocking_in_parallel(worker);
  2232. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  2233. }
  2234. }
  2235. }
  2236. if (temp_master)
  2237. sched_ctx->main_master = -1;
  2238. return;
  2239. }
  2240. void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void*), void* param, unsigned sched_ctx_id)
  2241. {
  2242. _starpu_sched_ctx_block_workers_in_parallel(sched_ctx_id, 1);
  2243. /* execute parallel code */
  2244. void* ret = func(param);
  2245. /* wake up starpu workers */
  2246. _starpu_sched_ctx_unblock_workers_in_parallel(sched_ctx_id, 1);
  2247. return ret;
  2248. }
  2249. static void _starpu_sched_ctx_update_parallel_workers_with(unsigned sched_ctx_id)
  2250. {
  2251. struct _starpu_sched_ctx * sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2252. if(sched_ctx->sched_policy)
  2253. return;
  2254. _starpu_sched_ctx_put_new_master(sched_ctx_id);
  2255. if(!sched_ctx->awake_workers)
  2256. {
  2257. _starpu_sched_ctx_block_workers_in_parallel(sched_ctx_id, 0);
  2258. }
  2259. }
  2260. static void _starpu_sched_ctx_update_parallel_workers_without(unsigned sched_ctx_id)
  2261. {
  2262. struct _starpu_sched_ctx * sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2263. if(sched_ctx->sched_policy)
  2264. return;
  2265. _starpu_sched_ctx_put_new_master(sched_ctx_id);
  2266. if(!sched_ctx->awake_workers)
  2267. {
  2268. _starpu_sched_ctx_unblock_workers_in_parallel(sched_ctx_id, 0);
  2269. }
  2270. }
  2271. void starpu_sched_ctx_get_available_cpuids(unsigned sched_ctx_id, int **cpuids, int *ncpuids)
  2272. {
  2273. int current_worker_id = starpu_worker_get_id();
  2274. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2275. struct starpu_worker_collection *workers = sched_ctx->workers;
  2276. _STARPU_MALLOC((*cpuids), workers->nworkers*sizeof(int));
  2277. int w = 0;
  2278. struct starpu_sched_ctx_iterator it;
  2279. workers->init_iterator(workers, &it);
  2280. while(workers->has_next(workers, &it))
  2281. {
  2282. int workerid = workers->get_next(workers, &it);
  2283. int master = sched_ctx->main_master;
  2284. if(master == current_worker_id || workerid == current_worker_id || current_worker_id == -1)
  2285. {
  2286. (*cpuids)[w++] = starpu_worker_get_bindid(workerid);
  2287. }
  2288. }
  2289. *ncpuids = w;
  2290. return;
  2291. }
  2292. static void _starpu_sched_ctx_put_new_master(unsigned sched_ctx_id)
  2293. {
  2294. int *workerids;
  2295. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2296. unsigned nworkers = starpu_sched_ctx_get_workers_list_raw(sched_ctx_id, &workerids);
  2297. unsigned i;
  2298. for (i=0; i<nworkers; i++)
  2299. {
  2300. if (starpu_worker_get_type(workerids[i]) == STARPU_CPU_WORKER)
  2301. {
  2302. sched_ctx->main_master = workerids[i];
  2303. break;
  2304. }
  2305. }
  2306. STARPU_ASSERT_MSG(i<nworkers, "StarPU did not find a a CPU worker to be set as the master");
  2307. }
  2308. struct starpu_perfmodel_arch * _starpu_sched_ctx_get_perf_archtype(unsigned sched_ctx_id)
  2309. {
  2310. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2311. return &sched_ctx->perf_arch;
  2312. }
  2313. int starpu_sched_ctx_get_worker_rank(unsigned sched_ctx_id)
  2314. {
  2315. int idx = 0;
  2316. int curr_workerid = starpu_worker_get_id();
  2317. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2318. if(sched_ctx->sched_policy || !sched_ctx->awake_workers)
  2319. return -1;
  2320. struct starpu_worker_collection *workers = sched_ctx->workers;
  2321. struct starpu_sched_ctx_iterator it;
  2322. workers->init_iterator(workers, &it);
  2323. while(workers->has_next(workers, &it))
  2324. {
  2325. int worker = workers->get_next(workers, &it);
  2326. if(worker == curr_workerid)
  2327. return idx;
  2328. idx++;
  2329. }
  2330. return -1;
  2331. }
  2332. void (*starpu_sched_ctx_get_sched_policy_init(unsigned sched_ctx_id))(unsigned)
  2333. {
  2334. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2335. return sched_ctx->init_sched;
  2336. }
  2337. unsigned starpu_sched_ctx_has_starpu_scheduler(unsigned sched_ctx_id, unsigned *awake_workers)
  2338. {
  2339. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2340. *awake_workers = sched_ctx->awake_workers;
  2341. return sched_ctx->sched_policy != NULL;
  2342. }
  2343. void *starpu_sched_ctx_get_user_data(unsigned sched_ctx_id)
  2344. {
  2345. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2346. STARPU_ASSERT(sched_ctx != NULL);
  2347. return sched_ctx->user_data;
  2348. }
  2349. void starpu_sched_ctx_set_user_data(unsigned sched_ctx_id, void* user_data)
  2350. {
  2351. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2352. STARPU_ASSERT(sched_ctx != NULL);
  2353. sched_ctx->user_data = user_data;
  2354. }
  2355. void _starpu_worker_apply_deferred_ctx_changes(void)
  2356. {
  2357. int workerid = starpu_worker_get_id_check();
  2358. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  2359. struct _starpu_ctx_change_list *l = &worker->ctx_change_list;
  2360. STARPU_ASSERT(!_starpu_worker_sched_op_pending());
  2361. while (!_starpu_ctx_change_list_empty(l))
  2362. {
  2363. struct _starpu_ctx_change *chg = _starpu_ctx_change_list_pop_front(l);
  2364. STARPU_ASSERT(chg->workerids_to_change != NULL);
  2365. if (chg->nworkers_to_notify)
  2366. {
  2367. STARPU_ASSERT(chg->workerids_to_notify != NULL);
  2368. notify_workers_about_changing_ctx_pending(chg->nworkers_to_notify, chg->workerids_to_notify);
  2369. }
  2370. else
  2371. {
  2372. STARPU_ASSERT(chg->workerids_to_notify == NULL);
  2373. notify_workers_about_changing_ctx_pending(chg->nworkers_to_change, chg->workerids_to_change);
  2374. }
  2375. _starpu_sched_ctx_lock_write(chg->sched_ctx_id);
  2376. switch (chg->op)
  2377. {
  2378. case ctx_change_add:
  2379. {
  2380. add_notified_workers(chg->workerids_to_change, chg->nworkers_to_change, chg->sched_ctx_id);
  2381. }
  2382. break;
  2383. case ctx_change_remove:
  2384. {
  2385. remove_notified_workers(chg->workerids_to_change, chg->nworkers_to_change, chg->sched_ctx_id);
  2386. {
  2387. int i;
  2388. for (i = 0; i < chg->nworkers_to_change; i++)
  2389. {
  2390. struct _starpu_worker *w =
  2391. _starpu_get_worker_struct(chg->workerids_to_change[i]);
  2392. if(w->removed_from_ctx[chg->sched_ctx_id] == 1
  2393. && w->shares_tasks_lists[chg->sched_ctx_id] == 1)
  2394. {
  2395. _starpu_worker_gets_out_of_ctx(chg->sched_ctx_id, w);
  2396. w->removed_from_ctx[chg->sched_ctx_id] = 0;
  2397. }
  2398. }
  2399. }
  2400. }
  2401. break;
  2402. default:
  2403. STARPU_ASSERT_MSG(0, "invalid ctx change opcode\n");
  2404. }
  2405. if (chg->nworkers_to_notify)
  2406. {
  2407. notify_workers_about_changing_ctx_done(chg->nworkers_to_notify, chg->workerids_to_notify);
  2408. }
  2409. else
  2410. {
  2411. notify_workers_about_changing_ctx_done(chg->nworkers_to_change, chg->workerids_to_change);
  2412. }
  2413. _starpu_sched_ctx_unlock_write(chg->sched_ctx_id);
  2414. free(chg->workerids_to_notify);
  2415. free(chg->workerids_to_change);
  2416. _starpu_ctx_change_delete(chg);
  2417. }
  2418. }
  2419. /* TODO: verify starpu_sched_ctx_create_inside_interval correctness before re-enabling the functions below
  2420. */
  2421. #if 0
  2422. static void _get_workers(int min, int max, int *workers, int *nw, enum starpu_worker_archtype arch, unsigned allow_overlap)
  2423. {
  2424. int pus[max];
  2425. int npus = 0;
  2426. int i;
  2427. struct _starpu_machine_config *config = _starpu_get_machine_config();
  2428. if(config->topology.nsched_ctxs == 1)
  2429. {
  2430. /*we have all available resources */
  2431. npus = starpu_worker_get_nids_by_type(arch, pus, max);
  2432. /*TODO: hierarchical ctxs: get max good workers: close one to another */
  2433. for(i = 0; i < npus; i++)
  2434. workers[(*nw)++] = pus[i];
  2435. }
  2436. else
  2437. {
  2438. unsigned enough_ressources = 0;
  2439. npus = starpu_worker_get_nids_ctx_free_by_type(arch, pus, max);
  2440. for(i = 0; i < npus; i++)
  2441. workers[(*nw)++] = pus[i];
  2442. if(npus == max)
  2443. /*we have enough available resources */
  2444. enough_ressources = 1;
  2445. if(!enough_ressources && npus >= min)
  2446. /*we have enough available resources */
  2447. enough_ressources = 1;
  2448. if(!enough_ressources)
  2449. {
  2450. /* try to get ressources from ctx who have more than the min of workers they need */
  2451. int s;
  2452. for(s = 1; s < STARPU_NMAX_SCHED_CTXS; s++)
  2453. {
  2454. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  2455. {
  2456. int _npus = 0;
  2457. int _pus[STARPU_NMAXWORKERS];
  2458. _npus = _starpu_get_workers_of_sched_ctx(config->sched_ctxs[s].id, _pus, arch);
  2459. int ctx_min = arch == STARPU_CPU_WORKER ? config->sched_ctxs[s].min_ncpus : config->sched_ctxs[s].min_ngpus;
  2460. if(_npus > ctx_min)
  2461. {
  2462. int n=0;
  2463. if(npus < min)
  2464. {
  2465. n = (_npus - ctx_min) > (min - npus) ? min - npus : (_npus - ctx_min);
  2466. npus += n;
  2467. }
  2468. /*TODO: hierarchical ctxs: get n good workers: close to the other ones I already assigned to the ctx */
  2469. for(i = 0; i < n; i++)
  2470. workers[(*nw)++] = _pus[i];
  2471. starpu_sched_ctx_remove_workers(_pus, n, config->sched_ctxs[s].id);
  2472. }
  2473. }
  2474. }
  2475. if(npus >= min)
  2476. enough_ressources = 1;
  2477. }
  2478. if(!enough_ressources)
  2479. {
  2480. /* if there is no available workers to satisfy the minimum required
  2481. give them workers proportional to their requirements*/
  2482. int global_npus = starpu_worker_get_count_by_type(arch);
  2483. int req_npus = 0;
  2484. int s;
  2485. for(s = 1; s < STARPU_NMAX_SCHED_CTXS; s++)
  2486. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  2487. req_npus += arch == STARPU_CPU_WORKER ? config->sched_ctxs[s].min_ncpus : config->sched_ctxs[s].min_ngpus;
  2488. req_npus += min;
  2489. for(s = 1; s < STARPU_NMAX_SCHED_CTXS; s++)
  2490. {
  2491. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  2492. {
  2493. int ctx_min = arch == STARPU_CPU_WORKER ? config->sched_ctxs[s].min_ncpus : config->sched_ctxs[s].min_ngpus;
  2494. double needed_npus = ((double)ctx_min * (double)global_npus) / (double)req_npus;
  2495. int _npus = 0;
  2496. int _pus[STARPU_NMAXWORKERS];
  2497. _npus = _starpu_get_workers_of_sched_ctx(config->sched_ctxs[s].id, _pus, arch);
  2498. if(needed_npus < (double)_npus)
  2499. {
  2500. double npus_to_rem = (double)_npus - needed_npus;
  2501. int x = floor(npus_to_rem);
  2502. double x_double = (double)x;
  2503. double diff = npus_to_rem - x_double;
  2504. int npus_to_remove = diff >= 0.5 ? x+1 : x;
  2505. int pus_to_remove[npus_to_remove];
  2506. int c = 0;
  2507. /*TODO: hierarchical ctxs: get npus_to_remove good workers: close to the other ones I already assigned to the ctx */
  2508. for(i = _npus-1; i >= (_npus - npus_to_remove); i--)
  2509. {
  2510. workers[(*nw)++] = _pus[i];
  2511. pus_to_remove[c++] = _pus[i];
  2512. }
  2513. if(!allow_overlap)
  2514. starpu_sched_ctx_remove_workers(pus_to_remove, npus_to_remove, config->sched_ctxs[s].id);
  2515. }
  2516. }
  2517. }
  2518. }
  2519. }
  2520. }
  2521. unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const char *sched_ctx_name,
  2522. int min_ncpus, int max_ncpus, int min_ngpus, int max_ngpus,
  2523. unsigned allow_overlap)
  2524. {
  2525. struct _starpu_machine_config *config = _starpu_get_machine_config();
  2526. struct starpu_sched_policy *selected_policy = _starpu_select_sched_policy(config, policy_name);
  2527. struct _starpu_sched_ctx *sched_ctx = NULL;
  2528. int workers[max_ncpus + max_ngpus];
  2529. int nw = 0;
  2530. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
  2531. _get_workers(min_ncpus, max_ncpus, workers, &nw, STARPU_CPU_WORKER, allow_overlap);
  2532. _get_workers(min_ngpus, max_ngpus, workers, &nw, STARPU_CUDA_WORKER, allow_overlap);
  2533. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
  2534. int i;
  2535. _STARPU_DEBUG("%d: ", nw);
  2536. for(i = 0; i < nw; i++)
  2537. _STARPU_DEBUG_NO_HEADER("%d ", workers[i]);
  2538. _STARPU_DEBUG_NO_HEADER("\n");
  2539. sched_ctx = _starpu_create_sched_ctx(selected_policy, workers, nw, 0, sched_ctx_name, 0, 0, 0, 0, 1, NULL, NULL,0, NULL, 0);
  2540. sched_ctx->min_ncpus = min_ncpus;
  2541. sched_ctx->max_ncpus = max_ncpus;
  2542. sched_ctx->min_ngpus = min_ngpus;
  2543. sched_ctx->max_ngpus = max_ngpus;
  2544. int *added_workerids;
  2545. unsigned nw_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &added_workerids);
  2546. #warning TODO: verify call below, shouldn t it be _starpu_update_workers_with_ctx?
  2547. _starpu_update_workers_without_ctx(added_workerids, nw_ctx, sched_ctx->id, 0);
  2548. free(added_workerids);
  2549. #ifdef STARPU_USE_SC_HYPERVISOR
  2550. sched_ctx->perf_counters = NULL;
  2551. #endif
  2552. return sched_ctx->id;
  2553. }
  2554. #endif