sched_ctx.c 62 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011, 2013 INRIA
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <core/sched_policy.h>
  17. #include <core/sched_ctx.h>
  18. #include <common/utils.h>
  19. #include <stdarg.h>
  20. starpu_pthread_rwlock_t changing_ctx_mutex[STARPU_NMAX_SCHED_CTXS];
  21. static starpu_pthread_mutex_t sched_ctx_manag = STARPU_PTHREAD_MUTEX_INITIALIZER;
  22. static starpu_pthread_mutex_t finished_submit_mutex = STARPU_PTHREAD_MUTEX_INITIALIZER;
  23. struct starpu_task stop_submission_task = STARPU_TASK_INITIALIZER;
  24. starpu_pthread_key_t sched_ctx_key;
  25. unsigned with_hypervisor = 0;
  26. double hyp_start_sample[STARPU_NMAX_SCHED_CTXS];
  27. double hyp_start_allow_sample[STARPU_NMAX_SCHED_CTXS];
  28. double flops[STARPU_NMAX_SCHED_CTXS][STARPU_NMAXWORKERS];
  29. size_t data_size[STARPU_NMAX_SCHED_CTXS][STARPU_NMAXWORKERS];
  30. static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config);
  31. static void _starpu_sched_ctx_add_workers_to_master(unsigned sched_ctx_id, int *workerids, int nworkers, int new_master);
  32. static void _starpu_sched_ctx_wake_these_workers_up(unsigned sched_ctx_id, int *workerids, int nworkers);
  33. static void _starpu_worker_gets_into_ctx(unsigned sched_ctx_id, struct _starpu_worker *worker)
  34. {
  35. unsigned ret_sched_ctx = _starpu_sched_ctx_list_get_sched_ctx(worker->sched_ctx_list, sched_ctx_id);
  36. /* the worker was planning to go away in another ctx but finally he changed his mind &
  37. he's staying */
  38. if (ret_sched_ctx == STARPU_NMAX_SCHED_CTXS)
  39. {
  40. /* add context to worker */
  41. _starpu_sched_ctx_list_add(&worker->sched_ctx_list, sched_ctx_id);
  42. worker->nsched_ctxs++;
  43. }
  44. worker->removed_from_ctx[sched_ctx_id] = 0;
  45. if(worker->tmp_sched_ctx == sched_ctx_id)
  46. worker->tmp_sched_ctx = -1;
  47. return;
  48. }
  49. void _starpu_worker_gets_out_of_ctx(unsigned sched_ctx_id, struct _starpu_worker *worker)
  50. {
  51. unsigned ret_sched_ctx = _starpu_sched_ctx_list_get_sched_ctx(worker->sched_ctx_list, sched_ctx_id);
  52. /* remove context from worker */
  53. if(ret_sched_ctx != STARPU_NMAX_SCHED_CTXS)
  54. {
  55. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  56. if(sched_ctx && sched_ctx->sched_policy && sched_ctx->sched_policy->remove_workers)
  57. {
  58. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  59. sched_ctx->sched_policy->remove_workers(sched_ctx_id, &worker->workerid, 1);
  60. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  61. }
  62. _starpu_sched_ctx_list_remove(&worker->sched_ctx_list, sched_ctx_id);
  63. worker->nsched_ctxs--;
  64. }
  65. return;
  66. }
  67. static void _starpu_update_workers_with_ctx(int *workerids, int nworkers, int sched_ctx_id)
  68. {
  69. int i;
  70. struct _starpu_worker *worker = NULL;
  71. for(i = 0; i < nworkers; i++)
  72. {
  73. worker = _starpu_get_worker_struct(workerids[i]);
  74. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  75. _starpu_worker_gets_into_ctx(sched_ctx_id, worker);
  76. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  77. }
  78. return;
  79. }
  80. static void _starpu_update_workers_without_ctx(int *workerids, int nworkers, int sched_ctx_id, unsigned now)
  81. {
  82. int i;
  83. struct _starpu_worker *worker = NULL;
  84. for(i = 0; i < nworkers; i++)
  85. {
  86. worker = _starpu_get_worker_struct(workerids[i]);
  87. if(now)
  88. {
  89. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  90. _starpu_worker_gets_out_of_ctx(sched_ctx_id, worker);
  91. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  92. }
  93. else
  94. {
  95. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  96. worker->removed_from_ctx[sched_ctx_id] = 1;
  97. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  98. }
  99. }
  100. return;
  101. }
  102. void starpu_sched_ctx_stop_task_submission()
  103. {
  104. _starpu_exclude_task_from_dag(&stop_submission_task);
  105. _starpu_task_submit_internally(&stop_submission_task);
  106. }
  107. void starpu_sched_ctx_worker_shares_tasks_lists(int workerid, int sched_ctx_id)
  108. {
  109. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  110. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  111. int curr_workerid = starpu_worker_get_id();
  112. /* if is the initial sched_ctx no point in taking the mutex, the workers are
  113. not launched yet, or if the current worker is calling this */
  114. if(!sched_ctx->is_initial_sched && workerid != curr_workerid)
  115. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  116. worker->shares_tasks_lists[sched_ctx_id] = 1;
  117. if(!sched_ctx->is_initial_sched && workerid != curr_workerid)
  118. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  119. }
  120. static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx, int *workerids, int nworkers,
  121. int *added_workers, int *n_added_workers)
  122. {
  123. struct starpu_worker_collection *workers = sched_ctx->workers;
  124. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  125. int nworkers_to_add = nworkers == -1 ? (int)config->topology.nworkers : nworkers;
  126. int workers_to_add[nworkers_to_add];
  127. int i = 0;
  128. for(i = 0; i < nworkers_to_add; i++)
  129. {
  130. /* added_workers is NULL for the call of this func at the creation of the context*/
  131. /* if the function is called at the creation of the context it's no need to do this verif */
  132. if(added_workers)
  133. {
  134. int worker = workers->add(workers, (workerids == NULL ? i : workerids[i]));
  135. if(worker >= 0)
  136. added_workers[(*n_added_workers)++] = worker;
  137. else
  138. {
  139. int curr_workerid = starpu_worker_get_id();
  140. struct _starpu_worker *worker_str = _starpu_get_worker_struct(workerids[i]);
  141. if(curr_workerid != workerids[i])
  142. STARPU_PTHREAD_MUTEX_LOCK(&worker_str->sched_mutex);
  143. worker_str->removed_from_ctx[sched_ctx->id] = 0;
  144. if(curr_workerid != workerids[i])
  145. STARPU_PTHREAD_MUTEX_UNLOCK(&worker_str->sched_mutex);
  146. }
  147. }
  148. else
  149. {
  150. int worker = (workerids == NULL ? i : workerids[i]);
  151. workers->add(workers, worker);
  152. workers_to_add[i] = worker;
  153. struct _starpu_worker *str_worker = _starpu_get_worker_struct(worker);
  154. str_worker->tmp_sched_ctx = (int)sched_ctx->id;
  155. }
  156. }
  157. if(!sched_ctx->sched_policy)
  158. {
  159. if(sched_ctx->main_master == -1)
  160. sched_ctx->main_master = starpu_sched_ctx_book_workers_for_task(sched_ctx->id, workerids, nworkers);
  161. else
  162. {
  163. _starpu_sched_ctx_add_workers_to_master(sched_ctx->id, workerids, nworkers, sched_ctx->main_master);
  164. }
  165. }
  166. else if(sched_ctx->sched_policy->add_workers)
  167. {
  168. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  169. if(added_workers)
  170. {
  171. if(*n_added_workers > 0)
  172. sched_ctx->sched_policy->add_workers(sched_ctx->id, added_workers, *n_added_workers);
  173. }
  174. else
  175. sched_ctx->sched_policy->add_workers(sched_ctx->id, workers_to_add, nworkers_to_add);
  176. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  177. }
  178. return;
  179. }
  180. static void _starpu_remove_workers_from_sched_ctx(struct _starpu_sched_ctx *sched_ctx, int *workerids,
  181. int nworkers, int *removed_workers, int *n_removed_workers)
  182. {
  183. struct starpu_worker_collection *workers = sched_ctx->workers;
  184. int i = 0;
  185. for(i = 0; i < nworkers; i++)
  186. {
  187. if(workers->nworkers > 0)
  188. {
  189. if(_starpu_worker_belongs_to_a_sched_ctx(workerids[i], sched_ctx->id))
  190. {
  191. int worker = workers->remove(workers, workerids[i]);
  192. if(worker >= 0)
  193. removed_workers[(*n_removed_workers)++] = worker;
  194. }
  195. }
  196. }
  197. if(!sched_ctx->sched_policy)
  198. _starpu_sched_ctx_wake_these_workers_up(sched_ctx->id, removed_workers, *n_removed_workers);
  199. return;
  200. }
  201. static void _starpu_sched_ctx_free_scheduling_data(struct _starpu_sched_ctx *sched_ctx)
  202. {
  203. int *workerids = NULL;
  204. unsigned nworkers_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &workerids);
  205. if(nworkers_ctx > 0 && sched_ctx->sched_policy->remove_workers)
  206. {
  207. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  208. sched_ctx->sched_policy->remove_workers(sched_ctx->id, workerids, nworkers_ctx);
  209. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  210. }
  211. free(workerids);
  212. return;
  213. }
  214. #ifdef STARPU_HAVE_HWLOC
  215. static void _starpu_sched_ctx_create_hwloc_tree(struct _starpu_sched_ctx *sched_ctx)
  216. {
  217. sched_ctx->hwloc_workers_set = hwloc_bitmap_alloc();
  218. struct starpu_worker_collection *workers = sched_ctx->workers;
  219. struct _starpu_worker *worker;
  220. struct starpu_sched_ctx_iterator it;
  221. if(workers->init_iterator)
  222. workers->init_iterator(workers, &it);
  223. while(workers->has_next(workers, &it))
  224. {
  225. worker = _starpu_get_worker_struct(workers->get_next(workers, &it));
  226. if(!starpu_worker_is_combined_worker(worker->workerid))
  227. {
  228. hwloc_bitmap_or(sched_ctx->hwloc_workers_set,
  229. sched_ctx->hwloc_workers_set,
  230. worker->hwloc_cpu_set);
  231. }
  232. }
  233. return;
  234. }
  235. #endif
  236. struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *policy, int *workerids,
  237. int nworkers_ctx, unsigned is_initial_sched,
  238. const char *sched_ctx_name,
  239. int min_prio_set, int min_prio,
  240. int max_prio_set, int max_prio)
  241. {
  242. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  243. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
  244. STARPU_ASSERT(config->topology.nsched_ctxs < STARPU_NMAX_SCHED_CTXS);
  245. unsigned id = _starpu_get_first_free_sched_ctx(config);
  246. struct _starpu_sched_ctx *sched_ctx = &config->sched_ctxs[id];
  247. sched_ctx->id = id;
  248. config->topology.nsched_ctxs++;
  249. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
  250. int nworkers = config->topology.nworkers;
  251. STARPU_ASSERT(nworkers_ctx <= nworkers);
  252. STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->empty_ctx_mutex, NULL);
  253. starpu_task_list_init(&sched_ctx->empty_ctx_tasks);
  254. sched_ctx->sched_policy = policy ? (struct starpu_sched_policy*)malloc(sizeof(struct starpu_sched_policy)) : NULL;
  255. sched_ctx->is_initial_sched = is_initial_sched;
  256. sched_ctx->name = sched_ctx_name;
  257. sched_ctx->inheritor = STARPU_NMAX_SCHED_CTXS;
  258. sched_ctx->finished_submit = 0;
  259. sched_ctx->min_priority_is_set = min_prio_set;
  260. if (sched_ctx->min_priority_is_set) sched_ctx->min_priority = min_prio;
  261. sched_ctx->max_priority_is_set = max_prio_set;
  262. if (sched_ctx->max_priority_is_set) sched_ctx->max_priority = max_prio;
  263. _starpu_barrier_counter_init(&sched_ctx->tasks_barrier, 0);
  264. _starpu_barrier_counter_init(&sched_ctx->ready_tasks_barrier, 0);
  265. sched_ctx->ready_flops = 0.0;
  266. sched_ctx->main_master = -1;
  267. int w;
  268. for(w = 0; w < nworkers; w++)
  269. {
  270. sem_init(&sched_ctx->fall_asleep_sem[w], 0, 0);
  271. sem_init(&sched_ctx->wake_up_sem[w], 0, 0);
  272. STARPU_PTHREAD_COND_INIT(&sched_ctx->parallel_sect_cond[w], NULL);
  273. STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->parallel_sect_mutex[w], NULL);
  274. sched_ctx->master[w] = -1;
  275. sched_ctx->parallel_sect[w] = 0;
  276. sched_ctx->sleeping[w] = 0;
  277. }
  278. /*init the strategy structs and the worker_collection of the ressources of the context */
  279. if(policy)
  280. _starpu_init_sched_policy(config, sched_ctx, policy);
  281. else
  282. starpu_sched_ctx_create_worker_collection(sched_ctx->id, STARPU_WORKER_LIST);
  283. /* construct the collection of workers(list/tree/etc.) */
  284. sched_ctx->workers->init(sched_ctx->workers);
  285. /* after having an worker_collection on the ressources add them */
  286. _starpu_add_workers_to_sched_ctx(sched_ctx, workerids, nworkers_ctx, NULL, NULL);
  287. #ifdef STARPU_HAVE_HWLOC
  288. /* build hwloc tree of the context */
  289. _starpu_sched_ctx_create_hwloc_tree(sched_ctx);
  290. #endif //STARPU_HAVE_HWLOC
  291. /* if we create the initial big sched ctx we can update workers' status here
  292. because they haven't been launched yet */
  293. if(is_initial_sched)
  294. {
  295. int i;
  296. /*initialize the mutexes for all contexts */
  297. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  298. STARPU_PTHREAD_RWLOCK_INIT(&changing_ctx_mutex[i], NULL);
  299. for(i = 0; i < nworkers; i++)
  300. {
  301. struct _starpu_worker *worker = _starpu_get_worker_struct(i);
  302. worker->sched_ctx_list = (struct _starpu_sched_ctx_list*)malloc(sizeof(struct _starpu_sched_ctx_list));
  303. _starpu_sched_ctx_list_init(worker->sched_ctx_list);
  304. _starpu_sched_ctx_list_add(&worker->sched_ctx_list, sched_ctx->id);
  305. worker->nsched_ctxs++;
  306. }
  307. }
  308. return sched_ctx;
  309. }
  310. static void _get_workers(int min, int max, int *workers, int *nw, enum starpu_worker_archtype arch, unsigned allow_overlap)
  311. {
  312. int pus[max];
  313. int npus = 0;
  314. int i;
  315. int n = 0;
  316. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  317. if(config->topology.nsched_ctxs == 1)
  318. {
  319. /*we have all available resources */
  320. npus = starpu_worker_get_nids_by_type(arch, pus, max);
  321. /*TODO: hierarchical ctxs: get max good workers: close one to another */
  322. for(i = 0; i < npus; i++)
  323. workers[(*nw)++] = pus[i];
  324. }
  325. else
  326. {
  327. unsigned enough_ressources = 0;
  328. npus = starpu_worker_get_nids_ctx_free_by_type(arch, pus, max);
  329. for(i = 0; i < npus; i++)
  330. workers[(*nw)++] = pus[i];
  331. if(npus == max)
  332. /*we have enough available resources */
  333. enough_ressources = 1;
  334. if(!enough_ressources && npus >= min)
  335. /*we have enough available resources */
  336. enough_ressources = 1;
  337. if(!enough_ressources)
  338. {
  339. /* try to get ressources from ctx who have more than the min of workers they need */
  340. int s;
  341. for(s = 1; s < STARPU_NMAX_SCHED_CTXS; s++)
  342. {
  343. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  344. {
  345. int _npus = 0;
  346. int _pus[STARPU_NMAXWORKERS];
  347. _npus = _starpu_get_workers_of_sched_ctx(config->sched_ctxs[s].id, _pus, arch);
  348. int ctx_min = arch == STARPU_CPU_WORKER ? config->sched_ctxs[s].min_ncpus : config->sched_ctxs[s].min_ngpus;
  349. if(_npus > ctx_min)
  350. {
  351. if(npus < min)
  352. {
  353. n = (_npus - ctx_min) > (min - npus) ? min - npus : (_npus - ctx_min);
  354. npus += n;
  355. }
  356. /*TODO: hierarchical ctxs: get n good workers: close to the other ones I already assigned to the ctx */
  357. for(i = 0; i < n; i++)
  358. workers[(*nw)++] = _pus[i];
  359. starpu_sched_ctx_remove_workers(_pus, n, config->sched_ctxs[s].id);
  360. }
  361. }
  362. }
  363. if(npus >= min)
  364. enough_ressources = 1;
  365. }
  366. if(!enough_ressources)
  367. {
  368. /* if there is no available workers to satisfy the minimum required
  369. give them workers proportional to their requirements*/
  370. int global_npus = starpu_worker_get_count_by_type(arch);
  371. int req_npus = 0;
  372. int s;
  373. for(s = 1; s < STARPU_NMAX_SCHED_CTXS; s++)
  374. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  375. req_npus += arch == STARPU_CPU_WORKER ? config->sched_ctxs[s].min_ncpus : config->sched_ctxs[s].min_ngpus;
  376. req_npus += min;
  377. for(s = 1; s < STARPU_NMAX_SCHED_CTXS; s++)
  378. {
  379. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  380. {
  381. int ctx_min = arch == STARPU_CPU_WORKER ? config->sched_ctxs[s].min_ncpus : config->sched_ctxs[s].min_ngpus;
  382. double needed_npus = ((double)ctx_min * (double)global_npus) / (double)req_npus;
  383. int _npus = 0;
  384. int _pus[STARPU_NMAXWORKERS];
  385. _npus = _starpu_get_workers_of_sched_ctx(config->sched_ctxs[s].id, _pus, arch);
  386. if(needed_npus < (double)_npus)
  387. {
  388. double npus_to_rem = (double)_npus - needed_npus;
  389. int x = floor(npus_to_rem);
  390. double x_double = (double)x;
  391. double diff = npus_to_rem - x_double;
  392. int npus_to_remove = diff >= 0.5 ? x+1 : x;
  393. int pus_to_remove[npus_to_remove];
  394. int c = 0;
  395. /*TODO: hierarchical ctxs: get npus_to_remove good workers: close to the other ones I already assigned to the ctx */
  396. for(i = _npus-1; i >= (_npus - npus_to_remove); i--)
  397. {
  398. workers[(*nw)++] = _pus[i];
  399. pus_to_remove[c++] = _pus[i];
  400. }
  401. if(!allow_overlap)
  402. starpu_sched_ctx_remove_workers(pus_to_remove, npus_to_remove, config->sched_ctxs[s].id);
  403. }
  404. }
  405. }
  406. }
  407. }
  408. }
  409. unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const char *sched_ctx_name,
  410. int min_ncpus, int max_ncpus, int min_ngpus, int max_ngpus,
  411. unsigned allow_overlap)
  412. {
  413. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  414. struct starpu_sched_policy *selected_policy = _starpu_select_sched_policy(config, policy_name);
  415. struct _starpu_sched_ctx *sched_ctx = NULL;
  416. int workers[max_ncpus + max_ngpus];
  417. int nw = 0;
  418. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
  419. _get_workers(min_ncpus, max_ncpus, workers, &nw, STARPU_CPU_WORKER, allow_overlap);
  420. _get_workers(min_ngpus, max_ngpus, workers, &nw, STARPU_CUDA_WORKER, allow_overlap);
  421. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
  422. int i;
  423. printf("%d: ", nw);
  424. for(i = 0; i < nw; i++)
  425. printf("%d ", workers[i]);
  426. printf("\n");
  427. sched_ctx = _starpu_create_sched_ctx(selected_policy, workers, nw, 0, sched_ctx_name, 0, 0, 0, 0);
  428. sched_ctx->min_ncpus = min_ncpus;
  429. sched_ctx->max_ncpus = max_ncpus;
  430. sched_ctx->min_ngpus = min_ngpus;
  431. sched_ctx->max_ngpus = max_ngpus;
  432. _starpu_unlock_mutex_if_prev_locked();
  433. int *added_workerids;
  434. unsigned nw_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &added_workerids);
  435. _starpu_update_workers_without_ctx(added_workerids, nw_ctx, sched_ctx->id, 0);
  436. free(added_workerids);
  437. _starpu_relock_mutex_if_prev_locked();
  438. #ifdef STARPU_USE_SC_HYPERVISOR
  439. sched_ctx->perf_counters = NULL;
  440. #endif
  441. return sched_ctx->id;
  442. }
  443. unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched_ctx_name, ...)
  444. {
  445. va_list varg_list;
  446. int arg_type;
  447. int min_prio_set = 0;
  448. int max_prio_set = 0;
  449. int min_prio = 0;
  450. int max_prio = 0;
  451. struct starpu_sched_policy *sched_policy = NULL;
  452. unsigned hierarchy_level = 0;
  453. unsigned nesting_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  454. va_start(varg_list, sched_ctx_name);
  455. while ((arg_type = va_arg(varg_list, int)) != 0)
  456. {
  457. if (arg_type == STARPU_SCHED_CTX_POLICY_NAME)
  458. {
  459. char *policy_name = va_arg(varg_list, char *);
  460. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  461. sched_policy = _starpu_select_sched_policy(config, policy_name);
  462. }
  463. else if (arg_type == STARPU_SCHED_CTX_POLICY_STRUCT)
  464. {
  465. sched_policy = va_arg(varg_list, struct starpu_sched_policy *);
  466. }
  467. else if (arg_type == STARPU_SCHED_CTX_POLICY_MIN_PRIO)
  468. {
  469. min_prio = va_arg(varg_list, int);
  470. min_prio_set = 1;
  471. }
  472. else if (arg_type == STARPU_SCHED_CTX_POLICY_MAX_PRIO)
  473. {
  474. max_prio = va_arg(varg_list, int);
  475. max_prio_set = 1;
  476. }
  477. else if (arg_type == STARPU_SCHED_CTX_HIERARCHY_LEVEL)
  478. {
  479. hierarchy_level = va_arg(varg_list, unsigned);
  480. }
  481. else if (arg_type == STARPU_SCHED_CTX_NESTED)
  482. {
  483. nesting_sched_ctx = va_arg(varg_list, unsigned);
  484. }
  485. else
  486. {
  487. STARPU_ABORT_MSG("Unrecognized argument %d\n", arg_type);
  488. }
  489. }
  490. va_end(varg_list);
  491. struct _starpu_sched_ctx *sched_ctx = NULL;
  492. sched_ctx = _starpu_create_sched_ctx(sched_policy, workerids, nworkers, 0, sched_ctx_name, min_prio_set, min_prio, max_prio_set, max_prio);
  493. sched_ctx->hierarchy_level = hierarchy_level;
  494. sched_ctx->nesting_sched_ctx = nesting_sched_ctx;
  495. _starpu_unlock_mutex_if_prev_locked();
  496. int *added_workerids;
  497. unsigned nw_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &added_workerids);
  498. _starpu_update_workers_with_ctx(added_workerids, nw_ctx, sched_ctx->id);
  499. free(added_workerids);
  500. _starpu_relock_mutex_if_prev_locked();
  501. #ifdef STARPU_USE_SC_HYPERVISOR
  502. sched_ctx->perf_counters = NULL;
  503. #endif
  504. return sched_ctx->id;
  505. }
  506. void starpu_sched_ctx_register_close_callback(unsigned sched_ctx_id, void (*close_callback)(unsigned sched_ctx_id, void* args), void *args)
  507. {
  508. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  509. sched_ctx->close_callback = close_callback;
  510. sched_ctx->close_args = args;
  511. return;
  512. }
  513. #ifdef STARPU_USE_SC_HYPERVISOR
  514. void starpu_sched_ctx_set_perf_counters(unsigned sched_ctx_id, void* perf_counters)
  515. {
  516. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  517. sched_ctx->perf_counters = (struct starpu_sched_ctx_performance_counters *)perf_counters;
  518. return;
  519. }
  520. #endif
  521. /* free all structures for the context */
  522. static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
  523. {
  524. STARPU_ASSERT(sched_ctx->id != STARPU_NMAX_SCHED_CTXS);
  525. if(sched_ctx->sched_policy)
  526. {
  527. _starpu_deinit_sched_policy(sched_ctx);
  528. free(sched_ctx->sched_policy);
  529. sched_ctx->sched_policy = NULL;
  530. }
  531. STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->empty_ctx_mutex);
  532. sched_ctx->id = STARPU_NMAX_SCHED_CTXS;
  533. #ifdef STARPU_HAVE_HWLOC
  534. hwloc_bitmap_free(sched_ctx->hwloc_workers_set);
  535. #endif //STARPU_HAVE_HWLOC
  536. struct _starpu_machine_config *config = _starpu_get_machine_config();
  537. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
  538. config->topology.nsched_ctxs--;
  539. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
  540. }
  541. void starpu_sched_ctx_delete(unsigned sched_ctx_id)
  542. {
  543. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  544. #ifdef STARPU_USE_SC_HYPERVISOR
  545. if(sched_ctx != NULL && sched_ctx_id != 0 && sched_ctx_id != STARPU_NMAX_SCHED_CTXS
  546. && sched_ctx->perf_counters != NULL)
  547. {
  548. _STARPU_TRACE_HYPERVISOR_BEGIN();
  549. sched_ctx->perf_counters->notify_delete_context(sched_ctx_id);
  550. _STARPU_TRACE_HYPERVISOR_END();
  551. }
  552. #endif //STARPU_USE_SC_HYPERVISOR
  553. unsigned inheritor_sched_ctx_id = sched_ctx->inheritor;
  554. struct _starpu_sched_ctx *inheritor_sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx->inheritor);
  555. _starpu_unlock_mutex_if_prev_locked();
  556. STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx_id]);
  557. STARPU_ASSERT(sched_ctx->id != STARPU_NMAX_SCHED_CTXS);
  558. int *workerids;
  559. unsigned nworkers_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &workerids);
  560. /*if both of them have all the ressources is pointless*/
  561. /*trying to transfer ressources from one ctx to the other*/
  562. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  563. unsigned nworkers = config->topology.nworkers;
  564. if(nworkers_ctx > 0 && inheritor_sched_ctx && inheritor_sched_ctx->id != STARPU_NMAX_SCHED_CTXS &&
  565. !(nworkers_ctx == nworkers && nworkers_ctx == inheritor_sched_ctx->workers->nworkers))
  566. {
  567. starpu_sched_ctx_add_workers(workerids, nworkers_ctx, inheritor_sched_ctx_id);
  568. starpu_sched_ctx_set_priority(workerids, nworkers_ctx, inheritor_sched_ctx_id, 1);
  569. starpu_sched_ctx_set_priority_on_level(workerids, nworkers_ctx, inheritor_sched_ctx_id, 1);
  570. }
  571. if(!_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id))
  572. {
  573. if(!sched_ctx->sched_policy)
  574. starpu_sched_ctx_unbook_workers_for_task(sched_ctx->id, sched_ctx->main_master);
  575. /*if btw the mutex release & the mutex lock the context has changed take care to free all
  576. scheduling data before deleting the context */
  577. _starpu_update_workers_without_ctx(workerids, nworkers_ctx, sched_ctx_id, 1);
  578. // _starpu_sched_ctx_free_scheduling_data(sched_ctx);
  579. _starpu_delete_sched_ctx(sched_ctx);
  580. }
  581. /* workerids is malloc-ed in starpu_sched_ctx_get_workers_list, don't forget to free it when
  582. you don't use it anymore */
  583. free(workerids);
  584. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
  585. _starpu_relock_mutex_if_prev_locked();
  586. return;
  587. }
  588. /* called after the workers are terminated so we don't have anything else to do but free the memory*/
  589. void _starpu_delete_all_sched_ctxs()
  590. {
  591. unsigned i;
  592. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  593. {
  594. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(i);
  595. STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[i]);
  596. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  597. {
  598. _starpu_sched_ctx_free_scheduling_data(sched_ctx);
  599. _starpu_barrier_counter_destroy(&sched_ctx->tasks_barrier);
  600. _starpu_barrier_counter_destroy(&sched_ctx->ready_tasks_barrier);
  601. _starpu_delete_sched_ctx(sched_ctx);
  602. }
  603. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[i]);
  604. STARPU_PTHREAD_RWLOCK_DESTROY(&changing_ctx_mutex[i]);
  605. }
  606. return;
  607. }
  608. static void _starpu_check_workers(int *workerids, int nworkers)
  609. {
  610. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  611. int nworkers_conf = config->topology.nworkers;
  612. int i;
  613. for(i = 0; i < nworkers; i++)
  614. {
  615. /* take care the user does not ask for a resource that does not exist */
  616. STARPU_ASSERT_MSG(workerids[i] >= 0 && workerids[i] <= nworkers_conf, "requested to add workerid = %d, but that is beyond the range 0 to %d", workerids[i], nworkers_conf);
  617. }
  618. }
  619. void _starpu_fetch_tasks_from_empty_ctx_list(struct _starpu_sched_ctx *sched_ctx)
  620. {
  621. unsigned unlocked = 0;
  622. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  623. if(starpu_task_list_empty(&sched_ctx->empty_ctx_tasks))
  624. {
  625. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  626. return;
  627. }
  628. else
  629. /* you're not suppose to get here if you deleted the context
  630. so no point in having the mutex locked */
  631. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx->id]);
  632. while(!starpu_task_list_empty(&sched_ctx->empty_ctx_tasks))
  633. {
  634. if(unlocked)
  635. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  636. struct starpu_task *old_task = starpu_task_list_pop_back(&sched_ctx->empty_ctx_tasks);
  637. unlocked = 1;
  638. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  639. if(old_task == &stop_submission_task)
  640. break;
  641. int ret = _starpu_push_task_to_workers(old_task);
  642. /* if we should stop poping from empty ctx tasks */
  643. if(ret == -EAGAIN) break;
  644. }
  645. if(!unlocked)
  646. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  647. /* leave the mutex as it was to avoid pbs in the caller function */
  648. STARPU_PTHREAD_RWLOCK_RDLOCK(&changing_ctx_mutex[sched_ctx->id]);
  649. return;
  650. }
  651. void starpu_sched_ctx_set_priority_on_level(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority)
  652. {
  653. /* int w; */
  654. /* struct _starpu_worker *worker = NULL; */
  655. /* for(w = 0; w < nworkers_to_add; w++) */
  656. /* { */
  657. /* worker = _starpu_get_worker_struct(workers_to_add[w]); */
  658. /* STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex); */
  659. /* struct _starpu_sched_ctx_list *l = NULL; */
  660. /* for (l = worker->sched_ctx_list; l; l = l->next) */
  661. /* { */
  662. /* if(l->sched_ctx != STARPU_NMAX_SCHED_CTXS && l->sched_ctx != sched_ctx && */
  663. /* starpu_sched_ctx_get_hierarchy_level(l->sched_ctx) == starpu_sched_ctx_get_hierarchy_level(sched_ctx)) */
  664. /* { */
  665. /* /\* the lock is taken inside the func *\/ */
  666. /* STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex); */
  667. /* starpu_sched_ctx_set_priority(&workers_to_add[w], 1, l->sched_ctx, priority); */
  668. /* STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex); */
  669. /* } */
  670. /* } */
  671. /* STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex); */
  672. /* } */
  673. /* return; */
  674. }
  675. static void _set_priority_hierarchically(int* workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx, unsigned priority)
  676. {
  677. if(starpu_sched_ctx_get_hierarchy_level(sched_ctx) > 0)
  678. {
  679. unsigned father = starpu_sched_ctx_get_inheritor(sched_ctx);
  680. starpu_sched_ctx_set_priority(workers_to_add, nworkers_to_add, father, priority);
  681. starpu_sched_ctx_set_priority_on_level(workers_to_add, nworkers_to_add, father, priority);
  682. _set_priority_hierarchically(workers_to_add, nworkers_to_add, father, priority);
  683. }
  684. return;
  685. }
  686. void starpu_sched_ctx_add_workers(int *workers_to_add, int nworkers_to_add, unsigned sched_ctx_id)
  687. {
  688. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  689. int added_workers[nworkers_to_add];
  690. int n_added_workers = 0;
  691. _starpu_unlock_mutex_if_prev_locked();
  692. STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx_id]);
  693. STARPU_ASSERT(workers_to_add != NULL && nworkers_to_add > 0);
  694. _starpu_check_workers(workers_to_add, nworkers_to_add);
  695. /* if the context has not already been deleted */
  696. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  697. {
  698. _starpu_add_workers_to_sched_ctx(sched_ctx, workers_to_add, nworkers_to_add, added_workers, &n_added_workers);
  699. if(n_added_workers > 0)
  700. {
  701. _starpu_update_workers_with_ctx(added_workers, n_added_workers, sched_ctx->id);
  702. }
  703. starpu_sched_ctx_set_priority(workers_to_add, nworkers_to_add, sched_ctx_id, 1);
  704. _set_priority_hierarchically(workers_to_add, nworkers_to_add, sched_ctx_id, 0);
  705. }
  706. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
  707. _starpu_relock_mutex_if_prev_locked();
  708. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  709. {
  710. STARPU_PTHREAD_RWLOCK_RDLOCK(&changing_ctx_mutex[sched_ctx_id]);
  711. _starpu_fetch_tasks_from_empty_ctx_list(sched_ctx);
  712. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
  713. }
  714. return;
  715. }
  716. void starpu_sched_ctx_remove_workers(int *workers_to_remove, int nworkers_to_remove, unsigned sched_ctx_id)
  717. {
  718. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  719. int removed_workers[sched_ctx->workers->nworkers];
  720. int n_removed_workers = 0;
  721. _starpu_check_workers(workers_to_remove, nworkers_to_remove);
  722. _starpu_unlock_mutex_if_prev_locked();
  723. STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx_id]);
  724. /* if the context has not already been deleted */
  725. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  726. {
  727. _starpu_remove_workers_from_sched_ctx(sched_ctx, workers_to_remove, nworkers_to_remove, removed_workers, &n_removed_workers);
  728. if(n_removed_workers > 0)
  729. {
  730. _starpu_update_workers_without_ctx(removed_workers, n_removed_workers, sched_ctx_id, 0);
  731. }
  732. }
  733. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);
  734. _starpu_relock_mutex_if_prev_locked();
  735. return;
  736. }
  737. int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _starpu_sched_ctx *sched_ctx)
  738. {
  739. unsigned worker = 0, nworkers = 0;
  740. STARPU_PTHREAD_RWLOCK_WRLOCK(&changing_ctx_mutex[sched_ctx->id]);
  741. struct starpu_worker_collection *workers = sched_ctx->workers;
  742. struct starpu_sched_ctx_iterator it;
  743. if(workers->init_iterator)
  744. workers->init_iterator(workers, &it);
  745. while(workers->has_next(workers, &it))
  746. {
  747. worker = workers->get_next(workers, &it);
  748. STARPU_ASSERT_MSG(worker < STARPU_NMAXWORKERS, "worker id %d", worker);
  749. if (starpu_worker_can_execute_task(worker, task, 0))
  750. nworkers++;
  751. }
  752. STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx->id]);
  753. return nworkers;
  754. }
  755. /* unused sched_ctx have the id STARPU_NMAX_SCHED_CTXS */
  756. void _starpu_init_all_sched_ctxs(struct _starpu_machine_config *config)
  757. {
  758. starpu_pthread_key_create(&sched_ctx_key, NULL);
  759. unsigned i;
  760. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  761. config->sched_ctxs[i].id = STARPU_NMAX_SCHED_CTXS;
  762. return;
  763. }
  764. /* sched_ctx aren't necessarly one next to another */
  765. /* for eg when we remove one its place is free */
  766. /* when we add new one we reuse its place */
  767. static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config)
  768. {
  769. unsigned i;
  770. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  771. if(config->sched_ctxs[i].id == STARPU_NMAX_SCHED_CTXS)
  772. return i;
  773. STARPU_ASSERT(0);
  774. return STARPU_NMAX_SCHED_CTXS;
  775. }
  776. int _starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id)
  777. {
  778. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  779. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait_for_all must not be called from a task or callback");
  780. return _starpu_barrier_counter_wait_for_empty_counter(&sched_ctx->tasks_barrier);
  781. }
  782. void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
  783. {
  784. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  785. if (!config->watchdog_ok)
  786. config->watchdog_ok = 1;
  787. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  788. int reached = _starpu_barrier_counter_get_reached_start(&sched_ctx->tasks_barrier);
  789. int finished = reached == 1;
  790. /* when finished decrementing the tasks if the user signaled he will not submit tasks anymore
  791. we can move all its workers to the inheritor context */
  792. if(finished && sched_ctx->inheritor != STARPU_NMAX_SCHED_CTXS)
  793. {
  794. STARPU_PTHREAD_MUTEX_LOCK(&finished_submit_mutex);
  795. if(sched_ctx->finished_submit)
  796. {
  797. STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
  798. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  799. {
  800. if(sched_ctx->close_callback)
  801. sched_ctx->close_callback(sched_ctx->id, sched_ctx->close_args);
  802. int *workerids = NULL;
  803. unsigned nworkers = starpu_sched_ctx_get_workers_list(sched_ctx->id, &workerids);
  804. if(nworkers > 0)
  805. {
  806. starpu_sched_ctx_add_workers(workerids, nworkers, sched_ctx->inheritor);
  807. free(workerids);
  808. }
  809. }
  810. _starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->tasks_barrier, 0.0);
  811. return;
  812. }
  813. STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
  814. }
  815. /* We also need to check for config->submitting = 0 (i.e. the
  816. * user calle starpu_drivers_request_termination()), in which
  817. * case we need to set config->running to 0 and wake workers,
  818. * so they can terminate, just like
  819. * starpu_drivers_request_termination() does.
  820. */
  821. STARPU_PTHREAD_MUTEX_LOCK(&config->submitted_mutex);
  822. if(config->submitting == 0)
  823. {
  824. if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  825. {
  826. if(sched_ctx->close_callback)
  827. sched_ctx->close_callback(sched_ctx->id, sched_ctx->close_args);
  828. }
  829. ANNOTATE_HAPPENS_AFTER(&config->running);
  830. config->running = 0;
  831. ANNOTATE_HAPPENS_BEFORE(&config->running);
  832. int s;
  833. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  834. {
  835. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  836. {
  837. _starpu_check_nsubmitted_tasks_of_sched_ctx(config->sched_ctxs[s].id);
  838. }
  839. }
  840. }
  841. STARPU_PTHREAD_MUTEX_UNLOCK(&config->submitted_mutex);
  842. _starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->tasks_barrier, 0.0);
  843. return;
  844. }
  845. void _starpu_increment_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
  846. {
  847. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  848. _starpu_barrier_counter_increment(&sched_ctx->tasks_barrier, 0.0);
  849. }
  850. int _starpu_get_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
  851. {
  852. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  853. return _starpu_barrier_counter_get_reached_start(&sched_ctx->tasks_barrier);
  854. }
  855. int _starpu_check_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
  856. {
  857. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  858. return _starpu_barrier_counter_check(&sched_ctx->tasks_barrier);
  859. }
  860. void _starpu_increment_nready_tasks_of_sched_ctx(unsigned sched_ctx_id, double ready_flops)
  861. {
  862. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  863. _starpu_barrier_counter_increment(&sched_ctx->ready_tasks_barrier, ready_flops);
  864. }
  865. void _starpu_decrement_nready_tasks_of_sched_ctx(unsigned sched_ctx_id, double ready_flops)
  866. {
  867. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  868. _starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->ready_tasks_barrier, ready_flops);
  869. }
  870. int starpu_sched_ctx_get_nready_tasks(unsigned sched_ctx_id)
  871. {
  872. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  873. return _starpu_barrier_counter_get_reached_start(&sched_ctx->ready_tasks_barrier);
  874. }
  875. double starpu_sched_ctx_get_nready_flops(unsigned sched_ctx_id)
  876. {
  877. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  878. return _starpu_barrier_counter_get_reached_flops(&sched_ctx->ready_tasks_barrier);
  879. }
  880. int _starpu_wait_for_no_ready_of_sched_ctx(unsigned sched_ctx_id)
  881. {
  882. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  883. return _starpu_barrier_counter_wait_for_empty_counter(&sched_ctx->ready_tasks_barrier);
  884. }
  885. void starpu_sched_ctx_set_context(unsigned *sched_ctx)
  886. {
  887. starpu_pthread_setspecific(sched_ctx_key, (void*)sched_ctx);
  888. }
  889. unsigned starpu_sched_ctx_get_context()
  890. {
  891. unsigned *sched_ctx = (unsigned*)starpu_pthread_getspecific(sched_ctx_key);
  892. if(sched_ctx == NULL)
  893. return STARPU_NMAX_SCHED_CTXS;
  894. STARPU_ASSERT(*sched_ctx < STARPU_NMAX_SCHED_CTXS);
  895. return *sched_ctx;
  896. }
  897. unsigned _starpu_sched_ctx_get_current_context()
  898. {
  899. unsigned sched_ctx = starpu_sched_ctx_get_context();
  900. if (sched_ctx == STARPU_NMAX_SCHED_CTXS)
  901. return _starpu_get_initial_sched_ctx()->id;
  902. else
  903. return sched_ctx;
  904. }
  905. void starpu_sched_ctx_notify_hypervisor_exists()
  906. {
  907. with_hypervisor = 1;
  908. int i, j;
  909. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  910. {
  911. hyp_start_sample[i] = starpu_timing_now();
  912. hyp_start_allow_sample[i] = 0.0;
  913. for(j = 0; j < STARPU_NMAXWORKERS; j++)
  914. {
  915. flops[i][j] = 0.0;
  916. data_size[i][j] = 0;
  917. }
  918. }
  919. }
  920. unsigned starpu_sched_ctx_check_if_hypervisor_exists()
  921. {
  922. return with_hypervisor;
  923. }
  924. unsigned _starpu_sched_ctx_allow_hypervisor(unsigned sched_ctx_id)
  925. {
  926. return 1;
  927. double now = starpu_timing_now();
  928. if(hyp_start_allow_sample[sched_ctx_id] > 0.0)
  929. {
  930. double allow_sample = (now - hyp_start_allow_sample[sched_ctx_id]) / 1000000.0;
  931. if(allow_sample < 0.001)
  932. return 1;
  933. else
  934. {
  935. hyp_start_allow_sample[sched_ctx_id] = 0.0;
  936. hyp_start_sample[sched_ctx_id] = starpu_timing_now();
  937. return 0;
  938. }
  939. }
  940. double forbid_sample = (now - hyp_start_sample[sched_ctx_id]) / 1000000.0;
  941. if(forbid_sample > 0.01)
  942. {
  943. // hyp_start_sample[sched_ctx_id] = starpu_timing_now();
  944. hyp_start_allow_sample[sched_ctx_id] = starpu_timing_now();
  945. return 1;
  946. }
  947. return 0;
  948. }
  949. unsigned _starpu_get_nsched_ctxs()
  950. {
  951. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  952. return config->topology.nsched_ctxs;
  953. }
  954. void starpu_sched_ctx_set_policy_data(unsigned sched_ctx_id, void* policy_data)
  955. {
  956. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  957. sched_ctx->policy_data = policy_data;
  958. }
  959. void* starpu_sched_ctx_get_policy_data(unsigned sched_ctx_id)
  960. {
  961. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  962. return sched_ctx->policy_data;
  963. }
  964. struct starpu_worker_collection* starpu_sched_ctx_create_worker_collection(unsigned sched_ctx_id, enum starpu_worker_collection_type worker_collection_type)
  965. {
  966. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  967. sched_ctx->workers = (struct starpu_worker_collection*)malloc(sizeof(struct starpu_worker_collection));
  968. switch(worker_collection_type)
  969. {
  970. #ifdef STARPU_HAVE_HWLOC
  971. case STARPU_WORKER_TREE:
  972. sched_ctx->workers->has_next = worker_tree.has_next;
  973. sched_ctx->workers->get_next = worker_tree.get_next;
  974. sched_ctx->workers->has_next_master = worker_tree.has_next_master;
  975. sched_ctx->workers->get_next_master = worker_tree.get_next_master;
  976. sched_ctx->workers->add = worker_tree.add;
  977. sched_ctx->workers->remove = worker_tree.remove;
  978. sched_ctx->workers->init = worker_tree.init;
  979. sched_ctx->workers->deinit = worker_tree.deinit;
  980. sched_ctx->workers->init_iterator = worker_tree.init_iterator;
  981. sched_ctx->workers->type = STARPU_WORKER_LIST;
  982. break;
  983. #endif
  984. // case STARPU_WORKER_LIST:
  985. default:
  986. sched_ctx->workers->has_next = worker_list.has_next;
  987. sched_ctx->workers->get_next = worker_list.get_next;
  988. sched_ctx->workers->has_next_master = worker_list.has_next_master;
  989. sched_ctx->workers->get_next_master = worker_list.get_next_master;
  990. sched_ctx->workers->add = worker_list.add;
  991. sched_ctx->workers->remove = worker_list.remove;
  992. sched_ctx->workers->init = worker_list.init;
  993. sched_ctx->workers->deinit = worker_list.deinit;
  994. sched_ctx->workers->init_iterator = worker_list.init_iterator;
  995. sched_ctx->workers->type = STARPU_WORKER_LIST;
  996. break;
  997. }
  998. return sched_ctx->workers;
  999. }
  1000. void starpu_sched_ctx_display_workers(unsigned sched_ctx_id, FILE *f)
  1001. {
  1002. int *workerids = NULL;
  1003. unsigned nworkers;
  1004. unsigned i;
  1005. nworkers = starpu_sched_ctx_get_workers_list(sched_ctx_id, &workerids);
  1006. fprintf(f, "[sched_ctx %d]: %d worker%s\n", sched_ctx_id, nworkers, nworkers>1?"s":"");
  1007. for (i = 0; i < nworkers; i++)
  1008. {
  1009. char name[256];
  1010. starpu_worker_get_name(workerids[i], name, 256);
  1011. fprintf(f, "\t\t%s\n", name);
  1012. }
  1013. }
  1014. unsigned starpu_sched_ctx_get_workers_list(unsigned sched_ctx_id, int **workerids)
  1015. {
  1016. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1017. struct starpu_worker_collection *workers = sched_ctx->workers;
  1018. *workerids = (int*)malloc(workers->nworkers*sizeof(int));
  1019. int worker;
  1020. unsigned nworkers = 0;
  1021. struct starpu_sched_ctx_iterator it;
  1022. if(workers->init_iterator)
  1023. workers->init_iterator(workers, &it);
  1024. while(workers->has_next(workers, &it))
  1025. {
  1026. worker = workers->get_next(workers, &it);
  1027. (*workerids)[nworkers++] = worker;
  1028. }
  1029. return nworkers;
  1030. }
  1031. void starpu_sched_ctx_delete_worker_collection(unsigned sched_ctx_id)
  1032. {
  1033. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1034. sched_ctx->workers->deinit(sched_ctx->workers);
  1035. free(sched_ctx->workers);
  1036. }
  1037. struct starpu_worker_collection* starpu_sched_ctx_get_worker_collection(unsigned sched_ctx_id)
  1038. {
  1039. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1040. return sched_ctx->workers;
  1041. }
  1042. int _starpu_get_workers_of_sched_ctx(unsigned sched_ctx_id, int *pus, enum starpu_worker_archtype arch)
  1043. {
  1044. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1045. struct starpu_worker_collection *workers = sched_ctx->workers;
  1046. int worker;
  1047. int npus = 0;
  1048. struct starpu_sched_ctx_iterator it;
  1049. if(workers->init_iterator)
  1050. workers->init_iterator(workers, &it);
  1051. while(workers->has_next(workers, &it))
  1052. {
  1053. worker = workers->get_next(workers, &it);
  1054. enum starpu_worker_archtype curr_arch = starpu_worker_get_type(worker);
  1055. if(curr_arch == arch || arch == STARPU_ANY_WORKER)
  1056. pus[npus++] = worker;
  1057. }
  1058. return npus;
  1059. }
  1060. starpu_pthread_rwlock_t* _starpu_sched_ctx_get_changing_ctx_mutex(unsigned sched_ctx_id)
  1061. {
  1062. return &changing_ctx_mutex[sched_ctx_id];
  1063. }
  1064. unsigned starpu_sched_ctx_get_nworkers(unsigned sched_ctx_id)
  1065. {
  1066. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1067. if(sched_ctx != NULL)
  1068. return sched_ctx->workers->nworkers;
  1069. else
  1070. return 0;
  1071. }
  1072. unsigned starpu_sched_ctx_get_nshared_workers(unsigned sched_ctx_id, unsigned sched_ctx_id2)
  1073. {
  1074. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1075. struct _starpu_sched_ctx *sched_ctx2 = _starpu_get_sched_ctx_struct(sched_ctx_id2);
  1076. struct starpu_worker_collection *workers = sched_ctx->workers;
  1077. struct starpu_worker_collection *workers2 = sched_ctx2->workers;
  1078. int worker, worker2;
  1079. int shared_workers = 0;
  1080. struct starpu_sched_ctx_iterator it1, it2;
  1081. if(workers->init_iterator)
  1082. workers->init_iterator(workers, &it1);
  1083. if(workers2->init_iterator)
  1084. workers2->init_iterator(workers2, &it2);
  1085. while(workers->has_next(workers, &it1))
  1086. {
  1087. worker = workers->get_next(workers, &it1);
  1088. while(workers2->has_next(workers2, &it2))
  1089. {
  1090. worker2 = workers2->get_next(workers2, &it2);
  1091. if(worker == worker2)
  1092. shared_workers++;
  1093. }
  1094. }
  1095. return shared_workers;
  1096. }
  1097. unsigned starpu_sched_ctx_contains_worker(int workerid, unsigned sched_ctx_id)
  1098. {
  1099. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1100. struct starpu_worker_collection *workers = sched_ctx->workers;
  1101. int worker;
  1102. struct starpu_sched_ctx_iterator it;
  1103. if(workers->init_iterator)
  1104. workers->init_iterator(workers, &it);
  1105. while(workers->has_next(workers, &it))
  1106. {
  1107. worker = workers->get_next(workers, &it);
  1108. if(worker == workerid)
  1109. return 1;
  1110. }
  1111. return 0;
  1112. }
  1113. unsigned starpu_sched_ctx_contains_type_of_worker(enum starpu_worker_archtype arch, unsigned sched_ctx_id)
  1114. {
  1115. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  1116. int worker;
  1117. struct starpu_sched_ctx_iterator it;
  1118. if(workers->init_iterator)
  1119. workers->init_iterator(workers, &it);
  1120. while(workers->has_next(workers, &it))
  1121. {
  1122. worker = workers->get_next(workers, &it);
  1123. enum starpu_worker_archtype curr_arch = starpu_worker_get_type(worker);
  1124. if(curr_arch == arch)
  1125. return 1;
  1126. }
  1127. return 0;
  1128. }
  1129. unsigned _starpu_worker_belongs_to_a_sched_ctx(int workerid, unsigned sched_ctx_id)
  1130. {
  1131. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  1132. int i;
  1133. struct _starpu_sched_ctx *sched_ctx = NULL;
  1134. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  1135. {
  1136. sched_ctx = &config->sched_ctxs[i];
  1137. if(sched_ctx && sched_ctx->id != STARPU_NMAX_SCHED_CTXS && sched_ctx->id != sched_ctx_id)
  1138. if(starpu_sched_ctx_contains_worker(workerid, sched_ctx->id))
  1139. return 1;
  1140. }
  1141. return 0;
  1142. }
  1143. unsigned starpu_sched_ctx_worker_get_id(unsigned sched_ctx_id)
  1144. {
  1145. int workerid = starpu_worker_get_id();
  1146. if(workerid != -1)
  1147. if(starpu_sched_ctx_contains_worker(workerid, sched_ctx_id))
  1148. return workerid;
  1149. return -1;
  1150. }
  1151. unsigned starpu_sched_ctx_overlapping_ctxs_on_worker(int workerid)
  1152. {
  1153. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1154. return worker->nsched_ctxs > 1;
  1155. }
  1156. void starpu_sched_ctx_set_inheritor(unsigned sched_ctx_id, unsigned inheritor)
  1157. {
  1158. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  1159. STARPU_ASSERT(inheritor < STARPU_NMAX_SCHED_CTXS);
  1160. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1161. sched_ctx->inheritor = inheritor;
  1162. return;
  1163. }
  1164. unsigned starpu_sched_ctx_get_inheritor(unsigned sched_ctx_id)
  1165. {
  1166. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  1167. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1168. return sched_ctx->inheritor;
  1169. }
  1170. unsigned starpu_sched_ctx_get_hierarchy_level(unsigned sched_ctx_id)
  1171. {
  1172. STARPU_ASSERT(sched_ctx_id < STARPU_NMAX_SCHED_CTXS);
  1173. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1174. return sched_ctx->hierarchy_level;
  1175. }
  1176. void starpu_sched_ctx_finished_submit(unsigned sched_ctx_id)
  1177. {
  1178. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1179. STARPU_PTHREAD_MUTEX_LOCK(&finished_submit_mutex);
  1180. sched_ctx->finished_submit = 1;
  1181. STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
  1182. return;
  1183. }
  1184. #ifdef STARPU_USE_SC_HYPERVISOR
  1185. void _starpu_sched_ctx_post_exec_task_cb(int workerid, struct starpu_task *task, size_t data_size2, uint32_t footprint)
  1186. {
  1187. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  1188. if(sched_ctx != NULL && task->sched_ctx != _starpu_get_initial_sched_ctx()->id &&
  1189. task->sched_ctx != STARPU_NMAX_SCHED_CTXS && sched_ctx->perf_counters != NULL)
  1190. {
  1191. flops[task->sched_ctx][workerid] += task->flops;
  1192. data_size[task->sched_ctx][workerid] += data_size2;
  1193. if(_starpu_sched_ctx_allow_hypervisor(sched_ctx->id) || task->hypervisor_tag > 0)
  1194. {
  1195. _STARPU_TRACE_HYPERVISOR_BEGIN();
  1196. sched_ctx->perf_counters->notify_post_exec_task(task, data_size[task->sched_ctx][workerid], footprint,
  1197. task->hypervisor_tag, flops[task->sched_ctx][workerid]);
  1198. _STARPU_TRACE_HYPERVISOR_END();
  1199. flops[task->sched_ctx][workerid] = 0.0;
  1200. data_size[task->sched_ctx][workerid] = 0;
  1201. }
  1202. }
  1203. }
  1204. void starpu_sched_ctx_call_pushed_task_cb(int workerid, unsigned sched_ctx_id)
  1205. {
  1206. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1207. if(sched_ctx != NULL && sched_ctx_id != _starpu_get_initial_sched_ctx()->id && sched_ctx_id != STARPU_NMAX_SCHED_CTXS
  1208. && sched_ctx->perf_counters != NULL && _starpu_sched_ctx_allow_hypervisor(sched_ctx_id))
  1209. {
  1210. _STARPU_TRACE_HYPERVISOR_BEGIN();
  1211. sched_ctx->perf_counters->notify_pushed_task(sched_ctx_id, workerid);
  1212. _STARPU_TRACE_HYPERVISOR_END();
  1213. }
  1214. }
  1215. #endif //STARPU_USE_SC_HYPERVISOR
  1216. int starpu_sched_get_min_priority(void)
  1217. {
  1218. return starpu_sched_ctx_get_min_priority(_starpu_sched_ctx_get_current_context());
  1219. }
  1220. int starpu_sched_get_max_priority(void)
  1221. {
  1222. return starpu_sched_ctx_get_max_priority(_starpu_sched_ctx_get_current_context());
  1223. }
  1224. int starpu_sched_set_min_priority(int min_prio)
  1225. {
  1226. return starpu_sched_ctx_set_min_priority(_starpu_sched_ctx_get_current_context(), min_prio);
  1227. }
  1228. int starpu_sched_set_max_priority(int max_prio)
  1229. {
  1230. return starpu_sched_ctx_set_max_priority(_starpu_sched_ctx_get_current_context(), max_prio);
  1231. }
  1232. int starpu_sched_ctx_get_min_priority(unsigned sched_ctx_id)
  1233. {
  1234. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1235. return sched_ctx->min_priority;
  1236. }
  1237. int starpu_sched_ctx_get_max_priority(unsigned sched_ctx_id)
  1238. {
  1239. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1240. return sched_ctx->max_priority;
  1241. }
  1242. int starpu_sched_ctx_set_min_priority(unsigned sched_ctx_id, int min_prio)
  1243. {
  1244. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1245. sched_ctx->min_priority = min_prio;
  1246. return 0;
  1247. }
  1248. int starpu_sched_ctx_set_max_priority(unsigned sched_ctx_id, int max_prio)
  1249. {
  1250. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1251. sched_ctx->max_priority = max_prio;
  1252. return 0;
  1253. }
  1254. int starpu_sched_ctx_min_priority_is_set(unsigned sched_ctx_id)
  1255. {
  1256. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1257. return sched_ctx->min_priority_is_set;
  1258. }
  1259. int starpu_sched_ctx_max_priority_is_set(unsigned sched_ctx_id)
  1260. {
  1261. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1262. return sched_ctx->max_priority_is_set;
  1263. }
  1264. void starpu_sched_ctx_set_priority(int *workers, int nworkers, unsigned sched_ctx_id, unsigned priority)
  1265. {
  1266. if(nworkers != -1)
  1267. {
  1268. int w;
  1269. struct _starpu_worker *worker = NULL;
  1270. for(w = 0; w < nworkers; w++)
  1271. {
  1272. worker = _starpu_get_worker_struct(workers[w]);
  1273. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  1274. struct _starpu_sched_ctx_list *l = NULL;
  1275. for (l = worker->sched_ctx_list; l; l = l->next)
  1276. {
  1277. if(l->sched_ctx == sched_ctx_id)
  1278. {
  1279. l->priority = priority;
  1280. break;
  1281. }
  1282. }
  1283. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  1284. }
  1285. }
  1286. return;
  1287. }
  1288. unsigned starpu_sched_ctx_get_priority(int workerid, unsigned sched_ctx_id)
  1289. {
  1290. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1291. struct _starpu_sched_ctx_list *l = NULL;
  1292. for (l = worker->sched_ctx_list; l; l = l->next)
  1293. {
  1294. if(l->sched_ctx == sched_ctx_id)
  1295. {
  1296. return l->priority;
  1297. }
  1298. }
  1299. return 1;
  1300. }
  1301. unsigned _starpu_sched_ctx_last_worker_awake(struct _starpu_worker *worker)
  1302. {
  1303. struct _starpu_sched_ctx_list *l = NULL;
  1304. for (l = worker->sched_ctx_list; l; l = l->next)
  1305. {
  1306. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
  1307. unsigned last_worker_awake = 1;
  1308. struct starpu_worker_collection *workers = sched_ctx->workers;
  1309. struct starpu_sched_ctx_iterator it;
  1310. int workerid;
  1311. if(workers->init_iterator)
  1312. workers->init_iterator(workers, &it);
  1313. while(workers->has_next(workers, &it))
  1314. {
  1315. workerid = workers->get_next(workers, &it);
  1316. if(workerid != worker->workerid && _starpu_worker_get_status(workerid) != STATUS_SLEEPING)
  1317. {
  1318. last_worker_awake = 0;
  1319. break;
  1320. }
  1321. }
  1322. if(last_worker_awake)
  1323. return 1;
  1324. }
  1325. return 0;
  1326. }
  1327. void starpu_sched_ctx_bind_current_thread_to_cpuid(unsigned cpuid STARPU_ATTRIBUTE_UNUSED)
  1328. {
  1329. struct _starpu_machine_config *config = _starpu_get_machine_config();
  1330. /* FIXME: why not factorize with _starpu_bind_thread_on_cpu? */
  1331. #ifdef STARPU_SIMGRID
  1332. return;
  1333. #else
  1334. if (starpu_get_env_number("STARPU_WORKERS_NOBIND") > 0)
  1335. return;
  1336. #ifdef STARPU_HAVE_HWLOC
  1337. const struct hwloc_topology_support *support = hwloc_topology_get_support (config->topology.hwtopology);
  1338. if (support->cpubind->set_thisthread_cpubind)
  1339. {
  1340. hwloc_obj_t obj = hwloc_get_obj_by_depth (config->topology.hwtopology,
  1341. config->pu_depth, cpuid);
  1342. hwloc_bitmap_t set = obj->cpuset;
  1343. int ret;
  1344. hwloc_bitmap_singlify(set);
  1345. ret = hwloc_set_cpubind (config->topology.hwtopology, set,
  1346. HWLOC_CPUBIND_THREAD);
  1347. if (ret)
  1348. {
  1349. perror("hwloc_set_cpubind");
  1350. STARPU_ABORT();
  1351. }
  1352. }
  1353. #elif defined(HAVE_PTHREAD_SETAFFINITY_NP) && defined(__linux__)
  1354. int ret;
  1355. /* fix the thread on the correct cpu */
  1356. cpu_set_t aff_mask;
  1357. CPU_ZERO(&aff_mask);
  1358. CPU_SET(cpuid, &aff_mask);
  1359. starpu_pthread_t self = pthread_self();
  1360. ret = pthread_setaffinity_np(self, sizeof(aff_mask), &aff_mask);
  1361. if (ret)
  1362. {
  1363. perror("binding thread");
  1364. STARPU_ABORT();
  1365. }
  1366. #elif defined(__MINGW32__) || defined(__CYGWIN__)
  1367. DWORD mask = 1 << cpuid;
  1368. if (!SetThreadAffinityMask(GetCurrentThread(), mask))
  1369. {
  1370. _STARPU_ERROR("SetThreadMaskAffinity(%lx) failed\n", mask);
  1371. }
  1372. #else
  1373. #warning no CPU binding support
  1374. #endif
  1375. #endif
  1376. }
  1377. unsigned starpu_sched_ctx_worker_is_master_for_child_ctx(int workerid, unsigned sched_ctx_id)
  1378. {
  1379. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1380. struct _starpu_sched_ctx_list *l = NULL;
  1381. struct _starpu_sched_ctx *sched_ctx = NULL;
  1382. for (l = worker->sched_ctx_list; l; l = l->next)
  1383. {
  1384. sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
  1385. if(sched_ctx-> main_master == workerid && sched_ctx->nesting_sched_ctx == sched_ctx_id)
  1386. return sched_ctx->id;
  1387. }
  1388. return STARPU_NMAX_SCHED_CTXS;
  1389. }
  1390. void starpu_sched_ctx_revert_task_counters(unsigned sched_ctx_id, double flops)
  1391. {
  1392. _starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx_id);
  1393. _starpu_decrement_nready_tasks_of_sched_ctx(sched_ctx_id, flops);
  1394. }
  1395. void starpu_sched_ctx_move_task_to_ctx(struct starpu_task *task, unsigned sched_ctx)
  1396. {
  1397. int workerid = starpu_worker_get_id();
  1398. struct _starpu_worker *worker = NULL;
  1399. if(workerid != -1)
  1400. {
  1401. worker = _starpu_get_worker_struct(workerid);
  1402. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  1403. }
  1404. task->sched_ctx = sched_ctx;
  1405. _starpu_task_submit_nodeps(task);
  1406. if(workerid != -1)
  1407. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  1408. }
  1409. static unsigned _worker_sleeping_in_other_ctx(unsigned sched_ctx_id, int workerid)
  1410. {
  1411. int s;
  1412. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  1413. {
  1414. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(s);
  1415. if(sched_ctx && sched_ctx->id > 0 && sched_ctx->id < STARPU_NMAX_SCHED_CTXS && sched_ctx->id != sched_ctx_id)
  1416. {
  1417. if(sched_ctx->parallel_sect[workerid])
  1418. return 1;
  1419. }
  1420. }
  1421. return 0;
  1422. }
  1423. static void _starpu_sched_ctx_get_workers_to_sleep(unsigned sched_ctx_id, int *workerids, int nworkers, int master)
  1424. {
  1425. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1426. int current_worker_id = starpu_worker_get_id();
  1427. unsigned sleeping[nworkers];
  1428. int w;
  1429. for(w = 0; w < nworkers; w++)
  1430. {
  1431. if(current_worker_id == -1 || workerids[w] != current_worker_id)
  1432. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerids[w]]);
  1433. sleeping[w] = _worker_sleeping_in_other_ctx(sched_ctx_id, workerids[w]);
  1434. sched_ctx->master[workerids[w]] = master;
  1435. sched_ctx->parallel_sect[workerids[w]] = 1;
  1436. if(current_worker_id == -1 || workerids[w] != current_worker_id)
  1437. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerids[w]]);
  1438. }
  1439. int workerid;
  1440. for(w = 0; w < nworkers; w++)
  1441. {
  1442. workerid = workerids[w];
  1443. if((current_worker_id == -1 || workerid != current_worker_id) && !sleeping[w])
  1444. {
  1445. sched_ctx->sleeping[workerids[w]] = 1;
  1446. sem_wait(&sched_ctx->fall_asleep_sem[master]);
  1447. }
  1448. }
  1449. return;
  1450. }
  1451. void _starpu_sched_ctx_signal_worker_blocked(unsigned sched_ctx_id, int workerid)
  1452. {
  1453. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1454. worker->slave = 1;
  1455. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1456. int master = sched_ctx->master[workerid];
  1457. sem_post(&sched_ctx->fall_asleep_sem[master]);
  1458. return;
  1459. }
  1460. void _starpu_sched_ctx_signal_worker_woke_up(unsigned sched_ctx_id, int workerid)
  1461. {
  1462. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1463. int master = sched_ctx->master[workerid];
  1464. sem_post(&sched_ctx->wake_up_sem[master]);
  1465. sched_ctx->sleeping[workerid] = 0;
  1466. sched_ctx->master[workerid] = -1;
  1467. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1468. worker->slave = 0;
  1469. return;
  1470. }
  1471. static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, int master)
  1472. {
  1473. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1474. int current_worker_id = starpu_worker_get_id();
  1475. struct starpu_worker_collection *workers = sched_ctx->workers;
  1476. struct starpu_sched_ctx_iterator it;
  1477. if(workers->init_iterator)
  1478. workers->init_iterator(workers, &it);
  1479. while(workers->has_next(workers, &it))
  1480. {
  1481. int workerid = workers->get_next(workers, &it);
  1482. int curr_master = sched_ctx->master[workerid];
  1483. if(curr_master == master && sched_ctx->parallel_sect[workerid])
  1484. {
  1485. if((current_worker_id == -1 || workerid != current_worker_id) && sched_ctx->sleeping[workerid])
  1486. {
  1487. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
  1488. STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond[workerid]);
  1489. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
  1490. sem_wait(&sched_ctx->wake_up_sem[master]);
  1491. }
  1492. else
  1493. sched_ctx->parallel_sect[workerid] = 0;
  1494. }
  1495. }
  1496. return;
  1497. }
  1498. void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void*), void* param, unsigned sched_ctx_id)
  1499. {
  1500. int *workerids;
  1501. int nworkers = starpu_sched_ctx_get_workers_list(sched_ctx_id, &workerids);
  1502. _starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, workerids, nworkers, workerids[nworkers-1]);
  1503. /* execute parallel code */
  1504. void* ret = func(param);
  1505. /* wake up starpu workers */
  1506. _starpu_sched_ctx_wake_up_workers(sched_ctx_id, workerids[nworkers-1]);
  1507. return ret;
  1508. }
  1509. void starpu_sched_ctx_get_available_cpuids(unsigned sched_ctx_id, int **cpuids, int *ncpuids)
  1510. {
  1511. int current_worker_id = starpu_worker_get_id();
  1512. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1513. struct starpu_worker_collection *workers = sched_ctx->workers;
  1514. (*cpuids) = (int*)malloc(workers->nworkers*sizeof(int));
  1515. int w = 0;
  1516. struct _starpu_worker *worker = NULL;
  1517. struct starpu_sched_ctx_iterator it;
  1518. int workerid;
  1519. if(workers->init_iterator)
  1520. workers->init_iterator(workers, &it);
  1521. while(workers->has_next(workers, &it))
  1522. {
  1523. workerid = workers->get_next(workers, &it);
  1524. int master = sched_ctx->master[workerid];
  1525. if(master == current_worker_id || workerid == current_worker_id || current_worker_id == -1)
  1526. {
  1527. (*cpuids)[w++] = starpu_worker_get_bindid(workerid);
  1528. }
  1529. }
  1530. *ncpuids = w;
  1531. return;
  1532. }
  1533. static void _starpu_sched_ctx_wake_these_workers_up(unsigned sched_ctx_id, int *workerids, int nworkers)
  1534. {
  1535. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1536. int current_worker_id = starpu_worker_get_id();
  1537. int masters[nworkers];
  1538. int w;
  1539. struct _starpu_worker *worker = NULL;
  1540. for(w = 0; w < nworkers; w++)
  1541. {
  1542. int workerid = workerids[w];
  1543. masters[w] = sched_ctx->master[workerid];
  1544. if(current_worker_id == -1 || workerid != current_worker_id)
  1545. {
  1546. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
  1547. STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond[workerid]);
  1548. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
  1549. }
  1550. else
  1551. sched_ctx->parallel_sect[workerid] = 0;
  1552. sched_ctx->master[workerid] = -1;
  1553. }
  1554. int workerid;
  1555. for(w = 0; w < nworkers; w++)
  1556. {
  1557. workerid = workerids[w];
  1558. if(masters[w] != -1)
  1559. {
  1560. int master = sched_ctx->master[workerid];
  1561. if(current_worker_id == -1 || workerid != current_worker_id)
  1562. sem_wait(&sched_ctx->wake_up_sem[master]);
  1563. }
  1564. }
  1565. return;
  1566. }
  1567. static int _starpu_sched_ctx_find_master(unsigned sched_ctx_id, int *workerids, int nworkers)
  1568. {
  1569. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1570. int new_master = workerids[nworkers-1];
  1571. int current_worker_id = starpu_worker_get_id();
  1572. int current_is_in_section = 0;
  1573. int npotential_masters = 0;
  1574. int nawake_workers = 0;
  1575. int ntrue_masters = 0;
  1576. int potential_masters[nworkers];
  1577. int awake_workers[nworkers];
  1578. int true_masters[nworkers];
  1579. int i,w;
  1580. for(w = 0 ; w < nworkers ; w++)
  1581. {
  1582. if (current_worker_id == workerids[w])
  1583. current_is_in_section = 1;
  1584. int master = sched_ctx->master[workerids[w]];
  1585. if (master > -1)
  1586. {
  1587. int already_seen = 0;
  1588. //Could create a function for this. Basically searching an element in an array.
  1589. for (i = 0 ; i < npotential_masters; i++)
  1590. {
  1591. if (potential_masters[i] == master)
  1592. {
  1593. already_seen = 1;
  1594. break;
  1595. }
  1596. }
  1597. if (!already_seen)
  1598. potential_masters[npotential_masters++] = master;
  1599. }
  1600. else if (master == -1)
  1601. awake_workers[nawake_workers++] = workerids[w];
  1602. }
  1603. for (i = 0 ; i < npotential_masters ; i++) {
  1604. int master_is_in_section = 0;
  1605. //Could create a function for this. Basically searching an element in an array.
  1606. for (w = 0 ; w < nworkers ; w++)
  1607. {
  1608. if (workerids[w] == potential_masters[i])
  1609. {
  1610. master_is_in_section = 1;
  1611. break;
  1612. }
  1613. }
  1614. if (master_is_in_section)
  1615. true_masters[ntrue_masters++] = potential_masters[i];
  1616. }
  1617. if (current_is_in_section)
  1618. new_master = current_worker_id;
  1619. else
  1620. {
  1621. if (ntrue_masters > 1)
  1622. {
  1623. if (nawake_workers > 0)
  1624. new_master = awake_workers[nawake_workers - 1];
  1625. else
  1626. new_master = true_masters[ntrue_masters - 1];
  1627. }
  1628. }
  1629. return new_master;
  1630. }
  1631. static void _starpu_sched_ctx_add_workers_to_master(unsigned sched_ctx_id, int *workerids, int nworkers, int new_master)
  1632. {
  1633. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  1634. int w;
  1635. int nput_to_sleep = 0;
  1636. int nwake_up = 0;
  1637. int put_to_sleep[nworkers];
  1638. int wake_up[nworkers];
  1639. for(w = 0 ; w < nworkers ; w++)
  1640. {
  1641. int master = sched_ctx->master[workerids[w]];
  1642. if (master == -1 && workerids[w] != new_master)
  1643. put_to_sleep[nput_to_sleep++] = workerids[w];
  1644. else if(master != -1 && workerids[w] == new_master)
  1645. wake_up[nwake_up++] = workerids[w];
  1646. }
  1647. if(nwake_up > 0)
  1648. _starpu_sched_ctx_wake_these_workers_up(sched_ctx_id, wake_up, nwake_up);
  1649. if(nput_to_sleep > 0)
  1650. _starpu_sched_ctx_get_workers_to_sleep(sched_ctx_id, put_to_sleep, nput_to_sleep, new_master);
  1651. }
  1652. int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids, int nworkers)
  1653. {
  1654. int new_master = _starpu_sched_ctx_find_master(sched_ctx_id, workerids, nworkers);
  1655. _starpu_sched_ctx_add_workers_to_master(sched_ctx_id, workerids, nworkers, new_master);
  1656. return new_master;
  1657. }
  1658. void starpu_sched_ctx_unbook_workers_for_task(unsigned sched_ctx_id, int master)
  1659. {
  1660. /* wake up starpu workers */
  1661. _starpu_sched_ctx_wake_up_workers(sched_ctx_id, master);
  1662. }