sched_ctx.c 84 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011, 2013 INRIA
  4. * Copyright (C) 2016 Uppsala University
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <core/sched_policy.h>
  18. #include <core/sched_ctx.h>
  19. #include <common/utils.h>
  20. #include <stdarg.h>
  21. starpu_pthread_rwlock_t changing_ctx_mutex[STARPU_NMAX_SCHED_CTXS];
  22. static starpu_pthread_mutex_t sched_ctx_manag = STARPU_PTHREAD_MUTEX_INITIALIZER;
  23. static starpu_pthread_mutex_t finished_submit_mutex = STARPU_PTHREAD_MUTEX_INITIALIZER;
  24. static struct starpu_task stop_submission_task = STARPU_TASK_INITIALIZER;
  25. starpu_pthread_key_t sched_ctx_key;
  26. static unsigned with_hypervisor = 0;
  27. static double hyp_start_sample[STARPU_NMAX_SCHED_CTXS];
  28. static double hyp_start_allow_sample[STARPU_NMAX_SCHED_CTXS];
  29. static double flops[STARPU_NMAX_SCHED_CTXS][STARPU_NMAXWORKERS];
  30. static size_t data_size[STARPU_NMAX_SCHED_CTXS][STARPU_NMAXWORKERS];
  31. static double hyp_actual_start_sample[STARPU_NMAX_SCHED_CTXS];
  32. static double window_size;
  33. static int nobind;
  34. static int occupied_sms = 0;
  35. static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config);
  36. static void _starpu_sched_ctx_add_workers_to_master(unsigned sched_ctx_id, int *workerids, int nworkers, int new_master);
  37. static void _starpu_sched_ctx_wake_these_workers_up(unsigned sched_ctx_id, int *workerids, int nworkers);
  38. static int _starpu_sched_ctx_find_master(unsigned sched_ctx_id, int *workerids, int nworkers);
  39. static void _starpu_sched_ctx_set_master(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers, int master);
  40. static void _starpu_worker_gets_into_ctx(unsigned sched_ctx_id, struct _starpu_worker *worker)
  41. {
  42. unsigned ret_sched_ctx = _starpu_sched_ctx_elt_exists(worker->sched_ctx_list, sched_ctx_id);
  43. /* the worker was planning to go away in another ctx but finally he changed his mind &
  44. he's staying */
  45. if (!ret_sched_ctx)
  46. {
  47. /* add context to worker */
  48. _starpu_sched_ctx_list_add(&worker->sched_ctx_list, sched_ctx_id);
  49. worker->nsched_ctxs++;
  50. }
  51. worker->removed_from_ctx[sched_ctx_id] = 0;
  52. if(worker->tmp_sched_ctx == (int) sched_ctx_id)
  53. worker->tmp_sched_ctx = -1;
  54. return;
  55. }
  56. void _starpu_worker_gets_out_of_ctx(unsigned sched_ctx_id, struct _starpu_worker *worker)
  57. {
  58. unsigned ret_sched_ctx = _starpu_sched_ctx_elt_exists(worker->sched_ctx_list, sched_ctx_id);
  59. /* remove context from worker */
  60. if(ret_sched_ctx)
  61. {
  62. /* don't remove scheduling data here, there might be tasks running and when post_exec
  63. executes scheduling data is not there any more, do it when deleting context, then
  64. we really won't need it anymore */
  65. /* struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id); */
  66. /* if(sched_ctx && sched_ctx->sched_policy && sched_ctx->sched_policy->remove_workers) */
  67. /* { */
  68. /* _STARPU_TRACE_WORKER_SCHEDULING_PUSH; */
  69. /* sched_ctx->sched_policy->remove_workers(sched_ctx_id, &worker->workerid, 1); */
  70. /* _STARPU_TRACE_WORKER_SCHEDULING_POP; */
  71. /* } */
  72. if (!_starpu_sched_ctx_list_remove(&worker->sched_ctx_list, sched_ctx_id))
  73. worker->nsched_ctxs--;
  74. }
  75. return;
  76. }
  77. static void _starpu_update_workers_with_ctx(int *workerids, int nworkers, int sched_ctx_id)
  78. {
  79. int i;
  80. struct _starpu_worker *worker = NULL;
  81. for(i = 0; i < nworkers; i++)
  82. {
  83. worker = _starpu_get_worker_struct(workerids[i]);
  84. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  85. _starpu_worker_gets_into_ctx(sched_ctx_id, worker);
  86. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  87. }
  88. return;
  89. }
  90. static void _starpu_update_workers_without_ctx(int *workerids, int nworkers, int sched_ctx_id, unsigned now)
  91. {
  92. int i;
  93. struct _starpu_worker *worker = NULL;
  94. for(i = 0; i < nworkers; i++)
  95. {
  96. worker = _starpu_get_worker_struct(workerids[i]);
  97. if(now)
  98. {
  99. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  100. _starpu_worker_gets_out_of_ctx(sched_ctx_id, worker);
  101. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  102. }
  103. else
  104. {
  105. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  106. worker->removed_from_ctx[sched_ctx_id] = 1;
  107. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  108. }
  109. }
  110. return;
  111. }
  112. void starpu_sched_ctx_stop_task_submission()
  113. {
  114. _starpu_exclude_task_from_dag(&stop_submission_task);
  115. int ret = _starpu_task_submit_internally(&stop_submission_task);
  116. STARPU_ASSERT(!ret);
  117. }
  118. void starpu_sched_ctx_worker_shares_tasks_lists(int workerid, int sched_ctx_id)
  119. {
  120. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  121. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  122. int curr_workerid = starpu_worker_get_id();
  123. /* if is the initial sched_ctx no point in taking the mutex, the workers are
  124. not launched yet, or if the current worker is calling this */
  125. if(!sched_ctx->is_initial_sched && workerid != curr_workerid)
  126. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  127. worker->shares_tasks_lists[sched_ctx_id] = 1;
  128. if(!sched_ctx->is_initial_sched && workerid != curr_workerid)
  129. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  130. }
  131. static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers,
  132. int *added_workers, int *n_added_workers)
  133. {
  134. struct starpu_worker_collection *workers = sched_ctx->workers;
  135. struct _starpu_machine_config *config = _starpu_get_machine_config();
  136. int nworkers_to_add = nworkers == -1 ? (int)config->topology.nworkers : nworkers;
  137. if (!nworkers_to_add)
  138. return;
  139. int workers_to_add[nworkers_to_add];
  140. int cpu_workers[nworkers_to_add];
  141. int ncpu_workers = 0;
  142. struct starpu_perfmodel_device devices[nworkers_to_add];
  143. int ndevices = 0;
  144. struct _starpu_worker *str_worker = NULL;
  145. int worker;
  146. int i = 0;
  147. for(i = 0; i < nworkers_to_add; i++)
  148. {
  149. /* added_workers is NULL for the call of this func at the creation of the context*/
  150. /* if the function is called at the creation of the context it's no need to do this verif */
  151. if(added_workers)
  152. {
  153. worker = workers->add(workers, (workerids == NULL ? i : workerids[i]));
  154. if(worker >= 0)
  155. added_workers[(*n_added_workers)++] = worker;
  156. else
  157. {
  158. int curr_workerid = starpu_worker_get_id();
  159. struct _starpu_worker *worker_str = _starpu_get_worker_struct(workerids[i]);
  160. if(curr_workerid != workerids[i])
  161. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker_str->sched_mutex);
  162. worker_str->removed_from_ctx[sched_ctx->id] = 0;
  163. if(curr_workerid != workerids[i])
  164. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker_str->sched_mutex);
  165. }
  166. }
  167. else
  168. {
  169. worker = (workerids == NULL ? i : workerids[i]);
  170. workers->add(workers, worker);
  171. workers_to_add[i] = worker;
  172. str_worker = _starpu_get_worker_struct(worker);
  173. str_worker->tmp_sched_ctx = (int)sched_ctx->id;
  174. }
  175. }
  176. int *wa;
  177. int na;
  178. if(added_workers)
  179. {
  180. na = *n_added_workers;
  181. wa = added_workers;
  182. }
  183. else
  184. {
  185. na = nworkers_to_add;
  186. wa = workers_to_add;
  187. }
  188. for(i = 0; i < na; i++)
  189. {
  190. worker = wa[i];
  191. str_worker = _starpu_get_worker_struct(worker);
  192. int dev1, dev2;
  193. unsigned found = 0;
  194. for(dev1 = 0; dev1 < str_worker->perf_arch.ndevices; dev1++)
  195. {
  196. for(dev2 = 0; dev2 < ndevices; dev2++)
  197. {
  198. if(devices[dev2].type == str_worker->perf_arch.devices[dev1].type &&
  199. devices[dev2].devid == str_worker->perf_arch.devices[dev1].devid)
  200. {
  201. devices[dev2].ncores += str_worker->perf_arch.devices[dev1].ncores;
  202. found = 1;
  203. break;
  204. }
  205. }
  206. if(!found)
  207. {
  208. devices[ndevices].type = str_worker->perf_arch.devices[dev1].type;
  209. devices[ndevices].devid = str_worker->perf_arch.devices[dev1].devid;
  210. devices[ndevices].ncores = str_worker->perf_arch.devices[dev1].ncores;
  211. ndevices++;
  212. }
  213. else
  214. found = 0;
  215. }
  216. if (!sched_ctx->sched_policy)
  217. {
  218. struct _starpu_worker *worker_str = _starpu_get_worker_struct(wa[i]);
  219. if (worker_str->arch == STARPU_CPU_WORKER)
  220. cpu_workers[ncpu_workers++] = wa[i];
  221. }
  222. }
  223. if(ndevices > 0)
  224. {
  225. if(sched_ctx->perf_arch.devices == NULL)
  226. {
  227. _STARPU_MALLOC(sched_ctx->perf_arch.devices, ndevices*sizeof(struct starpu_perfmodel_device));
  228. }
  229. else
  230. {
  231. int nfinal_devices = 0;
  232. int dev1, dev2;
  233. unsigned found = 0;
  234. for(dev1 = 0; dev1 < ndevices; dev1++)
  235. {
  236. for(dev2 = 0; dev2 < sched_ctx->perf_arch.ndevices; dev2++)
  237. {
  238. if(sched_ctx->perf_arch.devices[dev2].type == devices[dev1].type && sched_ctx->perf_arch.devices[dev2].devid == devices[dev1].devid)
  239. found = 1;
  240. }
  241. if(!found)
  242. {
  243. nfinal_devices++;
  244. }
  245. else
  246. found = 0;
  247. }
  248. int nsize = (sched_ctx->perf_arch.ndevices+nfinal_devices);
  249. _STARPU_REALLOC(sched_ctx->perf_arch.devices, nsize*sizeof(struct starpu_perfmodel_device));
  250. }
  251. int dev1, dev2;
  252. unsigned found = 0;
  253. for(dev1 = 0; dev1 < ndevices; dev1++)
  254. {
  255. for(dev2 = 0; dev2 < sched_ctx->perf_arch.ndevices; dev2++)
  256. {
  257. if(sched_ctx->perf_arch.devices[dev2].type == devices[dev1].type && sched_ctx->perf_arch.devices[dev2].devid == devices[dev1].devid)
  258. {
  259. if(sched_ctx->perf_arch.devices[dev2].type == STARPU_CPU_WORKER)
  260. sched_ctx->perf_arch.devices[dev2].ncores += devices[dev1].ncores;
  261. found = 1;
  262. }
  263. }
  264. if(!found)
  265. {
  266. sched_ctx->perf_arch.devices[sched_ctx->perf_arch.ndevices].type = devices[dev1].type;
  267. sched_ctx->perf_arch.devices[sched_ctx->perf_arch.ndevices].devid = devices[dev1].devid;
  268. if (sched_ctx->stream_worker != -1)
  269. sched_ctx->perf_arch.devices[sched_ctx->perf_arch.ndevices].ncores = sched_ctx->nsms;
  270. else
  271. sched_ctx->perf_arch.devices[sched_ctx->perf_arch.ndevices].ncores = devices[dev1].ncores;
  272. sched_ctx->perf_arch.ndevices++;
  273. }
  274. else
  275. found = 0;
  276. }
  277. }
  278. if(!sched_ctx->sched_policy)
  279. {
  280. if(!sched_ctx->awake_workers)
  281. {
  282. if(sched_ctx->main_master == -1)
  283. sched_ctx->main_master = starpu_sched_ctx_book_workers_for_task(sched_ctx->id, cpu_workers, ncpu_workers);
  284. else
  285. {
  286. _starpu_sched_ctx_add_workers_to_master(sched_ctx->id, cpu_workers, ncpu_workers, sched_ctx->main_master);
  287. }
  288. }
  289. else
  290. {
  291. sched_ctx->main_master = _starpu_sched_ctx_find_master(sched_ctx->id, cpu_workers, ncpu_workers);
  292. _starpu_sched_ctx_set_master(sched_ctx, cpu_workers, ncpu_workers, sched_ctx->main_master);
  293. }
  294. }
  295. else if(sched_ctx->sched_policy->add_workers)
  296. {
  297. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  298. if(added_workers)
  299. {
  300. if(*n_added_workers > 0)
  301. sched_ctx->sched_policy->add_workers(sched_ctx->id, added_workers, *n_added_workers);
  302. }
  303. else
  304. {
  305. sched_ctx->sched_policy->add_workers(sched_ctx->id, workers_to_add, nworkers_to_add);
  306. }
  307. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  308. }
  309. return;
  310. }
  311. static void _starpu_remove_workers_from_sched_ctx(struct _starpu_sched_ctx *sched_ctx, int *workerids,
  312. int nworkers, int *removed_workers, int *n_removed_workers)
  313. {
  314. struct starpu_worker_collection *workers = sched_ctx->workers;
  315. struct starpu_perfmodel_device devices[workers->nworkers];
  316. int ndevices = 0;
  317. int i = 0;
  318. for(i = 0; i < nworkers; i++)
  319. {
  320. if(workers->nworkers > 0)
  321. {
  322. if(_starpu_worker_belongs_to_a_sched_ctx(workerids[i], sched_ctx->id))
  323. {
  324. int worker = workers->remove(workers, workerids[i]);
  325. if(worker >= 0)
  326. removed_workers[(*n_removed_workers)++] = worker;
  327. }
  328. }
  329. }
  330. unsigned found = 0;
  331. int dev;
  332. struct starpu_sched_ctx_iterator it;
  333. if(workers->init_iterator)
  334. workers->init_iterator(workers, &it);
  335. while(workers->has_next(workers, &it))
  336. {
  337. int worker = workers->get_next(workers, &it);
  338. struct _starpu_worker *str_worker = _starpu_get_worker_struct(worker);
  339. for(dev = 0; dev < str_worker->perf_arch.ndevices; dev++)
  340. {
  341. int dev2;
  342. for(dev2 = 0; dev2 < ndevices; dev2++)
  343. {
  344. if(devices[dev2].type == str_worker->perf_arch.devices[dev].type &&
  345. devices[dev2].devid == str_worker->perf_arch.devices[dev].devid)
  346. {
  347. if(devices[dev2].type == STARPU_CPU_WORKER)
  348. devices[dev2].ncores += str_worker->perf_arch.devices[dev].ncores;
  349. }
  350. found = 1;
  351. }
  352. if(!found)
  353. {
  354. devices[ndevices].type = str_worker->perf_arch.devices[dev].type;
  355. devices[ndevices].devid = str_worker->perf_arch.devices[dev].devid;
  356. devices[ndevices].ncores = str_worker->perf_arch.devices[dev].ncores;
  357. ndevices++;
  358. }
  359. else
  360. found = 0;
  361. }
  362. found = 0;
  363. }
  364. sched_ctx->perf_arch.ndevices = ndevices;
  365. for(dev = 0; dev < ndevices; dev++)
  366. {
  367. sched_ctx->perf_arch.devices[dev].type = devices[dev].type;
  368. sched_ctx->perf_arch.devices[dev].devid = devices[dev].devid;
  369. sched_ctx->perf_arch.devices[dev].ncores = devices[dev].ncores;
  370. }
  371. if(!sched_ctx->sched_policy)
  372. {
  373. if(!sched_ctx->awake_workers)
  374. {
  375. _starpu_sched_ctx_wake_these_workers_up(sched_ctx->id, removed_workers, *n_removed_workers);
  376. }
  377. }
  378. return;
  379. }
  380. static void _starpu_sched_ctx_free_scheduling_data(struct _starpu_sched_ctx *sched_ctx)
  381. {
  382. if(sched_ctx->sched_policy && sched_ctx->sched_policy->remove_workers)
  383. {
  384. int *workerids = NULL;
  385. unsigned nworkers_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &workerids);
  386. if(nworkers_ctx > 0)
  387. {
  388. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  389. sched_ctx->sched_policy->remove_workers(sched_ctx->id, workerids, nworkers_ctx);
  390. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  391. }
  392. free(workerids);
  393. }
  394. return;
  395. }
  396. #ifdef STARPU_HAVE_HWLOC
  397. static void _starpu_sched_ctx_create_hwloc_tree(struct _starpu_sched_ctx *sched_ctx)
  398. {
  399. sched_ctx->hwloc_workers_set = hwloc_bitmap_alloc();
  400. struct starpu_worker_collection *workers = sched_ctx->workers;
  401. struct _starpu_worker *worker;
  402. struct starpu_sched_ctx_iterator it;
  403. workers->init_iterator(workers, &it);
  404. while(workers->has_next(workers, &it))
  405. {
  406. unsigned workerid = workers->get_next(workers, &it);
  407. if(!starpu_worker_is_combined_worker(workerid))
  408. {
  409. worker = _starpu_get_worker_struct(workerid);
  410. hwloc_bitmap_or(sched_ctx->hwloc_workers_set,
  411. sched_ctx->hwloc_workers_set,
  412. worker->hwloc_cpu_set);
  413. }
  414. }
  415. return;
  416. }
  417. #endif
  418. struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *policy, int *workerids,
  419. int nworkers_ctx, unsigned is_initial_sched,
  420. const char *sched_ctx_name,
  421. int min_prio_set, int min_prio,
  422. int max_prio_set, int max_prio,
  423. unsigned awake_workers,
  424. void (*sched_policy_init)(unsigned),
  425. void * user_data,
  426. int nsub_ctxs, int *sub_ctxs, int nsms)
  427. {
  428. struct _starpu_machine_config *config = _starpu_get_machine_config();
  429. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
  430. STARPU_ASSERT(config->topology.nsched_ctxs < STARPU_NMAX_SCHED_CTXS);
  431. unsigned id = _starpu_get_first_free_sched_ctx(config);
  432. struct _starpu_sched_ctx *sched_ctx = &config->sched_ctxs[id];
  433. sched_ctx->id = id;
  434. config->topology.nsched_ctxs++;
  435. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
  436. int nworkers = config->topology.nworkers;
  437. STARPU_ASSERT(nworkers_ctx <= nworkers);
  438. STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->empty_ctx_mutex, NULL);
  439. starpu_task_list_init(&sched_ctx->empty_ctx_tasks);
  440. STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->waiting_tasks_mutex, NULL);
  441. starpu_task_list_init(&sched_ctx->waiting_tasks);
  442. STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->sched_ctx_list_mutex, NULL);
  443. if (policy)
  444. {
  445. _STARPU_MALLOC(sched_ctx->sched_policy, sizeof(struct starpu_sched_policy));
  446. }
  447. else
  448. {
  449. sched_ctx->sched_policy = NULL;
  450. }
  451. sched_ctx->is_initial_sched = is_initial_sched;
  452. sched_ctx->name = sched_ctx_name;
  453. sched_ctx->inheritor = STARPU_NMAX_SCHED_CTXS;
  454. sched_ctx->finished_submit = 0;
  455. sched_ctx->min_priority_is_set = min_prio_set;
  456. if (sched_ctx->min_priority_is_set) sched_ctx->min_priority = min_prio;
  457. sched_ctx->max_priority_is_set = max_prio_set;
  458. if (sched_ctx->max_priority_is_set) sched_ctx->max_priority = max_prio;
  459. _starpu_barrier_counter_init(&sched_ctx->tasks_barrier, 0);
  460. _starpu_barrier_counter_init(&sched_ctx->ready_tasks_barrier, 0);
  461. sched_ctx->ready_flops = 0.0;
  462. sched_ctx->main_master = -1;
  463. sched_ctx->perf_arch.devices = NULL;
  464. sched_ctx->perf_arch.ndevices = 0;
  465. sched_ctx->init_sched = sched_policy_init;
  466. sched_ctx->user_data = user_data;
  467. sched_ctx->sms_start_idx = 0;
  468. sched_ctx->sms_end_idx = STARPU_NMAXSMS;
  469. sched_ctx->nsms = nsms;
  470. sched_ctx->stream_worker = -1;
  471. if(nsms > 0)
  472. {
  473. STARPU_ASSERT_MSG(workerids, "workerids is needed when setting nsms");
  474. sched_ctx->sms_start_idx = occupied_sms;
  475. sched_ctx->sms_end_idx = occupied_sms+nsms;
  476. occupied_sms += nsms;
  477. _STARPU_DEBUG("ctx %d: stream worker %d nsms %d ocupied sms %d\n", sched_ctx->id, workerids[0], nsms, occupied_sms);
  478. STARPU_ASSERT_MSG(occupied_sms <= STARPU_NMAXSMS , "STARPU:requested more sms than available");
  479. _starpu_worker_set_stream_ctx(workerids[0], sched_ctx);
  480. sched_ctx->stream_worker = workerids[0];
  481. }
  482. sched_ctx->nsub_ctxs = 0;
  483. int w;
  484. for(w = 0; w < nworkers; w++)
  485. {
  486. sem_init(&sched_ctx->fall_asleep_sem[w], 0, 0);
  487. sem_init(&sched_ctx->wake_up_sem[w], 0, 0);
  488. STARPU_PTHREAD_COND_INIT(&sched_ctx->parallel_sect_cond[w], NULL);
  489. STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->parallel_sect_mutex[w], NULL);
  490. STARPU_PTHREAD_COND_INIT(&sched_ctx->parallel_sect_cond_busy[w], NULL);
  491. sched_ctx->busy[w] = 0;
  492. sched_ctx->master[w] = -1;
  493. sched_ctx->parallel_sect[w] = 0;
  494. sched_ctx->sleeping[w] = 0;
  495. }
  496. /*init the strategy structs and the worker_collection of the ressources of the context */
  497. if(policy)
  498. {
  499. _starpu_init_sched_policy(config, sched_ctx, policy);
  500. sched_ctx->awake_workers = 1;
  501. }
  502. else
  503. {
  504. sched_ctx->awake_workers = awake_workers;
  505. starpu_sched_ctx_create_worker_collection(sched_ctx->id, STARPU_WORKER_LIST);
  506. }
  507. if(is_initial_sched)
  508. {
  509. int i;
  510. /*initialize the mutexes for all contexts */
  511. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  512. {
  513. STARPU_PTHREAD_RWLOCK_INIT(&changing_ctx_mutex[i], NULL);
  514. }
  515. }
  516. /*add sub_ctxs before add workers, in order to be able to associate them if necessary */
  517. if(nsub_ctxs != 0)
  518. {
  519. int i;
  520. for(i = 0; i < nsub_ctxs; i++)
  521. sched_ctx->sub_ctxs[i] = sub_ctxs[i];
  522. sched_ctx->nsub_ctxs = nsub_ctxs;
  523. }
  524. /* after having an worker_collection on the ressources add them */
  525. _starpu_add_workers_to_sched_ctx(sched_ctx, workerids, nworkers_ctx, NULL, NULL);
  526. #ifdef STARPU_HAVE_HWLOC
  527. /* build hwloc tree of the context */
  528. _starpu_sched_ctx_create_hwloc_tree(sched_ctx);
  529. #endif //STARPU_HAVE_HWLOC
  530. /* if we create the initial big sched ctx we can update workers' status here
  531. because they haven't been launched yet */
  532. if(is_initial_sched)
  533. {
  534. int i;
  535. for(i = 0; i < nworkers; i++)
  536. {
  537. struct _starpu_worker *worker = _starpu_get_worker_struct(i);
  538. if(!_starpu_sched_ctx_list_add(&worker->sched_ctx_list, sched_ctx->id))
  539. worker->nsched_ctxs++;
  540. }
  541. }
  542. return sched_ctx;
  543. }
  544. static void _get_workers(int min, int max, int *workers, int *nw, enum starpu_worker_archtype arch, unsigned allow_overlap)
  545. {
  546. int pus[max];
  547. int npus = 0;
  548. int i;
  549. struct _starpu_machine_config *config = _starpu_get_machine_config();
  550. if(config->topology.nsched_ctxs == 1)
  551. {
  552. /*we have all available resources */
  553. npus = starpu_worker_get_nids_by_type(arch, pus, max);
  554. /*TODO: hierarchical ctxs: get max good workers: close one to another */
  555. for(i = 0; i < npus; i++)
  556. workers[(*nw)++] = pus[i];
  557. }
  558. else
  559. {
  560. unsigned enough_ressources = 0;
  561. npus = starpu_worker_get_nids_ctx_free_by_type(arch, pus, max);
  562. for(i = 0; i < npus; i++)
  563. workers[(*nw)++] = pus[i];
  564. if(npus == max)
  565. /*we have enough available resources */
  566. enough_ressources = 1;
  567. if(!enough_ressources && npus >= min)
  568. /*we have enough available resources */
  569. enough_ressources = 1;
  570. if(!enough_ressources)
  571. {
  572. /* try to get ressources from ctx who have more than the min of workers they need */
  573. int s;
  574. for(s = 1; s < STARPU_NMAX_SCHED_CTXS; s++)
  575. {
  576. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  577. {
  578. int _npus = 0;
  579. int _pus[STARPU_NMAXWORKERS];
  580. _npus = _starpu_get_workers_of_sched_ctx(config->sched_ctxs[s].id, _pus, arch);
  581. int ctx_min = arch == STARPU_CPU_WORKER ? config->sched_ctxs[s].min_ncpus : config->sched_ctxs[s].min_ngpus;
  582. if(_npus > ctx_min)
  583. {
  584. int n=0;
  585. if(npus < min)
  586. {
  587. n = (_npus - ctx_min) > (min - npus) ? min - npus : (_npus - ctx_min);
  588. npus += n;
  589. }
  590. /*TODO: hierarchical ctxs: get n good workers: close to the other ones I already assigned to the ctx */
  591. for(i = 0; i < n; i++)
  592. workers[(*nw)++] = _pus[i];
  593. starpu_sched_ctx_remove_workers(_pus, n, config->sched_ctxs[s].id);
  594. }
  595. }
  596. }
  597. if(npus >= min)
  598. enough_ressources = 1;
  599. }
  600. if(!enough_ressources)
  601. {
  602. /* if there is no available workers to satisfy the minimum required
  603. give them workers proportional to their requirements*/
  604. int global_npus = starpu_worker_get_count_by_type(arch);
  605. int req_npus = 0;
  606. int s;
  607. for(s = 1; s < STARPU_NMAX_SCHED_CTXS; s++)
  608. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  609. req_npus += arch == STARPU_CPU_WORKER ? config->sched_ctxs[s].min_ncpus : config->sched_ctxs[s].min_ngpus;
  610. req_npus += min;
  611. for(s = 1; s < STARPU_NMAX_SCHED_CTXS; s++)
  612. {
  613. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  614. {
  615. int ctx_min = arch == STARPU_CPU_WORKER ? config->sched_ctxs[s].min_ncpus : config->sched_ctxs[s].min_ngpus;
  616. double needed_npus = ((double)ctx_min * (double)global_npus) / (double)req_npus;
  617. int _npus = 0;
  618. int _pus[STARPU_NMAXWORKERS];
  619. _npus = _starpu_get_workers_of_sched_ctx(config->sched_ctxs[s].id, _pus, arch);
  620. if(needed_npus < (double)_npus)
  621. {
  622. double npus_to_rem = (double)_npus - needed_npus;
  623. int x = floor(npus_to_rem);
  624. double x_double = (double)x;
  625. double diff = npus_to_rem - x_double;
  626. int npus_to_remove = diff >= 0.5 ? x+1 : x;
  627. int pus_to_remove[npus_to_remove];
  628. int c = 0;
  629. /*TODO: hierarchical ctxs: get npus_to_remove good workers: close to the other ones I already assigned to the ctx */
  630. for(i = _npus-1; i >= (_npus - npus_to_remove); i--)
  631. {
  632. workers[(*nw)++] = _pus[i];
  633. pus_to_remove[c++] = _pus[i];
  634. }
  635. if(!allow_overlap)
  636. starpu_sched_ctx_remove_workers(pus_to_remove, npus_to_remove, config->sched_ctxs[s].id);
  637. }
  638. }
  639. }
  640. }
  641. }
  642. }
  643. unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const char *sched_ctx_name,
  644. int min_ncpus, int max_ncpus, int min_ngpus, int max_ngpus,
  645. unsigned allow_overlap)
  646. {
  647. struct _starpu_machine_config *config = _starpu_get_machine_config();
  648. struct starpu_sched_policy *selected_policy = _starpu_select_sched_policy(config, policy_name);
  649. struct _starpu_sched_ctx *sched_ctx = NULL;
  650. int workers[max_ncpus + max_ngpus];
  651. int nw = 0;
  652. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
  653. _get_workers(min_ncpus, max_ncpus, workers, &nw, STARPU_CPU_WORKER, allow_overlap);
  654. _get_workers(min_ngpus, max_ngpus, workers, &nw, STARPU_CUDA_WORKER, allow_overlap);
  655. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
  656. int i;
  657. printf("%d: ", nw);
  658. for(i = 0; i < nw; i++)
  659. printf("%d ", workers[i]);
  660. printf("\n");
  661. sched_ctx = _starpu_create_sched_ctx(selected_policy, workers, nw, 0, sched_ctx_name, 0, 0, 0, 0, 1, NULL, NULL,0, NULL, 0);
  662. sched_ctx->min_ncpus = min_ncpus;
  663. sched_ctx->max_ncpus = max_ncpus;
  664. sched_ctx->min_ngpus = min_ngpus;
  665. sched_ctx->max_ngpus = max_ngpus;
  666. _starpu_unlock_mutex_if_prev_locked();
  667. int *added_workerids;
  668. unsigned nw_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &added_workerids);
  669. _starpu_update_workers_without_ctx(added_workerids, nw_ctx, sched_ctx->id, 0);
  670. free(added_workerids);
  671. _starpu_relock_mutex_if_prev_locked();
  672. #ifdef STARPU_USE_SC_HYPERVISOR
  673. sched_ctx->perf_counters = NULL;
  674. #endif
  675. return sched_ctx->id;
  676. }
  677. int starpu_sched_ctx_get_nsms(unsigned sched_ctx)
  678. {
  679. struct _starpu_sched_ctx *sc = _starpu_get_sched_ctx_struct(sched_ctx);
  680. return sc->nsms;
  681. }
  682. void starpu_sched_ctx_get_sms_interval(int stream_workerid, int *start, int *end)
  683. {
  684. struct _starpu_sched_ctx *sc = _starpu_worker_get_ctx_stream(stream_workerid);
  685. *start = sc->sms_start_idx;
  686. *end = sc->sms_end_idx;
  687. }
  688. int starpu_sched_ctx_get_sub_ctxs(unsigned sched_ctx, int *ctxs)
  689. {
  690. struct _starpu_sched_ctx *sc = _starpu_get_sched_ctx_struct(sched_ctx);
  691. int i;
  692. for(i = 0; i < sc->nsub_ctxs; i++)
  693. ctxs[i] = sc->sub_ctxs[i];
  694. return sc->nsub_ctxs;
  695. }
  696. int starpu_sched_ctx_get_stream_worker(unsigned sub_ctx)
  697. {
  698. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sub_ctx);
  699. struct starpu_worker_collection *workers = sched_ctx->workers;
  700. struct starpu_sched_ctx_iterator it;
  701. int worker = -1;
  702. workers->init_iterator(workers, &it);
  703. if(workers->has_next(workers, &it))
  704. {
  705. worker = workers->get_next(workers, &it);
  706. }
  707. return worker;
  708. }
  709. unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched_ctx_name, ...)
  710. {
  711. va_list varg_list;
  712. int arg_type;
  713. int min_prio_set = 0;
  714. int max_prio_set = 0;
  715. int min_prio = 0;
  716. int max_prio = 0;
  717. int nsms = 0;
  718. int *sub_ctxs = NULL;
  719. int nsub_ctxs = 0;
  720. void *user_data = NULL;
  721. struct starpu_sched_policy *sched_policy = NULL;
  722. unsigned hierarchy_level = 0;
  723. unsigned nesting_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  724. unsigned awake_workers = 0;
  725. void (*init_sched)(unsigned) = NULL;
  726. va_start(varg_list, sched_ctx_name);
  727. while ((arg_type = va_arg(varg_list, int)) != 0)
  728. {
  729. if (arg_type == STARPU_SCHED_CTX_POLICY_NAME)
  730. {
  731. char *policy_name = va_arg(varg_list, char *);
  732. struct _starpu_machine_config *config = _starpu_get_machine_config();
  733. sched_policy = _starpu_select_sched_policy(config, policy_name);
  734. }
  735. else if (arg_type == STARPU_SCHED_CTX_POLICY_STRUCT)
  736. {
  737. sched_policy = va_arg(varg_list, struct starpu_sched_policy *);
  738. }
  739. else if (arg_type == STARPU_SCHED_CTX_POLICY_MIN_PRIO)
  740. {
  741. min_prio = va_arg(varg_list, int);
  742. min_prio_set = 1;
  743. }
  744. else if (arg_type == STARPU_SCHED_CTX_POLICY_MAX_PRIO)
  745. {
  746. max_prio = va_arg(varg_list, int);
  747. max_prio_set = 1;
  748. }
  749. else if (arg_type == STARPU_SCHED_CTX_HIERARCHY_LEVEL)
  750. {
  751. hierarchy_level = va_arg(varg_list, unsigned);
  752. }
  753. else if (arg_type == STARPU_SCHED_CTX_NESTED)
  754. {
  755. nesting_sched_ctx = va_arg(varg_list, unsigned);
  756. }
  757. else if (arg_type == STARPU_SCHED_CTX_AWAKE_WORKERS)
  758. {
  759. awake_workers = 1;
  760. }
  761. else if (arg_type == STARPU_SCHED_CTX_POLICY_INIT)
  762. {
  763. init_sched = va_arg(varg_list, void(*)(unsigned));
  764. }
  765. else if (arg_type == STARPU_SCHED_CTX_USER_DATA)
  766. {
  767. user_data = va_arg(varg_list, void *);
  768. }
  769. else if (arg_type == STARPU_SCHED_CTX_SUB_CTXS)
  770. {
  771. sub_ctxs = va_arg(varg_list, int*);
  772. nsub_ctxs = va_arg(varg_list, int);
  773. }
  774. else if (arg_type == STARPU_SCHED_CTX_CUDA_NSMS)
  775. {
  776. nsms = va_arg(varg_list, int);
  777. }
  778. else
  779. {
  780. STARPU_ABORT_MSG("Unrecognized argument %d\n", arg_type);
  781. }
  782. }
  783. va_end(varg_list);
  784. if (workerids && nworkers != -1)
  785. {
  786. /* Make sure the user doesn't use invalid worker IDs. */
  787. int num_workers = starpu_worker_get_count();
  788. int i;
  789. for (i = 0; i < nworkers; i++)
  790. {
  791. if (workerids[i] < 0 || workerids[i] >= num_workers)
  792. {
  793. _STARPU_ERROR("Invalid worker ID (%d) specified!\n", workerids[i]);
  794. return STARPU_NMAX_SCHED_CTXS;
  795. }
  796. }
  797. }
  798. struct _starpu_sched_ctx *sched_ctx = NULL;
  799. sched_ctx = _starpu_create_sched_ctx(sched_policy, workerids, nworkers, 0, sched_ctx_name, min_prio_set, min_prio, max_prio_set, max_prio, awake_workers, init_sched, user_data, nsub_ctxs, sub_ctxs, nsms);
  800. sched_ctx->hierarchy_level = hierarchy_level;
  801. sched_ctx->nesting_sched_ctx = nesting_sched_ctx;
  802. _starpu_unlock_mutex_if_prev_locked();
  803. int *added_workerids;
  804. unsigned nw_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &added_workerids);
  805. _starpu_update_workers_with_ctx(added_workerids, nw_ctx, sched_ctx->id);
  806. free(added_workerids);
  807. _starpu_relock_mutex_if_prev_locked();
  808. #ifdef STARPU_USE_SC_HYPERVISOR
  809. sched_ctx->perf_counters = NULL;
  810. #endif
  811. return sched_ctx->id;
  812. }
  813. int fstarpu_sched_ctx_create(int *workerids, int nworkers, const char *sched_ctx_name, void ***_arglist)
  814. {
  815. void **arglist = *_arglist;
  816. int arg_i = 0;
  817. int min_prio_set = 0;
  818. int max_prio_set = 0;
  819. int min_prio = 0;
  820. int max_prio = 0;
  821. int nsms = 0;
  822. int *sub_ctxs = NULL;
  823. int nsub_ctxs = 0;
  824. void *user_data = NULL;
  825. struct starpu_sched_policy *sched_policy = NULL;
  826. unsigned hierarchy_level = 0;
  827. unsigned nesting_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  828. unsigned awake_workers = 0;
  829. void (*init_sched)(unsigned) = NULL;
  830. while (arglist[arg_i] != NULL)
  831. {
  832. const int arg_type = (int)(intptr_t)arglist[arg_i];
  833. if (arg_type == STARPU_SCHED_CTX_POLICY_NAME)
  834. {
  835. arg_i++;
  836. char *policy_name = arglist[arg_i];
  837. struct _starpu_machine_config *config = _starpu_get_machine_config();
  838. sched_policy = _starpu_select_sched_policy(config, policy_name);
  839. }
  840. else if (arg_type == STARPU_SCHED_CTX_POLICY_STRUCT)
  841. {
  842. arg_i++;
  843. sched_policy = arglist[arg_i];
  844. }
  845. else if (arg_type == STARPU_SCHED_CTX_POLICY_MIN_PRIO)
  846. {
  847. arg_i++;
  848. min_prio = *(int *)arglist[arg_i];
  849. min_prio_set = 1;
  850. }
  851. else if (arg_type == STARPU_SCHED_CTX_POLICY_MAX_PRIO)
  852. {
  853. arg_i++;
  854. max_prio = *(int *)arglist[arg_i];
  855. max_prio_set = 1;
  856. }
  857. else if (arg_type == STARPU_SCHED_CTX_HIERARCHY_LEVEL)
  858. {
  859. arg_i++;
  860. int val = *(int *)arglist[arg_i];
  861. STARPU_ASSERT(val >= 0);
  862. hierarchy_level = (unsigned)val;
  863. }
  864. else if (arg_type == STARPU_SCHED_CTX_NESTED)
  865. {
  866. arg_i++;
  867. int val = *(int *)arglist[arg_i];
  868. STARPU_ASSERT(val >= 0);
  869. nesting_sched_ctx = (unsigned)val;
  870. }
  871. else if (arg_type == STARPU_SCHED_CTX_AWAKE_WORKERS)
  872. {
  873. awake_workers = 1;
  874. }
  875. else if (arg_type == STARPU_SCHED_CTX_POLICY_INIT)
  876. {
  877. arg_i++;
  878. init_sched = arglist[arg_i];
  879. }
  880. else if (arg_type == STARPU_SCHED_CTX_USER_DATA)
  881. {
  882. arg_i++;
  883. user_data = arglist[arg_i];
  884. }
  885. else if (arg_type == STARPU_SCHED_CTX_SUB_CTXS)
  886. {
  887. arg_i++;
  888. sub_ctxs = (int*)arglist[arg_i];
  889. arg_i++;
  890. nsub_ctxs = *(int*)arglist[arg_i];
  891. }
  892. else if (arg_type == STARPU_SCHED_CTX_CUDA_NSMS)
  893. {
  894. arg_i++;
  895. nsms = *(int*)arglist[arg_i];
  896. }
  897. else
  898. {
  899. STARPU_ABORT_MSG("Unrecognized argument %d\n", arg_type);
  900. }
  901. arg_i++;
  902. }
  903. if (workerids && nworkers != -1)
  904. {
  905. /* Make sure the user doesn't use invalid worker IDs. */
  906. int num_workers = starpu_worker_get_count();
  907. int i;
  908. for (i = 0; i < nworkers; i++)
  909. {
  910. if (workerids[i] < 0 || workerids[i] >= num_workers)
  911. {
  912. _STARPU_ERROR("Invalid worker ID (%d) specified!\n", workerids[i]);
  913. return STARPU_NMAX_SCHED_CTXS;
  914. }
  915. }
  916. }
  917. struct _starpu_sched_ctx *sched_ctx = NULL;
  918. sched_ctx = _starpu_create_sched_ctx(sched_policy, workerids, nworkers, 0, sched_ctx_name, min_prio_set, min_prio, max_prio_set, max_prio, awake_workers, init_sched, user_data, nsub_ctxs, sub_ctxs, nsms);
  919. sched_ctx->hierarchy_level = hierarchy_level;
  920. sched_ctx->nesting_sched_ctx = nesting_sched_ctx;
  921. _starpu_unlock_mutex_if_prev_locked();
  922. int *added_workerids;
  923. unsigned nw_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &added_workerids);
  924. _starpu_update_workers_with_ctx(added_workerids, nw_ctx, sched_ctx->id);
  925. free(added_workerids);
  926. _starpu_relock_mutex_if_prev_locked();
  927. #ifdef STARPU_USE_SC_HYPERVISOR
  928. sched_ctx->perf_counters = NULL;
  929. #endif
  930. return (int)sched_ctx->id;
  931. }
  932. void starpu_sched_ctx_register_close_callback(unsigned sched_ctx_id, void (*close_callback)(unsigned sched_ctx_id, void* args), void *args)
  933. {
  934. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  935. sched_ctx->close_callback = close_callback;
  936. sched_ctx->close_args = args;
  937. return;
  938. }
  939. #ifdef STARPU_USE_SC_HYPERVISOR
  940. void starpu_sched_ctx_set_perf_counters(unsigned sched_ctx_id, void* perf_counters)
  941. {
  942. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  943. sched_ctx->perf_counters = (struct starpu_sched_ctx_performance_counters *)perf_counters;
  944. return;
  945. }
  946. #endif
  947. /* free all structures for the context */
  948. static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
  949. {
  950. STARPU_ASSERT(sched_ctx->id != STARPU_NMAX_SCHED_CTXS);
  951. struct _starpu_machine_config *config = _starpu_get_machine_config();
  952. int nworkers = config->topology.nworkers;
  953. int w;
  954. for(w = 0; w < nworkers; w++)
  955. {
  956. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[w]);
  957. while (sched_ctx->busy[w]) {
  958. STARPU_PTHREAD_COND_WAIT(&sched_ctx->parallel_sect_cond_busy[w], &sched_ctx->parallel_sect_mutex[w]);
  959. }
  960. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[w]);
  961. }
  962. if(sched_ctx->sched_policy)
  963. {
  964. _starpu_deinit_sched_policy(sched_ctx);
  965. free(sched_ctx->sched_policy);
  966. sched_ctx->sched_policy = NULL;
  967. }
  968. else
  969. {
  970. starpu_sched_ctx_delete_worker_collection(sched_ctx->id);
  971. }
  972. if (sched_ctx->perf_arch.devices)
  973. {
  974. free(sched_ctx->perf_arch.devices);
  975. sched_ctx->perf_arch.devices = NULL;
  976. }
  977. STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->empty_ctx_mutex);
  978. STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->waiting_tasks_mutex);
  979. STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->sched_ctx_list_mutex);
  980. sched_ctx->id = STARPU_NMAX_SCHED_CTXS;
  981. #ifdef STARPU_HAVE_HWLOC
  982. hwloc_bitmap_free(sched_ctx->hwloc_workers_set);
  983. #endif //STARPU_HAVE_HWLOC
  984. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
  985. config->topology.nsched_ctxs--;
  986. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
  987. }
  988. void starpu_sched_ctx_delete(unsigned sched_ctx_id)
  989. {
  990. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  991. STARPU_ASSERT(sched_ctx);
  992. #ifdef STARPU_USE_SC_HYPERVISOR
  993. if (sched_ctx_id != 0 && sched_ctx_id != STARPU_NMAX_SCHED_CTXS && sched_ctx->perf_counters != NULL)
  994. {
  995. _STARPU_TRACE_HYPERVISOR_BEGIN();
  996. sched_ctx->perf_counters->notify_delete_context(sched_ctx_id);
  997. _STARPU_TRACE_HYPERVISOR_END();
  998. }
  999. #endif //STARPU_USE_SC_HYPERVISOR
  1000. unsigned inheritor_sched_ctx_id = sched_ctx->inheritor;
  1001. struct _starpu_sched_ctx *inheritor_sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx->inheritor);
  1002. _starpu_unlock_mutex_if_prev_locked();
  1003. STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx_id]);
  1004. STARPU_ASSERT(sched_ctx->id != STARPU_NMAX_SCHED_CTXS);
  1005. int *workerids;
  1006. unsigned nworkers_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &workerids);
  1007. /*if both of them have all the ressources is pointless*/
  1008. /*trying to transfer ressources from one ctx to the other*/
  1009. struct _starpu_machine_config *config = _starpu_get_machine_config();
  1010. unsigned nworkers = config->topology.nworkers;
  1011. if(nworkers_ctx > 0 && inheritor_sched_ctx && inheritor_sched_ctx->id != STARPU_NMAX_SCHED_CTXS &&
  1012. !(nworkers_ctx == nworkers && nworkers_ctx == inheritor_sched_ctx->workers->nworkers))
  1013. {
  1014. starpu_sched_ctx_add_workers(workerids, nworkers_ctx, inheritor_sched_ctx_id);
  1015. starpu_sched_ctx_set_priority(workerids, nworkers_ctx, inheritor_sched_ctx_id, 1);
  1016. starpu_sched_ctx_set_priority_on_level(workerids, nworkers_ctx, inheritor_sched_ctx_id, 1);
  1017. }
  1018. if(!_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id))
  1019. {
  1020. if(!sched_ctx->sched_policy)
  1021. starpu_sched_ctx_unbook_workers_for_task(sched_ctx->id, sched_ctx->main_master);
  1022. /*if btw the mutex release & the mutex lock the context has changed take care to free all
  1023. scheduling data before deleting the context */
  1024. _starpu_update_workers_without_ctx(workerids, nworkers_ctx, sched_ctx_id, 1);
  1025. _starpu_sched_ctx_free_scheduling_data(sched_ctx);
  1026. _starpu_delete_sched_ctx(sched_ctx);
  1027. }
  1028. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
  1029. /* workerids is malloc-ed in starpu_sched_ctx_get_workers_list, don't forget to free it when
  1030. you don't use it anymore */
  1031. free(workerids);
  1032. _starpu_relock_mutex_if_prev_locked();
  1033. occupied_sms -= sched_ctx->nsms;
  1034. return;
  1035. }
  1036. /* called after the workers are terminated so we don't have anything else to do but free the memory*/
  1037. void _starpu_delete_all_sched_ctxs()
  1038. {
  1039. unsigned i;
  1040. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  1041. {
  1042. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(i);
  1043. STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[i]);
  1044. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  1045. {
  1046. _starpu_sched_ctx_free_scheduling_data(sched_ctx);
  1047. _starpu_barrier_counter_destroy(&sched_ctx->tasks_barrier);
  1048. _starpu_barrier_counter_destroy(&sched_ctx->ready_tasks_barrier);
  1049. _starpu_delete_sched_ctx(sched_ctx);
  1050. }
  1051. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[i]);
  1052. STARPU_PTHREAD_RWLOCK_DESTROY(&changing_ctx_mutex[i]);
  1053. }
  1054. STARPU_PTHREAD_KEY_DELETE(sched_ctx_key);
  1055. return;
  1056. }
  1057. static void _starpu_check_workers(int *workerids, int nworkers)
  1058. {
  1059. struct _starpu_machine_config *config = _starpu_get_machine_config();
  1060. int nworkers_conf = config->topology.nworkers;
  1061. int i;
  1062. for(i = 0; i < nworkers; i++)
  1063. {
  1064. /* take care the user does not ask for a resource that does not exist */
  1065. STARPU_ASSERT_MSG(workerids[i] >= 0 && workerids[i] <= nworkers_conf, "requested to add workerid = %d, but that is beyond the range 0 to %d", workerids[i], nworkers_conf);
  1066. }
  1067. }
  1068. void _starpu_fetch_tasks_from_empty_ctx_list(struct _starpu_sched_ctx *sched_ctx)
  1069. {
  1070. unsigned unlocked = 0;
  1071. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  1072. if(starpu_task_list_empty(&sched_ctx->empty_ctx_tasks))
  1073. {
  1074. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  1075. return;
  1076. }
  1077. else
  1078. /* you're not suppose to get here if you deleted the context
  1079. so no point in having the mutex locked */
  1080. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx->id]);
  1081. while(!starpu_task_list_empty(&sched_ctx->empty_ctx_tasks))
  1082. {
  1083. if(unlocked)
  1084. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  1085. struct starpu_task *old_task = starpu_task_list_pop_back(&sched_ctx->empty_ctx_tasks);
  1086. unlocked = 1;
  1087. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  1088. if(old_task == &stop_submission_task)
  1089. break;
  1090. int ret = _starpu_push_task_to_workers(old_task);
  1091. /* if we should stop poping from empty ctx tasks */
  1092. if(ret == -EAGAIN) break;
  1093. }
  1094. if(!unlocked)
  1095. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  1096. /* leave the mutex as it was to avoid pbs in the caller function */
  1097. STARPU_PTHREAD_RWLOCK_RDLOCK(&changing_ctx_mutex[sched_ctx->id]);
  1098. return;
  1099. }
  1100. unsigned _starpu_can_push_task(struct _starpu_sched_ctx *sched_ctx, struct starpu_task *task)
  1101. {
  1102. if(sched_ctx->sched_policy && sched_ctx->sched_policy->simulate_push_task)
  1103. {
  1104. if (window_size == 0.0) return 1;
  1105. STARPU_PTHREAD_RWLOCK_RDLOCK(&changing_ctx_mutex[sched_ctx->id]);
  1106. double expected_end = sched_ctx->sched_policy->simulate_push_task(task);
  1107. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx->id]);
  1108. double expected_len = 0.0;
  1109. if(hyp_actual_start_sample[sched_ctx->id] != 0.0)
  1110. expected_len = expected_end - hyp_actual_start_sample[sched_ctx->id] ;
  1111. else
  1112. {
  1113. printf("%d: sc start is 0.0\n", sched_ctx->id);
  1114. expected_len = expected_end - starpu_timing_now();
  1115. }
  1116. if(expected_len < 0.0)
  1117. printf("exp len negative %lf \n", expected_len);
  1118. expected_len /= 1000000.0;
  1119. // printf("exp_end %lf start %lf expected_len %lf \n", expected_end, hyp_actual_start_sample[sched_ctx->id], expected_len);
  1120. if(expected_len > (window_size + 0.2*window_size))
  1121. return 0;
  1122. }
  1123. return 1;
  1124. }
  1125. void _starpu_fetch_task_from_waiting_list(struct _starpu_sched_ctx *sched_ctx)
  1126. {
  1127. if(starpu_task_list_empty(&sched_ctx->waiting_tasks))
  1128. return;
  1129. struct starpu_task *old_task = starpu_task_list_back(&sched_ctx->waiting_tasks);
  1130. if(_starpu_can_push_task(sched_ctx, old_task))
  1131. {
  1132. old_task = starpu_task_list_pop_back(&sched_ctx->waiting_tasks);
  1133. _starpu_push_task_to_workers(old_task);
  1134. }
  1135. return;
  1136. }
  1137. void _starpu_push_task_to_waiting_list(struct _starpu_sched_ctx *sched_ctx, struct starpu_task *task)
  1138. {
  1139. starpu_task_list_push_front(&sched_ctx->waiting_tasks, task);
  1140. return;
  1141. }
  1142. void starpu_sched_ctx_set_priority_on_level(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority)
  1143. {
  1144. (void) workers_to_add;
  1145. (void) nworkers_to_add;
  1146. (void) sched_ctx;
  1147. (void) priority;
  1148. /* int w; */
  1149. /* struct _starpu_worker *worker = NULL; */
  1150. /* for(w = 0; w < nworkers_to_add; w++) */
  1151. /* { */
  1152. /* worker = _starpu_get_worker_struct(workers_to_add[w]); */
  1153. /* STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex); */
  1154. /* struct _starpu_sched_ctx_list *l = NULL; */
  1155. /* for (l = worker->sched_ctx_list; l; l = l->next) */
  1156. /* { */
  1157. /* if(l->sched_ctx != STARPU_NMAX_SCHED_CTXS && l->sched_ctx != sched_ctx && */
  1158. /* starpu_sched_ctx_get_hierarchy_level(l->sched_ctx) == starpu_sched_ctx_get_hierarchy_level(sched_ctx)) */
  1159. /* { */
  1160. /* /\* the lock is taken inside the func *\/ */
  1161. /* STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex); */
  1162. /* starpu_sched_ctx_set_priority(&workers_to_add[w], 1, l->sched_ctx, priority); */
  1163. /* STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex); */
  1164. /* } */
  1165. /* } */
  1166. /* STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex); */
  1167. /* } */
  1168. /* return; */
  1169. }
  1170. static void _set_priority_hierarchically(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority)
  1171. {
  1172. if(starpu_sched_ctx_get_hierarchy_level(sched_ctx) > 0)
  1173. {
  1174. unsigned father = starpu_sched_ctx_get_inheritor(sched_ctx);
  1175. starpu_sched_ctx_set_priority(workers_to_add, nworkers_to_add, father, priority);
  1176. starpu_sched_ctx_set_priority_on_level(workers_to_add, nworkers_to_add, father, priority);
  1177. _set_priority_hierarchically(workers_to_add, nworkers_to_add, father, priority);
  1178. }
  1179. return;
  1180. }
  1181. void starpu_sched_ctx_add_workers(int *workers_to_add, int nworkers_to_add, unsigned sched_ctx_id)
  1182. {
  1183. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1184. _starpu_unlock_mutex_if_prev_locked();
  1185. STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx_id]);
  1186. STARPU_ASSERT(workers_to_add != NULL && nworkers_to_add > 0);
  1187. _starpu_check_workers(workers_to_add, nworkers_to_add);
  1188. /* if the context has not already been deleted */
  1189. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  1190. {
  1191. int added_workers[nworkers_to_add];
  1192. int n_added_workers = 0;
  1193. _starpu_add_workers_to_sched_ctx(sched_ctx, workers_to_add, nworkers_to_add, added_workers, &n_added_workers);
  1194. if(n_added_workers > 0)
  1195. {
  1196. _starpu_update_workers_with_ctx(added_workers, n_added_workers, sched_ctx->id);
  1197. }
  1198. starpu_sched_ctx_set_priority(workers_to_add, nworkers_to_add, sched_ctx_id, 1);
  1199. _set_priority_hierarchically(workers_to_add, nworkers_to_add, sched_ctx_id, 0);
  1200. }
  1201. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
  1202. _starpu_relock_mutex_if_prev_locked();
  1203. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  1204. {
  1205. STARPU_PTHREAD_RWLOCK_RDLOCK(&changing_ctx_mutex[sched_ctx_id]);
  1206. _starpu_fetch_tasks_from_empty_ctx_list(sched_ctx);
  1207. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
  1208. }
  1209. return;
  1210. }
  1211. void starpu_sched_ctx_remove_workers(int *workers_to_remove, int nworkers_to_remove, unsigned sched_ctx_id)
  1212. {
  1213. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1214. _starpu_check_workers(workers_to_remove, nworkers_to_remove);
  1215. _starpu_unlock_mutex_if_prev_locked();
  1216. STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx_id]);
  1217. /* if the context has not already been deleted */
  1218. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  1219. {
  1220. int removed_workers[sched_ctx->workers->nworkers];
  1221. int n_removed_workers = 0;
  1222. _starpu_remove_workers_from_sched_ctx(sched_ctx, workers_to_remove, nworkers_to_remove, removed_workers, &n_removed_workers);
  1223. if(n_removed_workers > 0)
  1224. {
  1225. _starpu_update_workers_without_ctx(removed_workers, n_removed_workers, sched_ctx_id, 0);
  1226. starpu_sched_ctx_set_priority(removed_workers, n_removed_workers, sched_ctx_id, 1);
  1227. }
  1228. }
  1229. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
  1230. _starpu_relock_mutex_if_prev_locked();
  1231. return;
  1232. }
  1233. int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _starpu_sched_ctx *sched_ctx)
  1234. {
  1235. unsigned nworkers = 0;
  1236. STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx->id]);
  1237. struct starpu_worker_collection *workers = sched_ctx->workers;
  1238. struct starpu_sched_ctx_iterator it;
  1239. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  1240. while(workers->has_next(workers, &it))
  1241. {
  1242. unsigned worker = workers->get_next(workers, &it);
  1243. STARPU_ASSERT_MSG(worker < STARPU_NMAXWORKERS, "worker id %d", worker);
  1244. if (starpu_worker_can_execute_task_first_impl(worker, task, NULL))
  1245. nworkers++;
  1246. }
  1247. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx->id]);
  1248. return nworkers;
  1249. }
  1250. /* unused sched_ctx have the id STARPU_NMAX_SCHED_CTXS */
  1251. void _starpu_init_all_sched_ctxs(struct _starpu_machine_config *config)
  1252. {
  1253. STARPU_PTHREAD_KEY_CREATE(&sched_ctx_key, NULL);
  1254. window_size = starpu_get_env_float_default("STARPU_WINDOW_TIME_SIZE", 0.0);
  1255. nobind = starpu_get_env_number("STARPU_WORKERS_NOBIND");
  1256. unsigned i;
  1257. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  1258. config->sched_ctxs[i].id = STARPU_NMAX_SCHED_CTXS;
  1259. return;
  1260. }
  1261. /* sched_ctx aren't necessarly one next to another */
  1262. /* for eg when we remove one its place is free */
  1263. /* when we add new one we reuse its place */
  1264. static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config)
  1265. {
  1266. unsigned i;
  1267. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  1268. if(config->sched_ctxs[i].id == STARPU_NMAX_SCHED_CTXS)
  1269. return i;
  1270. STARPU_ASSERT(0);
  1271. return STARPU_NMAX_SCHED_CTXS;
  1272. }
  1273. int _starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id)
  1274. {
  1275. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1276. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait_for_all must not be called from a task or callback");
  1277. _starpu_barrier_counter_wait_for_empty_counter(&sched_ctx->tasks_barrier);
  1278. return 0;
  1279. }
  1280. int _starpu_wait_for_n_submitted_tasks_of_sched_ctx(unsigned sched_ctx_id, unsigned n)
  1281. {
  1282. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1283. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait_for_n_submitted_tasks must not be called from a task or callback");
  1284. return _starpu_barrier_counter_wait_until_counter_reaches_down_to_n(&sched_ctx->tasks_barrier, n);
  1285. }
  1286. void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
  1287. {
  1288. struct _starpu_machine_config *config = _starpu_get_machine_config();
  1289. #ifndef STARPU_SANITIZE_THREAD
  1290. if (!config->watchdog_ok)
  1291. config->watchdog_ok = 1;
  1292. #endif
  1293. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1294. int reached = _starpu_barrier_counter_get_reached_start(&sched_ctx->tasks_barrier);
  1295. int finished = reached == 1;
  1296. /* when finished decrementing the tasks if the user signaled he will not submit tasks anymore
  1297. we can move all its workers to the inheritor context */
  1298. if(finished && sched_ctx->inheritor != STARPU_NMAX_SCHED_CTXS)
  1299. {
  1300. STARPU_PTHREAD_MUTEX_LOCK(&finished_submit_mutex);
  1301. if(sched_ctx->finished_submit)
  1302. {
  1303. STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
  1304. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  1305. {
  1306. if(sched_ctx->close_callback)
  1307. sched_ctx->close_callback(sched_ctx->id, sched_ctx->close_args);
  1308. int *workerids = NULL;
  1309. unsigned nworkers = starpu_sched_ctx_get_workers_list(sched_ctx->id, &workerids);
  1310. if(nworkers > 0)
  1311. {
  1312. starpu_sched_ctx_add_workers(workerids, nworkers, sched_ctx->inheritor);
  1313. free(workerids);
  1314. }
  1315. }
  1316. _starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->tasks_barrier, 0.0);
  1317. return;
  1318. }
  1319. STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
  1320. }
  1321. /* We also need to check for config->submitting = 0 (i.e. the
  1322. * user calle starpu_drivers_request_termination()), in which
  1323. * case we need to set config->running to 0 and wake workers,
  1324. * so they can terminate, just like
  1325. * starpu_drivers_request_termination() does.
  1326. */
  1327. STARPU_PTHREAD_MUTEX_LOCK(&config->submitted_mutex);
  1328. if(config->submitting == 0)
  1329. {
  1330. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  1331. {
  1332. if(sched_ctx->close_callback)
  1333. sched_ctx->close_callback(sched_ctx->id, sched_ctx->close_args);
  1334. }
  1335. ANNOTATE_HAPPENS_AFTER(&config->running);
  1336. config->running = 0;
  1337. ANNOTATE_HAPPENS_BEFORE(&config->running);
  1338. int s;
  1339. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  1340. {
  1341. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  1342. {
  1343. _starpu_check_nsubmitted_tasks_of_sched_ctx(config->sched_ctxs[s].id);
  1344. }
  1345. }
  1346. }
  1347. STARPU_PTHREAD_MUTEX_UNLOCK(&config->submitted_mutex);
  1348. _starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->tasks_barrier, 0.0);
  1349. return;
  1350. }
  1351. void _starpu_increment_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
  1352. {
  1353. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1354. _starpu_barrier_counter_increment(&sched_ctx->tasks_barrier, 0.0);
  1355. }
  1356. int _starpu_get_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
  1357. {
  1358. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1359. return _starpu_barrier_counter_get_reached_start(&sched_ctx->tasks_barrier);
  1360. }
  1361. int _starpu_check_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
  1362. {
  1363. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1364. return _starpu_barrier_counter_check(&sched_ctx->tasks_barrier);
  1365. }
  1366. unsigned _starpu_increment_nready_tasks_of_sched_ctx(unsigned sched_ctx_id, double ready_flops, struct starpu_task *task)
  1367. {
  1368. unsigned ret = 1;
  1369. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1370. if(!sched_ctx->is_initial_sched)
  1371. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->waiting_tasks_mutex);
  1372. _starpu_barrier_counter_increment(&sched_ctx->ready_tasks_barrier, ready_flops);
  1373. if(!sched_ctx->is_initial_sched)
  1374. {
  1375. if(!_starpu_can_push_task(sched_ctx, task))
  1376. {
  1377. _starpu_push_task_to_waiting_list(sched_ctx, task);
  1378. ret = 0;
  1379. }
  1380. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->waiting_tasks_mutex);
  1381. }
  1382. return ret;
  1383. }
  1384. void _starpu_decrement_nready_tasks_of_sched_ctx(unsigned sched_ctx_id, double ready_flops)
  1385. {
  1386. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1387. if(!sched_ctx->is_initial_sched)
  1388. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->waiting_tasks_mutex);
  1389. _starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->ready_tasks_barrier, ready_flops);
  1390. if(!sched_ctx->is_initial_sched)
  1391. {
  1392. _starpu_fetch_task_from_waiting_list(sched_ctx);
  1393. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->waiting_tasks_mutex);
  1394. }
  1395. }
  1396. int starpu_sched_ctx_get_nready_tasks(unsigned sched_ctx_id)
  1397. {
  1398. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1399. return _starpu_barrier_counter_get_reached_start(&sched_ctx->ready_tasks_barrier);
  1400. }
  1401. double starpu_sched_ctx_get_nready_flops(unsigned sched_ctx_id)
  1402. {
  1403. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1404. return _starpu_barrier_counter_get_reached_flops(&sched_ctx->ready_tasks_barrier);
  1405. }
  1406. int _starpu_wait_for_no_ready_of_sched_ctx(unsigned sched_ctx_id)
  1407. {
  1408. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1409. _starpu_barrier_counter_wait_for_empty_counter(&sched_ctx->ready_tasks_barrier);
  1410. return 0;
  1411. }
  1412. void starpu_sched_ctx_set_context(unsigned *sched_ctx)
  1413. {
  1414. starpu_pthread_setspecific(sched_ctx_key, (void*)sched_ctx);
  1415. }
  1416. unsigned starpu_sched_ctx_get_context()
  1417. {
  1418. unsigned *sched_ctx = (unsigned*)starpu_pthread_getspecific(sched_ctx_key);
  1419. if(sched_ctx == NULL)
  1420. return STARPU_NMAX_SCHED_CTXS;
  1421. STARPU_ASSERT(*sched_ctx < STARPU_NMAX_SCHED_CTXS);
  1422. return *sched_ctx;
  1423. }
  1424. unsigned _starpu_sched_ctx_get_current_context()
  1425. {
  1426. unsigned sched_ctx = starpu_sched_ctx_get_context();
  1427. if (sched_ctx == STARPU_NMAX_SCHED_CTXS)
  1428. return _starpu_get_initial_sched_ctx()->id;
  1429. else
  1430. return sched_ctx;
  1431. }
  1432. void starpu_sched_ctx_notify_hypervisor_exists()
  1433. {
  1434. with_hypervisor = 1;
  1435. int i, j;
  1436. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  1437. {
  1438. hyp_start_sample[i] = starpu_timing_now();
  1439. hyp_start_allow_sample[i] = 0.0;
  1440. for(j = 0; j < STARPU_NMAXWORKERS; j++)
  1441. {
  1442. flops[i][j] = 0.0;
  1443. data_size[i][j] = 0;
  1444. }
  1445. hyp_actual_start_sample[i] = 0.0;
  1446. }
  1447. }
  1448. unsigned starpu_sched_ctx_check_if_hypervisor_exists()
  1449. {
  1450. return with_hypervisor;
  1451. }
  1452. void starpu_sched_ctx_update_start_resizing_sample(unsigned sched_ctx_id, double start_sample)
  1453. {
  1454. hyp_actual_start_sample[sched_ctx_id] = start_sample;
  1455. }
  1456. unsigned _starpu_sched_ctx_allow_hypervisor(unsigned sched_ctx_id)
  1457. {
  1458. (void) sched_ctx_id;
  1459. return 1;
  1460. #if 0
  1461. double now = starpu_timing_now();
  1462. if(hyp_start_allow_sample[sched_ctx_id] > 0.0)
  1463. {
  1464. double allow_sample = (now - hyp_start_allow_sample[sched_ctx_id]) / 1000000.0;
  1465. if(allow_sample < 0.001)
  1466. return 1;
  1467. else
  1468. {
  1469. hyp_start_allow_sample[sched_ctx_id] = 0.0;
  1470. hyp_start_sample[sched_ctx_id] = starpu_timing_now();
  1471. return 0;
  1472. }
  1473. }
  1474. double forbid_sample = (now - hyp_start_sample[sched_ctx_id]) / 1000000.0;
  1475. if(forbid_sample > 0.01)
  1476. {
  1477. // hyp_start_sample[sched_ctx_id] = starpu_timing_now();
  1478. hyp_start_allow_sample[sched_ctx_id] = starpu_timing_now();
  1479. return 1;
  1480. }
  1481. return 0;
  1482. #endif
  1483. }
  1484. void starpu_sched_ctx_set_policy_data(unsigned sched_ctx_id, void* policy_data)
  1485. {
  1486. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1487. sched_ctx->policy_data = policy_data;
  1488. }
  1489. void* starpu_sched_ctx_get_policy_data(unsigned sched_ctx_id)
  1490. {
  1491. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1492. return sched_ctx->policy_data;
  1493. }
  1494. struct starpu_worker_collection* starpu_sched_ctx_create_worker_collection(unsigned sched_ctx_id, enum starpu_worker_collection_type worker_collection_type)
  1495. {
  1496. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1497. _STARPU_MALLOC(sched_ctx->workers, sizeof(struct starpu_worker_collection));
  1498. switch(worker_collection_type)
  1499. {
  1500. #ifdef STARPU_HAVE_HWLOC
  1501. case STARPU_WORKER_TREE:
  1502. sched_ctx->workers->has_next = worker_tree.has_next;
  1503. sched_ctx->workers->get_next = worker_tree.get_next;
  1504. sched_ctx->workers->add = worker_tree.add;
  1505. sched_ctx->workers->remove = worker_tree.remove;
  1506. sched_ctx->workers->init = worker_tree.init;
  1507. sched_ctx->workers->deinit = worker_tree.deinit;
  1508. sched_ctx->workers->init_iterator = worker_tree.init_iterator;
  1509. sched_ctx->workers->init_iterator_for_parallel_tasks = worker_tree.init_iterator_for_parallel_tasks;
  1510. sched_ctx->workers->type = STARPU_WORKER_TREE;
  1511. break;
  1512. #endif
  1513. // case STARPU_WORKER_LIST:
  1514. default:
  1515. sched_ctx->workers->has_next = worker_list.has_next;
  1516. sched_ctx->workers->get_next = worker_list.get_next;
  1517. sched_ctx->workers->add = worker_list.add;
  1518. sched_ctx->workers->remove = worker_list.remove;
  1519. sched_ctx->workers->init = worker_list.init;
  1520. sched_ctx->workers->deinit = worker_list.deinit;
  1521. sched_ctx->workers->init_iterator = worker_list.init_iterator;
  1522. sched_ctx->workers->init_iterator_for_parallel_tasks = worker_list.init_iterator_for_parallel_tasks;
  1523. sched_ctx->workers->type = STARPU_WORKER_LIST;
  1524. break;
  1525. }
  1526. /* construct the collection of workers(list/tree/etc.) */
  1527. sched_ctx->workers->init(sched_ctx->workers);
  1528. return sched_ctx->workers;
  1529. }
  1530. void starpu_sched_ctx_display_workers(unsigned sched_ctx_id, FILE *f)
  1531. {
  1532. int *workerids = NULL;
  1533. unsigned nworkers;
  1534. unsigned i;
  1535. nworkers = starpu_sched_ctx_get_workers_list(sched_ctx_id, &workerids);
  1536. fprintf(f, "[sched_ctx %u]: %u worker%s\n", sched_ctx_id, nworkers, nworkers>1?"s":"");
  1537. for (i = 0; i < nworkers; i++)
  1538. {
  1539. char name[256];
  1540. starpu_worker_get_name(workerids[i], name, 256);
  1541. fprintf(f, "\t\t%s\n", name);
  1542. }
  1543. free(workerids);
  1544. }
  1545. unsigned starpu_sched_ctx_get_workers_list_raw(unsigned sched_ctx_id, int **workerids)
  1546. {
  1547. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1548. *workerids = sched_ctx->workers->workerids;
  1549. return sched_ctx->workers->nworkers;
  1550. }
  1551. unsigned starpu_sched_ctx_get_workers_list(unsigned sched_ctx_id, int **workerids)
  1552. {
  1553. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1554. struct starpu_worker_collection *workers = sched_ctx->workers;
  1555. unsigned nworkers = 0;
  1556. struct starpu_sched_ctx_iterator it;
  1557. if(!workers) return 0;
  1558. _STARPU_MALLOC(*workerids, workers->nworkers*sizeof(int));
  1559. workers->init_iterator(workers, &it);
  1560. while(workers->has_next(workers, &it))
  1561. {
  1562. int worker = workers->get_next(workers, &it);
  1563. (*workerids)[nworkers++] = worker;
  1564. }
  1565. return nworkers;
  1566. }
  1567. void starpu_sched_ctx_delete_worker_collection(unsigned sched_ctx_id)
  1568. {
  1569. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1570. sched_ctx->workers->deinit(sched_ctx->workers);
  1571. free(sched_ctx->workers);
  1572. sched_ctx->workers = NULL;
  1573. }
  1574. struct starpu_worker_collection* starpu_sched_ctx_get_worker_collection(unsigned sched_ctx_id)
  1575. {
  1576. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1577. return sched_ctx->workers;
  1578. }
  1579. int _starpu_get_workers_of_sched_ctx(unsigned sched_ctx_id, int *pus, enum starpu_worker_archtype arch)
  1580. {
  1581. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1582. struct starpu_worker_collection *workers = sched_ctx->workers;
  1583. int npus = 0;
  1584. struct starpu_sched_ctx_iterator it;
  1585. workers->init_iterator(workers, &it);
  1586. while(workers->has_next(workers, &it))
  1587. {
  1588. int worker = workers->get_next(workers, &it);
  1589. enum starpu_worker_archtype curr_arch = starpu_worker_get_type(worker);
  1590. if(curr_arch == arch || arch == STARPU_ANY_WORKER)
  1591. pus[npus++] = worker;
  1592. }
  1593. return npus;
  1594. }
  1595. starpu_pthread_rwlock_t* _starpu_sched_ctx_get_changing_ctx_mutex(unsigned sched_ctx_id)
  1596. {
  1597. return &changing_ctx_mutex[sched_ctx_id];
  1598. }
  1599. unsigned starpu_sched_ctx_get_nworkers(unsigned sched_ctx_id)
  1600. {
  1601. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1602. if(sched_ctx != NULL)
  1603. return sched_ctx->workers->nworkers;
  1604. else
  1605. return 0;
  1606. }
  1607. unsigned starpu_sched_ctx_get_nshared_workers(unsigned sched_ctx_id, unsigned sched_ctx_id2)
  1608. {
  1609. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1610. struct _starpu_sched_ctx *sched_ctx2 = _starpu_get_sched_ctx_struct(sched_ctx_id2);
  1611. struct starpu_worker_collection *workers = sched_ctx->workers;
  1612. struct starpu_worker_collection *workers2 = sched_ctx2->workers;
  1613. int shared_workers = 0;
  1614. struct starpu_sched_ctx_iterator it1, it2;
  1615. workers->init_iterator(workers, &it1);
  1616. workers2->init_iterator(workers2, &it2);
  1617. while(workers->has_next(workers, &it1))
  1618. {
  1619. int worker = workers->get_next(workers, &it1);
  1620. while(workers2->has_next(workers2, &it2))
  1621. {
  1622. int worker2 = workers2->get_next(workers2, &it2);
  1623. if(worker == worker2)
  1624. shared_workers++;
  1625. }
  1626. }
  1627. return shared_workers;
  1628. }
  1629. unsigned starpu_sched_ctx_contains_worker(int workerid, unsigned sched_ctx_id)
  1630. {
  1631. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1632. struct starpu_worker_collection *workers = sched_ctx->workers;
  1633. if(workers)
  1634. {
  1635. unsigned i;
  1636. for (i = 0; i < workers->nworkers; i++)
  1637. if (workerid == workers->workerids[i])
  1638. return 1;
  1639. }
  1640. return 0;
  1641. }
  1642. unsigned starpu_sched_ctx_contains_type_of_worker(enum starpu_worker_archtype arch, unsigned sched_ctx_id)
  1643. {
  1644. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  1645. unsigned i;
  1646. for (i = 0; i < workers->nworkers; i++)
  1647. {
  1648. int worker = workers->workerids[i];
  1649. enum starpu_worker_archtype curr_arch = starpu_worker_get_type(worker);
  1650. if(curr_arch == arch)
  1651. return 1;
  1652. }
  1653. return 0;
  1654. }
  1655. unsigned _starpu_worker_belongs_to_a_sched_ctx(int workerid, unsigned sched_ctx_id)
  1656. {
  1657. struct _starpu_machine_config *config = _starpu_get_machine_config();
  1658. int i;
  1659. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  1660. {
  1661. struct _starpu_sched_ctx *sched_ctx = &config->sched_ctxs[i];
  1662. if(sched_ctx && sched_ctx->id != STARPU_NMAX_SCHED_CTXS && sched_ctx->id != sched_ctx_id)
  1663. if(starpu_sched_ctx_contains_worker(workerid, sched_ctx->id))
  1664. return 1;
  1665. }
  1666. return 0;
  1667. }
  1668. unsigned starpu_sched_ctx_worker_get_id(unsigned sched_ctx_id)
  1669. {
  1670. int workerid = starpu_worker_get_id();
  1671. if(workerid != -1)
  1672. if(starpu_sched_ctx_contains_worker(workerid, sched_ctx_id))
  1673. return workerid;
  1674. return -1;
  1675. }
  1676. unsigned starpu_sched_ctx_get_ctx_for_task(struct starpu_task *task)
  1677. {
  1678. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  1679. unsigned ret_sched_ctx = task->sched_ctx;
  1680. if (task->possibly_parallel && !sched_ctx->sched_policy
  1681. && sched_ctx->nesting_sched_ctx != STARPU_NMAX_SCHED_CTXS)
  1682. ret_sched_ctx = sched_ctx->nesting_sched_ctx;
  1683. return ret_sched_ctx;
  1684. }
  1685. unsigned starpu_sched_ctx_overlapping_ctxs_on_worker(int workerid)
  1686. {
  1687. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1688. return worker->nsched_ctxs > 1;
  1689. }
  1690. void starpu_sched_ctx_set_inheritor(unsigned sched_ctx_id, unsigned inheritor)
  1691. {
  1692. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  1693. STARPU_ASSERT(inheritor < STARPU_NMAX_SCHED_CTXS);
  1694. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1695. sched_ctx->inheritor = inheritor;
  1696. return;
  1697. }
  1698. unsigned starpu_sched_ctx_get_inheritor(unsigned sched_ctx_id)
  1699. {
  1700. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  1701. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1702. return sched_ctx->inheritor;
  1703. }
  1704. unsigned starpu_sched_ctx_get_hierarchy_level(unsigned sched_ctx_id)
  1705. {
  1706. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  1707. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1708. return sched_ctx->hierarchy_level;
  1709. }
  1710. void starpu_sched_ctx_finished_submit(unsigned sched_ctx_id)
  1711. {
  1712. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1713. STARPU_PTHREAD_MUTEX_LOCK(&finished_submit_mutex);
  1714. sched_ctx->finished_submit = 1;
  1715. STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
  1716. return;
  1717. }
  1718. #ifdef STARPU_USE_SC_HYPERVISOR
  1719. void _starpu_sched_ctx_post_exec_task_cb(int workerid, struct starpu_task *task, size_t data_size2, uint32_t footprint)
  1720. {
  1721. if (workerid < 0)
  1722. return;
  1723. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  1724. if(sched_ctx != NULL && task->sched_ctx != _starpu_get_initial_sched_ctx()->id &&
  1725. task->sched_ctx != STARPU_NMAX_SCHED_CTXS && sched_ctx->perf_counters != NULL)
  1726. {
  1727. flops[task->sched_ctx][workerid] += task->flops;
  1728. data_size[task->sched_ctx][workerid] += data_size2;
  1729. if(_starpu_sched_ctx_allow_hypervisor(sched_ctx->id) || task->hypervisor_tag > 0)
  1730. {
  1731. _STARPU_TRACE_HYPERVISOR_BEGIN();
  1732. sched_ctx->perf_counters->notify_post_exec_task(task, data_size[task->sched_ctx][workerid], footprint,
  1733. task->hypervisor_tag, flops[task->sched_ctx][workerid]);
  1734. _STARPU_TRACE_HYPERVISOR_END();
  1735. flops[task->sched_ctx][workerid] = 0.0;
  1736. data_size[task->sched_ctx][workerid] = 0;
  1737. }
  1738. }
  1739. }
  1740. void starpu_sched_ctx_call_pushed_task_cb(int workerid, unsigned sched_ctx_id)
  1741. {
  1742. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1743. if(sched_ctx != NULL && sched_ctx_id != _starpu_get_initial_sched_ctx()->id && sched_ctx_id != STARPU_NMAX_SCHED_CTXS
  1744. && sched_ctx->perf_counters != NULL && _starpu_sched_ctx_allow_hypervisor(sched_ctx_id))
  1745. {
  1746. _STARPU_TRACE_HYPERVISOR_BEGIN();
  1747. sched_ctx->perf_counters->notify_pushed_task(sched_ctx_id, workerid);
  1748. _STARPU_TRACE_HYPERVISOR_END();
  1749. }
  1750. }
  1751. #endif //STARPU_USE_SC_HYPERVISOR
  1752. int starpu_sched_get_min_priority(void)
  1753. {
  1754. return starpu_sched_ctx_get_min_priority(_starpu_sched_ctx_get_current_context());
  1755. }
  1756. int starpu_sched_get_max_priority(void)
  1757. {
  1758. return starpu_sched_ctx_get_max_priority(_starpu_sched_ctx_get_current_context());
  1759. }
  1760. int starpu_sched_set_min_priority(int min_prio)
  1761. {
  1762. return starpu_sched_ctx_set_min_priority(_starpu_sched_ctx_get_current_context(), min_prio);
  1763. }
  1764. int starpu_sched_set_max_priority(int max_prio)
  1765. {
  1766. return starpu_sched_ctx_set_max_priority(_starpu_sched_ctx_get_current_context(), max_prio);
  1767. }
  1768. int starpu_sched_ctx_get_min_priority(unsigned sched_ctx_id)
  1769. {
  1770. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1771. return sched_ctx->min_priority;
  1772. }
  1773. int starpu_sched_ctx_get_max_priority(unsigned sched_ctx_id)
  1774. {
  1775. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1776. return sched_ctx->max_priority;
  1777. }
  1778. int starpu_sched_ctx_set_min_priority(unsigned sched_ctx_id, int min_prio)
  1779. {
  1780. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1781. sched_ctx->min_priority = min_prio;
  1782. return 0;
  1783. }
  1784. int starpu_sched_ctx_set_max_priority(unsigned sched_ctx_id, int max_prio)
  1785. {
  1786. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1787. sched_ctx->max_priority = max_prio;
  1788. return 0;
  1789. }
  1790. int starpu_sched_ctx_min_priority_is_set(unsigned sched_ctx_id)
  1791. {
  1792. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1793. return sched_ctx->min_priority_is_set;
  1794. }
  1795. int starpu_sched_ctx_max_priority_is_set(unsigned sched_ctx_id)
  1796. {
  1797. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1798. return sched_ctx->max_priority_is_set;
  1799. }
  1800. void starpu_sched_ctx_set_priority(int *workers, int nworkers, unsigned sched_ctx_id, unsigned priority)
  1801. {
  1802. if(nworkers != -1)
  1803. {
  1804. int w;
  1805. struct _starpu_worker *worker = NULL;
  1806. for(w = 0; w < nworkers; w++)
  1807. {
  1808. worker = _starpu_get_worker_struct(workers[w]);
  1809. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  1810. _starpu_sched_ctx_list_move(&worker->sched_ctx_list, sched_ctx_id, priority);
  1811. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  1812. }
  1813. }
  1814. return;
  1815. }
  1816. unsigned starpu_sched_ctx_get_priority(int workerid, unsigned sched_ctx_id)
  1817. {
  1818. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1819. return _starpu_sched_ctx_elt_get_priority(worker->sched_ctx_list, sched_ctx_id);
  1820. }
  1821. unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker)
  1822. {
  1823. struct _starpu_sched_ctx_list_iterator list_it;
  1824. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  1825. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  1826. {
  1827. struct _starpu_sched_ctx_elt *e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  1828. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  1829. unsigned last_worker_awake = 1;
  1830. struct starpu_worker_collection *workers = sched_ctx->workers;
  1831. struct starpu_sched_ctx_iterator it;
  1832. workers->init_iterator(workers, &it);
  1833. while(workers->has_next(workers, &it))
  1834. {
  1835. int workerid = workers->get_next(workers, &it);
  1836. if(workerid != worker->workerid && _starpu_worker_get_status(workerid) != STATUS_SLEEPING)
  1837. {
  1838. last_worker_awake = 0;
  1839. break;
  1840. }
  1841. }
  1842. if(last_worker_awake)
  1843. return 1;
  1844. }
  1845. return 0;
  1846. }
  1847. void starpu_sched_ctx_bind_current_thread_to_cpuid(unsigned cpuid)
  1848. {
  1849. _starpu_bind_thread_on_cpu(_starpu_get_machine_config(), cpuid, STARPU_NOWORKERID);
  1850. }
  1851. unsigned starpu_sched_ctx_worker_is_master_for_child_ctx(int workerid, unsigned sched_ctx_id)
  1852. {
  1853. if (_starpu_get_nsched_ctxs() <= 1)
  1854. return STARPU_NMAX_SCHED_CTXS;
  1855. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1856. struct _starpu_sched_ctx_list_iterator list_it;
  1857. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  1858. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  1859. {
  1860. struct _starpu_sched_ctx_elt *e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  1861. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  1862. if(sched_ctx-> main_master == workerid && sched_ctx->nesting_sched_ctx == sched_ctx_id)
  1863. return sched_ctx->id;
  1864. }
  1865. return STARPU_NMAX_SCHED_CTXS;
  1866. }
  1867. unsigned starpu_sched_ctx_master_get_context(int masterid)
  1868. {
  1869. struct _starpu_worker *worker = _starpu_get_worker_struct(masterid);
  1870. struct _starpu_sched_ctx_list_iterator list_it;
  1871. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  1872. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  1873. {
  1874. struct _starpu_sched_ctx_elt *e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  1875. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  1876. if(sched_ctx->main_master == masterid)
  1877. return sched_ctx->id;
  1878. }
  1879. return STARPU_NMAX_SCHED_CTXS;
  1880. }
  1881. struct _starpu_sched_ctx *__starpu_sched_ctx_get_sched_ctx_for_worker_and_job(struct _starpu_worker *worker, struct _starpu_job *j)
  1882. {
  1883. struct _starpu_sched_ctx_list_iterator list_it;
  1884. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  1885. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  1886. {
  1887. struct _starpu_sched_ctx_elt *e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  1888. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  1889. if (j->task->sched_ctx == sched_ctx->id)
  1890. return sched_ctx;
  1891. }
  1892. return NULL;
  1893. }
  1894. void starpu_sched_ctx_revert_task_counters(unsigned sched_ctx_id, double ready_flops)
  1895. {
  1896. _starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx_id);
  1897. _starpu_decrement_nready_tasks_of_sched_ctx(sched_ctx_id, ready_flops);
  1898. }
  1899. void starpu_sched_ctx_move_task_to_ctx(struct starpu_task *task, unsigned sched_ctx, unsigned manage_mutex,
  1900. unsigned with_repush)
  1901. {
  1902. /* TODO: make something cleaner which differentiates between calls
  1903. from push or pop (have mutex or not) and from another worker or not */
  1904. int workerid = starpu_worker_get_id();
  1905. struct _starpu_worker *worker = NULL;
  1906. if(workerid != -1 && manage_mutex)
  1907. {
  1908. worker = _starpu_get_worker_struct(workerid);
  1909. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  1910. }
  1911. task->sched_ctx = sched_ctx;
  1912. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  1913. _starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
  1914. if(with_repush)
  1915. _starpu_repush_task(j);
  1916. else
  1917. _starpu_increment_nready_tasks_of_sched_ctx(j->task->sched_ctx, j->task->flops, j->task);
  1918. if(workerid != -1 && manage_mutex)
  1919. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  1920. }
  1921. void starpu_sched_ctx_list_task_counters_increment(unsigned sched_ctx_id, int workerid)
  1922. {
  1923. /* Note : often we don't have any sched_mutex taken here but we
  1924. should, so take it */
  1925. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1926. if (worker->nsched_ctxs > 1)
  1927. {
  1928. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
  1929. _starpu_sched_ctx_list_push_event(worker->sched_ctx_list, sched_ctx_id);
  1930. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
  1931. }
  1932. }
  1933. void starpu_sched_ctx_list_task_counters_decrement(unsigned sched_ctx_id, int workerid)
  1934. {
  1935. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1936. if (worker->nsched_ctxs > 1)
  1937. _starpu_sched_ctx_list_pop_event(worker->sched_ctx_list, sched_ctx_id);
  1938. }
  1939. void starpu_sched_ctx_list_task_counters_reset(unsigned sched_ctx_id, int workerid)
  1940. {
  1941. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1942. if (worker->nsched_ctxs > 1)
  1943. _starpu_sched_ctx_list_pop_all_event(worker->sched_ctx_list, sched_ctx_id);
  1944. }
  1945. void starpu_sched_ctx_list_task_counters_increment_all(struct starpu_task *task, unsigned sched_ctx_id)
  1946. {
  1947. /* Note that with 1 ctx we will default to the global context,
  1948. hence our counters are useless */
  1949. if (_starpu_get_nsched_ctxs() > 1)
  1950. {
  1951. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1952. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  1953. struct starpu_sched_ctx_iterator it;
  1954. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  1955. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->sched_ctx_list_mutex);
  1956. while(workers->has_next(workers, &it))
  1957. {
  1958. int worker = workers->get_next(workers, &it);
  1959. starpu_sched_ctx_list_task_counters_increment(sched_ctx_id, worker);
  1960. }
  1961. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->sched_ctx_list_mutex);
  1962. }
  1963. }
  1964. void starpu_sched_ctx_list_task_counters_decrement_all(struct starpu_task *task, unsigned sched_ctx_id)
  1965. {
  1966. if (_starpu_get_nsched_ctxs() > 1)
  1967. {
  1968. int curr_workerid = starpu_worker_get_id();
  1969. struct _starpu_worker *curr_worker_str = NULL, *worker_str;
  1970. if(curr_workerid != -1)
  1971. {
  1972. curr_worker_str = _starpu_get_worker_struct(curr_workerid);
  1973. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&curr_worker_str->sched_mutex);
  1974. }
  1975. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1976. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  1977. struct starpu_sched_ctx_iterator it;
  1978. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  1979. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->sched_ctx_list_mutex);
  1980. while(workers->has_next(workers, &it))
  1981. {
  1982. int worker = workers->get_next(workers, &it);
  1983. worker_str = _starpu_get_worker_struct(worker);
  1984. if (worker_str->nsched_ctxs > 1)
  1985. {
  1986. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker_str->sched_mutex);
  1987. starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, worker);
  1988. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker_str->sched_mutex);
  1989. }
  1990. }
  1991. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->sched_ctx_list_mutex);
  1992. if(curr_workerid != -1)
  1993. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&curr_worker_str->sched_mutex);
  1994. }
  1995. }
  1996. void starpu_sched_ctx_list_task_counters_reset_all(struct starpu_task *task, unsigned sched_ctx_id)
  1997. {
  1998. if (_starpu_get_nsched_ctxs() > 1)
  1999. {
  2000. int curr_workerid = starpu_worker_get_id();
  2001. struct _starpu_worker *curr_worker_str = NULL, *worker_str;
  2002. if(curr_workerid != -1)
  2003. {
  2004. curr_worker_str = _starpu_get_worker_struct(curr_workerid);
  2005. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&curr_worker_str->sched_mutex);
  2006. }
  2007. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2008. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  2009. struct starpu_sched_ctx_iterator it;
  2010. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  2011. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->sched_ctx_list_mutex);
  2012. while(workers->has_next(workers, &it))
  2013. {
  2014. int worker = workers->get_next(workers, &it);
  2015. worker_str = _starpu_get_worker_struct(worker);
  2016. if (worker_str->nsched_ctxs > 1)
  2017. {
  2018. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker_str->sched_mutex);
  2019. starpu_sched_ctx_list_task_counters_reset(sched_ctx_id, worker);
  2020. STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker_str->sched_mutex);
  2021. }
  2022. }
  2023. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->sched_ctx_list_mutex);
  2024. if(curr_workerid != -1)
  2025. STARPU_PTHREAD_MUTEX_LOCK_SCHED(&curr_worker_str->sched_mutex);
  2026. }
  2027. }
  2028. static unsigned _worker_sleeping_in_other_ctx(unsigned sched_ctx_id, int workerid)
  2029. {
  2030. int s;
  2031. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  2032. {
  2033. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(s);
  2034. if(sched_ctx && sched_ctx->id > 0 && sched_ctx->id < STARPU_NMAX_SCHED_CTXS && sched_ctx->id != sched_ctx_id)
  2035. {
  2036. if(sched_ctx->parallel_sect[workerid])
  2037. return 1;
  2038. }
  2039. }
  2040. return 0;
  2041. }
  2042. static void _starpu_sched_ctx_get_workers_to_sleep(unsigned sched_ctx_id, int *workerids, int nworkers, int master)
  2043. {
  2044. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2045. int current_worker_id = starpu_worker_get_id();
  2046. unsigned sleeping[nworkers];
  2047. int w;
  2048. for(w = 0; w < nworkers; w++)
  2049. {
  2050. if(current_worker_id == -1 || workerids[w] != current_worker_id)
  2051. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerids[w]]);
  2052. sleeping[w] = _worker_sleeping_in_other_ctx(sched_ctx_id, workerids[w]);
  2053. sched_ctx->master[workerids[w]] = master;
  2054. sched_ctx->parallel_sect[workerids[w]] = 1;
  2055. if(current_worker_id == -1 || workerids[w] != current_worker_id)
  2056. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerids[w]]);
  2057. #ifndef STARPU_NON_BLOCKING_DRIVERS
  2058. starpu_wake_worker(workerids[w]);
  2059. #endif
  2060. }
  2061. for(w = 0; w < nworkers; w++)
  2062. {
  2063. int workerid = workerids[w];
  2064. if((current_worker_id == -1 || workerid != current_worker_id) && !sleeping[w])
  2065. {
  2066. sem_wait(&sched_ctx->fall_asleep_sem[master]);
  2067. }
  2068. }
  2069. return;
  2070. }
  2071. void _starpu_sched_ctx_signal_worker_blocked(unsigned sched_ctx_id, int workerid)
  2072. {
  2073. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  2074. worker->blocked = 1;
  2075. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2076. sched_ctx->sleeping[workerid] = 1;
  2077. int master = sched_ctx->master[workerid];
  2078. sem_post(&sched_ctx->fall_asleep_sem[master]);
  2079. return;
  2080. }
  2081. void _starpu_sched_ctx_signal_worker_woke_up(unsigned sched_ctx_id, int workerid)
  2082. {
  2083. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2084. int master = sched_ctx->master[workerid];
  2085. sem_post(&sched_ctx->wake_up_sem[master]);
  2086. sched_ctx->sleeping[workerid] = 0;
  2087. sched_ctx->master[workerid] = -1;
  2088. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  2089. worker->blocked = 0;
  2090. return;
  2091. }
  2092. static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, int master)
  2093. {
  2094. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2095. int current_worker_id = starpu_worker_get_id();
  2096. struct starpu_worker_collection *workers = sched_ctx->workers;
  2097. struct starpu_sched_ctx_iterator it;
  2098. workers->init_iterator(workers, &it);
  2099. while(workers->has_next(workers, &it))
  2100. {
  2101. int workerid = workers->get_next(workers, &it);
  2102. int curr_master = sched_ctx->master[workerid];
  2103. if(curr_master == master && sched_ctx->parallel_sect[workerid])
  2104. {
  2105. if((current_worker_id == -1 || workerid != current_worker_id) && sched_ctx->sleeping[workerid])
  2106. {
  2107. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
  2108. STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond[workerid]);
  2109. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
  2110. sem_wait(&sched_ctx->wake_up_sem[master]);
  2111. }
  2112. else
  2113. sched_ctx->parallel_sect[workerid] = 0;
  2114. }
  2115. }
  2116. return;
  2117. }
  2118. void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void*), void* param, unsigned sched_ctx_id)
  2119. {
  2120. int *workerids = NULL;
  2121. int nworkers = starpu_sched_ctx_get_workers_list(sched_ctx_id, &workerids);
  2122. _starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, workerids, nworkers, workerids[nworkers-1]);
  2123. /* execute parallel code */
  2124. void* ret = func(param);
  2125. /* wake up starpu workers */
  2126. _starpu_sched_ctx_wake_up_workers(sched_ctx_id, workerids[nworkers-1]);
  2127. free(workerids);
  2128. return ret;
  2129. }
  2130. void starpu_sched_ctx_get_available_cpuids(unsigned sched_ctx_id, int **cpuids, int *ncpuids)
  2131. {
  2132. int current_worker_id = starpu_worker_get_id();
  2133. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2134. struct starpu_worker_collection *workers = sched_ctx->workers;
  2135. _STARPU_MALLOC((*cpuids), workers->nworkers*sizeof(int));
  2136. int w = 0;
  2137. struct starpu_sched_ctx_iterator it;
  2138. workers->init_iterator(workers, &it);
  2139. while(workers->has_next(workers, &it))
  2140. {
  2141. int workerid = workers->get_next(workers, &it);
  2142. int master = sched_ctx->master[workerid];
  2143. if(master == current_worker_id || workerid == current_worker_id || current_worker_id == -1)
  2144. {
  2145. (*cpuids)[w++] = starpu_worker_get_bindid(workerid);
  2146. }
  2147. }
  2148. *ncpuids = w;
  2149. return;
  2150. }
  2151. static void _starpu_sched_ctx_wake_these_workers_up(unsigned sched_ctx_id, int *workerids, int nworkers)
  2152. {
  2153. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2154. int current_worker_id = starpu_worker_get_id();
  2155. int masters[nworkers];
  2156. int w;
  2157. for(w = 0; w < nworkers; w++)
  2158. {
  2159. int workerid = workerids[w];
  2160. masters[w] = sched_ctx->master[workerid];
  2161. if(current_worker_id == -1 || workerid != current_worker_id)
  2162. {
  2163. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
  2164. STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond[workerid]);
  2165. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
  2166. }
  2167. else
  2168. sched_ctx->parallel_sect[workerid] = 0;
  2169. sched_ctx->master[workerid] = -1;
  2170. }
  2171. for(w = 0; w < nworkers; w++)
  2172. {
  2173. int workerid = workerids[w];
  2174. if(masters[w] != -1)
  2175. {
  2176. int master = sched_ctx->master[workerid];
  2177. if(current_worker_id == -1 || workerid != current_worker_id)
  2178. sem_wait(&sched_ctx->wake_up_sem[master]);
  2179. }
  2180. }
  2181. return;
  2182. }
  2183. static int _starpu_sched_ctx_find_master(unsigned sched_ctx_id, int *workerids, int nworkers)
  2184. {
  2185. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2186. int new_master = workerids[nworkers-1];
  2187. int current_worker_id = starpu_worker_get_id();
  2188. int current_is_in_section = 0;
  2189. int npotential_masters = 0;
  2190. int nawake_workers = 0;
  2191. int ntrue_masters = 0;
  2192. int potential_masters[nworkers];
  2193. int awake_workers[nworkers];
  2194. int true_masters[nworkers];
  2195. int i,w;
  2196. for(w = 0 ; w < nworkers ; w++)
  2197. {
  2198. if (current_worker_id == workerids[w])
  2199. current_is_in_section = 1;
  2200. int master = sched_ctx->master[workerids[w]];
  2201. if (master > -1)
  2202. {
  2203. int already_seen = 0;
  2204. //Could create a function for this. Basically searching an element in an array.
  2205. for (i = 0 ; i < npotential_masters; i++)
  2206. {
  2207. if (potential_masters[i] == master)
  2208. {
  2209. already_seen = 1;
  2210. break;
  2211. }
  2212. }
  2213. if (!already_seen)
  2214. potential_masters[npotential_masters++] = master;
  2215. }
  2216. else if (master == -1)
  2217. awake_workers[nawake_workers++] = workerids[w];
  2218. }
  2219. for (i = 0 ; i < npotential_masters ; i++)
  2220. {
  2221. int master_is_in_section = 0;
  2222. //Could create a function for this. Basically searching an element in an array.
  2223. for (w = 0 ; w < nworkers ; w++)
  2224. {
  2225. if (workerids[w] == potential_masters[i])
  2226. {
  2227. master_is_in_section = 1;
  2228. break;
  2229. }
  2230. }
  2231. if (master_is_in_section)
  2232. true_masters[ntrue_masters++] = potential_masters[i];
  2233. }
  2234. if (current_is_in_section)
  2235. new_master = current_worker_id;
  2236. else
  2237. {
  2238. if (ntrue_masters > 1)
  2239. {
  2240. if (nawake_workers > 0)
  2241. new_master = awake_workers[nawake_workers - 1];
  2242. else
  2243. new_master = true_masters[ntrue_masters - 1];
  2244. }
  2245. }
  2246. return new_master;
  2247. }
  2248. static void _starpu_sched_ctx_add_workers_to_master(unsigned sched_ctx_id, int *workerids, int nworkers, int new_master)
  2249. {
  2250. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2251. int w;
  2252. int nput_to_sleep = 0;
  2253. int nwake_up = 0;
  2254. int put_to_sleep[nworkers];
  2255. int wake_up[nworkers];
  2256. for(w = 0 ; w < nworkers ; w++)
  2257. {
  2258. int master = sched_ctx->master[workerids[w]];
  2259. if (master == -1 && workerids[w] != new_master)
  2260. put_to_sleep[nput_to_sleep++] = workerids[w];
  2261. else if(master != -1 && workerids[w] == new_master)
  2262. wake_up[nwake_up++] = workerids[w];
  2263. }
  2264. if(nwake_up > 0)
  2265. _starpu_sched_ctx_wake_these_workers_up(sched_ctx_id, wake_up, nwake_up);
  2266. if(nput_to_sleep > 0)
  2267. _starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, put_to_sleep, nput_to_sleep, new_master);
  2268. }
  2269. static void _starpu_sched_ctx_set_master(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers, int master)
  2270. {
  2271. int i;
  2272. for(i = 0; i < nworkers; i++)
  2273. {
  2274. if(workerids[i] != master)
  2275. sched_ctx->master[workerids[i]] = master;
  2276. }
  2277. }
  2278. int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers)
  2279. {
  2280. int new_master = _starpu_sched_ctx_find_master(sched_ctx_id, workerids, nworkers);
  2281. _starpu_sched_ctx_add_workers_to_master(sched_ctx_id, workerids, nworkers, new_master);
  2282. return new_master;
  2283. }
  2284. void starpu_sched_ctx_unbook_workers_for_task(unsigned sched_ctx_id, int master)
  2285. {
  2286. /* wake up starpu workers */
  2287. _starpu_sched_ctx_wake_up_workers(sched_ctx_id, master);
  2288. }
  2289. struct starpu_perfmodel_arch * _starpu_sched_ctx_get_perf_archtype(unsigned sched_ctx_id)
  2290. {
  2291. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2292. return &sched_ctx->perf_arch;
  2293. }
  2294. int starpu_sched_ctx_get_worker_rank(unsigned sched_ctx_id)
  2295. {
  2296. int idx = 0;
  2297. int curr_workerid = starpu_worker_get_id();
  2298. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2299. if(sched_ctx->sched_policy || !sched_ctx->awake_workers)
  2300. return -1;
  2301. struct starpu_worker_collection *workers = sched_ctx->workers;
  2302. struct starpu_sched_ctx_iterator it;
  2303. workers->init_iterator(workers, &it);
  2304. while(workers->has_next(workers, &it))
  2305. {
  2306. int worker = workers->get_next(workers, &it);
  2307. if(worker == curr_workerid)
  2308. return idx;
  2309. idx++;
  2310. }
  2311. return -1;
  2312. }
  2313. void (*starpu_sched_ctx_get_sched_policy_init(unsigned sched_ctx_id))(unsigned)
  2314. {
  2315. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2316. return sched_ctx->init_sched;
  2317. }
  2318. unsigned starpu_sched_ctx_has_starpu_scheduler(unsigned sched_ctx_id, unsigned *awake_workers)
  2319. {
  2320. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2321. *awake_workers = sched_ctx->awake_workers;
  2322. return sched_ctx->sched_policy != NULL;
  2323. }
  2324. void *starpu_sched_ctx_get_used_data(unsigned sched_ctx_id)
  2325. {
  2326. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  2327. STARPU_ASSERT(sched_ctx != NULL);
  2328. return sched_ctx->user_data;
  2329. }