sched_policy.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010 Université de Bordeaux 1
  4. * Copyright (C) 2010 Centre National de la Recherche Scientifique
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <pthread.h>
  18. #include <starpu.h>
  19. #include <common/config.h>
  20. #include <common/utils.h>
  21. #include <core/sched_policy.h>
  22. #include <profiling/profiling.h>
  23. //static struct starpu_sched_policy_s policy;
  24. static int use_prefetch = 0;
  25. static pthread_cond_t blocking_ths_cond = PTHREAD_COND_INITIALIZER;
  26. static pthread_cond_t wakeup_ths_cond = PTHREAD_COND_INITIALIZER;
  27. static pthread_mutex_t blocking_ths_mutex = PTHREAD_MUTEX_INITIALIZER;
  28. static int nblocked_ths = 0;
  29. int starpu_get_prefetch_flag(void)
  30. {
  31. return use_prefetch;
  32. }
  33. /*
  34. * Predefined policies
  35. */
  36. extern struct starpu_sched_policy_s _starpu_sched_ws_policy;
  37. extern struct starpu_sched_policy_s _starpu_sched_prio_policy;
  38. extern struct starpu_sched_policy_s _starpu_sched_no_prio_policy;
  39. extern struct starpu_sched_policy_s _starpu_sched_random_policy;
  40. extern struct starpu_sched_policy_s _starpu_sched_dm_policy;
  41. extern struct starpu_sched_policy_s _starpu_sched_dmda_policy;
  42. extern struct starpu_sched_policy_s _starpu_sched_dmda_ready_policy;
  43. extern struct starpu_sched_policy_s _starpu_sched_dmda_sorted_policy;
  44. extern struct starpu_sched_policy_s _starpu_sched_eager_policy;
  45. extern struct starpu_sched_policy_s _starpu_sched_parallel_heft_policy;
  46. extern struct starpu_sched_policy_s _starpu_sched_pgreedy_policy;
  47. extern struct starpu_sched_policy_s heft_policy;
  48. #define NPREDEFINED_POLICIES 12
  49. static struct starpu_sched_policy_s *predefined_policies[NPREDEFINED_POLICIES] = {
  50. &_starpu_sched_ws_policy,
  51. &_starpu_sched_prio_policy,
  52. &_starpu_sched_no_prio_policy,
  53. &_starpu_sched_dm_policy,
  54. &_starpu_sched_dmda_policy,
  55. &heft_policy,
  56. &_starpu_sched_dmda_ready_policy,
  57. &_starpu_sched_dmda_sorted_policy,
  58. &_starpu_sched_random_policy,
  59. &_starpu_sched_eager_policy,
  60. &_starpu_sched_parallel_heft_policy,
  61. &_starpu_sched_pgreedy_policy
  62. };
  63. struct starpu_sched_policy_s *_starpu_get_sched_policy(struct starpu_sched_ctx *sched_ctx)
  64. {
  65. return sched_ctx->sched_policy;
  66. }
  67. /*
  68. * Methods to initialize the scheduling policy
  69. */
  70. static void load_sched_policy(struct starpu_sched_policy_s *sched_policy, struct starpu_sched_ctx *sched_ctx)
  71. {
  72. STARPU_ASSERT(sched_policy);
  73. #ifdef STARPU_VERBOSE
  74. if (sched_policy->policy_name)
  75. {
  76. if (sched_policy->policy_description)
  77. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  78. else
  79. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  80. }
  81. #endif
  82. struct starpu_sched_policy_s *policy = sched_ctx->sched_policy;
  83. policy->init_sched = sched_policy->init_sched;
  84. policy->deinit_sched = sched_policy->deinit_sched;
  85. policy->push_task = sched_policy->push_task;
  86. policy->push_prio_task = sched_policy->push_prio_task;
  87. policy->pop_task = sched_policy->pop_task;
  88. policy->post_exec_hook = sched_policy->post_exec_hook;
  89. policy->pop_every_task = sched_policy->pop_every_task;
  90. policy->policy_name = sched_policy->policy_name;
  91. }
  92. static struct starpu_sched_policy_s *find_sched_policy_from_name(const char *policy_name)
  93. {
  94. if (!policy_name)
  95. return NULL;
  96. unsigned i;
  97. for (i = 0; i < NPREDEFINED_POLICIES; i++)
  98. {
  99. struct starpu_sched_policy_s *p;
  100. p = predefined_policies[i];
  101. if (p->policy_name)
  102. {
  103. if (strcmp(policy_name, p->policy_name) == 0) {
  104. /* we found a policy with the requested name */
  105. return p;
  106. }
  107. }
  108. }
  109. /* nothing was found */
  110. return NULL;
  111. }
  112. static void display_sched_help_message(void)
  113. {
  114. const char *sched_env = getenv("STARPU_SCHED");
  115. if (sched_env && (strcmp(sched_env, "help") == 0)) {
  116. fprintf(stderr, "STARPU_SCHED can be either of\n");
  117. /* display the description of all predefined policies */
  118. unsigned i;
  119. for (i = 0; i < NPREDEFINED_POLICIES; i++)
  120. {
  121. struct starpu_sched_policy_s *p;
  122. p = predefined_policies[i];
  123. fprintf(stderr, "%s\t-> %s\n", p->policy_name, p->policy_description);
  124. }
  125. }
  126. }
  127. static struct starpu_sched_policy_s *select_sched_policy(struct starpu_machine_config_s *config, const char *policy_name)
  128. {
  129. struct starpu_sched_policy_s *selected_policy = NULL;
  130. struct starpu_conf *user_conf = config->user_conf;
  131. /* First, we check whether the application explicitely gave a scheduling policy or not */
  132. if (user_conf && (user_conf->sched_policy))
  133. return user_conf->sched_policy;
  134. /* Otherwise, we look if the application specified the name of a policy to load */
  135. const char *sched_pol_name;
  136. if (user_conf && (user_conf->sched_policy_name))
  137. {
  138. sched_pol_name = user_conf->sched_policy_name;
  139. }
  140. else {
  141. sched_pol_name = getenv("STARPU_SCHED");
  142. }
  143. if (sched_pol_name)
  144. selected_policy = find_sched_policy_from_name(sched_pol_name);
  145. else
  146. if(policy_name)
  147. selected_policy = find_sched_policy_from_name(policy_name);
  148. /* Perhaps there was no policy that matched the name */
  149. if (selected_policy)
  150. return selected_policy;
  151. /* If no policy was specified, we use the greedy policy as a default */
  152. return &_starpu_sched_eager_policy;
  153. }
  154. void _starpu_init_sched_policy(struct starpu_machine_config_s *config, struct starpu_sched_ctx *sched_ctx, const char *policy_name)
  155. {
  156. /* Perhaps we have to display some help */
  157. display_sched_help_message();
  158. /* Prefetch is activated by default */
  159. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  160. if (use_prefetch == -1)
  161. use_prefetch = 1;
  162. /* By default, we don't calibrate */
  163. unsigned do_calibrate = 0;
  164. if (config->user_conf && (config->user_conf->calibrate != -1))
  165. {
  166. do_calibrate = config->user_conf->calibrate;
  167. }
  168. else {
  169. int res = starpu_get_env_number("STARPU_CALIBRATE");
  170. do_calibrate = (res < 0)?0:(unsigned)res;
  171. }
  172. _starpu_set_calibrate_flag(do_calibrate);
  173. struct starpu_sched_policy_s *selected_policy;
  174. selected_policy = select_sched_policy(config, policy_name);
  175. load_sched_policy(selected_policy, sched_ctx);
  176. sched_ctx->sched_policy->init_sched(sched_ctx);
  177. }
  178. void _starpu_deinit_sched_policy(struct starpu_machine_config_s *config, struct starpu_sched_ctx *sched_ctx)
  179. {
  180. struct starpu_sched_policy_s *policy = sched_ctx->sched_policy;
  181. if (policy->deinit_sched)
  182. policy->deinit_sched(sched_ctx);
  183. }
  184. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  185. * case workerid identifies a combined worker, a task will be enqueued into
  186. * each worker of the combination. */
  187. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  188. {
  189. int nbasic_workers = (int)starpu_worker_get_count();
  190. /* Is this a basic worker or a combined worker ? */
  191. int is_basic_worker = (workerid < nbasic_workers);
  192. unsigned memory_node;
  193. struct starpu_worker_s *worker;
  194. struct starpu_combined_worker_s *combined_worker;
  195. if (is_basic_worker)
  196. {
  197. worker = _starpu_get_worker_struct(workerid);
  198. memory_node = worker->memory_node;
  199. }
  200. else
  201. {
  202. combined_worker = _starpu_get_combined_worker_struct(workerid);
  203. memory_node = combined_worker->memory_node;
  204. }
  205. if (use_prefetch)
  206. starpu_prefetch_task_input_on_node(task, memory_node);
  207. unsigned i;
  208. for(i = 0; i < worker->nctxs; i++){
  209. if (worker->sched_ctx[i]->sched_policy->push_task_notify)
  210. worker->sched_ctx[i]->sched_policy->push_task_notify(task, workerid);
  211. }
  212. if (is_basic_worker)
  213. {
  214. return _starpu_push_local_task(worker, task, 0);
  215. }
  216. else {
  217. /* This is a combined worker so we create task aliases */
  218. int worker_size = combined_worker->worker_size;
  219. int *combined_workerid = combined_worker->combined_workerid;
  220. int ret = 0;
  221. int i;
  222. starpu_job_t j = _starpu_get_job_associated_to_task(task);
  223. j->task_size = worker_size;
  224. j->combined_workerid = workerid;
  225. j->active_task_alias_count = 0;
  226. PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
  227. PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
  228. for (i = 0; i < worker_size; i++)
  229. {
  230. struct starpu_task *alias = _starpu_create_task_alias(task);
  231. worker = _starpu_get_worker_struct(combined_workerid[i]);
  232. ret |= _starpu_push_local_task(worker, alias, 0);
  233. }
  234. return ret;
  235. }
  236. }
  237. /* the generic interface that call the proper underlying implementation */
  238. int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
  239. {
  240. struct starpu_task *task = j->task;
  241. _STARPU_LOG_IN();
  242. task->status = STARPU_TASK_READY;
  243. _starpu_profiling_set_task_push_start_time(task);
  244. /* in case there is no codelet associated to the task (that's a control
  245. * task), we directly execute its callback and enforce the
  246. * corresponding dependencies */
  247. if (task->cl == NULL)
  248. {
  249. _starpu_handle_job_termination(j, job_is_already_locked);
  250. _STARPU_LOG_OUT_TAG("handle_job_termination");
  251. return 0;
  252. }
  253. int ret;
  254. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  255. {
  256. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  257. }
  258. else {
  259. struct starpu_sched_ctx *sched_ctx = task->sched_ctx;
  260. STARPU_ASSERT(sched_ctx->sched_policy->push_task);
  261. ret = sched_ctx->sched_policy->push_task(task, sched_ctx);
  262. }
  263. _starpu_profiling_set_task_push_end_time(task);
  264. _STARPU_LOG_OUT();
  265. return ret;
  266. }
  267. struct starpu_task *_starpu_pop_task(struct starpu_worker_s *worker)
  268. {
  269. struct starpu_task *task;
  270. /* We can't tell in advance which task will be picked up, so we measure
  271. * a timestamp, and will attribute it afterwards to the task. */
  272. int profiling = starpu_profiling_status_get();
  273. struct timespec pop_start_time;
  274. if (profiling)
  275. starpu_clock_gettime(&pop_start_time);
  276. /* perhaps there is some local task to be executed first */
  277. task = _starpu_pop_local_task(worker);
  278. if(!task){
  279. struct starpu_sched_ctx *sched_ctx;
  280. unsigned i;
  281. for(i = 0; i < worker->nctxs; i++){
  282. sched_ctx = worker->sched_ctx[i];
  283. if (sched_ctx->sched_policy->pop_task){
  284. task = sched_ctx->sched_policy->pop_task();
  285. break;
  286. }
  287. }
  288. }
  289. if(task){
  290. printf("task %s poped by th %d with strateg %s\n", task->name, worker->workerid, task->sched_ctx->sched_policy->policy_name);
  291. }
  292. /* Note that we may get a NULL task in case the scheduler was unlocked
  293. * for some reason. */
  294. if (profiling && task)
  295. {
  296. struct starpu_task_profiling_info *profiling_info;
  297. profiling_info = task->profiling_info;
  298. /* The task may have been created before profiling was enabled,
  299. * so we check if the profiling_info structure is available
  300. * even though we already tested if profiling is enabled. */
  301. if (profiling_info)
  302. {
  303. memcpy(&profiling_info->pop_start_time,
  304. &pop_start_time, sizeof(struct timespec));
  305. starpu_clock_gettime(&profiling_info->pop_end_time);
  306. }
  307. }
  308. return task;
  309. }
  310. struct starpu_task *_starpu_pop_every_task(struct starpu_sched_ctx *sched_ctx)
  311. {
  312. STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
  313. /* TODO set profiling info */
  314. return sched_ctx->sched_policy->pop_every_task();
  315. }
  316. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  317. {
  318. if (task->sched_ctx->sched_policy->post_exec_hook)
  319. task->sched_ctx->sched_policy->post_exec_hook(task);
  320. }
  321. void _starpu_wait_on_sched_event(void)
  322. {
  323. struct starpu_worker_s *worker = _starpu_get_local_worker_key();
  324. PTHREAD_MUTEX_LOCK(worker->sched_mutex);
  325. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  326. if (_starpu_machine_is_running())
  327. {
  328. #ifndef STARPU_NON_BLOCKING_DRIVERS
  329. pthread_cond_wait(worker->sched_cond, worker->sched_mutex);
  330. #endif
  331. }
  332. PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
  333. }
  334. /* The scheduling policy may put tasks directly into a worker's local queue so
  335. * that it is not always necessary to create its own queue when the local queue
  336. * is sufficient. If "back" not null, the task is put at the back of the queue
  337. * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  338. * a FIFO ordering. */
  339. int starpu_push_local_task(int workerid, struct starpu_task *task, int back)
  340. {
  341. struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
  342. return _starpu_push_local_task(worker, task, back);
  343. }
  344. void _starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerids_in_ctx, int nworkerids_in_ctx, unsigned is_init_sched)
  345. {
  346. sched_ctx->nworkers_in_ctx = nworkerids_in_ctx;
  347. sched_ctx->sched_policy = malloc(sizeof(struct starpu_sched_policy_s));
  348. sched_ctx->is_init_sched = is_init_sched;
  349. struct starpu_machine_config_s *config = _starpu_get_machine_config();
  350. int nworkers = config->topology.nworkers;
  351. int j;
  352. /*all the workers are in this contex*/
  353. if(workerids_in_ctx == NULL){
  354. for(j = 0; j < nworkers; j++){
  355. sched_ctx->workerid[j] = j;
  356. struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
  357. workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
  358. }
  359. sched_ctx->nworkers_in_ctx = nworkers;
  360. } else {
  361. int i;
  362. for(i = 0; i < nworkerids_in_ctx; i++){
  363. sched_ctx->workerid[i] = workerids_in_ctx[i];
  364. for(j = 0; j < nworkers; j++){
  365. if(sched_ctx->workerid[i] == j){
  366. struct starpu_worker_s *workerarg = _starpu_get_worker_struct(j);
  367. workerarg->sched_ctx[workerarg->nctxs++] = sched_ctx;
  368. }
  369. }
  370. }
  371. }
  372. _starpu_init_sched_policy(config, sched_ctx, policy_name);
  373. return;
  374. }
  375. void _starpu_decrement_nblocked_ths(void)
  376. {
  377. PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
  378. if(--nblocked_ths == 0)
  379. PTHREAD_COND_BROADCAST(&wakeup_ths_cond);
  380. PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
  381. }
  382. void _starpu_increment_nblocked_ths(int nworkers)
  383. {
  384. PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
  385. if (++nblocked_ths == nworkers)
  386. PTHREAD_COND_BROADCAST(&blocking_ths_cond);
  387. PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
  388. }
  389. static int _starpu_wait_for_all_threads_to_block(int nworkers)
  390. {
  391. PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
  392. while (nblocked_ths < nworkers)
  393. PTHREAD_COND_WAIT(&blocking_ths_cond, &blocking_ths_mutex);
  394. PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
  395. return 0;
  396. }
  397. static int _starpu_wait_for_all_threads_to_wake_up(void)
  398. {
  399. PTHREAD_MUTEX_LOCK(&blocking_ths_mutex);
  400. while (nblocked_ths > 0)
  401. PTHREAD_COND_WAIT(&wakeup_ths_cond, &blocking_ths_mutex);
  402. PTHREAD_MUTEX_UNLOCK(&blocking_ths_mutex);
  403. return 0;
  404. }
  405. static int set_changing_ctx_flag(starpu_worker_status changing_ctx, int nworkerids_in_ctx, int *workerids_in_ctx)
  406. {
  407. struct starpu_machine_config_s *config = _starpu_get_machine_config();
  408. int i;
  409. int nworkers = nworkerids_in_ctx == -1 ? config->topology.nworkers : nworkerids_in_ctx;
  410. struct starpu_worker_s *worker = NULL;
  411. pthread_mutex_t *changing_ctx_mutex = NULL;
  412. pthread_cond_t *changing_ctx_cond = NULL;
  413. int workerid = -1;
  414. for(i = 0; i < nworkers; i++)
  415. {
  416. workerid = workerids_in_ctx == NULL ? i : workerids_in_ctx[i];
  417. worker = _starpu_get_worker_struct(workerid);
  418. changing_ctx_mutex = &worker->changing_ctx_mutex;
  419. changing_ctx_cond = &worker->changing_ctx_cond;
  420. /*if the status is CHANGING_CTX let the thread know that it must block*/
  421. PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
  422. worker->status = changing_ctx;
  423. worker->nworkers_of_next_ctx = nworkers;
  424. PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
  425. /*if we have finished changing the ctx wake up the blocked threads*/
  426. if(changing_ctx == STATUS_UNKNOWN)
  427. {
  428. PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
  429. PTHREAD_COND_SIGNAL(changing_ctx_cond);
  430. PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
  431. }
  432. }
  433. /*after letting know all the concerned threads about the change
  434. wait for them to take into account the info*/
  435. if(changing_ctx == STATUS_CHANGING_CTX)
  436. _starpu_wait_for_all_threads_to_block(nworkers);
  437. else
  438. _starpu_wait_for_all_threads_to_wake_up();
  439. return 0;
  440. }
  441. void starpu_create_sched_ctx(struct starpu_sched_ctx *sched_ctx, const char *policy_name, int *workerids_in_ctx, int nworkerids_in_ctx)
  442. {
  443. /* wait for the workers concerned by the change of contex
  444. * to finish their work in the previous context */
  445. if(!starpu_wait_for_all_tasks_of_workers(workerids_in_ctx, nworkerids_in_ctx))
  446. {
  447. /* block the workers until the contex is switched */
  448. set_changing_ctx_flag(STATUS_CHANGING_CTX, nworkerids_in_ctx, workerids_in_ctx);
  449. _starpu_create_sched_ctx(sched_ctx, policy_name, workerids_in_ctx, nworkerids_in_ctx, 0);
  450. /* also wait the workers to wake up before using the context */
  451. set_changing_ctx_flag(STATUS_UNKNOWN, nworkerids_in_ctx, workerids_in_ctx);
  452. }
  453. return;
  454. }
  455. int worker_belongs_to_ctx(struct starpu_worker_s *workerarg, struct starpu_sched_ctx *sched_ctx)
  456. {
  457. unsigned i;
  458. for(i = 0; i < workerarg->nctxs; i++)
  459. if(sched_ctx != NULL && workerarg->sched_ctx[i] == sched_ctx
  460. && workerarg->status != STATUS_JOINED)
  461. return 1;
  462. return 0;
  463. }
  464. void starpu_delete_sched_ctx(struct starpu_sched_ctx *sched_ctx)
  465. {
  466. struct starpu_machine_config_s *config = _starpu_get_machine_config();
  467. int nworkers = config->topology.nworkers;
  468. int i;
  469. for(i = 0; i < nworkers; i++)
  470. {
  471. struct starpu_worker_s *workerarg = _starpu_get_worker_struct(i);
  472. if(worker_belongs_to_ctx(workerarg, sched_ctx))
  473. workerarg->nctxs--;
  474. }
  475. free(sched_ctx->sched_policy);
  476. sched_ctx->sched_policy = NULL;
  477. }
  478. void _starpu_delete_all_sched_ctxs()
  479. {
  480. struct starpu_machine_config_s *config = _starpu_get_machine_config();
  481. unsigned nworkers = config->topology.nworkers;
  482. unsigned i, j;
  483. struct starpu_sched_ctx *sched_ctx = NULL;
  484. struct starpu_worker_s *workerarg = NULL;
  485. for(i = 0; i < nworkers; i++)
  486. {
  487. workerarg = _starpu_get_worker_struct(i);
  488. for(j = 0; j < workerarg->nctxs; j++)
  489. {
  490. sched_ctx = workerarg->sched_ctx[j];
  491. if(sched_ctx != NULL && !sched_ctx->is_init_sched)
  492. {
  493. free(sched_ctx->sched_policy);
  494. sched_ctx->sched_policy = NULL;
  495. workerarg->nctxs--;
  496. }
  497. }
  498. }
  499. }
  500. int starpu_wait_for_all_tasks_of_worker(int workerid)
  501. {
  502. if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
  503. return -EDEADLK;
  504. struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
  505. PTHREAD_MUTEX_LOCK(&worker->submitted_mutex);
  506. while (worker->nsubmitted > 0)
  507. PTHREAD_COND_WAIT(&worker->submitted_cond, &worker->submitted_mutex);
  508. PTHREAD_MUTEX_UNLOCK(&worker->submitted_mutex);
  509. return 0;
  510. }
  511. int starpu_wait_for_all_tasks_of_workers(int *workerids_in_ctx, int nworkerids_in_ctx){
  512. int ret_val = 0;
  513. struct starpu_machine_config_s *config = _starpu_get_machine_config();
  514. int nworkers = nworkerids_in_ctx == -1 ? config->topology.nworkers : nworkerids_in_ctx;
  515. int workerid = -1;
  516. int i, n;
  517. for(i = 0; i < nworkers; i++)
  518. {
  519. workerid = workerids_in_ctx == NULL ? i : workerids_in_ctx[i];
  520. n = starpu_wait_for_all_tasks_of_worker(workerid);
  521. ret_val = ret_val && n;
  522. }
  523. return ret_val;
  524. }
  525. void _starpu_decrement_nsubmitted_tasks_of_worker(int workerid)
  526. {
  527. struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
  528. PTHREAD_MUTEX_LOCK(&worker->submitted_mutex);
  529. if (--worker->nsubmitted == 0)
  530. PTHREAD_COND_BROADCAST(&worker->submitted_cond);
  531. PTHREAD_MUTEX_UNLOCK(&worker->submitted_mutex);
  532. }
  533. void _starpu_increment_nsubmitted_tasks_of_worker(int workerid)
  534. {
  535. struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
  536. PTHREAD_MUTEX_LOCK(&worker->submitted_mutex);
  537. worker->nsubmitted++;
  538. PTHREAD_MUTEX_UNLOCK(&worker->submitted_mutex);
  539. }