heteroprio.c 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2015-2017 Inria
  4. * Copyright (C) 2015-2017 CNRS
  5. * Copyright (C) 2015-2017 Université de Bordeaux
  6. * Copyright (C) 2016 Uppsala University
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. /* Distributed queues using performance modeling to assign tasks */
  20. #include <starpu_config.h>
  21. #include <starpu_scheduler.h>
  22. #include <schedulers/starpu_heteroprio.h>
  23. #include <common/fxt.h>
  24. #include <core/task.h>
  25. #include <core/workers.h>
  26. #include <core/debug.h>
  27. #include <sched_policies/prio_deque.h>
  28. #include <limits.h>
  29. #ifndef DBL_MIN
  30. #define DBL_MIN __DBL_MIN__
  31. #endif
  32. #ifndef DBL_MAX
  33. #define DBL_MAX __DBL_MAX__
  34. #endif
  35. /* A bucket corresponds to a Pair of priorities
  36. * When a task is pushed with a priority X, it will be stored
  37. * into the bucket X.
  38. * All the tasks stored in the fifo should be computable by the arch
  39. * in valid_archs.
  40. * For example if valid_archs = (STARPU_CPU|STARPU_CUDA)
  41. * Then task->task->where should be at least (STARPU_CPU|STARPU_CUDA)
  42. */
  43. struct _heteroprio_bucket
  44. {
  45. /* The task of the current bucket */
  46. struct _starpu_prio_deque tasks_queue;
  47. /* The correct arch for the current bucket */
  48. unsigned valid_archs;
  49. /* The slow factors for any archs */
  50. float slow_factors_per_index[STARPU_NB_TYPES];
  51. /* The base arch for the slow factor (the fatest arch for the current task in the bucket */
  52. unsigned factor_base_arch_index;
  53. };
  54. /* Init a bucket */
  55. static void _heteroprio_bucket_init(struct _heteroprio_bucket* bucket)
  56. {
  57. memset(bucket, 0, sizeof(*bucket));
  58. _starpu_prio_deque_init(&bucket->tasks_queue);
  59. }
  60. /* Release a bucket */
  61. static void _heteroprio_bucket_release(struct _heteroprio_bucket* bucket)
  62. {
  63. STARPU_ASSERT(_starpu_prio_deque_is_empty(&bucket->tasks_queue) != 0);
  64. _starpu_prio_deque_destroy(&bucket->tasks_queue);
  65. }
  66. /* A worker is mainly composed of a fifo for the tasks
  67. * and some direct access to worker properties.
  68. * The fifo is implemented with any array,
  69. * to read a task, access tasks_queue[tasks_queue_index]
  70. * to write a task, access tasks_queue[(tasks_queue_index+tasks_queue_size)%HETEROPRIO_MAX_PREFETCH]
  71. */
  72. /* ANDRA_MODIF: can use starpu fifo + starpu sched_mutex*/
  73. struct _heteroprio_worker_wrapper
  74. {
  75. unsigned arch_type;
  76. unsigned arch_index;
  77. struct _starpu_prio_deque tasks_queue;
  78. };
  79. struct _starpu_heteroprio_data
  80. {
  81. starpu_pthread_mutex_t policy_mutex;
  82. struct starpu_bitmap *waiters;
  83. /* The bucket to store the tasks */
  84. struct _heteroprio_bucket buckets[STARPU_HETEROPRIO_MAX_PRIO];
  85. /* The number of buckets for each arch */
  86. unsigned nb_prio_per_arch_index[STARPU_NB_TYPES];
  87. /* The mapping to the corresponding buckets */
  88. unsigned prio_mapping_per_arch_index[STARPU_NB_TYPES][STARPU_HETEROPRIO_MAX_PRIO];
  89. /* The number of available tasks for a given arch (not prefetched) */
  90. unsigned nb_remaining_tasks_per_arch_index[STARPU_NB_TYPES];
  91. /* The total number of tasks in the bucket (not prefetched) */
  92. unsigned total_tasks_in_buckets;
  93. /* The total number of prefetched tasks for a given arch */
  94. unsigned nb_prefetched_tasks_per_arch_index[STARPU_NB_TYPES];
  95. /* The information for all the workers */
  96. struct _heteroprio_worker_wrapper workers_heteroprio[STARPU_NMAXWORKERS];
  97. /* The number of workers for a given arch */
  98. unsigned nb_workers_per_arch_index[STARPU_NB_TYPES];
  99. };
  100. /** Tell how many prio there are for a given arch */
  101. void starpu_heteroprio_set_nb_prios(unsigned sched_ctx_id, enum starpu_heteroprio_types arch, unsigned max_prio)
  102. {
  103. struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  104. STARPU_ASSERT(max_prio < STARPU_HETEROPRIO_MAX_PRIO);
  105. hp->nb_prio_per_arch_index[arch] = max_prio;
  106. }
  107. /** Set the mapping for a given arch prio=>bucket */
  108. inline void starpu_heteroprio_set_mapping(unsigned sched_ctx_id, enum starpu_heteroprio_types arch, unsigned source_prio, unsigned dest_bucket_id)
  109. {
  110. STARPU_ASSERT(dest_bucket_id < STARPU_HETEROPRIO_MAX_PRIO);
  111. struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  112. hp->prio_mapping_per_arch_index[arch][source_prio] = dest_bucket_id;
  113. hp->buckets[dest_bucket_id].valid_archs |= starpu_heteroprio_types_to_arch[arch];
  114. _STARPU_DEBUG("Adding arch %d to bucket %u\n", arch, dest_bucket_id);
  115. }
  116. /** Tell which arch is the faster for the tasks of a bucket (optional) */
  117. inline void starpu_heteroprio_set_faster_arch(unsigned sched_ctx_id, enum starpu_heteroprio_types arch, unsigned bucket_id)
  118. {
  119. STARPU_ASSERT(bucket_id < STARPU_HETEROPRIO_MAX_PRIO);
  120. struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  121. hp->buckets[bucket_id].factor_base_arch_index = arch;
  122. hp->buckets[bucket_id].slow_factors_per_index[arch] = 0;
  123. }
  124. /** Tell how slow is a arch for the tasks of a bucket (optional) */
  125. inline void starpu_heteroprio_set_arch_slow_factor(unsigned sched_ctx_id, enum starpu_heteroprio_types arch, unsigned bucket_id, float slow_factor)
  126. {
  127. STARPU_ASSERT(bucket_id < STARPU_HETEROPRIO_MAX_PRIO);
  128. struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  129. hp->buckets[bucket_id].slow_factors_per_index[arch] = slow_factor;
  130. }
  131. /** If the user does not provide an init callback we create a single bucket for all architectures */
  132. static inline void default_init_sched(unsigned sched_ctx_id)
  133. {
  134. int min_prio = starpu_sched_ctx_get_min_priority(sched_ctx_id);
  135. int max_prio = starpu_sched_ctx_get_max_priority(sched_ctx_id);
  136. STARPU_ASSERT(min_prio >= 0);
  137. STARPU_ASSERT(max_prio >= 0);
  138. // By default each type of devices uses 1 bucket and no slow factor
  139. #ifdef STARPU_USE_CPU
  140. if (starpu_cpu_worker_get_count() > 0)
  141. starpu_heteroprio_set_nb_prios(sched_ctx_id, STARPU_CPU_IDX, max_prio-min_prio+1);
  142. #endif
  143. #ifdef STARPU_USE_CUDA
  144. if (starpu_cuda_worker_get_count() > 0)
  145. starpu_heteroprio_set_nb_prios(sched_ctx_id, STARPU_CUDA_IDX, max_prio-min_prio+1);
  146. #endif
  147. #ifdef STARPU_USE_OPENCL
  148. if (starpu_opencl_worker_get_count() > 0)
  149. starpu_heteroprio_set_nb_prios(sched_ctx_id, STARPU_OPENCL_IDX, max_prio-min_prio+1);
  150. #endif
  151. #ifdef STARPU_USE_MIC
  152. if (starpu_mic_worker_get_count() > 0)
  153. starpu_heteroprio_set_nb_prios(sched_ctx_id, STARPU_MIC_IDX, max_prio-min_prio+1);
  154. #endif
  155. #ifdef STARPU_USE_SCC
  156. if (starpu_scc_worker_get_count() > 0)
  157. starpu_heteroprio_set_nb_prios(sched_ctx_id, STARPU_SCC_IDX, max_prio-min_prio+1);
  158. #endif
  159. #ifdef STARPU_USE_MPI_MASTER_SLAVE
  160. if (starpu_mpi_ms_worker_get_count() > 0)
  161. starpu_heteroprio_set_nb_prios(sched_ctx_id, STARPU_MPI_MS_IDX, max_prio-min_prio+1);
  162. #endif
  163. // Direct mapping
  164. int prio;
  165. for(prio=min_prio ; prio<=max_prio ; prio++)
  166. {
  167. #ifdef STARPU_USE_CPU
  168. if (starpu_cpu_worker_get_count() > 0)
  169. starpu_heteroprio_set_mapping(sched_ctx_id, STARPU_CPU_IDX, prio, prio);
  170. #endif
  171. #ifdef STARPU_USE_CUDA
  172. if (starpu_cuda_worker_get_count() > 0)
  173. starpu_heteroprio_set_mapping(sched_ctx_id, STARPU_CUDA_IDX, prio, prio);
  174. #endif
  175. #ifdef STARPU_USE_OPENCL
  176. if (starpu_opencl_worker_get_count() > 0)
  177. starpu_heteroprio_set_mapping(sched_ctx_id, STARPU_OPENCL_IDX, prio, prio);
  178. #endif
  179. #ifdef STARPU_USE_MIC
  180. if (starpu_mic_worker_get_count() > 0)
  181. starpu_heteroprio_set_mapping(sched_ctx_id, STARPU_MIC_IDX, prio, prio);
  182. #endif
  183. #ifdef STARPU_USE_SCC
  184. if (starpu_scc_worker_get_count() > 0)
  185. starpu_heteroprio_set_mapping(sched_ctx_id, STARPU_SCC_IDX, prio, prio);
  186. #endif
  187. #ifdef STARPU_USE_MPI_MASTER_SLAVE
  188. if (starpu_mpi_ms_worker_get_count() > 0)
  189. starpu_heteroprio_set_mapping(sched_ctx_id, STARPU_MPI_MS_IDX, prio, prio);
  190. #endif
  191. }
  192. }
  193. static void initialize_heteroprio_policy(unsigned sched_ctx_id)
  194. {
  195. /* Alloc the scheduler data */
  196. struct _starpu_heteroprio_data *hp;
  197. _STARPU_MALLOC(hp, sizeof(struct _starpu_heteroprio_data));
  198. memset(hp, 0, sizeof(*hp));
  199. hp->waiters = starpu_bitmap_create();
  200. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)hp);
  201. STARPU_PTHREAD_MUTEX_INIT(&hp->policy_mutex, NULL);
  202. unsigned idx_prio;
  203. for(idx_prio = 0; idx_prio < STARPU_HETEROPRIO_MAX_PRIO; ++idx_prio)
  204. _heteroprio_bucket_init(&hp->buckets[idx_prio]);
  205. void (*init_sched)(unsigned) = starpu_sched_ctx_get_sched_policy_init(sched_ctx_id);
  206. if(init_sched)
  207. init_sched(sched_ctx_id);
  208. else
  209. default_init_sched(sched_ctx_id);
  210. /* Ensure that information have been correctly filled */
  211. unsigned check_all_archs[STARPU_HETEROPRIO_MAX_PRIO];
  212. memset(check_all_archs, 0, sizeof(unsigned)*STARPU_HETEROPRIO_MAX_PRIO);
  213. unsigned arch_index;
  214. for(arch_index = 0; arch_index < STARPU_NB_TYPES; ++arch_index)
  215. {
  216. STARPU_ASSERT(hp->nb_prio_per_arch_index[arch_index] <= STARPU_HETEROPRIO_MAX_PRIO);
  217. unsigned check_archs[STARPU_HETEROPRIO_MAX_PRIO];
  218. memset(check_archs, 0, sizeof(unsigned)*STARPU_HETEROPRIO_MAX_PRIO);
  219. for(idx_prio = 0; idx_prio < hp->nb_prio_per_arch_index[arch_index]; ++idx_prio)
  220. {
  221. const unsigned mapped_prio = hp->prio_mapping_per_arch_index[arch_index][idx_prio];
  222. STARPU_ASSERT(mapped_prio <= STARPU_HETEROPRIO_MAX_PRIO);
  223. STARPU_ASSERT(hp->buckets[mapped_prio].slow_factors_per_index[arch_index] >= 0.0);
  224. STARPU_ASSERT(hp->buckets[mapped_prio].valid_archs & starpu_heteroprio_types_to_arch[arch_index]);
  225. check_archs[mapped_prio] = 1;
  226. check_all_archs[mapped_prio] += 1;
  227. }
  228. for(idx_prio = 0; idx_prio < STARPU_HETEROPRIO_MAX_PRIO; ++idx_prio)
  229. {
  230. /* Ensure the current arch use a bucket or someone else can use it */
  231. STARPU_ASSERT(check_archs[idx_prio] == 1 || hp->buckets[idx_prio].valid_archs == 0
  232. || (hp->buckets[idx_prio].valid_archs & ~starpu_heteroprio_types_to_arch[arch_index]) != 0);
  233. }
  234. }
  235. /* Ensure that if a valid_archs = (STARPU_CPU|STARPU_CUDA) then check_all_archs[] = 2 for example */
  236. for(idx_prio = 0; idx_prio < STARPU_HETEROPRIO_MAX_PRIO; ++idx_prio)
  237. {
  238. unsigned nb_arch_on_bucket = 0;
  239. for(arch_index = 0; arch_index < STARPU_NB_TYPES; ++arch_index)
  240. {
  241. if(hp->buckets[idx_prio].valid_archs & starpu_heteroprio_types_to_arch[arch_index])
  242. {
  243. nb_arch_on_bucket += 1;
  244. }
  245. }
  246. STARPU_ASSERT_MSG(check_all_archs[idx_prio] == nb_arch_on_bucket, "check_all_archs[idx_prio(%u)] = %u != nb_arch_on_bucket = %u\n", idx_prio, check_all_archs[idx_prio], nb_arch_on_bucket);
  247. }
  248. }
  249. static void deinitialize_heteroprio_policy(unsigned sched_ctx_id)
  250. {
  251. struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  252. /* Ensure there are no more tasks */
  253. STARPU_ASSERT(hp->total_tasks_in_buckets == 0);
  254. unsigned arch_index;
  255. for(arch_index = 0; arch_index < STARPU_NB_TYPES; ++arch_index)
  256. {
  257. STARPU_ASSERT(hp->nb_remaining_tasks_per_arch_index[arch_index] == 0);
  258. STARPU_ASSERT(hp->nb_prefetched_tasks_per_arch_index[arch_index] == 0);
  259. }
  260. unsigned idx_prio;
  261. for(idx_prio = 0; idx_prio < STARPU_HETEROPRIO_MAX_PRIO; ++idx_prio)
  262. {
  263. _heteroprio_bucket_release(&hp->buckets[idx_prio]);
  264. }
  265. starpu_bitmap_destroy(hp->waiters);
  266. STARPU_PTHREAD_MUTEX_DESTROY(&hp->policy_mutex);
  267. free(hp);
  268. }
  269. static void add_workers_heteroprio_policy(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  270. {
  271. struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  272. unsigned i;
  273. for (i = 0; i < nworkers; i++)
  274. {
  275. int workerid = workerids[i];
  276. memset(&hp->workers_heteroprio[workerid], 0, sizeof(hp->workers_heteroprio[workerid]));
  277. /* if the worker has already belonged to this context
  278. the queue and the synchronization variables have been already initialized */
  279. _starpu_prio_deque_init(&hp->workers_heteroprio[workerid].tasks_queue);
  280. switch(starpu_worker_get_type(workerid))
  281. {
  282. case STARPU_CPU_WORKER:
  283. hp->workers_heteroprio[workerid].arch_type = STARPU_CPU;
  284. hp->workers_heteroprio[workerid].arch_index = STARPU_CPU_IDX;
  285. break;
  286. case STARPU_CUDA_WORKER:
  287. hp->workers_heteroprio[workerid].arch_type = STARPU_CUDA;
  288. hp->workers_heteroprio[workerid].arch_index = STARPU_CUDA_IDX;
  289. break;
  290. case STARPU_OPENCL_WORKER:
  291. hp->workers_heteroprio[workerid].arch_type = STARPU_OPENCL;
  292. hp->workers_heteroprio[workerid].arch_index = STARPU_OPENCL_IDX;
  293. break;
  294. case STARPU_MIC_WORKER:
  295. hp->workers_heteroprio[workerid].arch_type = STARPU_MIC;
  296. hp->workers_heteroprio[workerid].arch_index = STARPU_MIC_IDX;
  297. break;
  298. case STARPU_SCC_WORKER:
  299. hp->workers_heteroprio[workerid].arch_type = STARPU_SCC;
  300. hp->workers_heteroprio[workerid].arch_index = STARPU_SCC_IDX;
  301. break;
  302. case STARPU_MPI_MS_WORKER:
  303. hp->workers_heteroprio[workerid].arch_type = STARPU_MPI_MS;
  304. hp->workers_heteroprio[workerid].arch_index = STARPU_MPI_MS_IDX;
  305. break;
  306. default:
  307. STARPU_ASSERT(0);
  308. }
  309. hp->nb_workers_per_arch_index[hp->workers_heteroprio[workerid].arch_index]++;
  310. }
  311. }
  312. static void remove_workers_heteroprio_policy(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  313. {
  314. struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  315. unsigned i;
  316. for (i = 0; i < nworkers; i++)
  317. {
  318. int workerid = workerids[i];
  319. _starpu_prio_deque_destroy(&hp->workers_heteroprio[workerid].tasks_queue);
  320. }
  321. }
  322. /* Push a new task (simply store it and update counters) */
  323. static int push_task_heteroprio_policy(struct starpu_task *task)
  324. {
  325. unsigned sched_ctx_id = task->sched_ctx;
  326. struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  327. /* One worker at a time use heteroprio */
  328. _starpu_worker_relax_on();
  329. STARPU_PTHREAD_MUTEX_LOCK(&hp->policy_mutex);
  330. _starpu_worker_relax_off();
  331. /* Retrieve the correct bucket */
  332. STARPU_ASSERT(task->priority < STARPU_HETEROPRIO_MAX_PRIO);
  333. struct _heteroprio_bucket* bucket = &hp->buckets[task->priority];
  334. /* Ensure that any worker that check that list can compute the task */
  335. STARPU_ASSERT_MSG(bucket->valid_archs, "The bucket %d does not have any archs\n", task->priority);
  336. STARPU_ASSERT(((bucket->valid_archs ^ task->where) & bucket->valid_archs) == 0);
  337. /* save the task */
  338. _starpu_prio_deque_push_back_task(&bucket->tasks_queue,task);
  339. /* Inc counters */
  340. unsigned arch_index;
  341. for(arch_index = 0; arch_index < STARPU_NB_TYPES; ++arch_index)
  342. {
  343. /* We test the archs on the bucket and not on task->where since it is restrictive */
  344. if(bucket->valid_archs & starpu_heteroprio_types_to_arch[arch_index])
  345. hp->nb_remaining_tasks_per_arch_index[arch_index] += 1;
  346. }
  347. hp->total_tasks_in_buckets += 1;
  348. starpu_push_task_end(task);
  349. /*if there are no tasks_queue block */
  350. /* wake people waiting for a task */
  351. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  352. struct starpu_sched_ctx_iterator it;
  353. #ifndef STARPU_NON_BLOCKING_DRIVERS
  354. char dowake[STARPU_NMAXWORKERS] = { 0 };
  355. #endif
  356. workers->init_iterator(workers, &it);
  357. while(workers->has_next(workers, &it))
  358. {
  359. unsigned worker = workers->get_next(workers, &it);
  360. #ifdef STARPU_NON_BLOCKING_DRIVERS
  361. if (!starpu_bitmap_get(hp->waiters, worker))
  362. /* This worker is not waiting for a task */
  363. continue;
  364. #endif
  365. if (starpu_worker_can_execute_task_first_impl(worker, task, NULL))
  366. {
  367. /* It can execute this one, tell him! */
  368. #ifdef STARPU_NON_BLOCKING_DRIVERS
  369. starpu_bitmap_unset(hp->waiters, worker);
  370. /* We really woke at least somebody, no need to wake somebody else */
  371. break;
  372. #else
  373. dowake[worker] = 1;
  374. #endif
  375. }
  376. }
  377. /* Let the task free */
  378. STARPU_PTHREAD_MUTEX_UNLOCK(&hp->policy_mutex);
  379. #ifndef STARPU_NON_BLOCKING_DRIVERS
  380. /* Now that we have a list of potential workers, try to wake one */
  381. workers->init_iterator(workers, &it);
  382. while(workers->has_next(workers, &it))
  383. {
  384. unsigned worker = workers->get_next(workers, &it);
  385. if (dowake[worker])
  386. if (_starpu_wake_worker_relax_light(worker))
  387. break; // wake up a single worker
  388. }
  389. #endif
  390. return 0;
  391. }
  392. static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
  393. {
  394. const unsigned workerid = starpu_worker_get_id_check();
  395. struct _starpu_heteroprio_data *hp = (struct _starpu_heteroprio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  396. struct _heteroprio_worker_wrapper* worker = &hp->workers_heteroprio[workerid];
  397. #ifdef STARPU_NON_BLOCKING_DRIVERS
  398. /* If no tasks available, no tasks in worker queue or some arch worker queue just return NULL */
  399. if (!STARPU_RUNNING_ON_VALGRIND
  400. && (hp->total_tasks_in_buckets == 0 || hp->nb_remaining_tasks_per_arch_index[worker->arch_index] == 0)
  401. && worker->tasks_queue.ntasks == 0 && hp->nb_prefetched_tasks_per_arch_index[worker->arch_index] == 0)
  402. {
  403. return NULL;
  404. }
  405. if (!STARPU_RUNNING_ON_VALGRIND && starpu_bitmap_get(hp->waiters, workerid))
  406. {
  407. /* Nobody woke us, avoid bothering the mutex */
  408. return NULL;
  409. }
  410. #endif
  411. _starpu_worker_relax_on();
  412. STARPU_PTHREAD_MUTEX_LOCK(&hp->policy_mutex);
  413. _starpu_worker_relax_off();
  414. /* keep track of the new added task to perfom real prefetch on node */
  415. unsigned nb_added_tasks = 0;
  416. /* Check that some tasks are available for the current worker arch */
  417. if( hp->nb_remaining_tasks_per_arch_index[worker->arch_index] != 0 )
  418. {
  419. /* Ideally we would like to fill the prefetch array */
  420. unsigned nb_tasks_to_prefetch = (STARPU_HETEROPRIO_MAX_PREFETCH-worker->tasks_queue.ntasks);
  421. /* But there are maybe less tasks than that! */
  422. if(nb_tasks_to_prefetch > hp->nb_remaining_tasks_per_arch_index[worker->arch_index])
  423. {
  424. nb_tasks_to_prefetch = hp->nb_remaining_tasks_per_arch_index[worker->arch_index];
  425. }
  426. /* But in case there are less tasks than worker we take the minimum */
  427. if(hp->nb_remaining_tasks_per_arch_index[worker->arch_index] < starpu_sched_ctx_get_nworkers(sched_ctx_id))
  428. {
  429. if(worker->tasks_queue.ntasks == 0)
  430. nb_tasks_to_prefetch = 1;
  431. else
  432. nb_tasks_to_prefetch = 0;
  433. }
  434. unsigned idx_prio, arch_index;
  435. /* We iterate until we found all the tasks we need */
  436. for(idx_prio = 0; nb_tasks_to_prefetch && idx_prio < hp->nb_prio_per_arch_index[worker->arch_index]; ++idx_prio)
  437. {
  438. /* Retrieve the bucket using the mapping */
  439. struct _heteroprio_bucket* bucket = &hp->buckets[hp->prio_mapping_per_arch_index[worker->arch_index][idx_prio]];
  440. /* Ensure we can compute task from this bucket */
  441. STARPU_ASSERT(bucket->valid_archs & worker->arch_type);
  442. /* Take nb_tasks_to_prefetch tasks if possible */
  443. while(!_starpu_prio_deque_is_empty(&bucket->tasks_queue) && nb_tasks_to_prefetch &&
  444. (bucket->factor_base_arch_index == 0 ||
  445. worker->arch_index == bucket->factor_base_arch_index ||
  446. (((float)bucket->tasks_queue.ntasks)/((float)hp->nb_workers_per_arch_index[bucket->factor_base_arch_index])) >= bucket->slow_factors_per_index[worker->arch_index]))
  447. {
  448. struct starpu_task* task = _starpu_prio_deque_pop_task(&bucket->tasks_queue);
  449. STARPU_ASSERT(starpu_worker_can_execute_task(workerid, task, 0));
  450. /* Save the task */
  451. STARPU_AYU_ADDTOTASKQUEUE(starpu_task_get_job_id(task), workerid);
  452. _starpu_prio_deque_push_task(&worker->tasks_queue, task);
  453. /* Update general counter */
  454. hp->nb_prefetched_tasks_per_arch_index[worker->arch_index] += 1;
  455. hp->total_tasks_in_buckets -= 1;
  456. for(arch_index = 0; arch_index < STARPU_NB_TYPES; ++arch_index)
  457. {
  458. /* We test the archs on the bucket and not on task->where since it is restrictive */
  459. if(bucket->valid_archs & starpu_heteroprio_types_to_arch[arch_index])
  460. {
  461. hp->nb_remaining_tasks_per_arch_index[arch_index] -= 1;
  462. }
  463. }
  464. /* Decrease the number of tasks to found */
  465. nb_tasks_to_prefetch -= 1;
  466. nb_added_tasks += 1;
  467. // TODO starpu_prefetch_task_input_on_node(task, workerid);
  468. }
  469. }
  470. }
  471. struct starpu_task* task = NULL;
  472. /* The worker has some tasks in its queue */
  473. if(worker->tasks_queue.ntasks)
  474. {
  475. int skipped;
  476. task = _starpu_prio_deque_pop_task_for_worker(&worker->tasks_queue, workerid, &skipped);
  477. hp->nb_prefetched_tasks_per_arch_index[worker->arch_index] -= 1;
  478. }
  479. /* Otherwise look if we can steal some work */
  480. else if(hp->nb_prefetched_tasks_per_arch_index[worker->arch_index])
  481. {
  482. /* If HETEROPRIO_MAX_PREFETCH==1 it should not be possible to steal work */
  483. STARPU_ASSERT(STARPU_HETEROPRIO_MAX_PREFETCH != 1);
  484. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  485. struct starpu_sched_ctx_iterator it;
  486. workers->init_iterator(workers, &it);
  487. unsigned victim;
  488. unsigned current_worker;
  489. /* Start stealing from just after ourself */
  490. while(workers->has_next(workers, &it))
  491. {
  492. current_worker = workers->get_next(workers, &it);
  493. if(current_worker == workerid)
  494. break;
  495. }
  496. /* circular loop */
  497. while (1)
  498. {
  499. if (!workers->has_next(workers, &it))
  500. {
  501. /* End of the list, restart from the beginning */
  502. workers->init_iterator(workers, &it);
  503. }
  504. while(workers->has_next(workers, &it))
  505. {
  506. victim = workers->get_next(workers, &it);
  507. /* When getting on ourself again, we're done trying to find work */
  508. if(victim == workerid)
  509. goto done;
  510. /* If it is the same arch and there is a task to steal */
  511. if(hp->workers_heteroprio[victim].arch_index == worker->arch_index
  512. && hp->workers_heteroprio[victim].tasks_queue.ntasks)
  513. {
  514. /* ensure the worker is not currently prefetching its data */
  515. _starpu_worker_lock(victim);
  516. if(hp->workers_heteroprio[victim].arch_index == worker->arch_index
  517. && hp->workers_heteroprio[victim].tasks_queue.ntasks)
  518. {
  519. int skipped;
  520. /* steal the last added task */
  521. task = _starpu_prio_deque_pop_task_for_worker(&hp->workers_heteroprio[victim].tasks_queue, workerid, &skipped);
  522. /* we steal a task update global counter */
  523. hp->nb_prefetched_tasks_per_arch_index[hp->workers_heteroprio[victim].arch_index] -= 1;
  524. _starpu_worker_unlock(victim);
  525. goto done;
  526. }
  527. _starpu_worker_unlock(victim);
  528. }
  529. }
  530. }
  531. done: ;
  532. }
  533. if (!task)
  534. {
  535. /* Tell pushers that we are waiting for tasks_queue for us */
  536. starpu_bitmap_set(hp->waiters, workerid);
  537. }
  538. STARPU_PTHREAD_MUTEX_UNLOCK(&hp->policy_mutex);
  539. if(task)
  540. {
  541. _starpu_worker_relax_on();
  542. _starpu_sched_ctx_lock_write(sched_ctx_id);
  543. _starpu_worker_relax_off();
  544. unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
  545. if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
  546. {
  547. starpu_sched_ctx_move_task_to_ctx_locked(task, child_sched_ctx, 1);
  548. starpu_sched_ctx_revert_task_counters_ctx_locked(sched_ctx_id, task->flops);
  549. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  550. return NULL;
  551. }
  552. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  553. }
  554. /* if we have task (task) me way have some in the queue (worker->tasks_queue_size) that was freshly addeed (nb_added_tasks) */
  555. if(task && worker->tasks_queue.ntasks && nb_added_tasks && starpu_get_prefetch_flag())
  556. {
  557. const unsigned memory_node = starpu_worker_get_memory_node(workerid);
  558. /* TOTO berenger: iterate in the other sense */
  559. struct starpu_task *task_to_prefetch = NULL;
  560. for (task_to_prefetch = starpu_task_prio_list_begin(&worker->tasks_queue.list);
  561. (task_to_prefetch != starpu_task_prio_list_end(&worker->tasks_queue.list) &&
  562. nb_added_tasks && hp->nb_remaining_tasks_per_arch_index[worker->arch_index] != 0);
  563. task_to_prefetch = starpu_task_prio_list_next(&worker->tasks_queue.list, task_to_prefetch))
  564. {
  565. /* prefetch from closest to end task */
  566. starpu_prefetch_task_input_on_node(task_to_prefetch, memory_node);
  567. nb_added_tasks -= 1;
  568. }
  569. }
  570. return task;
  571. }
  572. struct starpu_sched_policy _starpu_sched_heteroprio_policy =
  573. {
  574. .init_sched = initialize_heteroprio_policy,
  575. .deinit_sched = deinitialize_heteroprio_policy,
  576. .add_workers = add_workers_heteroprio_policy,
  577. .remove_workers = remove_workers_heteroprio_policy,
  578. .push_task = push_task_heteroprio_policy,
  579. .simulate_push_task = NULL,
  580. .push_task_notify = NULL,
  581. .pop_task = pop_task_heteroprio_policy,
  582. .pre_exec_hook = NULL,
  583. .post_exec_hook = NULL,
  584. .pop_every_task = NULL,
  585. .policy_name = "heteroprio",
  586. .policy_description = "heteroprio",
  587. .worker_type = STARPU_WORKER_LIST,
  588. };