deque_modeling_policy_data_aware.c 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011-2017 Inria
  4. * Copyright (C) 2009-2020 Université de Bordeaux
  5. * Copyright (C) 2013 Joris Pablo
  6. * Copyright (C) 2010-2019 CNRS
  7. * Copyright (C) 2013 Simon Archipoff
  8. * Copyright (C) 2013 Thibaut Lambert
  9. * Copyright (C) 2011 Télécom-SudParis
  10. * Copyright (C) 2016 Uppsala University
  11. *
  12. * StarPU is free software; you can redistribute it and/or modify
  13. * it under the terms of the GNU Lesser General Public License as published by
  14. * the Free Software Foundation; either version 2.1 of the License, or (at
  15. * your option) any later version.
  16. *
  17. * StarPU is distributed in the hope that it will be useful, but
  18. * WITHOUT ANY WARRANTY; without even the implied warranty of
  19. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  20. *
  21. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  22. */
  23. /* Distributed queues using performance modeling to assign tasks */
  24. #include <starpu_config.h>
  25. #include <starpu_scheduler.h>
  26. #include <common/fxt.h>
  27. #include <core/task.h>
  28. #include <core/workers.h>
  29. #include <core/sched_policy.h>
  30. #include <core/debug.h>
  31. #include <sched_policies/fifo_queues.h>
  32. #include <limits.h>
  33. #include <math.h> /* for fpclassify() checks on knob values */
  34. #ifndef DBL_MIN
  35. #define DBL_MIN __DBL_MIN__
  36. #endif
  37. #ifndef DBL_MAX
  38. #define DBL_MAX __DBL_MAX__
  39. #endif
  40. //#define NOTIFY_READY_SOON
  41. struct _starpu_dmda_data
  42. {
  43. double alpha;
  44. double beta;
  45. double _gamma;
  46. double idle_power;
  47. struct _starpu_fifo_taskq **queue_array;
  48. long int total_task_cnt;
  49. long int ready_task_cnt;
  50. long int eager_task_cnt; /* number of tasks scheduled without model */
  51. int num_priorities;
  52. };
  53. /* performance steering knobs */
  54. /* . per-scheduler knobs */
  55. static int __s_alpha_knob;
  56. static int __s_beta_knob;
  57. static int __s_gamma_knob;
  58. static int __s_idle_power_knob;
  59. /* . knob variables */
  60. static double __s_alpha__value = 1.0;
  61. static double __s_beta__value = 1.0;
  62. static double __s_gamma__value = 1.0;
  63. static double __s_idle_power__value = 1.0;
  64. /* . per-scheduler knob group */
  65. static struct starpu_perf_knob_group * __kg_starpu_dmda__per_scheduler;
  66. static void sched_knobs__set(const struct starpu_perf_knob * const knob, void *context, const struct starpu_perf_knob_value * const value)
  67. {
  68. const char * const sched_policy_name = *(const char **)context;
  69. (void) sched_policy_name;
  70. if (knob->id == __s_alpha_knob)
  71. {
  72. STARPU_ASSERT(fpclassify(value->val_double) == FP_NORMAL);
  73. __s_alpha__value = value->val_double;
  74. }
  75. else if (knob->id == __s_beta_knob)
  76. {
  77. STARPU_ASSERT(fpclassify(value->val_double) == FP_NORMAL);
  78. __s_beta__value = value->val_double;
  79. }
  80. else if (knob->id == __s_gamma_knob)
  81. {
  82. STARPU_ASSERT(fpclassify(value->val_double) == FP_NORMAL);
  83. __s_gamma__value = value->val_double;
  84. }
  85. else if (knob->id == __s_idle_power_knob)
  86. {
  87. STARPU_ASSERT(fpclassify(value->val_double) == FP_NORMAL);
  88. __s_idle_power__value = value->val_double;
  89. }
  90. else
  91. {
  92. STARPU_ASSERT(0);
  93. abort();
  94. }
  95. }
  96. static void sched_knobs__get(const struct starpu_perf_knob * const knob, void *context, struct starpu_perf_knob_value * const value)
  97. {
  98. const char * const sched_policy_name = *(const char **)context;
  99. (void) sched_policy_name;
  100. if (knob->id == __s_alpha_knob)
  101. {
  102. value->val_double = __s_alpha__value;
  103. }
  104. else if (knob->id == __s_beta_knob)
  105. {
  106. value->val_double = __s_beta__value;
  107. }
  108. else if (knob->id == __s_gamma_knob)
  109. {
  110. value->val_double = __s_gamma__value;
  111. }
  112. else if (knob->id == __s_idle_power_knob)
  113. {
  114. value->val_double = __s_idle_power__value;
  115. }
  116. else
  117. {
  118. STARPU_ASSERT(0);
  119. abort();
  120. }
  121. }
  122. void _starpu__dmda_c__register_knobs(void)
  123. {
  124. {
  125. const enum starpu_perf_knob_scope scope = starpu_perf_knob_scope_per_scheduler;
  126. __kg_starpu_dmda__per_scheduler = _starpu_perf_knob_group_register(scope, sched_knobs__set, sched_knobs__get);
  127. /* TODO: priority capping knobs actually work globally for now, the sched policy name is ignored */
  128. __STARPU_PERF_KNOB_REG("starpu.dmda", __kg_starpu_dmda__per_scheduler, s_alpha_knob, double, "alpha constant multiplier");
  129. __STARPU_PERF_KNOB_REG("starpu.dmda", __kg_starpu_dmda__per_scheduler, s_beta_knob, double, "beta constant multiplier");
  130. __STARPU_PERF_KNOB_REG("starpu.dmda", __kg_starpu_dmda__per_scheduler, s_gamma_knob, double, "gamma constant multiplier");
  131. __STARPU_PERF_KNOB_REG("starpu.dmda", __kg_starpu_dmda__per_scheduler, s_idle_power_knob, double, "idle_power constant multiplier");
  132. }
  133. }
  134. void _starpu__dmda_c__unregister_knobs(void)
  135. {
  136. _starpu_perf_knob_group_unregister(__kg_starpu_dmda__per_scheduler);
  137. __kg_starpu_dmda__per_scheduler = NULL;
  138. }
  139. /* The dmda scheduling policy uses
  140. *
  141. * alpha * T_computation + beta * T_communication + gamma * Consumption
  142. *
  143. * Here are the default values of alpha, beta, gamma
  144. */
  145. #define _STARPU_SCHED_ALPHA_DEFAULT 1.0
  146. #define _STARPU_SCHED_BETA_DEFAULT 1.0
  147. #define _STARPU_SCHED_GAMMA_DEFAULT 1000.0
  148. /* This is called when a transfer request is actually pushed to the worker */
  149. static void _starpu_fifo_task_transfer_started(struct _starpu_fifo_taskq *fifo, struct starpu_task *task, int num_priorities)
  150. {
  151. double transfer_model = task->predicted_transfer;
  152. if (isnan(transfer_model))
  153. return;
  154. /* We now start the transfer, move it from predicted to pipelined */
  155. fifo->exp_len -= transfer_model;
  156. fifo->pipeline_len += transfer_model;
  157. fifo->exp_start = starpu_timing_now() + fifo->pipeline_len;
  158. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  159. if(num_priorities != -1)
  160. {
  161. int i;
  162. int task_prio = _starpu_normalize_prio(task->priority, num_priorities, task->sched_ctx);
  163. for(i = 0; i <= task_prio; i++)
  164. fifo->exp_len_per_priority[i] -= transfer_model;
  165. }
  166. }
  167. /* This is called when a task is actually pushed to the worker (i.e. the transfer finished */
  168. static void _starpu_fifo_task_started(struct _starpu_fifo_taskq *fifo, struct starpu_task *task, int num_priorities)
  169. {
  170. double model = task->predicted;
  171. double transfer_model = task->predicted_transfer;
  172. if(!isnan(transfer_model))
  173. /* The transfer is over, remove it from pipelined */
  174. fifo->pipeline_len -= transfer_model;
  175. if(!isnan(model))
  176. {
  177. /* We now start the computation, move it from predicted to pipelined */
  178. fifo->exp_len -= model;
  179. fifo->pipeline_len += model;
  180. fifo->exp_start = starpu_timing_now() + fifo->pipeline_len;
  181. fifo->exp_end= fifo->exp_start + fifo->exp_len;
  182. if(num_priorities != -1)
  183. {
  184. int i;
  185. int task_prio = _starpu_normalize_prio(task->priority, num_priorities, task->sched_ctx);
  186. for(i = 0; i <= task_prio; i++)
  187. fifo->exp_len_per_priority[i] -= model;
  188. }
  189. }
  190. }
  191. /* This is called when a task is actually finished */
  192. static void _starpu_fifo_task_finished(struct _starpu_fifo_taskq *fifo, struct starpu_task *task, int num_priorities STARPU_ATTRIBUTE_UNUSED)
  193. {
  194. if(!isnan(task->predicted))
  195. /* The execution is over, remove it from pipelined */
  196. fifo->pipeline_len -= task->predicted;
  197. fifo->exp_start = STARPU_MAX(starpu_timing_now() + fifo->pipeline_len, fifo->exp_start);
  198. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  199. }
  200. static struct starpu_task *_dmda_pop_task(unsigned sched_ctx_id, int ready)
  201. {
  202. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  203. struct starpu_task *task;
  204. unsigned workerid = starpu_worker_get_id_check();
  205. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  206. /* Take the opportunity to update start time */
  207. fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start);
  208. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  209. STARPU_ASSERT_MSG(fifo, "worker %u does not belong to ctx %u anymore.\n", workerid, sched_ctx_id);
  210. if (ready)
  211. task = _starpu_fifo_pop_first_ready_task(fifo, workerid, dt->num_priorities);
  212. else
  213. task = _starpu_fifo_pop_local_task(fifo);
  214. if (task)
  215. {
  216. _starpu_fifo_task_transfer_started(fifo, task, dt->num_priorities);
  217. starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
  218. #ifdef STARPU_VERBOSE
  219. if (task->cl)
  220. {
  221. int non_ready = _starpu_count_non_ready_buffers(task, workerid);
  222. if (non_ready == 0)
  223. dt->ready_task_cnt++;
  224. }
  225. dt->total_task_cnt++;
  226. #endif
  227. }
  228. return task;
  229. }
  230. static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
  231. {
  232. return _dmda_pop_task(sched_ctx_id, 1);
  233. }
  234. static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
  235. {
  236. return _dmda_pop_task(sched_ctx_id, 0);
  237. }
  238. static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
  239. {
  240. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  241. struct starpu_task *new_list, *task;
  242. unsigned workerid = starpu_worker_get_id_check();
  243. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  244. /* Take the opportunity to update start time */
  245. fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start);
  246. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  247. starpu_worker_lock_self();
  248. new_list = _starpu_fifo_pop_every_task(fifo, workerid);
  249. starpu_worker_unlock_self();
  250. starpu_sched_ctx_list_task_counters_reset(sched_ctx_id, workerid);
  251. for (task = new_list; task; task = task->next)
  252. _starpu_fifo_task_transfer_started(fifo, task, dt->num_priorities);
  253. return new_list;
  254. }
  255. static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
  256. double predicted, double predicted_transfer,
  257. int prio, unsigned sched_ctx_id)
  258. {
  259. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  260. /* make sure someone could execute that task ! */
  261. STARPU_ASSERT(best_workerid != -1);
  262. if (_starpu_get_nsched_ctxs() > 1)
  263. {
  264. starpu_worker_relax_on();
  265. _starpu_sched_ctx_lock_write(sched_ctx_id);
  266. starpu_worker_relax_off();
  267. if (_starpu_sched_ctx_worker_is_master_for_child_ctx(sched_ctx_id, best_workerid, task))
  268. task = NULL;
  269. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  270. if (!task)
  271. return 0;
  272. }
  273. struct _starpu_fifo_taskq *fifo = dt->queue_array[best_workerid];
  274. double now = starpu_timing_now();
  275. #ifdef STARPU_USE_SC_HYPERVISOR
  276. starpu_sched_ctx_call_pushed_task_cb(best_workerid, sched_ctx_id);
  277. #endif //STARPU_USE_SC_HYPERVISOR
  278. starpu_worker_lock(best_workerid);
  279. /* Sometimes workers didn't take the tasks as early as we expected */
  280. fifo->exp_start = isnan(fifo->exp_start) ? now + fifo->pipeline_len : STARPU_MAX(fifo->exp_start, now);
  281. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  282. if ((now + predicted_transfer) < fifo->exp_end)
  283. {
  284. /* We may hope that the transfer will be finished by
  285. * the start of the task. */
  286. predicted_transfer = 0.0;
  287. }
  288. else
  289. {
  290. /* The transfer will not be finished by then, take the
  291. * remainder into account */
  292. predicted_transfer = (now + predicted_transfer) - fifo->exp_end;
  293. }
  294. if(!isnan(predicted_transfer))
  295. {
  296. fifo->exp_len += predicted_transfer;
  297. if(dt->num_priorities != -1)
  298. {
  299. int i;
  300. int task_prio = _starpu_normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
  301. for(i = 0; i <= task_prio; i++)
  302. fifo->exp_len_per_priority[i] += predicted_transfer;
  303. }
  304. }
  305. if(!isnan(predicted))
  306. {
  307. fifo->exp_len += predicted;
  308. if(dt->num_priorities != -1)
  309. {
  310. int i;
  311. int task_prio = _starpu_normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
  312. for(i = 0; i <= task_prio; i++)
  313. fifo->exp_len_per_priority[i] += predicted;
  314. }
  315. }
  316. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  317. starpu_worker_unlock(best_workerid);
  318. task->predicted = predicted;
  319. task->predicted_transfer = predicted_transfer;
  320. if (starpu_get_prefetch_flag())
  321. starpu_prefetch_task_input_for(task, best_workerid);
  322. STARPU_AYU_ADDTOTASKQUEUE(starpu_task_get_job_id(task), best_workerid);
  323. if (_starpu_get_nsched_ctxs() > 1)
  324. {
  325. unsigned stream_ctx_id = starpu_worker_get_sched_ctx_id_stream(best_workerid);
  326. if(stream_ctx_id != STARPU_NMAX_SCHED_CTXS)
  327. {
  328. starpu_worker_relax_on();
  329. _starpu_sched_ctx_lock_write(sched_ctx_id);
  330. starpu_worker_relax_off();
  331. starpu_sched_ctx_move_task_to_ctx_locked(task, stream_ctx_id, 0);
  332. starpu_sched_ctx_revert_task_counters_ctx_locked(sched_ctx_id, task->flops);
  333. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  334. }
  335. }
  336. int ret = 0;
  337. if (prio)
  338. {
  339. starpu_worker_lock(best_workerid);
  340. ret =_starpu_fifo_push_sorted_task(dt->queue_array[best_workerid], task);
  341. if(dt->num_priorities != -1)
  342. {
  343. int i;
  344. int task_prio = _starpu_normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
  345. for(i = 0; i <= task_prio; i++)
  346. dt->queue_array[best_workerid]->ntasks_per_priority[i]++;
  347. }
  348. #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
  349. starpu_wake_worker_locked(best_workerid);
  350. #endif
  351. starpu_push_task_end(task);
  352. starpu_worker_unlock(best_workerid);
  353. }
  354. else
  355. {
  356. starpu_worker_lock(best_workerid);
  357. starpu_task_list_push_back (&dt->queue_array[best_workerid]->taskq, task);
  358. dt->queue_array[best_workerid]->ntasks++;
  359. dt->queue_array[best_workerid]->nprocessed++;
  360. #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
  361. starpu_wake_worker_locked(best_workerid);
  362. #endif
  363. starpu_push_task_end(task);
  364. starpu_worker_unlock(best_workerid);
  365. }
  366. starpu_sched_ctx_list_task_counters_increment(sched_ctx_id, best_workerid);
  367. return ret;
  368. }
  369. /* TODO: factorize with dmda!! */
  370. static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
  371. {
  372. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  373. int best = -1;
  374. double best_exp_end = 0.0;
  375. double model_best = 0.0;
  376. double transfer_model_best = 0.0;
  377. int ntasks_best = -1;
  378. double ntasks_best_end = 0.0;
  379. int calibrating = 0;
  380. /* A priori, we know all estimations */
  381. int unknown = 0;
  382. unsigned best_impl = 0;
  383. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  384. struct starpu_sched_ctx_iterator it;
  385. double now = starpu_timing_now();
  386. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  387. while(workers->has_next(workers, &it))
  388. {
  389. unsigned nimpl;
  390. unsigned impl_mask;
  391. unsigned worker = workers->get_next(workers, &it);
  392. struct _starpu_fifo_taskq *fifo = dt->queue_array[worker];
  393. struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(worker, sched_ctx_id);
  394. /* Sometimes workers didn't take the tasks as early as we expected */
  395. double exp_start = isnan(fifo->exp_start) ? now + fifo->pipeline_len : STARPU_MAX(fifo->exp_start, now);
  396. if (!starpu_worker_can_execute_task_impl(worker, task, &impl_mask))
  397. continue;
  398. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  399. {
  400. if (!(impl_mask & (1U << nimpl)))
  401. {
  402. /* no one on that queue may execute this task */
  403. continue;
  404. }
  405. double exp_end;
  406. double local_length = starpu_task_expected_length(task, perf_arch, nimpl);
  407. double local_penalty = starpu_task_expected_data_transfer_time_for(task, worker);
  408. double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
  409. //_STARPU_DEBUG("Scheduler dm: task length (%lf) worker (%u) kernel (%u) \n", local_length,worker,nimpl);
  410. /*
  411. * This implements a default greedy scheduler for the
  412. * case of tasks which have no performance model, or
  413. * whose performance model is not calibrated yet.
  414. *
  415. * It simply uses the number of tasks already pushed to
  416. * the workers, divided by the relative performance of
  417. * a CPU and of a GPU.
  418. *
  419. * This is always computed, but the ntasks_best
  420. * selection is only really used if the task indeed has
  421. * no performance model, or is not calibrated yet.
  422. */
  423. if (ntasks_best == -1
  424. /* Always compute the greedy decision, at least for
  425. * the tasks with no performance model. */
  426. || (!calibrating && ntasks_end < ntasks_best_end)
  427. /* The performance model of this task is not
  428. * calibrated on this worker, try to run it there
  429. * to calibrate it there. */
  430. || (!calibrating && isnan(local_length))
  431. /* the performance model of this task is not
  432. * calibrated on this worker either, rather run it
  433. * there if this one is low on scheduled tasks. */
  434. || (calibrating && isnan(local_length) && ntasks_end < ntasks_best_end)
  435. )
  436. {
  437. ntasks_best_end = ntasks_end;
  438. ntasks_best = worker;
  439. best_impl = nimpl;
  440. }
  441. if (isnan(local_length))
  442. {
  443. /* we are calibrating, we want to speed-up calibration time
  444. * so we privilege non-calibrated tasks (but still
  445. * greedily distribute them to avoid dumb schedules) */
  446. static int warned;
  447. if (!warned)
  448. {
  449. warned = 1;
  450. _STARPU_DISP("Warning: performance model for %s not finished calibrating on worker %u, using a dumb scheduling heuristic for now\n", starpu_task_get_name(task), worker);
  451. }
  452. calibrating = 1;
  453. }
  454. if (isnan(local_length) || _STARPU_IS_ZERO(local_length))
  455. /* there is no prediction available for that task
  456. * with that arch yet, so switch to a greedy strategy */
  457. unknown = 1;
  458. if (unknown)
  459. continue;
  460. exp_end = exp_start + fifo->exp_len + local_length;
  461. if (best == -1 || exp_end < best_exp_end)
  462. {
  463. /* a better solution was found */
  464. best_exp_end = exp_end;
  465. best = worker;
  466. model_best = local_length;
  467. transfer_model_best = local_penalty;
  468. best_impl = nimpl;
  469. }
  470. }
  471. }
  472. if (unknown)
  473. {
  474. best = ntasks_best;
  475. model_best = 0.0;
  476. transfer_model_best = 0.0;
  477. #ifdef STARPU_VERBOSE
  478. dt->eager_task_cnt++;
  479. #endif
  480. }
  481. //_STARPU_DEBUG("Scheduler dm: kernel (%u)\n", best_impl);
  482. starpu_task_set_implementation(task, best_impl);
  483. starpu_sched_task_break(task);
  484. /* we should now have the best worker in variable "best" */
  485. return push_task_on_best_worker(task, best,
  486. model_best, transfer_model_best, prio, sched_ctx_id);
  487. }
  488. /* TODO: factorise CPU computations, expensive with a lot of cores */
  489. static void compute_all_performance_predictions(struct starpu_task *task,
  490. unsigned nworkers,
  491. double local_task_length[nworkers][STARPU_MAXIMPLEMENTATIONS],
  492. double exp_end[nworkers][STARPU_MAXIMPLEMENTATIONS],
  493. double *max_exp_endp,
  494. double *best_exp_endp,
  495. double local_data_penalty[nworkers][STARPU_MAXIMPLEMENTATIONS],
  496. double local_energy[nworkers][STARPU_MAXIMPLEMENTATIONS],
  497. int *forced_worker, int *forced_impl, unsigned sched_ctx_id, unsigned sorted_decision)
  498. {
  499. int calibrating = 0;
  500. double max_exp_end = DBL_MIN;
  501. double best_exp_end = DBL_MAX;
  502. int ntasks_best = -1;
  503. int nimpl_best = 0;
  504. double ntasks_best_end = 0.0;
  505. /* A priori, we know all estimations */
  506. int unknown = 0;
  507. unsigned worker_ctx = 0;
  508. int task_prio = 0;
  509. starpu_task_bundle_t bundle = task->bundle;
  510. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  511. if(sorted_decision && dt->num_priorities != -1)
  512. task_prio = _starpu_normalize_prio(task->priority, dt->num_priorities, sched_ctx_id);
  513. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  514. double now = starpu_timing_now();
  515. struct starpu_sched_ctx_iterator it;
  516. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  517. while(worker_ctx<nworkers && workers->has_next(workers, &it))
  518. {
  519. unsigned nimpl;
  520. unsigned impl_mask;
  521. unsigned workerid = workers->get_next(workers, &it);
  522. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  523. struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(workerid, sched_ctx_id);
  524. unsigned memory_node = starpu_worker_get_memory_node(workerid);
  525. STARPU_ASSERT_MSG(fifo != NULL, "workerid %u ctx %u\n", workerid, sched_ctx_id);
  526. /* Sometimes workers didn't take the tasks as early as we expected */
  527. double exp_start = isnan(fifo->exp_start) ? now + fifo->pipeline_len : STARPU_MAX(fifo->exp_start, now);
  528. if (!starpu_worker_can_execute_task_impl(workerid, task, &impl_mask))
  529. continue;
  530. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  531. {
  532. if (!(impl_mask & (1U << nimpl)))
  533. {
  534. /* no one on that queue may execute this task */
  535. continue;
  536. }
  537. int fifo_ntasks = fifo->ntasks;
  538. double prev_exp_len = fifo->exp_len;
  539. /* consider the priority of the task when deciding on which workerid to schedule,
  540. compute the expected_end of the task if it is inserted before other tasks already scheduled */
  541. if(sorted_decision)
  542. {
  543. if(dt->num_priorities != -1)
  544. {
  545. prev_exp_len = fifo->exp_len_per_priority[task_prio];
  546. fifo_ntasks = fifo->ntasks_per_priority[task_prio];
  547. }
  548. else
  549. {
  550. starpu_worker_lock(workerid);
  551. prev_exp_len = _starpu_fifo_get_exp_len_prev_task_list(fifo, task, workerid, nimpl, &fifo_ntasks);
  552. starpu_worker_unlock(workerid);
  553. }
  554. }
  555. exp_end[worker_ctx][nimpl] = exp_start + prev_exp_len;
  556. if (exp_end[worker_ctx][nimpl] > max_exp_end)
  557. max_exp_end = exp_end[worker_ctx][nimpl];
  558. //_STARPU_DEBUG("Scheduler dmda: task length (%lf) workerid (%u) kernel (%u) \n", local_task_length[workerid][nimpl],workerid,nimpl);
  559. if (bundle)
  560. {
  561. /* TODO : conversion time */
  562. local_task_length[worker_ctx][nimpl] = starpu_task_bundle_expected_length(bundle, perf_arch, nimpl);
  563. local_data_penalty[worker_ctx][nimpl] = starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
  564. local_energy[worker_ctx][nimpl] = starpu_task_bundle_expected_energy(bundle, perf_arch,nimpl);
  565. }
  566. else
  567. {
  568. local_task_length[worker_ctx][nimpl] = starpu_task_expected_length(task, perf_arch, nimpl);
  569. local_data_penalty[worker_ctx][nimpl] = starpu_task_expected_data_transfer_time_for(task, workerid);
  570. local_energy[worker_ctx][nimpl] = starpu_task_expected_energy(task, perf_arch,nimpl);
  571. double conversion_time = starpu_task_expected_conversion_time(task, perf_arch, nimpl);
  572. if (conversion_time > 0.0)
  573. local_task_length[worker_ctx][nimpl] += conversion_time;
  574. }
  575. double ntasks_end = fifo_ntasks / starpu_worker_get_relative_speedup(perf_arch);
  576. /*
  577. * This implements a default greedy scheduler for the
  578. * case of tasks which have no performance model, or
  579. * whose performance model is not calibrated yet.
  580. *
  581. * It simply uses the number of tasks already pushed to
  582. * the workers, divided by the relative performance of
  583. * a CPU and of a GPU.
  584. *
  585. * This is always computed, but the ntasks_best
  586. * selection is only really used if the task indeed has
  587. * no performance model, or is not calibrated yet.
  588. */
  589. if (ntasks_best == -1
  590. /* Always compute the greedy decision, at least for
  591. * the tasks with no performance model. */
  592. || (!calibrating && ntasks_end < ntasks_best_end)
  593. /* The performance model of this task is not
  594. * calibrated on this workerid, try to run it there
  595. * to calibrate it there. */
  596. || (!calibrating && isnan(local_task_length[worker_ctx][nimpl]))
  597. /* the performance model of this task is not
  598. * calibrated on this workerid either, rather run it
  599. * there if this one is low on scheduled tasks. */
  600. || (calibrating && isnan(local_task_length[worker_ctx][nimpl]) && ntasks_end < ntasks_best_end)
  601. )
  602. {
  603. ntasks_best_end = ntasks_end;
  604. ntasks_best = workerid;
  605. nimpl_best = nimpl;
  606. }
  607. if (isnan(local_task_length[worker_ctx][nimpl]))
  608. /* we are calibrating, we want to speed-up calibration time
  609. * so we privilege non-calibrated tasks (but still
  610. * greedily distribute them to avoid dumb schedules) */
  611. calibrating = 1;
  612. if (isnan(local_task_length[worker_ctx][nimpl])
  613. || _STARPU_IS_ZERO(local_task_length[worker_ctx][nimpl]))
  614. /* there is no prediction available for that task
  615. * with that arch (yet or at all), so switch to a greedy strategy */
  616. unknown = 1;
  617. if (unknown)
  618. continue;
  619. double task_starting_time = STARPU_MAX(exp_start + prev_exp_len, now + local_data_penalty[worker_ctx][nimpl]);
  620. exp_end[worker_ctx][nimpl] = task_starting_time + local_task_length[worker_ctx][nimpl];
  621. if (exp_end[worker_ctx][nimpl] < best_exp_end)
  622. {
  623. /* a better solution was found */
  624. best_exp_end = exp_end[worker_ctx][nimpl];
  625. nimpl_best = nimpl;
  626. }
  627. if (isnan(local_energy[worker_ctx][nimpl]))
  628. local_energy[worker_ctx][nimpl] = 0.;
  629. }
  630. worker_ctx++;
  631. }
  632. *forced_worker = unknown?ntasks_best:-1;
  633. *forced_impl = unknown?nimpl_best:-1;
  634. #ifdef STARPU_VERBOSE
  635. if (unknown)
  636. {
  637. dt->eager_task_cnt++;
  638. }
  639. #endif
  640. *best_exp_endp = best_exp_end;
  641. *max_exp_endp = max_exp_end;
  642. }
  643. static double _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id, unsigned simulate, unsigned sorted_decision)
  644. {
  645. /* find the queue */
  646. int best = -1, best_in_ctx = -1;
  647. int selected_impl = 0;
  648. double model_best = 0.0;
  649. double transfer_model_best = 0.0;
  650. /* this flag is set if the corresponding worker is selected because
  651. there is no performance prediction available yet */
  652. int forced_best = -1;
  653. int forced_impl = -1;
  654. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  655. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  656. unsigned nworkers_ctx = workers->nworkers;
  657. double local_task_length[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
  658. double local_data_penalty[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
  659. double local_energy[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
  660. /* Expected end of this task on the workers */
  661. double exp_end[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
  662. /* This is the minimum among the exp_end[] matrix */
  663. double best_exp_end;
  664. /* This is the maximum termination time of already-scheduled tasks over all workers */
  665. double max_exp_end = 0.0;
  666. double fitness[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
  667. compute_all_performance_predictions(task,
  668. nworkers_ctx,
  669. local_task_length,
  670. exp_end,
  671. &max_exp_end,
  672. &best_exp_end,
  673. local_data_penalty,
  674. local_energy,
  675. &forced_best,
  676. &forced_impl, sched_ctx_id, sorted_decision);
  677. if (forced_best == -1)
  678. {
  679. double best_fitness = -1;
  680. unsigned worker_ctx = 0;
  681. struct starpu_sched_ctx_iterator it;
  682. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  683. while(worker_ctx < nworkers_ctx && workers->has_next(workers, &it))
  684. {
  685. unsigned worker = workers->get_next(workers, &it);
  686. unsigned nimpl;
  687. unsigned impl_mask;
  688. if (!starpu_worker_can_execute_task_impl(worker, task, &impl_mask))
  689. continue;
  690. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  691. {
  692. if (!(impl_mask & (1U << nimpl)))
  693. {
  694. /* no one on that queue may execute this task */
  695. continue;
  696. }
  697. fitness[worker_ctx][nimpl] = dt->alpha * __s_alpha__value *(exp_end[worker_ctx][nimpl] - best_exp_end)
  698. + dt->beta * __s_beta__value *(local_data_penalty[worker_ctx][nimpl])
  699. + dt->_gamma * __s_gamma__value *(local_energy[worker_ctx][nimpl]);
  700. if (exp_end[worker_ctx][nimpl] > max_exp_end)
  701. {
  702. /* This placement will make the computation
  703. * longer, take into account the idle
  704. * consumption of other cpus */
  705. fitness[worker_ctx][nimpl] += dt->_gamma * __s_gamma__value * dt->idle_power * __s_idle_power__value * (exp_end[worker_ctx][nimpl] - max_exp_end) / 1000000.0;
  706. }
  707. if (best == -1 || fitness[worker_ctx][nimpl] < best_fitness)
  708. {
  709. /* we found a better solution */
  710. best_fitness = fitness[worker_ctx][nimpl];
  711. best = worker;
  712. best_in_ctx = worker_ctx;
  713. selected_impl = nimpl;
  714. //_STARPU_DEBUG("best fitness (worker %d) %e = alpha*(%e) + beta(%e) +gamma(%e)\n", worker, best_fitness, exp_end[worker][nimpl] - best_exp_end, local_data_penalty[worker][nimpl], local_energy[worker][nimpl]);
  715. }
  716. }
  717. worker_ctx++;
  718. }
  719. }
  720. STARPU_ASSERT(forced_best != -1 || best != -1);
  721. if (forced_best != -1)
  722. {
  723. /* there is no prediction available for that task
  724. * with that arch we want to speed-up calibration time
  725. * so we force this measurement */
  726. best = forced_best;
  727. selected_impl = forced_impl;
  728. model_best = 0.0;
  729. transfer_model_best = 0.0;
  730. }
  731. else if (task->bundle)
  732. {
  733. struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(best_in_ctx, sched_ctx_id);
  734. unsigned memory_node = starpu_worker_get_memory_node(best);
  735. model_best = starpu_task_expected_length(task, perf_arch, selected_impl);
  736. transfer_model_best = starpu_task_expected_data_transfer_time(memory_node, task);
  737. }
  738. else
  739. {
  740. model_best = local_task_length[best_in_ctx][selected_impl];
  741. transfer_model_best = local_data_penalty[best_in_ctx][selected_impl];
  742. }
  743. //_STARPU_DEBUG("Scheduler dmda: kernel (%u)\n", best_impl);
  744. starpu_task_set_implementation(task, selected_impl);
  745. starpu_sched_task_break(task);
  746. if(!simulate)
  747. {
  748. /* we should now have the best worker in variable "best" */
  749. return push_task_on_best_worker(task, best, model_best, transfer_model_best, prio, sched_ctx_id);
  750. }
  751. else
  752. {
  753. return exp_end[best_in_ctx][selected_impl] ;
  754. }
  755. }
  756. static int dmda_push_sorted_decision_task(struct starpu_task *task)
  757. {
  758. return _dmda_push_task(task, 1, task->sched_ctx, 0, 1);
  759. }
  760. static int dmda_push_sorted_task(struct starpu_task *task)
  761. {
  762. #ifdef STARPU_DEVEL
  763. #warning TODO: after defining a scheduling window, use that instead of empty_ctx_tasks
  764. #endif
  765. return _dmda_push_task(task, 1, task->sched_ctx, 0, 0);
  766. }
  767. static int dm_push_task(struct starpu_task *task)
  768. {
  769. return _dm_push_task(task, 0, task->sched_ctx);
  770. }
  771. static int dmda_push_task(struct starpu_task *task)
  772. {
  773. STARPU_ASSERT(task);
  774. return _dmda_push_task(task, 0, task->sched_ctx, 0, 0);
  775. }
  776. static double dmda_simulate_push_task(struct starpu_task *task)
  777. {
  778. STARPU_ASSERT(task);
  779. return _dmda_push_task(task, 0, task->sched_ctx, 1, 0);
  780. }
  781. static double dmda_simulate_push_sorted_task(struct starpu_task *task)
  782. {
  783. STARPU_ASSERT(task);
  784. return _dmda_push_task(task, 1, task->sched_ctx, 1, 0);
  785. }
  786. static double dmda_simulate_push_sorted_decision_task(struct starpu_task *task)
  787. {
  788. STARPU_ASSERT(task);
  789. return _dmda_push_task(task, 1, task->sched_ctx, 1, 1);
  790. }
  791. #ifdef NOTIFY_READY_SOON
  792. static void dmda_notify_ready_soon(void *data STARPU_ATTRIBUTE_UNUSED, struct starpu_task *task, double delay)
  793. {
  794. if (!task->cl)
  795. return;
  796. /* fprintf(stderr, "task %lu %p %p %s %s will be ready within %f\n", starpu_task_get_job_id(task), task, task->cl, task->cl->name, task->cl->model?task->cl->model->symbol : NULL, delay); */
  797. /* TODO: do something with it */
  798. }
  799. #endif
  800. static void dmda_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  801. {
  802. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  803. unsigned i;
  804. for (i = 0; i < nworkers; i++)
  805. {
  806. struct _starpu_fifo_taskq *q;
  807. int workerid = workerids[i];
  808. /* if the worker has alreadry belonged to this context
  809. the queue and the synchronization variables have been already initialized */
  810. q = dt->queue_array[workerid];
  811. if(q == NULL)
  812. {
  813. q = dt->queue_array[workerid] = _starpu_create_fifo();
  814. /* These are only stats, they can be read with races */
  815. STARPU_HG_DISABLE_CHECKING(q->exp_start);
  816. STARPU_HG_DISABLE_CHECKING(q->exp_len);
  817. STARPU_HG_DISABLE_CHECKING(q->exp_end);
  818. }
  819. if(dt->num_priorities != -1)
  820. {
  821. _STARPU_MALLOC(q->exp_len_per_priority, dt->num_priorities*sizeof(double));
  822. _STARPU_MALLOC(q->ntasks_per_priority, dt->num_priorities*sizeof(unsigned));
  823. int j;
  824. for(j = 0; j < dt->num_priorities; j++)
  825. {
  826. q->exp_len_per_priority[j] = 0.0;
  827. q->ntasks_per_priority[j] = 0;
  828. }
  829. }
  830. }
  831. }
  832. static void dmda_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  833. {
  834. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  835. unsigned i;
  836. for (i = 0; i < nworkers; i++)
  837. {
  838. int workerid = workerids[i];
  839. if(dt->queue_array[workerid] != NULL)
  840. {
  841. if(dt->num_priorities != -1)
  842. {
  843. free(dt->queue_array[workerid]->exp_len_per_priority);
  844. free(dt->queue_array[workerid]->ntasks_per_priority);
  845. }
  846. _starpu_destroy_fifo(dt->queue_array[workerid]);
  847. dt->queue_array[workerid] = NULL;
  848. }
  849. }
  850. }
  851. static void initialize_dmda_policy(unsigned sched_ctx_id)
  852. {
  853. struct _starpu_dmda_data *dt;
  854. _STARPU_CALLOC(dt, 1, sizeof(struct _starpu_dmda_data));
  855. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)dt);
  856. _STARPU_MALLOC(dt->queue_array, STARPU_NMAXWORKERS*sizeof(struct _starpu_fifo_taskq*));
  857. int i;
  858. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  859. dt->queue_array[i] = NULL;
  860. dt->alpha = starpu_get_env_float_default("STARPU_SCHED_ALPHA", _STARPU_SCHED_ALPHA_DEFAULT);
  861. dt->beta = starpu_get_env_float_default("STARPU_SCHED_BETA", _STARPU_SCHED_BETA_DEFAULT);
  862. dt->_gamma = starpu_get_env_float_default("STARPU_SCHED_GAMMA", _STARPU_SCHED_GAMMA_DEFAULT);
  863. dt->idle_power = starpu_get_env_float_default("STARPU_IDLE_POWER", 0.0);
  864. if(starpu_sched_ctx_min_priority_is_set(sched_ctx_id) != 0 && starpu_sched_ctx_max_priority_is_set(sched_ctx_id) != 0)
  865. dt->num_priorities = starpu_sched_ctx_get_max_priority(sched_ctx_id) - starpu_sched_ctx_get_min_priority(sched_ctx_id) + 1;
  866. else
  867. dt->num_priorities = -1;
  868. #ifdef NOTIFY_READY_SOON
  869. starpu_task_notify_ready_soon_register(dmda_notify_ready_soon, dt);
  870. #endif
  871. }
  872. static void initialize_dmda_sorted_policy(unsigned sched_ctx_id)
  873. {
  874. initialize_dmda_policy(sched_ctx_id);
  875. /* The application may use any integer */
  876. if (starpu_sched_ctx_min_priority_is_set(sched_ctx_id) == 0)
  877. starpu_sched_ctx_set_min_priority(sched_ctx_id, INT_MIN);
  878. if (starpu_sched_ctx_max_priority_is_set(sched_ctx_id) == 0)
  879. starpu_sched_ctx_set_max_priority(sched_ctx_id, INT_MAX);
  880. }
  881. static void deinitialize_dmda_policy(unsigned sched_ctx_id)
  882. {
  883. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  884. #ifdef STARPU_VERBOSE
  885. {
  886. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  887. long int modelled_task_cnt = dt->total_task_cnt - dt->eager_task_cnt;
  888. _STARPU_DEBUG("%s sched policy (sched_ctx %u): total_task_cnt %ld ready_task_cnt %ld (%.1f%%), modelled_task_cnt = %ld (%.1f%%)%s\n",
  889. sched_ctx->sched_policy?sched_ctx->sched_policy->policy_name:"<none>",
  890. sched_ctx_id,
  891. dt->total_task_cnt,
  892. dt->ready_task_cnt,
  893. (100.0f*dt->ready_task_cnt)/dt->total_task_cnt,
  894. modelled_task_cnt,
  895. (100.0f*modelled_task_cnt)/dt->total_task_cnt,
  896. modelled_task_cnt==0?" *** Check if performance models are enabled and converging on a per-codelet basis, or use an non-modeling scheduling policy. ***":"");
  897. }
  898. #endif
  899. free(dt->queue_array);
  900. free(dt);
  901. }
  902. /* dmda_pre_exec_hook is called right after the data transfer is done and right
  903. * before the computation to begin, it is useful to update more precisely the
  904. * value of the expected start, end, length, etc... */
  905. static void dmda_pre_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
  906. {
  907. unsigned workerid = starpu_worker_get_id_check();
  908. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  909. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  910. const double now = starpu_timing_now();
  911. /* Once the task is executing, we can update the predicted amount
  912. * of work. */
  913. starpu_worker_lock_self();
  914. _starpu_fifo_task_started(fifo, task, dt->num_priorities);
  915. /* Take the opportunity to update start time */
  916. fifo->exp_start = STARPU_MAX(now + fifo->pipeline_len, fifo->exp_start);
  917. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  918. starpu_worker_unlock_self();
  919. }
  920. static void dmda_push_task_notify(struct starpu_task *task, int workerid, int perf_workerid, unsigned sched_ctx_id)
  921. {
  922. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  923. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  924. /* Compute the expected penality */
  925. struct starpu_perfmodel_arch *perf_arch = starpu_worker_get_perf_archtype(perf_workerid, sched_ctx_id);
  926. double predicted = starpu_task_expected_length(task, perf_arch,
  927. starpu_task_get_implementation(task));
  928. double predicted_transfer = starpu_task_expected_data_transfer_time_for(task, workerid);
  929. double now = starpu_timing_now();
  930. /* Update the predictions */
  931. starpu_worker_lock(workerid);
  932. /* Sometimes workers didn't take the tasks as early as we expected */
  933. fifo->exp_start = isnan(fifo->exp_start) ? now + fifo->pipeline_len : STARPU_MAX(fifo->exp_start, now);
  934. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  935. /* If there is no prediction available, we consider the task has a null length */
  936. if (!isnan(predicted_transfer))
  937. {
  938. if (now + predicted_transfer < fifo->exp_end)
  939. {
  940. /* We may hope that the transfer will be finished by
  941. * the start of the task. */
  942. predicted_transfer = 0;
  943. }
  944. else
  945. {
  946. /* The transfer will not be finished by then, take the
  947. * remainder into account */
  948. predicted_transfer = (now + predicted_transfer) - fifo->exp_end;
  949. }
  950. task->predicted_transfer = predicted_transfer;
  951. fifo->exp_end += predicted_transfer;
  952. fifo->exp_len += predicted_transfer;
  953. if(dt->num_priorities != -1)
  954. {
  955. int i;
  956. int task_prio = _starpu_normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
  957. for(i = 0; i <= task_prio; i++)
  958. fifo->exp_len_per_priority[i] += predicted_transfer;
  959. }
  960. }
  961. /* If there is no prediction available, we consider the task has a null length */
  962. if (!isnan(predicted))
  963. {
  964. task->predicted = predicted;
  965. fifo->exp_end += predicted;
  966. fifo->exp_len += predicted;
  967. if(dt->num_priorities != -1)
  968. {
  969. int i;
  970. int task_prio = _starpu_normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
  971. for(i = 0; i <= task_prio; i++)
  972. fifo->exp_len_per_priority[i] += predicted;
  973. }
  974. }
  975. if(dt->num_priorities != -1)
  976. {
  977. int i;
  978. int task_prio = _starpu_normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
  979. for(i = 0; i <= task_prio; i++)
  980. fifo->ntasks_per_priority[i]++;
  981. }
  982. fifo->ntasks++;
  983. starpu_worker_unlock(workerid);
  984. }
  985. static void dmda_post_exec_hook(struct starpu_task * task, unsigned sched_ctx_id)
  986. {
  987. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  988. unsigned workerid = starpu_worker_get_id_check();
  989. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  990. starpu_worker_lock_self();
  991. _starpu_fifo_task_finished(fifo, task, dt->num_priorities);
  992. starpu_worker_unlock_self();
  993. }
  994. struct starpu_sched_policy _starpu_sched_dm_policy =
  995. {
  996. .init_sched = initialize_dmda_policy,
  997. .deinit_sched = deinitialize_dmda_policy,
  998. .add_workers = dmda_add_workers ,
  999. .remove_workers = dmda_remove_workers,
  1000. .push_task = dm_push_task,
  1001. .simulate_push_task = NULL,
  1002. .pop_task = dmda_pop_task,
  1003. .pre_exec_hook = dmda_pre_exec_hook,
  1004. .post_exec_hook = dmda_post_exec_hook,
  1005. .pop_every_task = dmda_pop_every_task,
  1006. .policy_name = "dm",
  1007. .policy_description = "performance model",
  1008. .worker_type = STARPU_WORKER_LIST,
  1009. };
  1010. struct starpu_sched_policy _starpu_sched_dmda_policy =
  1011. {
  1012. .init_sched = initialize_dmda_policy,
  1013. .deinit_sched = deinitialize_dmda_policy,
  1014. .add_workers = dmda_add_workers ,
  1015. .remove_workers = dmda_remove_workers,
  1016. .push_task = dmda_push_task,
  1017. .simulate_push_task = dmda_simulate_push_task,
  1018. .push_task_notify = dmda_push_task_notify,
  1019. .pop_task = dmda_pop_task,
  1020. .pre_exec_hook = dmda_pre_exec_hook,
  1021. .post_exec_hook = dmda_post_exec_hook,
  1022. .pop_every_task = dmda_pop_every_task,
  1023. .policy_name = "dmda",
  1024. .policy_description = "data-aware performance model",
  1025. .worker_type = STARPU_WORKER_LIST,
  1026. };
  1027. struct starpu_sched_policy _starpu_sched_dmda_prio_policy =
  1028. {
  1029. .init_sched = initialize_dmda_sorted_policy,
  1030. .deinit_sched = deinitialize_dmda_policy,
  1031. .add_workers = dmda_add_workers ,
  1032. .remove_workers = dmda_remove_workers,
  1033. .push_task = dmda_push_sorted_task,
  1034. .simulate_push_task = dmda_simulate_push_sorted_task,
  1035. .push_task_notify = dmda_push_task_notify,
  1036. .pop_task = dmda_pop_task,
  1037. .pre_exec_hook = dmda_pre_exec_hook,
  1038. .post_exec_hook = dmda_post_exec_hook,
  1039. .pop_every_task = dmda_pop_every_task,
  1040. .policy_name = "dmdap",
  1041. .policy_description = "data-aware performance model (priority)",
  1042. .worker_type = STARPU_WORKER_LIST,
  1043. };
  1044. struct starpu_sched_policy _starpu_sched_dmda_sorted_policy =
  1045. {
  1046. .init_sched = initialize_dmda_sorted_policy,
  1047. .deinit_sched = deinitialize_dmda_policy,
  1048. .add_workers = dmda_add_workers ,
  1049. .remove_workers = dmda_remove_workers,
  1050. .push_task = dmda_push_sorted_task,
  1051. .simulate_push_task = dmda_simulate_push_sorted_task,
  1052. .push_task_notify = dmda_push_task_notify,
  1053. .pop_task = dmda_pop_ready_task,
  1054. .pre_exec_hook = dmda_pre_exec_hook,
  1055. .post_exec_hook = dmda_post_exec_hook,
  1056. .pop_every_task = dmda_pop_every_task,
  1057. .policy_name = "dmdas",
  1058. .policy_description = "data-aware performance model (sorted)",
  1059. .worker_type = STARPU_WORKER_LIST,
  1060. };
  1061. struct starpu_sched_policy _starpu_sched_dmda_sorted_decision_policy =
  1062. {
  1063. .init_sched = initialize_dmda_sorted_policy,
  1064. .deinit_sched = deinitialize_dmda_policy,
  1065. .add_workers = dmda_add_workers ,
  1066. .remove_workers = dmda_remove_workers,
  1067. .push_task = dmda_push_sorted_decision_task,
  1068. .simulate_push_task = dmda_simulate_push_sorted_decision_task,
  1069. .push_task_notify = dmda_push_task_notify,
  1070. .pop_task = dmda_pop_ready_task,
  1071. .pre_exec_hook = dmda_pre_exec_hook,
  1072. .post_exec_hook = dmda_post_exec_hook,
  1073. .pop_every_task = dmda_pop_every_task,
  1074. .policy_name = "dmdasd",
  1075. .policy_description = "data-aware performance model (sorted decision)",
  1076. .worker_type = STARPU_WORKER_LIST,
  1077. };
  1078. struct starpu_sched_policy _starpu_sched_dmda_ready_policy =
  1079. {
  1080. .init_sched = initialize_dmda_policy,
  1081. .deinit_sched = deinitialize_dmda_policy,
  1082. .add_workers = dmda_add_workers ,
  1083. .remove_workers = dmda_remove_workers,
  1084. .push_task = dmda_push_task,
  1085. .simulate_push_task = dmda_simulate_push_task,
  1086. .push_task_notify = dmda_push_task_notify,
  1087. .pop_task = dmda_pop_ready_task,
  1088. .pre_exec_hook = dmda_pre_exec_hook,
  1089. .post_exec_hook = dmda_post_exec_hook,
  1090. .pop_every_task = dmda_pop_every_task,
  1091. .policy_name = "dmdar",
  1092. .policy_description = "data-aware performance model (ready)",
  1093. .worker_type = STARPU_WORKER_LIST,
  1094. };