deque_modeling_policy_data_aware.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010, 2011-2012 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011, 2012 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 Télécom-SudParis
  6. * Copyright (C) 2011 INRIA
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. /* Distributed queues using performance modeling to assign tasks */
  20. #include <limits.h>
  21. #include <core/workers.h>
  22. #include <sched_policies/fifo_queues.h>
  23. #include <core/perfmodel/perfmodel.h>
  24. #include <starpu_parameters.h>
  25. typedef struct {
  26. double alpha;
  27. double beta;
  28. double _gamma;
  29. double idle_power;
  30. struct starpu_fifo_taskq_s **queue_array;
  31. long int total_task_cnt;
  32. long int ready_task_cnt;
  33. } dmda_data;
  34. static int count_non_ready_buffers(struct starpu_task *task, uint32_t node)
  35. {
  36. int cnt = 0;
  37. unsigned nbuffers = task->cl->nbuffers;
  38. unsigned index;
  39. for (index = 0; index < nbuffers; index++)
  40. {
  41. starpu_data_handle_t handle;
  42. handle = task->handles[index];
  43. int is_valid;
  44. starpu_data_query_status(handle, node, NULL, &is_valid, NULL);
  45. if (!is_valid)
  46. cnt++;
  47. }
  48. return cnt;
  49. }
  50. static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo_taskq *fifo_queue, unsigned node)
  51. {
  52. struct starpu_task *task = NULL, *current;
  53. if (fifo_queue->ntasks == 0)
  54. return NULL;
  55. if (fifo_queue->ntasks > 0)
  56. {
  57. fifo_queue->ntasks--;
  58. task = starpu_task_list_back(&fifo_queue->taskq);
  59. int first_task_priority = task->priority;
  60. current = task;
  61. int non_ready_best = INT_MAX;
  62. while (current)
  63. {
  64. int priority = current->priority;
  65. if (priority <= first_task_priority)
  66. {
  67. int non_ready = count_non_ready_buffers(current, node);
  68. if (non_ready < non_ready_best)
  69. {
  70. non_ready_best = non_ready;
  71. task = current;
  72. if (non_ready == 0)
  73. break;
  74. }
  75. }
  76. current = current->prev;
  77. }
  78. starpu_task_list_erase(&fifo_queue->taskq, task);
  79. _STARPU_TRACE_JOB_POP(task, 0);
  80. }
  81. return task;
  82. }
  83. static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
  84. {
  85. dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
  86. struct starpu_task *task;
  87. int workerid = starpu_worker_get_id();
  88. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  89. unsigned node = starpu_worker_get_memory_node(workerid);
  90. task = _starpu_fifo_pop_first_ready_task(fifo, node);
  91. if (task)
  92. {
  93. double model = task->predicted;
  94. fifo->exp_len -= model;
  95. fifo->exp_start = starpu_timing_now() + model;
  96. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  97. #ifdef STARPU_VERBOSE
  98. if (task->cl)
  99. {
  100. int non_ready = count_non_ready_buffers(task, starpu_worker_get_memory_node(workerid));
  101. if (non_ready == 0)
  102. dt->ready_task_cnt++;
  103. }
  104. dt->total_task_cnt++;
  105. #endif
  106. }
  107. return task;
  108. }
  109. static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
  110. {
  111. dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
  112. struct starpu_task *task;
  113. int workerid = starpu_worker_get_id();
  114. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  115. task = _starpu_fifo_pop_task(fifo, workerid);
  116. if (task)
  117. {
  118. double model = task->predicted;
  119. fifo->exp_len -= model;
  120. fifo->exp_start = starpu_timing_now() + model;
  121. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  122. #ifdef STARPU_VERBOSE
  123. if (task->cl)
  124. {
  125. int non_ready = count_non_ready_buffers(task, starpu_worker_get_memory_node(workerid));
  126. if (non_ready == 0)
  127. dt->ready_task_cnt++;
  128. }
  129. dt->total_task_cnt++;
  130. #endif
  131. }
  132. return task;
  133. }
  134. static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
  135. {
  136. dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
  137. struct starpu_task *new_list;
  138. int workerid = starpu_worker_get_id();
  139. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  140. pthread_mutex_t *sched_mutex;
  141. pthread_cond_t *sched_cond;
  142. starpu_worker_get_sched_condition(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
  143. new_list = _starpu_fifo_pop_every_task(fifo, sched_mutex, workerid);
  144. while (new_list)
  145. {
  146. double model = new_list->predicted;
  147. fifo->exp_len -= model;
  148. fifo->exp_start = starpu_timing_now() + model;
  149. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  150. new_list = new_list->next;
  151. }
  152. return new_list;
  153. }
  154. static
  155. int _starpu_fifo_push_sorted_task(struct _starpu_fifo_taskq *fifo_queue, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task)
  156. {
  157. struct starpu_task_list *list = &fifo_queue->taskq;
  158. _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  159. _STARPU_TRACE_JOB_PUSH(task, 0);
  160. if (list->head == NULL)
  161. {
  162. list->head = task;
  163. list->tail = task;
  164. task->prev = NULL;
  165. task->next = NULL;
  166. }
  167. else
  168. {
  169. struct starpu_task *current = list->head;
  170. struct starpu_task *prev = NULL;
  171. while (current)
  172. {
  173. if (current->priority >= task->priority)
  174. break;
  175. prev = current;
  176. current = current->next;
  177. }
  178. if (prev == NULL)
  179. {
  180. /* Insert at the front of the list */
  181. list->head->prev = task;
  182. task->prev = NULL;
  183. task->next = list->head;
  184. list->head = task;
  185. }
  186. else
  187. {
  188. if (current)
  189. {
  190. /* Insert between prev and current */
  191. task->prev = prev;
  192. prev->next = task;
  193. task->next = current;
  194. current->prev = task;
  195. }
  196. else
  197. {
  198. /* Insert at the tail of the list */
  199. list->tail->next = task;
  200. task->next = NULL;
  201. task->prev = list->tail;
  202. list->tail = task;
  203. }
  204. }
  205. }
  206. fifo_queue->ntasks++;
  207. fifo_queue->nprocessed++;
  208. _STARPU_PTHREAD_COND_SIGNAL(sched_cond);
  209. _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  210. return 0;
  211. }
  212. static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio, unsigned sched_ctx_id)
  213. {
  214. dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
  215. /* make sure someone coule execute that task ! */
  216. STARPU_ASSERT(best_workerid != -1);
  217. struct _starpu_fifo_taskq *fifo;
  218. fifo = dt->queue_array[best_workerid];
  219. fifo->exp_end += predicted;
  220. fifo->exp_len += predicted;
  221. task->predicted = predicted;
  222. /* TODO predicted_transfer */
  223. unsigned memory_node = starpu_worker_get_memory_node(best_workerid);
  224. if (starpu_get_prefetch_flag())
  225. starpu_prefetch_task_input_on_node(task, memory_node);
  226. pthread_mutex_t *sched_mutex;
  227. pthread_cond_t *sched_cond;
  228. starpu_worker_get_sched_condition(sched_ctx_id, best_workerid, &sched_mutex, &sched_cond);
  229. if (prio)
  230. return _starpu_fifo_push_sorted_task(dt->queue_array[best_workerid],
  231. sched_mutex, sched_cond, task);
  232. else
  233. return _starpu_fifo_push_task(dt->queue_array[best_workerid],
  234. sched_mutex, sched_cond, task);
  235. }
  236. /* TODO: factorize with dmda!! */
  237. static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
  238. {
  239. dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
  240. /* find the queue */
  241. struct _starpu_fifo_taskq *fifo;
  242. unsigned worker, worker_ctx;
  243. int best = -1;
  244. double best_exp_end = 0.0;
  245. double model_best = 0.0;
  246. int ntasks_best = -1;
  247. double ntasks_best_end = 0.0;
  248. int calibrating = 0;
  249. /* A priori, we know all estimations */
  250. int unknown = 0;
  251. unsigned best_impl = 0;
  252. unsigned nimpl;
  253. unsigned nworkers = starpu_get_nworkers_of_ctx(sched_ctx_id);
  254. int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
  255. for (worker_ctx = 0; worker_ctx < nworkers; worker_ctx++)
  256. {
  257. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  258. {
  259. worker = workerids[worker_ctx];
  260. double exp_end;
  261. fifo = dt->queue_array[worker];
  262. /* Sometimes workers didn't take the tasks as early as we expected */
  263. fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
  264. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  265. if (!starpu_worker_can_execute_task(worker, task, nimpl))
  266. {
  267. /* no one on that queue may execute this task */
  268. continue;
  269. }
  270. enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
  271. double local_length = starpu_task_expected_length(task, perf_arch, nimpl);
  272. double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
  273. //_STARPU_DEBUG("Scheduler dm: task length (%lf) worker (%u) kernel (%u) \n", local_length,worker,nimpl);
  274. if (ntasks_best == -1
  275. || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
  276. || (!calibrating && isnan(local_length)) /* Not calibrating but this worker is being calibrated */
  277. || (calibrating && isnan(local_length) && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
  278. )
  279. {
  280. ntasks_best_end = ntasks_end;
  281. ntasks_best = worker;
  282. best_impl = nimpl;
  283. }
  284. if (isnan(local_length))
  285. /* we are calibrating, we want to speed-up calibration time
  286. * so we privilege non-calibrated tasks (but still
  287. * greedily distribute them to avoid dumb schedules) */
  288. calibrating = 1;
  289. if (isnan(local_length) || _STARPU_IS_ZERO(local_length))
  290. /* there is no prediction available for that task
  291. * with that arch yet, so switch to a greedy strategy */
  292. unknown = 1;
  293. if (unknown)
  294. continue;
  295. exp_end = fifo->exp_start + fifo->exp_len + local_length;
  296. if (best == -1 || exp_end < best_exp_end)
  297. {
  298. /* a better solution was found */
  299. best_exp_end = exp_end;
  300. best = worker;
  301. model_best = local_length;
  302. best_impl = nimpl;
  303. }
  304. }
  305. }
  306. if (unknown)
  307. {
  308. best = ntasks_best;
  309. model_best = 0.0;
  310. }
  311. _starpu_increment_nsubmitted_tasks_of_worker(best);
  312. //_STARPU_DEBUG("Scheduler dm: kernel (%u)\n", best_impl);
  313. _starpu_get_job_associated_to_task(task)->nimpl = best_impl;
  314. /* we should now have the best worker in variable "best" */
  315. return push_task_on_best_worker(task, best, model_best, prio, sched_ctx_id);
  316. }
  317. static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
  318. {
  319. dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
  320. /* find the queue */
  321. struct _starpu_fifo_taskq *fifo;
  322. unsigned worker, worker_ctx;
  323. int best = -1, best_ctx = -1;
  324. /* this flag is set if the corresponding worker is selected because
  325. there is no performance prediction available yet */
  326. int forced_best = -1;
  327. unsigned nworkers_ctx = starpu_get_nworkers_of_ctx(sched_ctx_id);
  328. int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
  329. double local_task_length[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
  330. double local_data_penalty[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
  331. double local_power[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
  332. double exp_end[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
  333. double max_exp_end = 0.0;
  334. double fitness[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
  335. double best_exp_end = 10e240;
  336. double model_best = 0.0;
  337. //double penality_best = 0.0;
  338. int ntasks_best = -1;
  339. double ntasks_best_end = 0.0;
  340. int calibrating = 0;
  341. /* A priori, we know all estimations */
  342. int unknown = 0;
  343. unsigned best_impl = 0;
  344. unsigned nimpl=0;
  345. for (worker_ctx = 0; worker_ctx < nworkers_ctx; worker_ctx++)
  346. {
  347. worker = workerids[worker_ctx];
  348. for(nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  349. {
  350. fifo = dt->queue_array[worker];
  351. /* Sometimes workers didn't take the tasks as early as we expected */
  352. fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
  353. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  354. if (fifo->exp_end > max_exp_end)
  355. max_exp_end = fifo->exp_end;
  356. if (!starpu_worker_can_execute_task(worker, task, nimpl))
  357. {
  358. /* no one on that queue may execute this task */
  359. continue;
  360. }
  361. enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
  362. local_task_length[worker_ctx][nimpl] = starpu_task_expected_length(task, perf_arch, nimpl);
  363. //_STARPU_DEBUG("Scheduler dmda: task length (%lf) worker (%u) kernel (%u) \n", local_task_length[worker],worker,nimpl);
  364. unsigned memory_node = starpu_worker_get_memory_node(worker);
  365. local_data_penalty[worker_ctx][nimpl] = starpu_task_expected_data_transfer_time(memory_node, task);
  366. double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
  367. if (ntasks_best == -1
  368. || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
  369. || (!calibrating && isnan(local_task_length[worker_ctx][nimpl])) /* Not calibrating but this worker is being calibrated */
  370. || (calibrating && isnan(local_task_length[worker_ctx][nimpl]) && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
  371. )
  372. {
  373. ntasks_best_end = ntasks_end;
  374. ntasks_best = worker;
  375. best_impl = nimpl;
  376. }
  377. if (isnan(local_task_length[worker_ctx][nimpl]))
  378. /* we are calibrating, we want to speed-up calibration time
  379. * so we privilege non-calibrated tasks (but still
  380. * greedily distribute them to avoid dumb schedules) */
  381. calibrating = 1;
  382. if (isnan(local_task_length[worker_ctx][nimpl])
  383. || _STARPU_IS_ZERO(local_task_length[worker_ctx][nimpl]))
  384. /* there is no prediction available for that task
  385. * with that arch yet, so switch to a greedy strategy */
  386. unknown = 1;
  387. if (unknown)
  388. continue;
  389. exp_end[worker_ctx][nimpl] = fifo->exp_start + fifo->exp_len + local_task_length[worker_ctx][nimpl];
  390. if (exp_end[worker_ctx][nimpl] < best_exp_end)
  391. {
  392. /* a better solution was found */
  393. best_exp_end = exp_end[worker_ctx][nimpl];
  394. best_impl = nimpl;
  395. }
  396. local_power[worker_ctx][nimpl] = starpu_task_expected_power(task, perf_arch, nimpl);
  397. if (isnan(local_power[worker_ctx][nimpl]))
  398. local_power[worker_ctx][nimpl] = 0.;
  399. }
  400. }
  401. if (unknown)
  402. forced_best = ntasks_best;
  403. double best_fitness = -1;
  404. if (forced_best == -1)
  405. {
  406. for (worker_ctx = 0; worker_ctx < nworkers_ctx; worker_ctx++)
  407. {
  408. worker = workerids[worker_ctx];
  409. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  410. {
  411. if (!starpu_worker_can_execute_task(worker, task, nimpl))
  412. {
  413. /* no one on that queue may execute this task */
  414. continue;
  415. }
  416. fifo = dt->queue_array[worker];
  417. fitness[worker_ctx][nimpl] = dt->alpha*(exp_end[worker_ctx][nimpl] - best_exp_end)
  418. + dt->beta*(local_data_penalty[worker_ctx][nimpl])
  419. + dt->_gamma*(local_power[worker_ctx][nimpl]);
  420. if (exp_end[worker_ctx][nimpl] > max_exp_end)
  421. /* This placement will make the computation
  422. * longer, take into account the idle
  423. * consumption of other cpus */
  424. fitness[worker_ctx][nimpl] += dt->_gamma * dt->idle_power * (exp_end[worker_ctx][nimpl] - max_exp_end) / 1000000.0;
  425. if (best == -1 || fitness[worker_ctx][nimpl] < best_fitness)
  426. {
  427. /* we found a better solution */
  428. best_fitness = fitness[worker_ctx][nimpl];
  429. best = worker;
  430. best_ctx = worker_ctx;
  431. best_impl = nimpl;
  432. // _STARPU_DEBUG("best fitness (worker %d) %e = alpha*(%e) + beta(%e) +gamma(%e)\n", worker, best_fitness, exp_end[worker][nimpl] - best_exp_end, local_data_penalty[worker][nimpl], local_power[worker][nimpl]);
  433. }
  434. }
  435. }
  436. STARPU_ASSERT(forced_best != -1 || best != -1);
  437. if (forced_best != -1)
  438. {
  439. /* there is no prediction available for that task
  440. * with that arch we want to speed-up calibration time
  441. * so we force this measurement */
  442. best = forced_best;
  443. model_best = 0.0;
  444. //penality_best = 0.0;
  445. }
  446. else
  447. {
  448. model_best = local_task_length[best_ctx][best_impl];
  449. //penality_best = local_data_penalty[best_ctx][best_impl];
  450. }
  451. //_STARPU_DEBUG("Scheduler dmda: kernel (%u)\n", best_impl);
  452. _starpu_get_job_associated_to_task(task)->nimpl = best_impl;
  453. /* we should now have the best worker in variable "best" */
  454. return push_task_on_best_worker(task, best, model_best, prio, sched_ctx_id);
  455. }
  456. static int dmda_push_sorted_task(struct starpu_task *task, unsigned sched_ctx_id)
  457. {
  458. return _dmda_push_task(task, 1, sched_ctx_id);
  459. }
  460. static int dm_push_task(struct starpu_task *task, unsigned sched_ctx_id)
  461. {
  462. return _dm_push_task(task, 0, sched_ctx_id);
  463. }
  464. static int dmda_push_task(struct starpu_task *task, unsigned sched_ctx_id)
  465. {
  466. return _dmda_push_task(task, 0, sched_ctx_id);
  467. }
  468. static void initialize_dmda_policy_for_workers(unsigned sched_ctx_id, int *workerids, unsigned nnew_workers)
  469. {
  470. unsigned nworkers = starpu_get_nworkers_of_ctx(sched_ctx_id);
  471. dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
  472. int workerid;
  473. unsigned i;
  474. pthread_mutex_t *sched_mutex;
  475. pthread_cond_t *sched_cond;
  476. for (i = 0; i < nnew_workers; i++)
  477. {
  478. workerid = workerids[i];
  479. dt->queue_array[workerid] = _starpu_create_fifo();
  480. starpu_worker_init_sched_condition(sched_ctx_id, workerid);
  481. }
  482. }
  483. static void initialize_dmda_policy(unsigned sched_ctx_id)
  484. {
  485. dmda_data *dt = (dmda_data*)malloc(sizeof(dmda_data));
  486. dt->alpha = STARPU_DEFAULT_ALPHA;
  487. dt->beta = STARPU_DEFAULT_BETA;
  488. dt->_gamma = STARPU_DEFAULT_GAMMA;
  489. dt->idle_power = 0.0;
  490. unsigned nworkers = starpu_get_nworkers_of_ctx(sched_ctx_id);
  491. int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
  492. starpu_set_sched_ctx_policy_data(sched_ctx_id, (void*)dt);
  493. dt->queue_array = (struct starpu_fifo_taskq_s**)malloc(STARPU_NMAXWORKERS*sizeof(struct starpu_fifo_taskq_s*));
  494. const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
  495. if (strval_alpha)
  496. dt->alpha = atof(strval_alpha);
  497. const char *strval_beta = getenv("STARPU_SCHED_BETA");
  498. if (strval_beta)
  499. dt->beta = atof(strval_beta);
  500. const char *strval_gamma = getenv("STARPU_SCHED_GAMMA");
  501. if (strval_gamma)
  502. dt->_gamma = atof(strval_gamma);
  503. unsigned workerid_ctx;
  504. int workerid;
  505. for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
  506. {
  507. workerid = workerids[workerid_ctx];
  508. dt->queue_array[workerid] = _starpu_create_fifo();
  509. starpu_worker_init_sched_condition(sched_ctx_id, workerid);
  510. }
  511. }
  512. static void initialize_dmda_sorted_policy(unsigned sched_ctx_id)
  513. {
  514. initialize_dmda_policy(sched_ctx_id);
  515. /* The application may use any integer */
  516. starpu_sched_set_min_priority(INT_MIN);
  517. starpu_sched_set_max_priority(INT_MAX);
  518. }
  519. static void deinitialize_dmda_policy(unsigned sched_ctx_id)
  520. {
  521. dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
  522. int workerid_ctx, workerid;
  523. int nworkers = starpu_get_nworkers_of_ctx(sched_ctx_id);
  524. int *workerids = starpu_get_workers_of_ctx(sched_ctx_id);
  525. for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++){
  526. workerid = workerids[workerid_ctx];
  527. _starpu_destroy_fifo(dt->queue_array[workerid]);
  528. starpu_worker_deinit_sched_condition(sched_ctx_id, workerid);
  529. }
  530. free(dt->queue_array);
  531. free(dt);
  532. _STARPU_DEBUG("total_task_cnt %ld ready_task_cnt %ld -> %f\n", total_task_cnt, ready_task_cnt, (100.0f*ready_task_cnt)/total_task_cnt);
  533. }
  534. /* TODO: use post_exec_hook to fix the expected start */
  535. struct starpu_sched_policy _starpu_sched_dm_policy =
  536. {
  537. .init_sched = initialize_dmda_policy,
  538. .deinit_sched = deinitialize_dmda_policy,
  539. .push_task = dm_push_task,
  540. .pop_task = dmda_pop_task,
  541. .pre_exec_hook = NULL,
  542. .post_exec_hook = NULL,
  543. .pop_every_task = dmda_pop_every_task,
  544. .policy_name = "dm",
  545. .policy_description = "performance model",
  546. .init_sched_for_workers = initialize_dmda_policy_for_workers
  547. };
  548. struct starpu_sched_policy _starpu_sched_dmda_policy =
  549. {
  550. .init_sched = initialize_dmda_policy,
  551. .deinit_sched = deinitialize_dmda_policy,
  552. .push_task = dmda_push_task,
  553. .pop_task = dmda_pop_task,
  554. .pre_exec_hook = NULL,
  555. .post_exec_hook = NULL,
  556. .pop_every_task = dmda_pop_every_task,
  557. .policy_name = "dmda",
  558. .policy_description = "data-aware performance model",
  559. .init_sched_for_workers = initialize_dmda_policy_for_workers
  560. };
  561. struct starpu_sched_policy _starpu_sched_dmda_sorted_policy =
  562. {
  563. .init_sched = initialize_dmda_sorted_policy,
  564. .deinit_sched = deinitialize_dmda_policy,
  565. .push_task = dmda_push_sorted_task,
  566. .pop_task = dmda_pop_ready_task,
  567. .pre_exec_hook = NULL,
  568. .post_exec_hook = NULL,
  569. .pop_every_task = dmda_pop_every_task,
  570. .policy_name = "dmdas",
  571. .policy_description = "data-aware performance model (sorted)",
  572. .init_sched_for_workers = initialize_dmda_policy_for_workers
  573. };
  574. struct starpu_sched_policy _starpu_sched_dmda_ready_policy =
  575. {
  576. .init_sched = initialize_dmda_policy,
  577. .deinit_sched = deinitialize_dmda_policy,
  578. .push_task = dmda_push_task,
  579. .pop_task = dmda_pop_ready_task,
  580. .pre_exec_hook = NULL,
  581. .post_exec_hook = NULL,
  582. .pop_every_task = dmda_pop_every_task,
  583. .policy_name = "dmdar",
  584. .policy_description = "data-aware performance model (ready)",
  585. .init_sched_for_workers = initialize_dmda_policy_for_workers
  586. };