deque_modeling_policy_data_aware.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010, 2011 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 Télécom-SudParis
  6. * Copyright (C) 2011 INRIA
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. /* Distributed queues using performance modeling to assign tasks */
  20. #include <limits.h>
  21. #include <core/workers.h>
  22. #include <sched_policies/fifo_queues.h>
  23. #include <core/perfmodel/perfmodel.h>
  24. #include <starpu_parameters.h>
  25. /* #ifdef STARPU_VERBOSE */
  26. /* static long int total_task_cnt = 0; */
  27. /* static long int ready_task_cnt = 0; */
  28. /* #endif */
  29. typedef struct {
  30. double alpha;
  31. double beta;
  32. double _gamma;
  33. double idle_power;
  34. struct starpu_fifo_taskq_s **queue_array;
  35. long int total_task_cnt;
  36. long int ready_task_cnt;
  37. } dmda_data;
  38. static int count_non_ready_buffers(struct starpu_task *task, uint32_t node)
  39. {
  40. int cnt = 0;
  41. starpu_buffer_descr *descrs = task->buffers;
  42. unsigned nbuffers = task->cl->nbuffers;
  43. unsigned index;
  44. for (index = 0; index < nbuffers; index++)
  45. {
  46. starpu_buffer_descr *descr;
  47. starpu_data_handle handle;
  48. descr = &descrs[index];
  49. handle = descr->handle;
  50. int is_valid;
  51. starpu_data_query_status(handle, node, NULL, &is_valid, NULL);
  52. if (!is_valid)
  53. cnt++;
  54. }
  55. return cnt;
  56. }
  57. static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct starpu_fifo_taskq_s *fifo_queue, unsigned node)
  58. {
  59. struct starpu_task *task = NULL, *current;
  60. if (fifo_queue->ntasks == 0)
  61. return NULL;
  62. if (fifo_queue->ntasks > 0)
  63. {
  64. fifo_queue->ntasks--;
  65. task = starpu_task_list_back(&fifo_queue->taskq);
  66. int first_task_priority = task->priority;
  67. current = task;
  68. int non_ready_best = INT_MAX;
  69. while (current)
  70. {
  71. int priority = current->priority;
  72. if (priority <= first_task_priority)
  73. {
  74. int non_ready = count_non_ready_buffers(current, node);
  75. if (non_ready < non_ready_best)
  76. {
  77. non_ready_best = non_ready;
  78. task = current;
  79. if (non_ready == 0)
  80. break;
  81. }
  82. }
  83. current = current->prev;
  84. }
  85. starpu_task_list_erase(&fifo_queue->taskq, task);
  86. STARPU_TRACE_JOB_POP(task, 0);
  87. }
  88. return task;
  89. }
  90. static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
  91. {
  92. struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  93. dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
  94. struct starpu_task *task;
  95. int workerid = starpu_worker_get_id();
  96. int workerid_ctx = _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
  97. struct starpu_fifo_taskq_s *fifo = dt->queue_array[workerid_ctx];
  98. unsigned node = starpu_worker_get_memory_node(workerid);
  99. task = _starpu_fifo_pop_first_ready_task(fifo, node);
  100. if (task) {
  101. double model = task->predicted;
  102. fifo->exp_len -= model;
  103. fifo->exp_start = starpu_timing_now() + model;
  104. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  105. #ifdef STARPU_VERBOSE
  106. if (task->cl)
  107. {
  108. int non_ready = count_non_ready_buffers(task, starpu_worker_get_memory_node(workerid));
  109. if (non_ready == 0)
  110. dt->ready_task_cnt++;
  111. }
  112. dt->total_task_cnt++;
  113. #endif
  114. }
  115. return task;
  116. }
  117. static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
  118. {
  119. struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  120. dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
  121. struct starpu_task *task;
  122. int workerid = starpu_worker_get_id();
  123. int workerid_ctx = _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
  124. struct starpu_fifo_taskq_s *fifo = dt->queue_array[workerid_ctx];
  125. task = _starpu_fifo_pop_task(fifo, -1);
  126. if (task) {
  127. double model = task->predicted;
  128. fifo->exp_len -= model;
  129. fifo->exp_start = starpu_timing_now() + model;
  130. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  131. #ifdef STARPU_VERBOSE
  132. if (task->cl)
  133. {
  134. int non_ready = count_non_ready_buffers(task, starpu_worker_get_memory_node(workerid));
  135. if (non_ready == 0)
  136. dt->ready_task_cnt++;
  137. }
  138. dt->total_task_cnt++;
  139. #endif
  140. }
  141. return task;
  142. }
  143. static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
  144. {
  145. struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  146. dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
  147. struct starpu_task *new_list;
  148. int workerid = starpu_worker_get_id();
  149. int workerid_ctx = _starpu_get_index_in_ctx_of_workerid(sched_ctx_id, workerid);
  150. struct starpu_fifo_taskq_s *fifo = dt->queue_array[workerid_ctx];
  151. new_list = _starpu_fifo_pop_every_task(fifo, sched_ctx->sched_mutex[workerid_ctx], workerid);
  152. while (new_list)
  153. {
  154. double model = new_list->predicted;
  155. fifo->exp_len -= model;
  156. fifo->exp_start = starpu_timing_now() + model;
  157. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  158. new_list = new_list->next;
  159. }
  160. return new_list;
  161. }
  162. static
  163. int _starpu_fifo_push_sorted_task(struct starpu_fifo_taskq_s *fifo_queue, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond, struct starpu_task *task)
  164. {
  165. struct starpu_task_list *list = &fifo_queue->taskq;
  166. PTHREAD_MUTEX_LOCK(sched_mutex);
  167. STARPU_TRACE_JOB_PUSH(task, 0);
  168. if (list->head == NULL)
  169. {
  170. list->head = task;
  171. list->tail = task;
  172. task->prev = NULL;
  173. task->next = NULL;
  174. }
  175. else {
  176. struct starpu_task *current = list->head;
  177. struct starpu_task *prev = NULL;
  178. while (current)
  179. {
  180. if (current->priority >= task->priority)
  181. break;
  182. prev = current;
  183. current = current->next;
  184. }
  185. if (prev == NULL)
  186. {
  187. /* Insert at the front of the list */
  188. list->head->prev = task;
  189. task->prev = NULL;
  190. task->next = list->head;
  191. list->head = task;
  192. }
  193. else {
  194. if (current)
  195. {
  196. /* Insert between prev and current */
  197. task->prev = prev;
  198. prev->next = task;
  199. task->next = current;
  200. current->prev = task;
  201. }
  202. else {
  203. /* Insert at the tail of the list */
  204. list->tail->next = task;
  205. task->next = NULL;
  206. task->prev = list->tail;
  207. list->tail = task;
  208. }
  209. }
  210. }
  211. fifo_queue->ntasks++;
  212. fifo_queue->nprocessed++;
  213. PTHREAD_COND_SIGNAL(sched_cond);
  214. PTHREAD_MUTEX_UNLOCK(sched_mutex);
  215. return 0;
  216. }
  217. static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio, struct starpu_sched_ctx *sched_ctx)
  218. {
  219. dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
  220. /* make sure someone coule execute that task ! */
  221. STARPU_ASSERT(best_workerid != -1);
  222. int best_workerid_ctx = _starpu_get_index_in_ctx_of_workerid(sched_ctx->id, best_workerid);
  223. struct starpu_fifo_taskq_s *fifo;
  224. fifo = dt->queue_array[best_workerid_ctx];
  225. fifo->exp_end += predicted;
  226. fifo->exp_len += predicted;
  227. task->predicted = predicted;
  228. unsigned memory_node = starpu_worker_get_memory_node(best_workerid);
  229. if (starpu_get_prefetch_flag())
  230. starpu_prefetch_task_input_on_node(task, memory_node);
  231. if (prio)
  232. return _starpu_fifo_push_sorted_task(dt->queue_array[best_workerid_ctx],
  233. sched_ctx->sched_mutex[best_workerid_ctx], sched_ctx->sched_cond[best_workerid_ctx], task);
  234. else
  235. return _starpu_fifo_push_task(dt->queue_array[best_workerid_ctx],
  236. sched_ctx->sched_mutex[best_workerid_ctx], sched_ctx->sched_cond[best_workerid_ctx], task);
  237. }
  238. static int _dm_push_task(struct starpu_task *task, unsigned prio, struct starpu_sched_ctx *sched_ctx)
  239. {
  240. dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
  241. /* find the queue */
  242. struct starpu_fifo_taskq_s *fifo;
  243. unsigned worker, worker_ctx;
  244. int best = -1;
  245. double best_exp_end = 0.0;
  246. double model_best = 0.0;
  247. int ntasks_best = -1;
  248. double ntasks_best_end = 0.0;
  249. int calibrating = 0;
  250. /* A priori, we know all estimations */
  251. int unknown = 0;
  252. unsigned best_impl = 0;
  253. unsigned nimpl;
  254. unsigned nworkers = sched_ctx->nworkers;
  255. for (worker_ctx = 0; worker_ctx < nworkers; worker_ctx++)
  256. {
  257. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  258. {
  259. worker = sched_ctx->workerids[worker_ctx];
  260. double exp_end;
  261. fifo = dt->queue_array[worker_ctx];
  262. /* Sometimes workers didn't take the tasks as early as we expected */
  263. fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
  264. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  265. if (!starpu_worker_may_execute_task(worker, task, nimpl))
  266. {
  267. /* no one on that queue may execute this task */
  268. continue;
  269. }
  270. enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
  271. double local_length = starpu_task_expected_length(task, perf_arch, nimpl);
  272. double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
  273. //_STARPU_DEBUG("Scheduler dm: task length (%lf) worker (%u) kernel (%u) \n", local_length,worker,nimpl);
  274. if (ntasks_best == -1
  275. || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
  276. || (!calibrating && local_length == -1.0) /* Not calibrating but this worker is being calibrated */
  277. || (calibrating && local_length == -1.0 && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
  278. ) {
  279. ntasks_best_end = ntasks_end;
  280. ntasks_best = worker;
  281. }
  282. if (local_length == -1.0)
  283. /* we are calibrating, we want to speed-up calibration time
  284. * so we privilege non-calibrated tasks (but still
  285. * greedily distribute them to avoid dumb schedules) */
  286. calibrating = 1;
  287. if (local_length <= 0.0)
  288. /* there is no prediction available for that task
  289. * with that arch yet, so switch to a greedy strategy */
  290. unknown = 1;
  291. if (unknown)
  292. continue;
  293. exp_end = fifo->exp_start + fifo->exp_len + local_length;
  294. if (best == -1 || exp_end < best_exp_end)
  295. {
  296. /* a better solution was found */
  297. best_exp_end = exp_end;
  298. best = worker;
  299. model_best = local_length;
  300. best_impl = nimpl;
  301. }
  302. }
  303. }
  304. if (unknown) {
  305. best = ntasks_best;
  306. model_best = 0.0;
  307. }
  308. _starpu_increment_nsubmitted_tasks_of_worker(best);
  309. //_STARPU_DEBUG("Scheduler dm: kernel (%u)\n", best_impl);
  310. _starpu_get_job_associated_to_task(task)->nimpl = 0;//best_impl;
  311. /* we should now have the best worker in variable "best" */
  312. return push_task_on_best_worker(task, best, model_best, prio, sched_ctx);
  313. }
  314. static int _dmda_push_task(struct starpu_task *task, unsigned prio, struct starpu_sched_ctx *sched_ctx)
  315. {
  316. dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
  317. /* find the queue */
  318. struct starpu_fifo_taskq_s *fifo;
  319. unsigned worker, worker_ctx;
  320. int best = -1, best_ctx = -1;
  321. /* this flag is set if the corresponding worker is selected because
  322. there is no performance prediction available yet */
  323. int forced_best = -1;
  324. unsigned nworkers_ctx = sched_ctx->nworkers;
  325. double local_task_length[nworkers_ctx];
  326. double local_data_penalty[nworkers_ctx];
  327. double local_power[nworkers_ctx];
  328. double exp_end[nworkers_ctx];
  329. double max_exp_end = 0.0;
  330. double fitness[nworkers_ctx];
  331. double best_exp_end = 10e240;
  332. double model_best = 0.0;
  333. //double penality_best = 0.0;
  334. int ntasks_best = -1;
  335. double ntasks_best_end = 0.0;
  336. int calibrating = 0;
  337. /* A priori, we know all estimations */
  338. int unknown = 0;
  339. unsigned best_impl = 0;
  340. unsigned nimpl=0;
  341. for (worker_ctx = 0; worker_ctx < nworkers_ctx; worker_ctx++)
  342. {
  343. worker = sched_ctx->workerids[worker_ctx];
  344. for(nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  345. {
  346. fifo = dt->queue_array[worker_ctx];
  347. /* Sometimes workers didn't take the tasks as early as we expected */
  348. fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
  349. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  350. if (fifo->exp_end > max_exp_end)
  351. max_exp_end = fifo->exp_end;
  352. if (!starpu_worker_may_execute_task(worker, task, nimpl))
  353. {
  354. /* no one on that queue may execute this task */
  355. continue;
  356. }
  357. enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
  358. local_task_length[worker_ctx] = starpu_task_expected_length(task, perf_arch, nimpl);
  359. //_STARPU_DEBUG("Scheduler dmda: task length (%lf) worker (%u) kernel (%u) \n", local_task_length[worker],worker,nimpl);
  360. unsigned memory_node = starpu_worker_get_memory_node(worker);
  361. local_data_penalty[worker_ctx] = starpu_task_expected_data_transfer_time(memory_node, task);
  362. double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
  363. if (ntasks_best == -1
  364. || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
  365. || (!calibrating && local_task_length[worker] == -1.0) /* Not calibrating but this worker is being calibrated */
  366. || (calibrating && local_task_length[worker] == -1.0 && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
  367. ) {
  368. ntasks_best_end = ntasks_end;
  369. ntasks_best = worker;
  370. }
  371. if (local_task_length[worker_ctx] == -1.0)
  372. /* we are calibrating, we want to speed-up calibration time
  373. * so we privilege non-calibrated tasks (but still
  374. * greedily distribute them to avoid dumb schedules) */
  375. calibrating = 1;
  376. if (local_task_length[worker_ctx] <= 0.0)
  377. /* there is no prediction available for that task
  378. * with that arch yet, so switch to a greedy strategy */
  379. unknown = 1;
  380. if (unknown)
  381. continue;
  382. exp_end[worker_ctx] = fifo->exp_start + fifo->exp_len + local_task_length[worker_ctx];
  383. if (exp_end[worker_ctx] < best_exp_end)
  384. {
  385. /* a better solution was found */
  386. best_exp_end = exp_end[worker_ctx];
  387. best_impl = nimpl;
  388. }
  389. local_power[worker_ctx] = starpu_task_expected_power(task, perf_arch, nimpl);
  390. if (local_power[worker_ctx] == -1.0)
  391. local_power[worker_ctx] = 0.;
  392. }
  393. }
  394. if (unknown)
  395. forced_best = ntasks_best;
  396. double best_fitness = -1;
  397. if (forced_best == -1)
  398. {
  399. for (worker_ctx = 0; worker_ctx < nworkers_ctx; worker_ctx++)
  400. {
  401. worker = sched_ctx->workerids[worker_ctx];
  402. fifo = dt->queue_array[worker_ctx];
  403. if (!starpu_worker_may_execute_task(worker, task, 0))
  404. {
  405. /* no one on that queue may execute this task */
  406. continue;
  407. }
  408. fitness[worker_ctx] = dt->alpha*(exp_end[worker_ctx] - best_exp_end)
  409. + dt->beta*(local_data_penalty[worker_ctx])
  410. + dt->_gamma*(local_power[worker_ctx]);
  411. if (exp_end[worker_ctx] > max_exp_end)
  412. /* This placement will make the computation
  413. * longer, take into account the idle
  414. * consumption of other cpus */
  415. fitness[worker_ctx] += dt->_gamma * dt->idle_power * (exp_end[worker_ctx] - max_exp_end) / 1000000.0;
  416. if (best == -1 || fitness[worker_ctx] < best_fitness)
  417. {
  418. /* we found a better solution */
  419. best_fitness = fitness[worker_ctx];
  420. best = worker;
  421. best_ctx = worker_ctx;
  422. // _STARPU_DEBUG("best fitness (worker %d) %e = alpha*(%e) + beta(%e) +gamma(%e)\n", worker, best_fitness, exp_end[worker] - best_exp_end, local_data_penalty[worker], local_power[worker]);
  423. }
  424. }
  425. }
  426. STARPU_ASSERT(forced_best != -1 || best != -1);
  427. if (forced_best != -1)
  428. {
  429. /* there is no prediction available for that task
  430. * with that arch we want to speed-up calibration time
  431. * so we force this measurement */
  432. best = forced_best;
  433. model_best = 0.0;
  434. //penality_best = 0.0;
  435. }
  436. else
  437. {
  438. model_best = local_task_length[best];
  439. //penality_best = local_data_penalty[best];
  440. }
  441. //_STARPU_DEBUG("Scheduler dmda: kernel (%u)\n", best_impl);
  442. _starpu_get_job_associated_to_task(task)->nimpl = best_impl;
  443. /* we should now have the best worker in variable "best" */
  444. return push_task_on_best_worker(task, best, model_best, prio, sched_ctx);
  445. }
  446. static int dmda_push_sorted_task(struct starpu_task *task, unsigned sched_ctx_id)
  447. {
  448. struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  449. return _dmda_push_task(task, 2, sched_ctx);
  450. }
  451. static int dm_push_task(struct starpu_task *task, unsigned sched_ctx_id)
  452. {
  453. struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  454. return _dm_push_task(task, 0, sched_ctx);
  455. }
  456. static int dmda_push_task(struct starpu_task *task, unsigned sched_ctx_id)
  457. {
  458. struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  459. return _dmda_push_task(task, 0, sched_ctx);
  460. }
  461. static void initialize_dmda_policy_for_workers(unsigned sched_ctx_id, unsigned nnew_workers)
  462. {
  463. struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  464. unsigned nworkers = sched_ctx->nworkers;
  465. dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
  466. struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
  467. unsigned ntotal_workers = config->topology.nworkers;
  468. unsigned all_workers = nnew_workers == ntotal_workers ? ntotal_workers : nworkers + nnew_workers;
  469. unsigned workerid_ctx;
  470. for (workerid_ctx = nworkers; workerid_ctx < all_workers; workerid_ctx++)
  471. {
  472. dt->queue_array[workerid_ctx] = _starpu_create_fifo();
  473. sched_ctx->sched_mutex[workerid_ctx] = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
  474. sched_ctx->sched_cond[workerid_ctx] = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));
  475. PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid_ctx], NULL);
  476. PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid_ctx], NULL);
  477. }
  478. }
  479. static void initialize_dmda_policy(unsigned sched_ctx_id)
  480. {
  481. dmda_data *dt = (dmda_data*)malloc(sizeof(dmda_data));
  482. dt->alpha = STARPU_DEFAULT_ALPHA;
  483. dt->beta = STARPU_DEFAULT_BETA;
  484. dt->_gamma = STARPU_DEFAULT_GAMMA;
  485. dt->idle_power = 0.0;
  486. struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  487. unsigned nworkers = sched_ctx->nworkers;
  488. sched_ctx->policy_data = (void*)dt;
  489. dt->queue_array = (struct starpu_fifo_taskq_s**)malloc(STARPU_NMAXWORKERS*sizeof(struct starpu_fifo_taskq_s*));
  490. const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
  491. if (strval_alpha)
  492. dt->alpha = atof(strval_alpha);
  493. const char *strval_beta = getenv("STARPU_SCHED_BETA");
  494. if (strval_beta)
  495. dt->beta = atof(strval_beta);
  496. const char *strval_gamma = getenv("STARPU_SCHED_GAMMA");
  497. if (strval_gamma)
  498. dt->_gamma = atof(strval_gamma);
  499. unsigned workerid_ctx;
  500. for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++)
  501. {
  502. dt->queue_array[workerid_ctx] = _starpu_create_fifo();
  503. sched_ctx->sched_mutex[workerid_ctx] = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
  504. sched_ctx->sched_cond[workerid_ctx] = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));
  505. PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid_ctx], NULL);
  506. PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid_ctx], NULL);
  507. }
  508. }
  509. static void initialize_dmda_sorted_policy(unsigned sched_ctx_id)
  510. {
  511. initialize_dmda_policy(sched_ctx_id);
  512. /* The application may use any integer */
  513. starpu_sched_set_min_priority(INT_MIN);
  514. starpu_sched_set_max_priority(INT_MAX);
  515. }
  516. static void deinitialize_dmda_policy(unsigned sched_ctx_id)
  517. {
  518. struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  519. dmda_data *dt = (dmda_data*)sched_ctx->policy_data;
  520. int workerid_ctx;
  521. int nworkers = sched_ctx->nworkers;
  522. for (workerid_ctx = 0; workerid_ctx < nworkers; workerid_ctx++){
  523. _starpu_destroy_fifo(dt->queue_array[workerid_ctx]);
  524. PTHREAD_MUTEX_DESTROY(sched_ctx->sched_mutex[workerid_ctx]);
  525. PTHREAD_COND_DESTROY(sched_ctx->sched_cond[workerid_ctx]);
  526. free(sched_ctx->sched_mutex[workerid_ctx]);
  527. free(sched_ctx->sched_cond[workerid_ctx]);
  528. }
  529. free(dt->queue_array);
  530. free(dt);
  531. _STARPU_DEBUG("total_task_cnt %ld ready_task_cnt %ld -> %f\n", total_task_cnt, ready_task_cnt, (100.0f*ready_task_cnt)/total_task_cnt);
  532. }
  533. /* TODO: use post_exec_hook to fix the expected start */
  534. struct starpu_sched_policy_s _starpu_sched_dm_policy = {
  535. .init_sched = initialize_dmda_policy,
  536. .deinit_sched = deinitialize_dmda_policy,
  537. .push_task = dm_push_task,
  538. .pop_task = dmda_pop_task,
  539. .post_exec_hook = NULL,
  540. .pop_every_task = dmda_pop_every_task,
  541. .policy_name = "dm",
  542. .policy_description = "performance model",
  543. .init_sched_for_workers = initialize_dmda_policy_for_workers
  544. };
  545. struct starpu_sched_policy_s _starpu_sched_dmda_policy = {
  546. .init_sched = initialize_dmda_policy,
  547. .deinit_sched = deinitialize_dmda_policy,
  548. .push_task = dmda_push_task,
  549. .pop_task = dmda_pop_task,
  550. .post_exec_hook = NULL,
  551. .pop_every_task = dmda_pop_every_task,
  552. .policy_name = "dmda",
  553. .policy_description = "data-aware performance model",
  554. .init_sched_for_workers = initialize_dmda_policy_for_workers
  555. };
  556. struct starpu_sched_policy_s _starpu_sched_dmda_sorted_policy = {
  557. .init_sched = initialize_dmda_sorted_policy,
  558. .deinit_sched = deinitialize_dmda_policy,
  559. .push_task = dmda_push_sorted_task,
  560. .pop_task = dmda_pop_ready_task,
  561. .post_exec_hook = NULL,
  562. .pop_every_task = dmda_pop_every_task,
  563. .policy_name = "dmdas",
  564. .policy_description = "data-aware performance model (sorted)",
  565. .init_sched_for_workers = initialize_dmda_policy_for_workers
  566. };
  567. struct starpu_sched_policy_s _starpu_sched_dmda_ready_policy = {
  568. .init_sched = initialize_dmda_policy,
  569. .deinit_sched = deinitialize_dmda_policy,
  570. .push_task = dmda_push_task,
  571. .pop_task = dmda_pop_ready_task,
  572. .post_exec_hook = NULL,
  573. .pop_every_task = dmda_pop_every_task,
  574. .policy_name = "dmdar",
  575. .policy_description = "data-aware performance model (ready)",
  576. .init_sched_for_workers = initialize_dmda_policy_for_workers
  577. };