deque_modeling_policy_data_aware.c 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2012 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011, 2012 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 Télécom-SudParis
  6. * Copyright (C) 2011 INRIA
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. /* Distributed queues using performance modeling to assign tasks */
  20. #include <limits.h>
  21. #include <core/perfmodel/perfmodel.h>
  22. #include <core/task_bundle.h>
  23. #include <core/workers.h>
  24. #include <sched_policies/fifo_queues.h>
  25. #include <core/perfmodel/perfmodel.h>
  26. #include <starpu_parameters.h>
  27. #ifndef DBL_MIN
  28. #define DBL_MIN __DBL_MIN__
  29. #endif
  30. #ifndef DBL_MAX
  31. #define DBL_MAX __DBL_MAX__
  32. #endif
  33. typedef struct {
  34. double alpha;
  35. double beta;
  36. double _gamma;
  37. double idle_power;
  38. struct _starpu_fifo_taskq **queue_array;
  39. long int total_task_cnt;
  40. long int ready_task_cnt;
  41. } dmda_data;
  42. static int count_non_ready_buffers(struct starpu_task *task, uint32_t node)
  43. {
  44. int cnt = 0;
  45. unsigned nbuffers = task->cl->nbuffers;
  46. unsigned index;
  47. for (index = 0; index < nbuffers; index++)
  48. {
  49. starpu_data_handle_t handle;
  50. handle = task->handles[index];
  51. int is_valid;
  52. starpu_data_query_status(handle, node, NULL, &is_valid, NULL);
  53. if (!is_valid)
  54. cnt++;
  55. }
  56. return cnt;
  57. }
  58. static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo_taskq *fifo_queue, unsigned node)
  59. {
  60. struct starpu_task *task = NULL, *current;
  61. if (fifo_queue->ntasks == 0)
  62. return NULL;
  63. if (fifo_queue->ntasks > 0)
  64. {
  65. fifo_queue->ntasks--;
  66. task = starpu_task_list_back(&fifo_queue->taskq);
  67. int first_task_priority = task->priority;
  68. current = task;
  69. int non_ready_best = INT_MAX;
  70. while (current)
  71. {
  72. int priority = current->priority;
  73. if (priority <= first_task_priority)
  74. {
  75. int non_ready = count_non_ready_buffers(current, node);
  76. if (non_ready < non_ready_best)
  77. {
  78. non_ready_best = non_ready;
  79. task = current;
  80. if (non_ready == 0)
  81. break;
  82. }
  83. }
  84. current = current->prev;
  85. }
  86. starpu_task_list_erase(&fifo_queue->taskq, task);
  87. _STARPU_TRACE_JOB_POP(task, 0);
  88. }
  89. return task;
  90. }
  91. static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
  92. {
  93. dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
  94. struct starpu_task *task;
  95. int workerid = starpu_worker_get_id();
  96. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  97. unsigned node = starpu_worker_get_memory_node(workerid);
  98. task = _starpu_fifo_pop_first_ready_task(fifo, node);
  99. if (task)
  100. {
  101. double model = task->predicted;
  102. fifo->exp_len -= model;
  103. fifo->exp_start = starpu_timing_now() + model;
  104. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  105. #ifdef STARPU_VERBOSE
  106. if (task->cl)
  107. {
  108. int non_ready = count_non_ready_buffers(task, starpu_worker_get_memory_node(workerid));
  109. if (non_ready == 0)
  110. dt->ready_task_cnt++;
  111. }
  112. dt->total_task_cnt++;
  113. #endif
  114. }
  115. return task;
  116. }
  117. static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
  118. {
  119. dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
  120. struct starpu_task *task;
  121. int workerid = starpu_worker_get_id();
  122. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  123. task = _starpu_fifo_pop_local_task(fifo);
  124. if (task)
  125. {
  126. double model = task->predicted;
  127. fifo->exp_len -= model;
  128. fifo->exp_start = starpu_timing_now() + model;
  129. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  130. #ifdef STARPU_VERBOSE
  131. if (task->cl)
  132. {
  133. int non_ready = count_non_ready_buffers(task, starpu_worker_get_memory_node(workerid));
  134. if (non_ready == 0)
  135. dt->ready_task_cnt++;
  136. }
  137. dt->total_task_cnt++;
  138. #endif
  139. }
  140. return task;
  141. }
  142. static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
  143. {
  144. dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
  145. struct starpu_task *new_list;
  146. int workerid = starpu_worker_get_id();
  147. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  148. pthread_mutex_t *sched_mutex;
  149. pthread_cond_t *sched_cond;
  150. starpu_worker_get_sched_condition(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
  151. new_list = _starpu_fifo_pop_every_task(fifo, sched_mutex, workerid);
  152. while (new_list)
  153. {
  154. double model = new_list->predicted;
  155. fifo->exp_len -= model;
  156. fifo->exp_start = starpu_timing_now() + model;
  157. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  158. new_list = new_list->next;
  159. }
  160. return new_list;
  161. }
  162. static int push_task_on_best_worker(struct starpu_task *task, int best_workerid, double predicted, int prio, unsigned sched_ctx_id)
  163. {
  164. dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
  165. /* make sure someone coule execute that task ! */
  166. STARPU_ASSERT(best_workerid != -1);
  167. struct _starpu_fifo_taskq *fifo;
  168. fifo = dt->queue_array[best_workerid];
  169. fifo->exp_end += predicted;
  170. fifo->exp_len += predicted;
  171. task->predicted = predicted;
  172. /* TODO predicted_transfer */
  173. unsigned memory_node = starpu_worker_get_memory_node(best_workerid);
  174. if (starpu_get_prefetch_flag())
  175. starpu_prefetch_task_input_on_node(task, memory_node);
  176. pthread_mutex_t *sched_mutex;
  177. pthread_cond_t *sched_cond;
  178. starpu_worker_get_sched_condition(sched_ctx_id, best_workerid, &sched_mutex, &sched_cond);
  179. #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
  180. starpu_call_pushed_task_cb(best_workerid, sched_ctx_id);
  181. #endif //STARPU_USE_SCHED_CTX_HYPERVISOR
  182. if (prio)
  183. return _starpu_fifo_push_sorted_task(dt->queue_array[best_workerid],
  184. sched_mutex, sched_cond, task);
  185. else
  186. return _starpu_fifo_push_task(dt->queue_array[best_workerid],
  187. sched_mutex, sched_cond, task);
  188. }
  189. /* TODO: factorize with dmda!! */
  190. static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
  191. {
  192. dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
  193. /* find the queue */
  194. struct _starpu_fifo_taskq *fifo;
  195. unsigned worker, worker_ctx;
  196. int best = -1;
  197. double best_exp_end = 0.0;
  198. double model_best = 0.0;
  199. int ntasks_best = -1;
  200. double ntasks_best_end = 0.0;
  201. int calibrating = 0;
  202. /* A priori, we know all estimations */
  203. int unknown = 0;
  204. unsigned best_impl = 0;
  205. unsigned nimpl;
  206. struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
  207. if(workers->init_cursor)
  208. workers->init_cursor(workers);
  209. while(workers->has_next(workers))
  210. {
  211. worker = workers->get_next(workers);
  212. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  213. {
  214. double exp_end;
  215. fifo = dt->queue_array[worker];
  216. /* Sometimes workers didn't take the tasks as early as we expected */
  217. fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
  218. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  219. if (!starpu_worker_can_execute_task(worker, task, nimpl))
  220. {
  221. /* no one on that queue may execute this task */
  222. worker_ctx++;
  223. continue;
  224. }
  225. enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
  226. double local_length = starpu_task_expected_length(task, perf_arch, nimpl);
  227. double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
  228. //_STARPU_DEBUG("Scheduler dm: task length (%lf) worker (%u) kernel (%u) \n", local_length,worker,nimpl);
  229. if (ntasks_best == -1
  230. || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
  231. || (!calibrating && isnan(local_length)) /* Not calibrating but this worker is being calibrated */
  232. || (calibrating && isnan(local_length) && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
  233. )
  234. {
  235. ntasks_best_end = ntasks_end;
  236. ntasks_best = worker;
  237. best_impl = nimpl;
  238. }
  239. if (isnan(local_length))
  240. /* we are calibrating, we want to speed-up calibration time
  241. * so we privilege non-calibrated tasks (but still
  242. * greedily distribute them to avoid dumb schedules) */
  243. calibrating = 1;
  244. if (isnan(local_length) || _STARPU_IS_ZERO(local_length))
  245. /* there is no prediction available for that task
  246. * with that arch yet, so switch to a greedy strategy */
  247. unknown = 1;
  248. if (unknown)
  249. continue;
  250. exp_end = fifo->exp_start + fifo->exp_len + local_length;
  251. if (best == -1 || exp_end < best_exp_end)
  252. {
  253. /* a better solution was found */
  254. best_exp_end = exp_end;
  255. best = worker;
  256. model_best = local_length;
  257. best_impl = nimpl;
  258. }
  259. }
  260. worker_ctx++;
  261. }
  262. if (unknown)
  263. {
  264. best = ntasks_best;
  265. model_best = 0.0;
  266. }
  267. //_STARPU_DEBUG("Scheduler dm: kernel (%u)\n", best_impl);
  268. if(workers->init_cursor)
  269. workers->deinit_cursor(workers);
  270. _starpu_get_job_associated_to_task(task)->nimpl = best_impl;
  271. /* we should now have the best worker in variable "best" */
  272. return push_task_on_best_worker(task, best, model_best, prio, sched_ctx_id);
  273. }
  274. static void compute_all_performance_predictions(struct starpu_task *task,
  275. double local_task_length[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
  276. double exp_end[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
  277. double *max_exp_endp,
  278. double *best_exp_endp,
  279. double local_data_penalty[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
  280. double local_power[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
  281. int *forced_worker, int *forced_impl, unsigned sched_ctx_id)
  282. {
  283. int calibrating = 0;
  284. double max_exp_end = DBL_MIN;
  285. double best_exp_end = DBL_MAX;
  286. int ntasks_best = -1;
  287. int nimpl_best = 0;
  288. double ntasks_best_end = 0.0;
  289. /* A priori, we know all estimations */
  290. int unknown = 0;
  291. unsigned worker, worker_ctx = 0;
  292. unsigned nimpl;
  293. starpu_task_bundle_t bundle = task->bundle;
  294. dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
  295. struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
  296. /* find the queue */
  297. struct _starpu_fifo_taskq *fifo;
  298. while(workers->has_next(workers))
  299. {
  300. worker = workers->get_next(workers);
  301. fifo = dt->queue_array[worker];
  302. for(nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  303. {
  304. if (!starpu_worker_can_execute_task(worker, task, nimpl))
  305. {
  306. /* no one on that queue may execute this task */
  307. continue;
  308. }
  309. /* Sometimes workers didn't take the tasks as early as we expected */
  310. fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
  311. exp_end[worker_ctx][nimpl] = fifo->exp_start + fifo->exp_len;
  312. if (exp_end[worker_ctx][nimpl] > max_exp_end)
  313. max_exp_end = exp_end[worker_ctx][nimpl];
  314. enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
  315. unsigned memory_node = starpu_worker_get_memory_node(worker);
  316. //_STARPU_DEBUG("Scheduler dmda: task length (%lf) worker (%u) kernel (%u) \n", local_task_length[worker][nimpl],worker,nimpl);
  317. if (bundle)
  318. {
  319. STARPU_ABORT(); /* Not implemented yet. */
  320. }
  321. else
  322. {
  323. local_task_length[worker_ctx][nimpl] = starpu_task_expected_length(task, perf_arch, nimpl);
  324. local_data_penalty[worker_ctx][nimpl] = starpu_task_expected_data_transfer_time(memory_node, task);
  325. local_power[worker_ctx][nimpl] = starpu_task_expected_power(task, perf_arch,nimpl);
  326. }
  327. double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
  328. if (ntasks_best == -1
  329. || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better worker */
  330. || (!calibrating && isnan(local_task_length[worker_ctx][nimpl])) /* Not calibrating but this worker is being calibrated */
  331. || (calibrating && isnan(local_task_length[worker_ctx][nimpl]) && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
  332. )
  333. {
  334. ntasks_best_end = ntasks_end;
  335. ntasks_best = worker;
  336. nimpl_best = nimpl;
  337. }
  338. if (isnan(local_task_length[worker_ctx][nimpl]))
  339. /* we are calibrating, we want to speed-up calibration time
  340. * so we privilege non-calibrated tasks (but still
  341. * greedily distribute them to avoid dumb schedules) */
  342. calibrating = 1;
  343. if (isnan(local_task_length[worker_ctx][nimpl])
  344. || _STARPU_IS_ZERO(local_task_length[worker_ctx][nimpl]))
  345. /* there is no prediction available for that task
  346. * with that arch (yet or at all), so switch to a greedy strategy */
  347. unknown = 1;
  348. if (unknown)
  349. continue;
  350. exp_end[worker_ctx][nimpl] = fifo->exp_start + fifo->exp_len + local_task_length[worker_ctx][nimpl];
  351. if (exp_end[worker_ctx][nimpl] < best_exp_end)
  352. {
  353. /* a better solution was found */
  354. best_exp_end = exp_end[worker_ctx][nimpl];
  355. nimpl_best = nimpl;
  356. }
  357. if (isnan(local_power[worker_ctx][nimpl]))
  358. local_power[worker_ctx][nimpl] = 0.;
  359. }
  360. worker_ctx++;
  361. }
  362. *forced_worker = unknown?ntasks_best:-1;
  363. *forced_impl = unknown?nimpl_best:-1;
  364. *best_exp_endp = best_exp_end;
  365. *max_exp_endp = max_exp_end;
  366. }
  367. static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
  368. {
  369. /* find the queue */
  370. unsigned worker, worker_ctx = 0;
  371. int best = -1, best_in_ctx = -1;
  372. int selected_impl = 0;
  373. double model_best = 0.0;
  374. /* this flag is set if the corresponding worker is selected because
  375. there is no performance prediction available yet */
  376. int forced_best = -1;
  377. int forced_impl = -1;
  378. dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
  379. struct worker_collection *workers = starpu_get_worker_collection_of_sched_ctx(sched_ctx_id);
  380. unsigned nworkers_ctx = workers->nworkers;
  381. double local_task_length[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
  382. double local_data_penalty[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
  383. double local_power[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
  384. double exp_end[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
  385. double max_exp_end = 0.0;
  386. double best_exp_end;
  387. double fitness[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
  388. if(workers->init_cursor)
  389. workers->init_cursor(workers);
  390. compute_all_performance_predictions(task,
  391. local_task_length,
  392. exp_end,
  393. &max_exp_end,
  394. &best_exp_end,
  395. local_data_penalty,
  396. local_power,
  397. &forced_best,
  398. &forced_impl, sched_ctx_id);
  399. double best_fitness = -1;
  400. unsigned nimpl;
  401. if (forced_best == -1)
  402. {
  403. while(workers->has_next(workers))
  404. {
  405. worker = workers->get_next(workers);
  406. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  407. {
  408. if (!starpu_worker_can_execute_task(worker, task, nimpl))
  409. {
  410. /* no one on that queue may execute this task */
  411. continue;
  412. }
  413. fitness[worker_ctx][nimpl] = dt->alpha*(exp_end[worker_ctx][nimpl] - best_exp_end)
  414. + dt->beta*(local_data_penalty[worker_ctx][nimpl])
  415. + dt->_gamma*(local_power[worker_ctx][nimpl]);
  416. if (exp_end[worker_ctx][nimpl] > max_exp_end)
  417. /* This placement will make the computation
  418. * longer, take into account the idle
  419. * consumption of other cpus */
  420. fitness[worker_ctx][nimpl] += dt->_gamma * dt->idle_power * (exp_end[worker_ctx][nimpl] - max_exp_end) / 1000000.0;
  421. if (best == -1 || fitness[worker_ctx][nimpl] < best_fitness)
  422. {
  423. /* we found a better solution */
  424. best_fitness = fitness[worker_ctx][nimpl];
  425. best = worker;
  426. best_in_ctx = worker_ctx;
  427. selected_impl = nimpl;
  428. //_STARPU_DEBUG("best fitness (worker %d) %e = alpha*(%e) + beta(%e) +gamma(%e)\n", worker, best_fitness, exp_end[worker][nimpl] - best_exp_end, local_data_penalty[worker][nimpl], local_power[worker][nimpl]);
  429. }
  430. }
  431. }
  432. }
  433. STARPU_ASSERT(forced_best != -1 || best != -1);
  434. if (forced_best != -1)
  435. {
  436. /* there is no prediction available for that task
  437. * with that arch we want to speed-up calibration time
  438. * so we force this measurement */
  439. best = forced_best;
  440. model_best = 0.0;
  441. //penality_best = 0.0;
  442. }
  443. else
  444. {
  445. model_best = local_task_length[best_in_ctx][selected_impl];
  446. //penality_best = local_data_penalty[best_in_ctx][best_impl];
  447. }
  448. if(workers->init_cursor)
  449. workers->deinit_cursor(workers);
  450. //_STARPU_DEBUG("Scheduler dmda: kernel (%u)\n", best_impl);
  451. _starpu_get_job_associated_to_task(task)->nimpl = selected_impl;
  452. /* we should now have the best worker in variable "best" */
  453. return push_task_on_best_worker(task, best, model_best, prio, sched_ctx_id);
  454. }
  455. static int dmda_push_sorted_task(struct starpu_task *task)
  456. {
  457. unsigned sched_ctx_id = task->sched_ctx;
  458. pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
  459. unsigned nworkers;
  460. int ret_val = -1;
  461. _STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
  462. nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
  463. if(nworkers == 0)
  464. {
  465. _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
  466. return ret_val;
  467. }
  468. ret_val = _dmda_push_task(task, 1, sched_ctx_id);
  469. _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
  470. return ret_val;
  471. }
  472. static int dm_push_task(struct starpu_task *task)
  473. {
  474. unsigned sched_ctx_id = task->sched_ctx;
  475. pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
  476. unsigned nworkers;
  477. int ret_val = -1;
  478. _STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
  479. nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
  480. if(nworkers == 0)
  481. {
  482. _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
  483. return ret_val;
  484. }
  485. ret_val = _dm_push_task(task, 0, sched_ctx_id);
  486. _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
  487. return ret_val;
  488. }
  489. static int dmda_push_task(struct starpu_task *task)
  490. {
  491. unsigned sched_ctx_id = task->sched_ctx;
  492. pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
  493. unsigned nworkers;
  494. int ret_val = -1;
  495. _STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
  496. nworkers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
  497. if(nworkers == 0)
  498. {
  499. _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
  500. return ret_val;
  501. }
  502. STARPU_ASSERT(task);
  503. ret_val = _dmda_push_task(task, 0, sched_ctx_id);
  504. _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
  505. return ret_val;
  506. }
  507. static void dmda_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  508. {
  509. dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
  510. int workerid;
  511. unsigned i;
  512. for (i = 0; i < nworkers; i++)
  513. {
  514. workerid = workerids[i];
  515. dt->queue_array[workerid] = _starpu_create_fifo();
  516. starpu_worker_init_sched_condition(sched_ctx_id, workerid);
  517. }
  518. }
  519. static void dmda_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  520. {
  521. dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
  522. int workerid;
  523. unsigned i;
  524. for (i = 0; i < nworkers; i++)
  525. {
  526. workerid = workerids[i];
  527. _starpu_destroy_fifo(dt->queue_array[workerid]);
  528. starpu_worker_deinit_sched_condition(sched_ctx_id, workerid);
  529. }
  530. }
  531. static void initialize_dmda_policy(unsigned sched_ctx_id)
  532. {
  533. starpu_create_worker_collection_for_sched_ctx(sched_ctx_id, WORKER_LIST);
  534. dmda_data *dt = (dmda_data*)malloc(sizeof(dmda_data));
  535. dt->alpha = _STARPU_DEFAULT_ALPHA;
  536. dt->beta = _STARPU_DEFAULT_BETA;
  537. dt->_gamma = _STARPU_DEFAULT_GAMMA;
  538. dt->idle_power = 0.0;
  539. starpu_set_sched_ctx_policy_data(sched_ctx_id, (void*)dt);
  540. dt->queue_array = (struct _starpu_fifo_taskq**)malloc(STARPU_NMAXWORKERS*sizeof(struct _starpu_fifo_taskq*));
  541. const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
  542. if (strval_alpha)
  543. dt->alpha = atof(strval_alpha);
  544. const char *strval_beta = getenv("STARPU_SCHED_BETA");
  545. if (strval_beta)
  546. dt->beta = atof(strval_beta);
  547. const char *strval_gamma = getenv("STARPU_SCHED_GAMMA");
  548. if (strval_gamma)
  549. dt->_gamma = atof(strval_gamma);
  550. const char *strval_idle_power = getenv("STARPU_IDLE_POWER");
  551. if (strval_idle_power)
  552. dt->idle_power = atof(strval_idle_power);
  553. }
  554. static void initialize_dmda_sorted_policy(unsigned sched_ctx_id)
  555. {
  556. initialize_dmda_policy(sched_ctx_id);
  557. /* The application may use any integer */
  558. starpu_sched_set_min_priority(INT_MIN);
  559. starpu_sched_set_max_priority(INT_MAX);
  560. }
  561. static void deinitialize_dmda_policy(unsigned sched_ctx_id)
  562. {
  563. dmda_data *dt = (dmda_data*)starpu_get_sched_ctx_policy_data(sched_ctx_id);
  564. free(dt->queue_array);
  565. free(dt);
  566. starpu_delete_worker_collection_for_sched_ctx(sched_ctx_id);
  567. _STARPU_DEBUG("total_task_cnt %ld ready_task_cnt %ld -> %f\n", dt->total_task_cnt, dt->ready_task_cnt, (100.0f*dt->ready_task_cnt)/dt->total_task_cnt);
  568. }
  569. /* TODO: use post_exec_hook to fix the expected start */
  570. struct starpu_sched_policy _starpu_sched_dm_policy =
  571. {
  572. .init_sched = initialize_dmda_policy,
  573. .deinit_sched = deinitialize_dmda_policy,
  574. .add_workers = dmda_add_workers ,
  575. .remove_workers = dmda_remove_workers,
  576. .push_task = dm_push_task,
  577. .pop_task = dmda_pop_task,
  578. .pre_exec_hook = NULL,
  579. .post_exec_hook = NULL,
  580. .pop_every_task = dmda_pop_every_task,
  581. .policy_name = "dm",
  582. .policy_description = "performance model"
  583. };
  584. struct starpu_sched_policy _starpu_sched_dmda_policy =
  585. {
  586. .init_sched = initialize_dmda_policy,
  587. .deinit_sched = deinitialize_dmda_policy,
  588. .add_workers = dmda_add_workers ,
  589. .remove_workers = dmda_remove_workers,
  590. .push_task = dmda_push_task,
  591. .pop_task = dmda_pop_task,
  592. .pre_exec_hook = NULL,
  593. .post_exec_hook = NULL,
  594. .pop_every_task = dmda_pop_every_task,
  595. .policy_name = "dmda",
  596. .policy_description = "data-aware performance model"
  597. };
  598. struct starpu_sched_policy _starpu_sched_dmda_sorted_policy =
  599. {
  600. .init_sched = initialize_dmda_sorted_policy,
  601. .deinit_sched = deinitialize_dmda_policy,
  602. .add_workers = dmda_add_workers ,
  603. .remove_workers = dmda_remove_workers,
  604. .push_task = dmda_push_sorted_task,
  605. .pop_task = dmda_pop_ready_task,
  606. .pre_exec_hook = NULL,
  607. .post_exec_hook = NULL,
  608. .pop_every_task = dmda_pop_every_task,
  609. .policy_name = "dmdas",
  610. .policy_description = "data-aware performance model (sorted)"
  611. };
  612. struct starpu_sched_policy _starpu_sched_dmda_ready_policy =
  613. {
  614. .init_sched = initialize_dmda_policy,
  615. .deinit_sched = deinitialize_dmda_policy,
  616. .add_workers = dmda_add_workers ,
  617. .remove_workers = dmda_remove_workers,
  618. .push_task = dmda_push_task,
  619. .pop_task = dmda_pop_ready_task,
  620. .pre_exec_hook = NULL,
  621. .post_exec_hook = NULL,
  622. .pop_every_task = dmda_pop_every_task,
  623. .policy_name = "dmdar",
  624. .policy_description = "data-aware performance model (ready)"
  625. };