deque_modeling_policy_data_aware.c 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2012 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011, 2012 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 Télécom-SudParis
  6. * Copyright (C) 2011-2012 INRIA
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. /* Distributed queues using performance modeling to assign tasks */
  20. #include <starpu_config.h>
  21. #include <limits.h>
  22. #include <core/perfmodel/perfmodel.h>
  23. #include <core/task_bundle.h>
  24. #include <core/workers.h>
  25. #include <sched_policies/fifo_queues.h>
  26. #include <core/perfmodel/perfmodel.h>
  27. #include <starpu_parameters.h>
  28. #include <core/debug.h>
  29. #ifdef STARPU_USE_TOP
  30. #include <top/starpu_top_core.h>
  31. #endif /* !STARPU_USE_TOP */
  32. #ifndef DBL_MIN
  33. #define DBL_MIN __DBL_MIN__
  34. #endif
  35. #ifndef DBL_MAX
  36. #define DBL_MAX __DBL_MAX__
  37. #endif
  38. struct _starpu_dmda_data
  39. {
  40. double alpha;
  41. double beta;
  42. double _gamma;
  43. double idle_power;
  44. struct _starpu_fifo_taskq **queue_array;
  45. long int total_task_cnt;
  46. long int ready_task_cnt;
  47. };
  48. static double alpha = _STARPU_DEFAULT_ALPHA;
  49. static double beta = _STARPU_DEFAULT_BETA;
  50. static double _gamma = _STARPU_DEFAULT_GAMMA;
  51. static double idle_power = 0.0;
  52. #ifdef STARPU_USE_TOP
  53. static const float alpha_minimum=0;
  54. static const float alpha_maximum=10.0;
  55. static const float beta_minimum=0;
  56. static const float beta_maximum=10.0;
  57. static const float gamma_minimum=0;
  58. static const float gamma_maximum=10000.0;
  59. static const float idle_power_minimum=0;
  60. static const float idle_power_maximum=10000.0;
  61. #endif /* !STARPU_USE_TOP */
  62. static int count_non_ready_buffers(struct starpu_task *task, uint32_t node)
  63. {
  64. int cnt = 0;
  65. unsigned nbuffers = task->cl->nbuffers;
  66. unsigned index;
  67. for (index = 0; index < nbuffers; index++)
  68. {
  69. starpu_data_handle_t handle;
  70. handle = task->handles[index];
  71. int is_valid;
  72. starpu_data_query_status(handle, node, NULL, &is_valid, NULL);
  73. if (!is_valid)
  74. cnt++;
  75. }
  76. return cnt;
  77. }
  78. #ifdef STARPU_USE_TOP
  79. static void param_modified(struct starpu_top_param* d)
  80. {
  81. /* Just to show parameter modification. */
  82. fprintf(stderr,
  83. "%s has been modified : "
  84. "alpha=%f|beta=%f|gamma=%f|idle_power=%f !\n",
  85. d->name, alpha,beta,_gamma, idle_power);
  86. }
  87. #endif /* !STARPU_USE_TOP */
  88. static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo_taskq *fifo_queue, unsigned node)
  89. {
  90. struct starpu_task *task = NULL, *current;
  91. if (fifo_queue->ntasks == 0)
  92. return NULL;
  93. if (fifo_queue->ntasks > 0)
  94. {
  95. fifo_queue->ntasks--;
  96. task = starpu_task_list_back(&fifo_queue->taskq);
  97. if (STARPU_UNLIKELY(!task))
  98. return NULL;
  99. int first_task_priority = task->priority;
  100. current = task;
  101. int non_ready_best = INT_MAX;
  102. while (current)
  103. {
  104. int priority = current->priority;
  105. if (priority <= first_task_priority)
  106. {
  107. int non_ready = count_non_ready_buffers(current, node);
  108. if (non_ready < non_ready_best)
  109. {
  110. non_ready_best = non_ready;
  111. task = current;
  112. if (non_ready == 0)
  113. break;
  114. }
  115. }
  116. current = current->prev;
  117. }
  118. starpu_task_list_erase(&fifo_queue->taskq, task);
  119. _STARPU_TRACE_JOB_POP(task, 0);
  120. }
  121. return task;
  122. }
  123. static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
  124. {
  125. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  126. struct starpu_task *task;
  127. int workerid = starpu_worker_get_id();
  128. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  129. unsigned node = starpu_worker_get_memory_node(workerid);
  130. task = _starpu_fifo_pop_first_ready_task(fifo, node);
  131. if (task)
  132. {
  133. double model = task->predicted;
  134. fifo->exp_len -= model;
  135. fifo->exp_start = starpu_timing_now() + model;
  136. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  137. #ifdef STARPU_VERBOSE
  138. if (task->cl)
  139. {
  140. int non_ready = count_non_ready_buffers(task, node);
  141. if (non_ready == 0)
  142. dt->ready_task_cnt++;
  143. }
  144. dt->total_task_cnt++;
  145. #endif
  146. }
  147. return task;
  148. }
  149. static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
  150. {
  151. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  152. struct starpu_task *task;
  153. int workerid = starpu_worker_get_id();
  154. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  155. task = _starpu_fifo_pop_local_task(fifo);
  156. if (task)
  157. {
  158. double model = task->predicted;
  159. fifo->exp_len -= model;
  160. fifo->exp_start = starpu_timing_now() + model;
  161. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  162. #ifdef STARPU_VERBOSE
  163. if (task->cl)
  164. {
  165. int non_ready = count_non_ready_buffers(task, starpu_worker_get_memory_node(workerid));
  166. if (non_ready == 0)
  167. dt->ready_task_cnt++;
  168. }
  169. dt->total_task_cnt++;
  170. #endif
  171. }
  172. return task;
  173. }
  174. static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
  175. {
  176. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  177. struct starpu_task *new_list;
  178. int workerid = starpu_worker_get_id();
  179. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  180. _starpu_pthread_mutex_t *sched_mutex;
  181. _starpu_pthread_cond_t *sched_cond;
  182. starpu_sched_ctx_get_worker_mutex_and_cond(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
  183. new_list = _starpu_fifo_pop_every_task(fifo, sched_mutex, workerid);
  184. while (new_list)
  185. {
  186. double model = new_list->predicted;
  187. fifo->exp_len -= model;
  188. fifo->exp_start = starpu_timing_now() + model;
  189. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  190. new_list = new_list->next;
  191. }
  192. return new_list;
  193. }
  194. static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
  195. double predicted, double predicted_transfer,
  196. int prio, unsigned sched_ctx_id)
  197. {
  198. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  199. /* make sure someone coule execute that task ! */
  200. STARPU_ASSERT(best_workerid != -1);
  201. struct _starpu_fifo_taskq *fifo = dt->queue_array[best_workerid];
  202. _starpu_pthread_mutex_t *sched_mutex;
  203. _starpu_pthread_cond_t *sched_cond;
  204. starpu_sched_ctx_get_worker_mutex_and_cond(sched_ctx_id, best_workerid, &sched_mutex, &sched_cond);
  205. #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
  206. starpu_call_pushed_task_cb(best_workerid, sched_ctx_id);
  207. #endif //STARPU_USE_SCHED_CTX_HYPERVISOR
  208. _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  209. /* Sometimes workers didn't take the tasks as early as we expected */
  210. fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
  211. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  212. fifo->exp_end += predicted;
  213. fifo->exp_len += predicted;
  214. if (starpu_timing_now() + predicted_transfer < fifo->exp_end)
  215. {
  216. /* We may hope that the transfer will be finished by
  217. * the start of the task. */
  218. predicted_transfer = 0;
  219. }
  220. else
  221. {
  222. /* The transfer will not be finished by then, take the
  223. * remainder into account */
  224. predicted_transfer += starpu_timing_now();
  225. predicted_transfer -= fifo->exp_end;
  226. }
  227. fifo->exp_end += predicted_transfer;
  228. fifo->exp_len += predicted_transfer;
  229. _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  230. task->predicted = predicted;
  231. task->predicted_transfer = predicted_transfer;
  232. #ifdef STARPU_USE_TOP
  233. if (_starpu_top_status_get())
  234. _starpu_top_task_prevision(task, best_workerid,
  235. (unsigned long long)(fifo->exp_end-predicted)/1000,
  236. (unsigned long long)fifo->exp_end/1000);
  237. #endif /* !STARPU_USE_TOP */
  238. if (starpu_get_prefetch_flag())
  239. {
  240. unsigned memory_node = starpu_worker_get_memory_node(best_workerid);
  241. starpu_prefetch_task_input_on_node(task, memory_node);
  242. }
  243. #ifdef HAVE_AYUDAME_H
  244. if (AYU_event)
  245. {
  246. int id = best_workerid;
  247. AYU_event(AYU_ADDTASKTOQUEUE, _starpu_get_job_associated_to_task(task)->job_id, &id);
  248. }
  249. #endif
  250. if (prio)
  251. return _starpu_fifo_push_sorted_task(dt->queue_array[best_workerid],
  252. sched_mutex, sched_cond, task);
  253. else
  254. return _starpu_fifo_push_task(dt->queue_array[best_workerid],
  255. sched_mutex, sched_cond, task);
  256. }
  257. /* TODO: factorize with dmda!! */
  258. static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
  259. {
  260. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  261. unsigned worker, worker_ctx = 0;
  262. int best = -1;
  263. double best_exp_end = 0.0;
  264. double model_best = 0.0;
  265. double transfer_model_best = 0.0;
  266. int ntasks_best = -1;
  267. double ntasks_best_end = 0.0;
  268. int calibrating = 0;
  269. /* A priori, we know all estimations */
  270. int unknown = 0;
  271. unsigned best_impl = 0;
  272. unsigned nimpl;
  273. struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  274. if(workers->init_cursor)
  275. workers->init_cursor(workers);
  276. while(workers->has_next(workers))
  277. {
  278. worker = workers->get_next(workers);
  279. struct _starpu_fifo_taskq *fifo = dt->queue_array[worker];
  280. unsigned memory_node = starpu_worker_get_memory_node(worker);
  281. enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
  282. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  283. {
  284. if (!starpu_worker_can_execute_task(worker, task, nimpl))
  285. {
  286. /* no one on that queue may execute this task */
  287. // worker_ctx++;
  288. continue;
  289. }
  290. double exp_end;
  291. _starpu_pthread_mutex_t *sched_mutex;
  292. _starpu_pthread_cond_t *sched_cond;
  293. starpu_sched_ctx_get_worker_mutex_and_cond(sched_ctx_id, worker, &sched_mutex, &sched_cond);
  294. /* Sometimes workers didn't take the tasks as early as we expected */
  295. _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  296. fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
  297. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  298. _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  299. double local_length = starpu_task_expected_length(task, perf_arch, nimpl);
  300. double local_penalty = starpu_task_expected_data_transfer_time(memory_node, task);
  301. double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
  302. //_STARPU_DEBUG("Scheduler dm: task length (%lf) worker (%u) kernel (%u) \n", local_length,worker,nimpl);
  303. if (ntasks_best == -1
  304. || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better task */
  305. || (!calibrating && isnan(local_length)) /* Not calibrating but this worker is being calibrated */
  306. || (calibrating && isnan(local_length) && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
  307. )
  308. {
  309. ntasks_best_end = ntasks_end;
  310. ntasks_best = worker;
  311. best_impl = nimpl;
  312. }
  313. if (isnan(local_length))
  314. /* we are calibrating, we want to speed-up calibration time
  315. * so we privilege non-calibrated tasks (but still
  316. * greedily distribute them to avoid dumb schedules) */
  317. calibrating = 1;
  318. if (isnan(local_length) || _STARPU_IS_ZERO(local_length))
  319. /* there is no prediction available for that task
  320. * with that arch yet, so switch to a greedy strategy */
  321. unknown = 1;
  322. if (unknown)
  323. continue;
  324. exp_end = fifo->exp_start + fifo->exp_len + local_length;
  325. if (best == -1 || exp_end < best_exp_end)
  326. {
  327. /* a better solution was found */
  328. best_exp_end = exp_end;
  329. best = worker;
  330. model_best = local_length;
  331. transfer_model_best = local_penalty;
  332. best_impl = nimpl;
  333. }
  334. }
  335. worker_ctx++;
  336. }
  337. if (unknown)
  338. {
  339. best = ntasks_best;
  340. model_best = 0.0;
  341. transfer_model_best = 0.0;
  342. }
  343. //_STARPU_DEBUG("Scheduler dm: kernel (%u)\n", best_impl);
  344. if (workers->deinit_cursor)
  345. workers->deinit_cursor(workers);
  346. _starpu_get_job_associated_to_task(task)->nimpl = best_impl;
  347. /* we should now have the best worker in variable "best" */
  348. return push_task_on_best_worker(task, best,
  349. model_best, transfer_model_best, prio, sched_ctx_id);
  350. }
  351. static void compute_all_performance_predictions(struct starpu_task *task,
  352. double local_task_length[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
  353. double exp_end[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
  354. double *max_exp_endp,
  355. double *best_exp_endp,
  356. double local_data_penalty[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
  357. double local_power[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS],
  358. int *forced_worker, int *forced_impl, unsigned sched_ctx_id)
  359. {
  360. int calibrating = 0;
  361. double max_exp_end = DBL_MIN;
  362. double best_exp_end = DBL_MAX;
  363. int ntasks_best = -1;
  364. int nimpl_best = 0;
  365. double ntasks_best_end = 0.0;
  366. /* A priori, we know all estimations */
  367. int unknown = 0;
  368. unsigned worker, worker_ctx = 0;
  369. unsigned nimpl;
  370. starpu_task_bundle_t bundle = task->bundle;
  371. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  372. struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  373. while(workers->has_next(workers))
  374. {
  375. worker = workers->get_next(workers);
  376. struct _starpu_fifo_taskq *fifo = dt->queue_array[worker];
  377. enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
  378. unsigned memory_node = starpu_worker_get_memory_node(worker);
  379. for(nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  380. {
  381. if (!starpu_worker_can_execute_task(worker, task, nimpl))
  382. {
  383. /* no one on that queue may execute this task */
  384. continue;
  385. }
  386. /* Sometimes workers didn't take the tasks as early as we expected */
  387. _starpu_pthread_mutex_t *sched_mutex;
  388. _starpu_pthread_cond_t *sched_cond;
  389. starpu_sched_ctx_get_worker_mutex_and_cond(sched_ctx_id, worker, &sched_mutex, &sched_cond);
  390. _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  391. fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
  392. _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  393. exp_end[worker_ctx][nimpl] = fifo->exp_start + fifo->exp_len;
  394. if (exp_end[worker_ctx][nimpl] > max_exp_end)
  395. max_exp_end = exp_end[worker_ctx][nimpl];
  396. //_STARPU_DEBUG("Scheduler dmda: task length (%lf) worker (%u) kernel (%u) \n", local_task_length[worker][nimpl],worker,nimpl);
  397. if (bundle)
  398. {
  399. /* TODO : conversion time */
  400. local_task_length[worker_ctx][nimpl] = starpu_task_bundle_expected_length(bundle, perf_arch, nimpl);
  401. local_data_penalty[worker_ctx][nimpl] = starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
  402. local_power[worker_ctx][nimpl] = starpu_task_bundle_expected_power(bundle, perf_arch,nimpl);
  403. }
  404. else
  405. {
  406. local_task_length[worker_ctx][nimpl] = starpu_task_expected_length(task, perf_arch, nimpl);
  407. local_data_penalty[worker_ctx][nimpl] = starpu_task_expected_data_transfer_time(memory_node, task);
  408. local_power[worker_ctx][nimpl] = starpu_task_expected_power(task, perf_arch,nimpl);
  409. double conversion_time = starpu_task_expected_conversion_time(task, perf_arch, nimpl);
  410. if (conversion_time > 0.0)
  411. local_task_length[worker_ctx][nimpl] += conversion_time;
  412. }
  413. double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
  414. if (ntasks_best == -1
  415. || (!calibrating && ntasks_end < ntasks_best_end) /* Not calibrating, take better worker */
  416. || (!calibrating && isnan(local_task_length[worker_ctx][nimpl])) /* Not calibrating but this worker is being calibrated */
  417. || (calibrating && isnan(local_task_length[worker_ctx][nimpl]) && ntasks_end < ntasks_best_end) /* Calibrating, compete this worker with other non-calibrated */
  418. )
  419. {
  420. ntasks_best_end = ntasks_end;
  421. ntasks_best = worker;
  422. nimpl_best = nimpl;
  423. }
  424. if (isnan(local_task_length[worker_ctx][nimpl]))
  425. /* we are calibrating, we want to speed-up calibration time
  426. * so we privilege non-calibrated tasks (but still
  427. * greedily distribute them to avoid dumb schedules) */
  428. calibrating = 1;
  429. if (isnan(local_task_length[worker_ctx][nimpl])
  430. || _STARPU_IS_ZERO(local_task_length[worker_ctx][nimpl]))
  431. /* there is no prediction available for that task
  432. * with that arch (yet or at all), so switch to a greedy strategy */
  433. unknown = 1;
  434. if (unknown)
  435. continue;
  436. exp_end[worker_ctx][nimpl] = fifo->exp_start + fifo->exp_len + local_task_length[worker_ctx][nimpl];
  437. if (exp_end[worker_ctx][nimpl] < best_exp_end)
  438. {
  439. /* a better solution was found */
  440. best_exp_end = exp_end[worker_ctx][nimpl];
  441. nimpl_best = nimpl;
  442. }
  443. if (isnan(local_power[worker_ctx][nimpl]))
  444. local_power[worker_ctx][nimpl] = 0.;
  445. }
  446. worker_ctx++;
  447. }
  448. *forced_worker = unknown?ntasks_best:-1;
  449. *forced_impl = unknown?nimpl_best:-1;
  450. *best_exp_endp = best_exp_end;
  451. *max_exp_endp = max_exp_end;
  452. }
  453. static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
  454. {
  455. /* find the queue */
  456. unsigned worker, worker_ctx = 0;
  457. int best = -1, best_in_ctx = -1;
  458. int selected_impl = 0;
  459. double model_best = 0.0;
  460. double transfer_model_best = 0.0;
  461. /* this flag is set if the corresponding worker is selected because
  462. there is no performance prediction available yet */
  463. int forced_best = -1;
  464. int forced_impl = -1;
  465. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  466. struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  467. unsigned nworkers_ctx = workers->nworkers;
  468. double local_task_length[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
  469. double local_data_penalty[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
  470. double local_power[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
  471. double exp_end[STARPU_NMAXWORKERS][STARPU_MAXIMPLEMENTATIONS];
  472. double max_exp_end = 0.0;
  473. double best_exp_end;
  474. double fitness[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
  475. if(workers->init_cursor)
  476. workers->init_cursor(workers);
  477. compute_all_performance_predictions(task,
  478. local_task_length,
  479. exp_end,
  480. &max_exp_end,
  481. &best_exp_end,
  482. local_data_penalty,
  483. local_power,
  484. &forced_best,
  485. &forced_impl, sched_ctx_id);
  486. double best_fitness = -1;
  487. unsigned nimpl;
  488. if (forced_best == -1)
  489. {
  490. while(workers->has_next(workers))
  491. {
  492. worker = workers->get_next(workers);
  493. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  494. {
  495. if (!starpu_worker_can_execute_task(worker, task, nimpl))
  496. {
  497. /* no one on that queue may execute this task */
  498. continue;
  499. }
  500. fitness[worker_ctx][nimpl] = dt->alpha*(exp_end[worker_ctx][nimpl] - best_exp_end)
  501. + dt->beta*(local_data_penalty[worker_ctx][nimpl])
  502. + dt->_gamma*(local_power[worker_ctx][nimpl]);
  503. if (exp_end[worker_ctx][nimpl] > max_exp_end)
  504. {
  505. /* This placement will make the computation
  506. * longer, take into account the idle
  507. * consumption of other cpus */
  508. fitness[worker_ctx][nimpl] += dt->_gamma * dt->idle_power * (exp_end[worker_ctx][nimpl] - max_exp_end) / 1000000.0;
  509. }
  510. if (best == -1 || fitness[worker_ctx][nimpl] < best_fitness)
  511. {
  512. /* we found a better solution */
  513. best_fitness = fitness[worker_ctx][nimpl];
  514. best = worker;
  515. best_in_ctx = worker_ctx;
  516. selected_impl = nimpl;
  517. //_STARPU_DEBUG("best fitness (worker %d) %e = alpha*(%e) + beta(%e) +gamma(%e)\n", worker, best_fitness, exp_end[worker][nimpl] - best_exp_end, local_data_penalty[worker][nimpl], local_power[worker][nimpl]);
  518. }
  519. }
  520. worker_ctx++;
  521. }
  522. }
  523. STARPU_ASSERT(forced_best != -1 || best != -1);
  524. if (forced_best != -1)
  525. {
  526. /* there is no prediction available for that task
  527. * with that arch we want to speed-up calibration time
  528. * so we force this measurement */
  529. best = forced_best;
  530. model_best = 0.0;
  531. transfer_model_best = 0.0;
  532. }
  533. else if (task->bundle)
  534. {
  535. enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(best_in_ctx);
  536. unsigned memory_node = starpu_worker_get_memory_node(best);
  537. model_best = starpu_task_expected_length(task, perf_arch, selected_impl);
  538. transfer_model_best = starpu_task_expected_data_transfer_time(memory_node, task);
  539. }
  540. else
  541. {
  542. model_best = local_task_length[best_in_ctx][selected_impl];
  543. transfer_model_best = local_data_penalty[best_in_ctx][selected_impl];
  544. }
  545. if (task->bundle)
  546. starpu_task_bundle_remove(task->bundle, task);
  547. if (workers->deinit_cursor)
  548. workers->deinit_cursor(workers);
  549. //_STARPU_DEBUG("Scheduler dmda: kernel (%u)\n", best_impl);
  550. _starpu_get_job_associated_to_task(task)->nimpl = selected_impl;
  551. /* we should now have the best worker in variable "best" */
  552. return push_task_on_best_worker(task, best, model_best, transfer_model_best, prio, sched_ctx_id);
  553. }
  554. static int dmda_push_sorted_task(struct starpu_task *task)
  555. {
  556. #ifdef STARPU_DEVEL
  557. #warning TODO: after defining a scheduling window, use that instead of empty_ctx_tasks
  558. #endif
  559. unsigned sched_ctx_id = task->sched_ctx;
  560. _starpu_pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
  561. unsigned nworkers;
  562. int ret_val = -1;
  563. _STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
  564. nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
  565. if(nworkers == 0)
  566. {
  567. _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
  568. return ret_val;
  569. }
  570. ret_val = _dmda_push_task(task, 1, sched_ctx_id);
  571. _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
  572. return ret_val;
  573. }
  574. static int dm_push_task(struct starpu_task *task)
  575. {
  576. unsigned sched_ctx_id = task->sched_ctx;
  577. _starpu_pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
  578. unsigned nworkers;
  579. int ret_val = -1;
  580. _STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
  581. nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
  582. if(nworkers == 0)
  583. {
  584. _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
  585. return ret_val;
  586. }
  587. ret_val = _dm_push_task(task, 0, sched_ctx_id);
  588. _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
  589. return ret_val;
  590. }
  591. static int dmda_push_task(struct starpu_task *task)
  592. {
  593. unsigned sched_ctx_id = task->sched_ctx;
  594. _starpu_pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);
  595. unsigned nworkers;
  596. int ret_val = -1;
  597. _STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
  598. nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
  599. if(nworkers == 0)
  600. {
  601. _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
  602. return ret_val;
  603. }
  604. STARPU_ASSERT(task);
  605. ret_val = _dmda_push_task(task, 0, sched_ctx_id);
  606. _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
  607. return ret_val;
  608. }
  609. static void dmda_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  610. {
  611. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  612. int workerid;
  613. unsigned i;
  614. for (i = 0; i < nworkers; i++)
  615. {
  616. workerid = workerids[i];
  617. /* if the worker has alreadry belonged to this context
  618. the queue and the synchronization variables have been already initialized */
  619. if(dt->queue_array[workerid] ==NULL)
  620. {
  621. dt->queue_array[workerid] = _starpu_create_fifo();
  622. starpu_sched_ctx_init_worker_mutex_and_cond(sched_ctx_id, workerid);
  623. }
  624. }
  625. }
  626. static void dmda_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  627. {
  628. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  629. int workerid;
  630. unsigned i;
  631. for (i = 0; i < nworkers; i++)
  632. {
  633. workerid = workerids[i];
  634. _starpu_destroy_fifo(dt->queue_array[workerid]);
  635. dt->queue_array[workerid] = NULL;
  636. starpu_sched_ctx_deinit_worker_mutex_and_cond(sched_ctx_id, workerid);
  637. }
  638. }
  639. static void initialize_dmda_policy(unsigned sched_ctx_id)
  640. {
  641. starpu_sched_ctx_create_worker_collection(sched_ctx_id, WORKER_LIST);
  642. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)malloc(sizeof(struct _starpu_dmda_data));
  643. dt->alpha = _STARPU_DEFAULT_ALPHA;
  644. dt->beta = _STARPU_DEFAULT_BETA;
  645. dt->_gamma = _STARPU_DEFAULT_GAMMA;
  646. dt->idle_power = 0.0;
  647. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)dt);
  648. dt->queue_array = (struct _starpu_fifo_taskq**)malloc(STARPU_NMAXWORKERS*sizeof(struct _starpu_fifo_taskq*));
  649. int i;
  650. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  651. dt->queue_array[i] = NULL;
  652. const char *strval_alpha = getenv("STARPU_SCHED_ALPHA");
  653. if (strval_alpha)
  654. dt->alpha = atof(strval_alpha);
  655. const char *strval_beta = getenv("STARPU_SCHED_BETA");
  656. if (strval_beta)
  657. dt->beta = atof(strval_beta);
  658. const char *strval_gamma = getenv("STARPU_SCHED_GAMMA");
  659. if (strval_gamma)
  660. dt->_gamma = atof(strval_gamma);
  661. const char *strval_idle_power = getenv("STARPU_IDLE_POWER");
  662. if (strval_idle_power)
  663. dt->idle_power = atof(strval_idle_power);
  664. #ifdef STARPU_USE_TOP
  665. starpu_top_register_parameter_float("DMDA_ALPHA", &alpha,
  666. alpha_minimum, alpha_maximum, param_modified);
  667. starpu_top_register_parameter_float("DMDA_BETA", &beta,
  668. beta_minimum, beta_maximum, param_modified);
  669. starpu_top_register_parameter_float("DMDA_GAMMA", &_gamma,
  670. gamma_minimum, gamma_maximum, param_modified);
  671. starpu_top_register_parameter_float("DMDA_IDLE_POWER", &idle_power,
  672. idle_power_minimum, idle_power_maximum, param_modified);
  673. #endif /* !STARPU_USE_TOP */
  674. }
  675. static void initialize_dmda_sorted_policy(unsigned sched_ctx_id)
  676. {
  677. initialize_dmda_policy(sched_ctx_id);
  678. /* The application may use any integer */
  679. starpu_sched_set_min_priority(INT_MIN);
  680. starpu_sched_set_max_priority(INT_MAX);
  681. }
  682. static void deinitialize_dmda_policy(unsigned sched_ctx_id)
  683. {
  684. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  685. free(dt->queue_array);
  686. free(dt);
  687. starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
  688. _STARPU_DEBUG("total_task_cnt %ld ready_task_cnt %ld -> %f\n", dt->total_task_cnt, dt->ready_task_cnt, (100.0f*dt->ready_task_cnt)/dt->total_task_cnt);
  689. }
  690. /* dmda_pre_exec_hook is called right after the data transfer is done and right
  691. * before the computation to begin, it is useful to update more precisely the
  692. * value of the expected start, end, length, etc... */
  693. static void dmda_pre_exec_hook(struct starpu_task *task)
  694. {
  695. unsigned sched_ctx_id = task->sched_ctx;
  696. int workerid = starpu_worker_get_id();
  697. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  698. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  699. double model = task->predicted;
  700. double transfer_model = task->predicted_transfer;
  701. _starpu_pthread_mutex_t *sched_mutex;
  702. _starpu_pthread_cond_t *sched_cond;
  703. starpu_sched_ctx_get_worker_mutex_and_cond(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
  704. /* Once the task is executing, we can update the predicted amount
  705. * of work. */
  706. _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  707. fifo->exp_len-= transfer_model;
  708. fifo->exp_start = starpu_timing_now() + model;
  709. fifo->exp_end= fifo->exp_start + fifo->exp_len;
  710. _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  711. }
  712. static void dmda_push_task_notify(struct starpu_task *task, int workerid, unsigned sched_ctx_id)
  713. {
  714. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  715. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  716. /* Compute the expected penality */
  717. enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(workerid);
  718. unsigned memory_node = starpu_worker_get_memory_node(workerid);
  719. double predicted = starpu_task_expected_length(task, perf_arch,
  720. _starpu_get_job_associated_to_task(task)->nimpl);
  721. double predicted_transfer = starpu_task_expected_data_transfer_time(memory_node, task);
  722. _starpu_pthread_mutex_t *sched_mutex;
  723. _starpu_pthread_cond_t *sched_cond;
  724. starpu_sched_ctx_get_worker_mutex_and_cond(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
  725. /* Update the predictions */
  726. _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
  727. /* Sometimes workers didn't take the tasks as early as we expected */
  728. fifo->exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
  729. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  730. /* If there is no prediction available, we consider the task has a null length */
  731. if (!isnan(predicted))
  732. {
  733. task->predicted = predicted;
  734. fifo->exp_end += predicted;
  735. fifo->exp_len += predicted;
  736. }
  737. /* If there is no prediction available, we consider the task has a null length */
  738. if (!isnan(predicted_transfer))
  739. {
  740. if (starpu_timing_now() + predicted_transfer < fifo->exp_end)
  741. {
  742. /* We may hope that the transfer will be finished by
  743. * the start of the task. */
  744. predicted_transfer = 0;
  745. }
  746. else
  747. {
  748. /* The transfer will not be finished by then, take the
  749. * remainder into account */
  750. predicted_transfer = (starpu_timing_now() + predicted_transfer) - fifo->exp_end;
  751. }
  752. task->predicted_transfer = predicted_transfer;
  753. fifo->exp_end += predicted_transfer;
  754. fifo->exp_len += predicted_transfer;
  755. }
  756. fifo->ntasks++;
  757. _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
  758. }
  759. /* TODO: use post_exec_hook to fix the expected start */
  760. struct starpu_sched_policy _starpu_sched_dm_policy =
  761. {
  762. .init_sched = initialize_dmda_policy,
  763. .deinit_sched = deinitialize_dmda_policy,
  764. .add_workers = dmda_add_workers ,
  765. .remove_workers = dmda_remove_workers,
  766. .push_task = dm_push_task,
  767. .pop_task = dmda_pop_task,
  768. .pre_exec_hook = NULL,
  769. .post_exec_hook = NULL,
  770. .pop_every_task = dmda_pop_every_task,
  771. .policy_name = "dm",
  772. .policy_description = "performance model"
  773. };
  774. struct starpu_sched_policy _starpu_sched_dmda_policy =
  775. {
  776. .init_sched = initialize_dmda_policy,
  777. .deinit_sched = deinitialize_dmda_policy,
  778. .add_workers = dmda_add_workers ,
  779. .remove_workers = dmda_remove_workers,
  780. .push_task = dmda_push_task,
  781. .push_task_notify = dmda_push_task_notify,
  782. .pop_task = dmda_pop_task,
  783. .pre_exec_hook = dmda_pre_exec_hook,
  784. .post_exec_hook = NULL,
  785. .pop_every_task = dmda_pop_every_task,
  786. .policy_name = "dmda",
  787. .policy_description = "data-aware performance model"
  788. };
  789. struct starpu_sched_policy _starpu_sched_dmda_sorted_policy =
  790. {
  791. .init_sched = initialize_dmda_sorted_policy,
  792. .deinit_sched = deinitialize_dmda_policy,
  793. .add_workers = dmda_add_workers ,
  794. .remove_workers = dmda_remove_workers,
  795. .push_task = dmda_push_sorted_task,
  796. .push_task_notify = dmda_push_task_notify,
  797. .pop_task = dmda_pop_ready_task,
  798. .pre_exec_hook = dmda_pre_exec_hook,
  799. .post_exec_hook = NULL,
  800. .pop_every_task = dmda_pop_every_task,
  801. .policy_name = "dmdas",
  802. .policy_description = "data-aware performance model (sorted)"
  803. };
  804. struct starpu_sched_policy _starpu_sched_dmda_ready_policy =
  805. {
  806. .init_sched = initialize_dmda_policy,
  807. .deinit_sched = deinitialize_dmda_policy,
  808. .add_workers = dmda_add_workers ,
  809. .remove_workers = dmda_remove_workers,
  810. .push_task = dmda_push_task,
  811. .push_task_notify = dmda_push_task_notify,
  812. .pop_task = dmda_pop_ready_task,
  813. .pre_exec_hook = dmda_pre_exec_hook,
  814. .post_exec_hook = NULL,
  815. .pop_every_task = dmda_pop_every_task,
  816. .policy_name = "dmdar",
  817. .policy_description = "data-aware performance model (ready)"
  818. };