deque_modeling_policy_data_aware.c 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2017 Université de Bordeaux
  4. * Copyright (C) 2010, 2011, 2012, 2013, 2015, 2016, 2017 CNRS
  5. * Copyright (C) 2011 Télécom-SudParis
  6. * Copyright (C) 2011-2012, 2016-2017 INRIA
  7. * Copyright (C) 2016 Uppsala University
  8. *
  9. * StarPU is free software; you can redistribute it and/or modify
  10. * it under the terms of the GNU Lesser General Public License as published by
  11. * the Free Software Foundation; either version 2.1 of the License, or (at
  12. * your option) any later version.
  13. *
  14. * StarPU is distributed in the hope that it will be useful, but
  15. * WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  17. *
  18. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  19. */
  20. /* Distributed queues using performance modeling to assign tasks */
  21. #include <starpu_config.h>
  22. #include <starpu_scheduler.h>
  23. #include <common/fxt.h>
  24. #include <core/task.h>
  25. #include <core/workers.h>
  26. #include <core/sched_policy.h>
  27. #include <core/debug.h>
  28. #include <sched_policies/fifo_queues.h>
  29. #include <limits.h>
  30. #ifndef DBL_MIN
  31. #define DBL_MIN __DBL_MIN__
  32. #endif
  33. #ifndef DBL_MAX
  34. #define DBL_MAX __DBL_MAX__
  35. #endif
  36. struct _starpu_dmda_data
  37. {
  38. double alpha;
  39. double beta;
  40. double _gamma;
  41. double idle_power;
  42. struct _starpu_fifo_taskq **queue_array;
  43. long int total_task_cnt;
  44. long int ready_task_cnt;
  45. long int eager_task_cnt; /* number of tasks scheduled without model */
  46. int num_priorities;
  47. };
  48. /* The dmda scheduling policy uses
  49. *
  50. * alpha * T_computation + beta * T_communication + gamma * Consumption
  51. *
  52. * Here are the default values of alpha, beta, gamma
  53. */
  54. #define _STARPU_SCHED_ALPHA_DEFAULT 1.0
  55. #define _STARPU_SCHED_BETA_DEFAULT 1.0
  56. #define _STARPU_SCHED_GAMMA_DEFAULT 1000.0
  57. #ifdef STARPU_USE_TOP
  58. static double alpha = _STARPU_SCHED_ALPHA_DEFAULT;
  59. static double beta = _STARPU_SCHED_BETA_DEFAULT;
  60. static double _gamma = _STARPU_SCHED_GAMMA_DEFAULT;
  61. static double idle_power = 0.0;
  62. static const float alpha_minimum=0;
  63. static const float alpha_maximum=10.0;
  64. static const float beta_minimum=0;
  65. static const float beta_maximum=10.0;
  66. static const float gamma_minimum=0;
  67. static const float gamma_maximum=10000.0;
  68. static const float idle_power_minimum=0;
  69. static const float idle_power_maximum=10000.0;
  70. #endif /* !STARPU_USE_TOP */
  71. static int count_non_ready_buffers(struct starpu_task *task, unsigned node)
  72. {
  73. int cnt = 0;
  74. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  75. unsigned index;
  76. for (index = 0; index < nbuffers; index++)
  77. {
  78. starpu_data_handle_t handle;
  79. handle = STARPU_TASK_GET_HANDLE(task, index);
  80. int is_valid;
  81. starpu_data_query_status(handle, node, NULL, &is_valid, NULL);
  82. if (!is_valid)
  83. cnt++;
  84. }
  85. return cnt;
  86. }
  87. #ifdef STARPU_USE_TOP
  88. static void param_modified(struct starpu_top_param* d)
  89. {
  90. #ifdef STARPU_DEVEL
  91. #warning FIXME: get sched ctx to get alpha/beta/gamma/idle values
  92. #endif
  93. /* Just to show parameter modification. */
  94. _STARPU_MSG("%s has been modified : "
  95. "alpha=%f|beta=%f|gamma=%f|idle_power=%f !\n",
  96. d->name, alpha,beta,_gamma, idle_power);
  97. }
  98. #endif /* !STARPU_USE_TOP */
  99. static int _normalize_prio(int priority, int num_priorities, unsigned sched_ctx_id)
  100. {
  101. int min = starpu_sched_ctx_get_min_priority(sched_ctx_id);
  102. int max = starpu_sched_ctx_get_max_priority(sched_ctx_id);
  103. return ((num_priorities-1)/(max-min)) * (priority - min);
  104. }
  105. /* This is called when a transfer request is actually pushed to the worker */
  106. static void _starpu_fifo_task_transfer_started(struct _starpu_fifo_taskq *fifo, struct starpu_task *task, int num_priorities)
  107. {
  108. double transfer_model = task->predicted_transfer;
  109. if (isnan(transfer_model))
  110. return;
  111. /* We now start the transfer, move it from predicted to pipelined */
  112. fifo->exp_len -= transfer_model;
  113. fifo->pipeline_len += transfer_model;
  114. fifo->exp_start = starpu_timing_now() + fifo->pipeline_len;
  115. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  116. if(num_priorities != -1)
  117. {
  118. int i;
  119. int task_prio = _normalize_prio(task->priority, num_priorities, task->sched_ctx);
  120. for(i = 0; i <= task_prio; i++)
  121. fifo->exp_len_per_priority[i] -= transfer_model;
  122. }
  123. }
  124. /* This is called when a task is actually pushed to the worker (i.e. the transfer finished */
  125. static void _starpu_fifo_task_started(struct _starpu_fifo_taskq *fifo, struct starpu_task *task, int num_priorities)
  126. {
  127. double model = task->predicted;
  128. double transfer_model = task->predicted_transfer;
  129. if(!isnan(transfer_model))
  130. /* The transfer is over, remove it from pipelined */
  131. fifo->pipeline_len -= transfer_model;
  132. if(!isnan(model))
  133. {
  134. /* We now start the computation, move it from predicted to pipelined */
  135. fifo->exp_len -= model;
  136. fifo->pipeline_len += model;
  137. fifo->exp_start = starpu_timing_now() + fifo->pipeline_len;
  138. fifo->exp_end= fifo->exp_start + fifo->exp_len;
  139. if(num_priorities != -1)
  140. {
  141. int i;
  142. int task_prio = _normalize_prio(task->priority, num_priorities, task->sched_ctx);
  143. for(i = 0; i <= task_prio; i++)
  144. fifo->exp_len_per_priority[i] -= model;
  145. }
  146. }
  147. }
  148. /* This is called when a task is actually finished */
  149. static void _starpu_fifo_task_finished(struct _starpu_fifo_taskq *fifo, struct starpu_task *task, int num_priorities STARPU_ATTRIBUTE_UNUSED)
  150. {
  151. if(!isnan(task->predicted))
  152. /* The execution is over, remove it from pipelined */
  153. fifo->pipeline_len -= task->predicted;
  154. fifo->exp_start = STARPU_MAX(starpu_timing_now() + fifo->pipeline_len, fifo->exp_start);
  155. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  156. }
  157. static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo_taskq *fifo_queue, unsigned node, int num_priorities)
  158. {
  159. struct starpu_task *task = NULL, *current;
  160. if (fifo_queue->ntasks == 0)
  161. return NULL;
  162. if (fifo_queue->ntasks > 0)
  163. {
  164. fifo_queue->ntasks--;
  165. task = starpu_task_list_front(&fifo_queue->taskq);
  166. if (STARPU_UNLIKELY(!task))
  167. return NULL;
  168. int first_task_priority = task->priority;
  169. current = task;
  170. int non_ready_best = INT_MAX;
  171. while (current)
  172. {
  173. int priority = current->priority;
  174. if (priority >= first_task_priority)
  175. {
  176. int non_ready = count_non_ready_buffers(current, node);
  177. if (non_ready < non_ready_best)
  178. {
  179. non_ready_best = non_ready;
  180. task = current;
  181. if (non_ready == 0)
  182. break;
  183. }
  184. }
  185. current = current->next;
  186. }
  187. if(num_priorities != -1)
  188. {
  189. int i;
  190. int task_prio = _normalize_prio(task->priority, num_priorities, task->sched_ctx);
  191. for(i = 0; i <= task_prio; i++)
  192. fifo_queue->ntasks_per_priority[i]--;
  193. }
  194. starpu_task_list_erase(&fifo_queue->taskq, task);
  195. }
  196. return task;
  197. }
  198. static struct starpu_task *dmda_pop_ready_task(unsigned sched_ctx_id)
  199. {
  200. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  201. struct starpu_task *task;
  202. unsigned workerid = starpu_worker_get_id_check();
  203. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  204. unsigned node = starpu_worker_get_memory_node(workerid);
  205. /* Take the opportunity to update start time */
  206. fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start);
  207. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  208. task = _starpu_fifo_pop_first_ready_task(fifo, node, dt->num_priorities);
  209. if (task)
  210. {
  211. _starpu_fifo_task_transfer_started(fifo, task, dt->num_priorities);
  212. starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
  213. #ifdef STARPU_VERBOSE
  214. if (task->cl)
  215. {
  216. int non_ready = count_non_ready_buffers(task, node);
  217. if (non_ready == 0)
  218. dt->ready_task_cnt++;
  219. }
  220. dt->total_task_cnt++;
  221. #endif
  222. }
  223. return task;
  224. }
  225. static struct starpu_task *dmda_pop_task(unsigned sched_ctx_id)
  226. {
  227. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  228. struct starpu_task *task;
  229. unsigned workerid = starpu_worker_get_id_check();
  230. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  231. /* Take the opportunity to update start time */
  232. fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start);
  233. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  234. STARPU_ASSERT_MSG(fifo, "worker %u does not belong to ctx %u anymore.\n", workerid, sched_ctx_id);
  235. task = _starpu_fifo_pop_local_task(fifo);
  236. if (task)
  237. {
  238. _starpu_fifo_task_transfer_started(fifo, task, dt->num_priorities);
  239. starpu_sched_ctx_list_task_counters_decrement(sched_ctx_id, workerid);
  240. #ifdef STARPU_VERBOSE
  241. if (task->cl)
  242. {
  243. int non_ready = count_non_ready_buffers(task, starpu_worker_get_memory_node(workerid));
  244. if (non_ready == 0)
  245. dt->ready_task_cnt++;
  246. }
  247. dt->total_task_cnt++;
  248. #endif
  249. }
  250. return task;
  251. }
  252. static struct starpu_task *dmda_pop_every_task(unsigned sched_ctx_id)
  253. {
  254. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  255. struct starpu_task *new_list, *task;
  256. unsigned workerid = starpu_worker_get_id_check();
  257. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  258. /* Take the opportunity to update start time */
  259. fifo->exp_start = STARPU_MAX(starpu_timing_now(), fifo->exp_start);
  260. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  261. _starpu_worker_lock_self();
  262. new_list = _starpu_fifo_pop_every_task(fifo, workerid);
  263. _starpu_worker_unlock_self();
  264. starpu_sched_ctx_list_task_counters_reset(sched_ctx_id, workerid);
  265. for (task = new_list; task; task = task->next)
  266. _starpu_fifo_task_transfer_started(fifo, task, dt->num_priorities);
  267. return new_list;
  268. }
  269. static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
  270. double predicted, double predicted_transfer,
  271. int prio, unsigned sched_ctx_id)
  272. {
  273. _starpu_worker_relax_on();
  274. _starpu_sched_ctx_lock_write(sched_ctx_id);
  275. _starpu_worker_relax_off();
  276. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  277. /* make sure someone could execute that task ! */
  278. STARPU_ASSERT(best_workerid != -1);
  279. unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(best_workerid, sched_ctx_id);
  280. if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
  281. {
  282. starpu_sched_ctx_move_task_to_ctx_locked(task, child_sched_ctx, 1);
  283. starpu_sched_ctx_revert_task_counters_ctx_locked(sched_ctx_id, task->flops);
  284. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  285. return 0;
  286. }
  287. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  288. struct _starpu_fifo_taskq *fifo = dt->queue_array[best_workerid];
  289. double now = starpu_timing_now();
  290. #ifdef STARPU_USE_SC_HYPERVISOR
  291. starpu_sched_ctx_call_pushed_task_cb(best_workerid, sched_ctx_id);
  292. #endif //STARPU_USE_SC_HYPERVISOR
  293. _starpu_worker_lock(best_workerid);
  294. /* Sometimes workers didn't take the tasks as early as we expected */
  295. fifo->exp_start = isnan(fifo->exp_start) ? now + fifo->pipeline_len : STARPU_MAX(fifo->exp_start, now);
  296. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  297. if ((now + predicted_transfer) < fifo->exp_end)
  298. {
  299. /* We may hope that the transfer will be finished by
  300. * the start of the task. */
  301. predicted_transfer = 0.0;
  302. }
  303. else
  304. {
  305. /* The transfer will not be finished by then, take the
  306. * remainder into account */
  307. predicted_transfer = (now + predicted_transfer) - fifo->exp_end;
  308. }
  309. if(!isnan(predicted_transfer))
  310. {
  311. fifo->exp_len += predicted_transfer;
  312. if(dt->num_priorities != -1)
  313. {
  314. int i;
  315. int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
  316. for(i = 0; i <= task_prio; i++)
  317. fifo->exp_len_per_priority[i] += predicted_transfer;
  318. }
  319. }
  320. if(!isnan(predicted))
  321. {
  322. fifo->exp_len += predicted;
  323. if(dt->num_priorities != -1)
  324. {
  325. int i;
  326. int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
  327. for(i = 0; i <= task_prio; i++)
  328. fifo->exp_len_per_priority[i] += predicted;
  329. }
  330. }
  331. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  332. _starpu_worker_unlock(best_workerid);
  333. task->predicted = predicted;
  334. task->predicted_transfer = predicted_transfer;
  335. #ifdef STARPU_USE_TOP
  336. starpu_top_task_prevision(task, best_workerid,
  337. (unsigned long long)(fifo->exp_end-predicted)/1000,
  338. (unsigned long long)fifo->exp_end/1000);
  339. #endif /* !STARPU_USE_TOP */
  340. if (starpu_get_prefetch_flag())
  341. {
  342. unsigned memory_node = starpu_worker_get_memory_node(best_workerid);
  343. starpu_prefetch_task_input_on_node(task, memory_node);
  344. }
  345. STARPU_AYU_ADDTOTASKQUEUE(starpu_task_get_job_id(task), best_workerid);
  346. unsigned stream_ctx_id = starpu_worker_get_sched_ctx_id_stream(best_workerid);
  347. if(stream_ctx_id != STARPU_NMAX_SCHED_CTXS)
  348. {
  349. _starpu_worker_relax_on();
  350. _starpu_sched_ctx_lock_write(sched_ctx_id);
  351. _starpu_worker_relax_off();
  352. starpu_sched_ctx_move_task_to_ctx_locked(task, stream_ctx_id, 0);
  353. starpu_sched_ctx_revert_task_counters_ctx_locked(sched_ctx_id, task->flops);
  354. _starpu_sched_ctx_unlock_write(sched_ctx_id);
  355. }
  356. int ret = 0;
  357. if (prio)
  358. {
  359. _starpu_worker_lock(best_workerid);
  360. ret =_starpu_fifo_push_sorted_task(dt->queue_array[best_workerid], task);
  361. if(dt->num_priorities != -1)
  362. {
  363. int i;
  364. int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
  365. for(i = 0; i <= task_prio; i++)
  366. dt->queue_array[best_workerid]->ntasks_per_priority[i]++;
  367. }
  368. #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
  369. starpu_wake_worker_locked(best_workerid);
  370. #endif
  371. starpu_push_task_end(task);
  372. _starpu_worker_unlock(best_workerid);
  373. }
  374. else
  375. {
  376. _starpu_worker_lock(best_workerid);
  377. starpu_task_list_push_back (&dt->queue_array[best_workerid]->taskq, task);
  378. dt->queue_array[best_workerid]->ntasks++;
  379. dt->queue_array[best_workerid]->nprocessed++;
  380. #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
  381. starpu_wake_worker_locked(best_workerid);
  382. #endif
  383. starpu_push_task_end(task);
  384. _starpu_worker_unlock(best_workerid);
  385. }
  386. starpu_sched_ctx_list_task_counters_increment(sched_ctx_id, best_workerid);
  387. return ret;
  388. }
  389. /* TODO: factorize with dmda!! */
  390. static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id)
  391. {
  392. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  393. int best = -1;
  394. unsigned worker_ctx = 0;
  395. double best_exp_end = 0.0;
  396. double model_best = 0.0;
  397. double transfer_model_best = 0.0;
  398. int ntasks_best = -1;
  399. double ntasks_best_end = 0.0;
  400. int calibrating = 0;
  401. /* A priori, we know all estimations */
  402. int unknown = 0;
  403. unsigned best_impl = 0;
  404. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  405. struct starpu_sched_ctx_iterator it;
  406. double now = starpu_timing_now();
  407. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  408. while(workers->has_next(workers, &it))
  409. {
  410. unsigned nimpl;
  411. unsigned impl_mask;
  412. unsigned worker = workers->get_next(workers, &it);
  413. struct _starpu_fifo_taskq *fifo = dt->queue_array[worker];
  414. unsigned memory_node = starpu_worker_get_memory_node(worker);
  415. struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(worker, sched_ctx_id);
  416. /* Sometimes workers didn't take the tasks as early as we expected */
  417. double exp_start = isnan(fifo->exp_start) ? now + fifo->pipeline_len : STARPU_MAX(fifo->exp_start, now);
  418. if (!starpu_worker_can_execute_task_impl(worker, task, &impl_mask))
  419. continue;
  420. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  421. {
  422. if (!(impl_mask & (1U << nimpl)))
  423. {
  424. /* no one on that queue may execute this task */
  425. // worker_ctx++;
  426. continue;
  427. }
  428. double exp_end;
  429. double local_length = starpu_task_expected_length(task, perf_arch, nimpl);
  430. double local_penalty = starpu_task_expected_data_transfer_time(memory_node, task);
  431. double ntasks_end = fifo->ntasks / starpu_worker_get_relative_speedup(perf_arch);
  432. //_STARPU_DEBUG("Scheduler dm: task length (%lf) worker (%u) kernel (%u) \n", local_length,worker,nimpl);
  433. /*
  434. * This implements a default greedy scheduler for the
  435. * case of tasks which have no performance model, or
  436. * whose performance model is not calibrated yet.
  437. *
  438. * It simply uses the number of tasks already pushed to
  439. * the workers, divided by the relative performance of
  440. * a CPU and of a GPU.
  441. *
  442. * This is always computed, but the ntasks_best
  443. * selection is only really used if the task indeed has
  444. * no performance model, or is not calibrated yet.
  445. */
  446. if (ntasks_best == -1
  447. /* Always compute the greedy decision, at least for
  448. * the tasks with no performance model. */
  449. || (!calibrating && ntasks_end < ntasks_best_end)
  450. /* The performance model of this task is not
  451. * calibrated on this worker, try to run it there
  452. * to calibrate it there. */
  453. || (!calibrating && isnan(local_length))
  454. /* the performance model of this task is not
  455. * calibrated on this worker either, rather run it
  456. * there if this one is low on scheduled tasks. */
  457. || (calibrating && isnan(local_length) && ntasks_end < ntasks_best_end)
  458. )
  459. {
  460. ntasks_best_end = ntasks_end;
  461. ntasks_best = worker;
  462. best_impl = nimpl;
  463. }
  464. if (isnan(local_length))
  465. {
  466. /* we are calibrating, we want to speed-up calibration time
  467. * so we privilege non-calibrated tasks (but still
  468. * greedily distribute them to avoid dumb schedules) */
  469. static int warned;
  470. if (!warned)
  471. {
  472. warned = 1;
  473. _STARPU_DISP("Warning: performance model for %s not finished calibrating on worker %u, using a dumb scheduling heuristic for now\n", starpu_task_get_name(task), worker);
  474. }
  475. calibrating = 1;
  476. }
  477. if (isnan(local_length) || _STARPU_IS_ZERO(local_length))
  478. /* there is no prediction available for that task
  479. * with that arch yet, so switch to a greedy strategy */
  480. unknown = 1;
  481. if (unknown)
  482. continue;
  483. exp_end = exp_start + fifo->exp_len + local_length;
  484. if (best == -1 || exp_end < best_exp_end)
  485. {
  486. /* a better solution was found */
  487. best_exp_end = exp_end;
  488. best = worker;
  489. model_best = local_length;
  490. transfer_model_best = local_penalty;
  491. best_impl = nimpl;
  492. }
  493. }
  494. worker_ctx++;
  495. }
  496. if (unknown)
  497. {
  498. best = ntasks_best;
  499. model_best = 0.0;
  500. transfer_model_best = 0.0;
  501. #ifdef STARPU_VERBOSE
  502. dt->eager_task_cnt++;
  503. #endif
  504. }
  505. //_STARPU_DEBUG("Scheduler dm: kernel (%u)\n", best_impl);
  506. starpu_task_set_implementation(task, best_impl);
  507. _STARPU_TASK_BREAK_ON(task, sched);
  508. /* we should now have the best worker in variable "best" */
  509. return push_task_on_best_worker(task, best,
  510. model_best, transfer_model_best, prio, sched_ctx_id);
  511. }
  512. /* TODO: factorise CPU computations, expensive with a lot of cores */
  513. static void compute_all_performance_predictions(struct starpu_task *task,
  514. unsigned nworkers,
  515. double local_task_length[nworkers][STARPU_MAXIMPLEMENTATIONS],
  516. double exp_end[nworkers][STARPU_MAXIMPLEMENTATIONS],
  517. double *max_exp_endp,
  518. double *best_exp_endp,
  519. double local_data_penalty[nworkers][STARPU_MAXIMPLEMENTATIONS],
  520. double local_energy[nworkers][STARPU_MAXIMPLEMENTATIONS],
  521. int *forced_worker, int *forced_impl, unsigned sched_ctx_id, unsigned sorted_decision)
  522. {
  523. int calibrating = 0;
  524. double max_exp_end = DBL_MIN;
  525. double best_exp_end = DBL_MAX;
  526. int ntasks_best = -1;
  527. int nimpl_best = 0;
  528. double ntasks_best_end = 0.0;
  529. /* A priori, we know all estimations */
  530. int unknown = 0;
  531. unsigned worker_ctx = 0;
  532. int task_prio = 0;
  533. starpu_task_bundle_t bundle = task->bundle;
  534. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  535. if(sorted_decision && dt->num_priorities != -1)
  536. task_prio = _normalize_prio(task->priority, dt->num_priorities, sched_ctx_id);
  537. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  538. double now = starpu_timing_now();
  539. struct starpu_sched_ctx_iterator it;
  540. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  541. while(worker_ctx<nworkers && workers->has_next(workers, &it))
  542. {
  543. unsigned nimpl;
  544. unsigned impl_mask;
  545. unsigned workerid = workers->get_next(workers, &it);
  546. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  547. struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(workerid, sched_ctx_id);
  548. unsigned memory_node = starpu_worker_get_memory_node(workerid);
  549. STARPU_ASSERT_MSG(fifo != NULL, "workerid %u ctx %u\n", workerid, sched_ctx_id);
  550. /* Sometimes workers didn't take the tasks as early as we expected */
  551. double exp_start = isnan(fifo->exp_start) ? now + fifo->pipeline_len : STARPU_MAX(fifo->exp_start, now);
  552. if (!starpu_worker_can_execute_task_impl(workerid, task, &impl_mask))
  553. continue;
  554. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  555. {
  556. if (!(impl_mask & (1U << nimpl)))
  557. {
  558. /* no one on that queue may execute this task */
  559. continue;
  560. }
  561. int fifo_ntasks = fifo->ntasks;
  562. double prev_exp_len = fifo->exp_len;
  563. /* consider the priority of the task when deciding on which workerid to schedule,
  564. compute the expected_end of the task if it is inserted before other tasks already scheduled */
  565. if(sorted_decision)
  566. {
  567. if(dt->num_priorities != -1)
  568. {
  569. prev_exp_len = fifo->exp_len_per_priority[task_prio];
  570. fifo_ntasks = fifo->ntasks_per_priority[task_prio];
  571. }
  572. else
  573. {
  574. _starpu_worker_lock(workerid);
  575. prev_exp_len = _starpu_fifo_get_exp_len_prev_task_list(fifo, task, workerid, nimpl, &fifo_ntasks);
  576. _starpu_worker_unlock(workerid);
  577. }
  578. }
  579. exp_end[worker_ctx][nimpl] = exp_start + prev_exp_len;
  580. if (exp_end[worker_ctx][nimpl] > max_exp_end)
  581. max_exp_end = exp_end[worker_ctx][nimpl];
  582. //_STARPU_DEBUG("Scheduler dmda: task length (%lf) workerid (%u) kernel (%u) \n", local_task_length[workerid][nimpl],workerid,nimpl);
  583. if (bundle)
  584. {
  585. /* TODO : conversion time */
  586. local_task_length[worker_ctx][nimpl] = starpu_task_bundle_expected_length(bundle, perf_arch, nimpl);
  587. local_data_penalty[worker_ctx][nimpl] = starpu_task_bundle_expected_data_transfer_time(bundle, memory_node);
  588. local_energy[worker_ctx][nimpl] = starpu_task_bundle_expected_energy(bundle, perf_arch,nimpl);
  589. }
  590. else
  591. {
  592. local_task_length[worker_ctx][nimpl] = starpu_task_expected_length(task, perf_arch, nimpl);
  593. local_data_penalty[worker_ctx][nimpl] = starpu_task_expected_data_transfer_time(memory_node, task);
  594. local_energy[worker_ctx][nimpl] = starpu_task_expected_energy(task, perf_arch,nimpl);
  595. double conversion_time = starpu_task_expected_conversion_time(task, perf_arch, nimpl);
  596. if (conversion_time > 0.0)
  597. local_task_length[worker_ctx][nimpl] += conversion_time;
  598. }
  599. double ntasks_end = fifo_ntasks / starpu_worker_get_relative_speedup(perf_arch);
  600. /*
  601. * This implements a default greedy scheduler for the
  602. * case of tasks which have no performance model, or
  603. * whose performance model is not calibrated yet.
  604. *
  605. * It simply uses the number of tasks already pushed to
  606. * the workers, divided by the relative performance of
  607. * a CPU and of a GPU.
  608. *
  609. * This is always computed, but the ntasks_best
  610. * selection is only really used if the task indeed has
  611. * no performance model, or is not calibrated yet.
  612. */
  613. if (ntasks_best == -1
  614. /* Always compute the greedy decision, at least for
  615. * the tasks with no performance model. */
  616. || (!calibrating && ntasks_end < ntasks_best_end)
  617. /* The performance model of this task is not
  618. * calibrated on this workerid, try to run it there
  619. * to calibrate it there. */
  620. || (!calibrating && isnan(local_task_length[worker_ctx][nimpl]))
  621. /* the performance model of this task is not
  622. * calibrated on this workerid either, rather run it
  623. * there if this one is low on scheduled tasks. */
  624. || (calibrating && isnan(local_task_length[worker_ctx][nimpl]) && ntasks_end < ntasks_best_end)
  625. )
  626. {
  627. ntasks_best_end = ntasks_end;
  628. ntasks_best = workerid;
  629. nimpl_best = nimpl;
  630. }
  631. if (isnan(local_task_length[worker_ctx][nimpl]))
  632. /* we are calibrating, we want to speed-up calibration time
  633. * so we privilege non-calibrated tasks (but still
  634. * greedily distribute them to avoid dumb schedules) */
  635. calibrating = 1;
  636. if (isnan(local_task_length[worker_ctx][nimpl])
  637. || _STARPU_IS_ZERO(local_task_length[worker_ctx][nimpl]))
  638. /* there is no prediction available for that task
  639. * with that arch (yet or at all), so switch to a greedy strategy */
  640. unknown = 1;
  641. if (unknown)
  642. continue;
  643. double task_starting_time = STARPU_MAX(exp_start + prev_exp_len, now + local_data_penalty[worker_ctx][nimpl]);
  644. exp_end[worker_ctx][nimpl] = task_starting_time + local_task_length[worker_ctx][nimpl];
  645. if (exp_end[worker_ctx][nimpl] < best_exp_end)
  646. {
  647. /* a better solution was found */
  648. best_exp_end = exp_end[worker_ctx][nimpl];
  649. nimpl_best = nimpl;
  650. }
  651. if (isnan(local_energy[worker_ctx][nimpl]))
  652. local_energy[worker_ctx][nimpl] = 0.;
  653. }
  654. worker_ctx++;
  655. }
  656. *forced_worker = unknown?ntasks_best:-1;
  657. *forced_impl = unknown?nimpl_best:-1;
  658. #ifdef STARPU_VERBOSE
  659. if (unknown)
  660. {
  661. dt->eager_task_cnt++;
  662. }
  663. #endif
  664. *best_exp_endp = best_exp_end;
  665. *max_exp_endp = max_exp_end;
  666. }
  667. static double _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sched_ctx_id, unsigned simulate, unsigned sorted_decision)
  668. {
  669. /* find the queue */
  670. int best = -1, best_in_ctx = -1;
  671. int selected_impl = 0;
  672. double model_best = 0.0;
  673. double transfer_model_best = 0.0;
  674. /* this flag is set if the corresponding worker is selected because
  675. there is no performance prediction available yet */
  676. int forced_best = -1;
  677. int forced_impl = -1;
  678. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  679. struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  680. unsigned nworkers_ctx = workers->nworkers;
  681. double local_task_length[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
  682. double local_data_penalty[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
  683. double local_energy[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
  684. /* Expected end of this task on the workers */
  685. double exp_end[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
  686. /* This is the minimum among the exp_end[] matrix */
  687. double best_exp_end;
  688. /* This is the maximum termination time of already-scheduled tasks over all workers */
  689. double max_exp_end = 0.0;
  690. double fitness[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
  691. compute_all_performance_predictions(task,
  692. nworkers_ctx,
  693. local_task_length,
  694. exp_end,
  695. &max_exp_end,
  696. &best_exp_end,
  697. local_data_penalty,
  698. local_energy,
  699. &forced_best,
  700. &forced_impl, sched_ctx_id, sorted_decision);
  701. if (forced_best == -1)
  702. {
  703. double best_fitness = -1;
  704. unsigned worker_ctx = 0;
  705. struct starpu_sched_ctx_iterator it;
  706. workers->init_iterator_for_parallel_tasks(workers, &it, task);
  707. while(worker_ctx < nworkers_ctx && workers->has_next(workers, &it))
  708. {
  709. unsigned worker = workers->get_next(workers, &it);
  710. unsigned nimpl;
  711. unsigned impl_mask;
  712. if (!starpu_worker_can_execute_task_impl(worker, task, &impl_mask))
  713. continue;
  714. for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
  715. {
  716. if (!(impl_mask & (1U << nimpl)))
  717. {
  718. /* no one on that queue may execute this task */
  719. continue;
  720. }
  721. fitness[worker_ctx][nimpl] = dt->alpha*(exp_end[worker_ctx][nimpl] - best_exp_end)
  722. + dt->beta*(local_data_penalty[worker_ctx][nimpl])
  723. + dt->_gamma*(local_energy[worker_ctx][nimpl]);
  724. if (exp_end[worker_ctx][nimpl] > max_exp_end)
  725. {
  726. /* This placement will make the computation
  727. * longer, take into account the idle
  728. * consumption of other cpus */
  729. fitness[worker_ctx][nimpl] += dt->_gamma * dt->idle_power * (exp_end[worker_ctx][nimpl] - max_exp_end) / 1000000.0;
  730. }
  731. if (best == -1 || fitness[worker_ctx][nimpl] < best_fitness)
  732. {
  733. /* we found a better solution */
  734. best_fitness = fitness[worker_ctx][nimpl];
  735. best = worker;
  736. best_in_ctx = worker_ctx;
  737. selected_impl = nimpl;
  738. //_STARPU_DEBUG("best fitness (worker %d) %e = alpha*(%e) + beta(%e) +gamma(%e)\n", worker, best_fitness, exp_end[worker][nimpl] - best_exp_end, local_data_penalty[worker][nimpl], local_energy[worker][nimpl]);
  739. }
  740. }
  741. worker_ctx++;
  742. }
  743. }
  744. STARPU_ASSERT(forced_best != -1 || best != -1);
  745. if (forced_best != -1)
  746. {
  747. /* there is no prediction available for that task
  748. * with that arch we want to speed-up calibration time
  749. * so we force this measurement */
  750. best = forced_best;
  751. selected_impl = forced_impl;
  752. model_best = 0.0;
  753. transfer_model_best = 0.0;
  754. }
  755. else if (task->bundle)
  756. {
  757. struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(best_in_ctx, sched_ctx_id);
  758. unsigned memory_node = starpu_worker_get_memory_node(best);
  759. model_best = starpu_task_expected_length(task, perf_arch, selected_impl);
  760. transfer_model_best = starpu_task_expected_data_transfer_time(memory_node, task);
  761. }
  762. else
  763. {
  764. model_best = local_task_length[best_in_ctx][selected_impl];
  765. transfer_model_best = local_data_penalty[best_in_ctx][selected_impl];
  766. }
  767. //_STARPU_DEBUG("Scheduler dmda: kernel (%u)\n", best_impl);
  768. starpu_task_set_implementation(task, selected_impl);
  769. _STARPU_TASK_BREAK_ON(task, sched);
  770. if(!simulate)
  771. {
  772. /* we should now have the best worker in variable "best" */
  773. return push_task_on_best_worker(task, best, model_best, transfer_model_best, prio, sched_ctx_id);
  774. }
  775. else
  776. {
  777. return exp_end[best_in_ctx][selected_impl] ;
  778. }
  779. }
  780. static int dmda_push_sorted_decision_task(struct starpu_task *task)
  781. {
  782. return _dmda_push_task(task, 1, task->sched_ctx, 0, 1);
  783. }
  784. static int dmda_push_sorted_task(struct starpu_task *task)
  785. {
  786. #ifdef STARPU_DEVEL
  787. #warning TODO: after defining a scheduling window, use that instead of empty_ctx_tasks
  788. #endif
  789. return _dmda_push_task(task, 1, task->sched_ctx, 0, 0);
  790. }
  791. static int dm_push_task(struct starpu_task *task)
  792. {
  793. return _dm_push_task(task, 0, task->sched_ctx);
  794. }
  795. static int dmda_push_task(struct starpu_task *task)
  796. {
  797. STARPU_ASSERT(task);
  798. return _dmda_push_task(task, 0, task->sched_ctx, 0, 0);
  799. }
  800. static double dmda_simulate_push_task(struct starpu_task *task)
  801. {
  802. STARPU_ASSERT(task);
  803. return _dmda_push_task(task, 0, task->sched_ctx, 1, 0);
  804. }
  805. static double dmda_simulate_push_sorted_task(struct starpu_task *task)
  806. {
  807. STARPU_ASSERT(task);
  808. return _dmda_push_task(task, 1, task->sched_ctx, 1, 0);
  809. }
  810. static double dmda_simulate_push_sorted_decision_task(struct starpu_task *task)
  811. {
  812. STARPU_ASSERT(task);
  813. return _dmda_push_task(task, 1, task->sched_ctx, 1, 1);
  814. }
  815. static void dmda_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  816. {
  817. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  818. unsigned i;
  819. for (i = 0; i < nworkers; i++)
  820. {
  821. struct _starpu_fifo_taskq *q;
  822. int workerid = workerids[i];
  823. /* if the worker has alreadry belonged to this context
  824. the queue and the synchronization variables have been already initialized */
  825. q = dt->queue_array[workerid];
  826. if(q == NULL)
  827. {
  828. q = dt->queue_array[workerid] = _starpu_create_fifo();
  829. /* These are only stats, they can be read with races */
  830. STARPU_HG_DISABLE_CHECKING(q->exp_start);
  831. STARPU_HG_DISABLE_CHECKING(q->exp_len);
  832. STARPU_HG_DISABLE_CHECKING(q->exp_end);
  833. }
  834. if(dt->num_priorities != -1)
  835. {
  836. _STARPU_MALLOC(q->exp_len_per_priority, dt->num_priorities*sizeof(double));
  837. _STARPU_MALLOC(q->ntasks_per_priority, dt->num_priorities*sizeof(unsigned));
  838. int j;
  839. for(j = 0; j < dt->num_priorities; j++)
  840. {
  841. q->exp_len_per_priority[j] = 0.0;
  842. q->ntasks_per_priority[j] = 0;
  843. }
  844. }
  845. }
  846. }
  847. static void dmda_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
  848. {
  849. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  850. unsigned i;
  851. for (i = 0; i < nworkers; i++)
  852. {
  853. int workerid = workerids[i];
  854. if(dt->queue_array[workerid] != NULL)
  855. {
  856. if(dt->num_priorities != -1)
  857. {
  858. free(dt->queue_array[workerid]->exp_len_per_priority);
  859. free(dt->queue_array[workerid]->ntasks_per_priority);
  860. }
  861. _starpu_destroy_fifo(dt->queue_array[workerid]);
  862. dt->queue_array[workerid] = NULL;
  863. }
  864. }
  865. }
  866. static void initialize_dmda_policy(unsigned sched_ctx_id)
  867. {
  868. struct _starpu_dmda_data *dt;
  869. _STARPU_CALLOC(dt, 1, sizeof(struct _starpu_dmda_data));
  870. starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)dt);
  871. _STARPU_MALLOC(dt->queue_array, STARPU_NMAXWORKERS*sizeof(struct _starpu_fifo_taskq*));
  872. int i;
  873. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  874. dt->queue_array[i] = NULL;
  875. dt->alpha = starpu_get_env_float_default("STARPU_SCHED_ALPHA", _STARPU_SCHED_ALPHA_DEFAULT);
  876. dt->beta = starpu_get_env_float_default("STARPU_SCHED_BETA", _STARPU_SCHED_BETA_DEFAULT);
  877. dt->_gamma = starpu_get_env_float_default("STARPU_SCHED_GAMMA", _STARPU_SCHED_GAMMA_DEFAULT);
  878. dt->idle_power = starpu_get_env_float_default("STARPU_IDLE_POWER", 0.0);
  879. if(starpu_sched_ctx_min_priority_is_set(sched_ctx_id) != 0 && starpu_sched_ctx_max_priority_is_set(sched_ctx_id) != 0)
  880. dt->num_priorities = starpu_sched_ctx_get_max_priority(sched_ctx_id) - starpu_sched_ctx_get_min_priority(sched_ctx_id) + 1;
  881. else
  882. dt->num_priorities = -1;
  883. #ifdef STARPU_USE_TOP
  884. /* FIXME: broken, needs to access context variable */
  885. starpu_top_register_parameter_float("DMDA_ALPHA", &alpha,
  886. alpha_minimum, alpha_maximum, param_modified);
  887. starpu_top_register_parameter_float("DMDA_BETA", &beta,
  888. beta_minimum, beta_maximum, param_modified);
  889. starpu_top_register_parameter_float("DMDA_GAMMA", &_gamma,
  890. gamma_minimum, gamma_maximum, param_modified);
  891. starpu_top_register_parameter_float("DMDA_IDLE_POWER", &idle_power,
  892. idle_power_minimum, idle_power_maximum, param_modified);
  893. #endif /* !STARPU_USE_TOP */
  894. }
  895. static void initialize_dmda_sorted_policy(unsigned sched_ctx_id)
  896. {
  897. initialize_dmda_policy(sched_ctx_id);
  898. /* The application may use any integer */
  899. if (starpu_sched_ctx_min_priority_is_set(sched_ctx_id) == 0)
  900. starpu_sched_ctx_set_min_priority(sched_ctx_id, INT_MIN);
  901. if (starpu_sched_ctx_max_priority_is_set(sched_ctx_id) == 0)
  902. starpu_sched_ctx_set_max_priority(sched_ctx_id, INT_MAX);
  903. }
  904. static void deinitialize_dmda_policy(unsigned sched_ctx_id)
  905. {
  906. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  907. #ifdef STARPU_VERBOSE
  908. {
  909. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  910. long int modelled_task_cnt = dt->total_task_cnt - dt->eager_task_cnt;
  911. _STARPU_DEBUG("%s sched policy (sched_ctx %u): total_task_cnt %ld ready_task_cnt %ld (%.1f%%), modelled_task_cnt = %ld (%.1f%%)%s\n",
  912. sched_ctx->sched_policy?sched_ctx->sched_policy->policy_name:"<none>",
  913. sched_ctx_id,
  914. dt->total_task_cnt,
  915. dt->ready_task_cnt,
  916. (100.0f*dt->ready_task_cnt)/dt->total_task_cnt,
  917. modelled_task_cnt,
  918. (100.0f*modelled_task_cnt)/dt->total_task_cnt,
  919. modelled_task_cnt==0?" *** Check if performance models are enabled and converging on a per-codelet basis, or use an non-modeling scheduling policy. ***":"");
  920. }
  921. #endif
  922. free(dt->queue_array);
  923. free(dt);
  924. }
  925. /* dmda_pre_exec_hook is called right after the data transfer is done and right
  926. * before the computation to begin, it is useful to update more precisely the
  927. * value of the expected start, end, length, etc... */
  928. static void dmda_pre_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
  929. {
  930. unsigned workerid = starpu_worker_get_id_check();
  931. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  932. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  933. const double now = starpu_timing_now();
  934. /* Once the task is executing, we can update the predicted amount
  935. * of work. */
  936. _starpu_worker_lock_self();
  937. _starpu_fifo_task_started(fifo, task, dt->num_priorities);
  938. /* Take the opportunity to update start time */
  939. fifo->exp_start = STARPU_MAX(now + fifo->pipeline_len, fifo->exp_start);
  940. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  941. _starpu_worker_unlock_self();
  942. }
  943. static void dmda_push_task_notify(struct starpu_task *task, int workerid, int perf_workerid, unsigned sched_ctx_id)
  944. {
  945. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  946. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  947. /* Compute the expected penality */
  948. struct starpu_perfmodel_arch *perf_arch = starpu_worker_get_perf_archtype(perf_workerid, sched_ctx_id);
  949. unsigned memory_node = starpu_worker_get_memory_node(workerid);
  950. double predicted = starpu_task_expected_length(task, perf_arch,
  951. starpu_task_get_implementation(task));
  952. double predicted_transfer = starpu_task_expected_data_transfer_time(memory_node, task);
  953. double now = starpu_timing_now();
  954. /* Update the predictions */
  955. _starpu_worker_lock(workerid);
  956. /* Sometimes workers didn't take the tasks as early as we expected */
  957. fifo->exp_start = isnan(fifo->exp_start) ? now + fifo->pipeline_len : STARPU_MAX(fifo->exp_start, now);
  958. fifo->exp_end = fifo->exp_start + fifo->exp_len;
  959. /* If there is no prediction available, we consider the task has a null length */
  960. if (!isnan(predicted_transfer))
  961. {
  962. if (now + predicted_transfer < fifo->exp_end)
  963. {
  964. /* We may hope that the transfer will be finished by
  965. * the start of the task. */
  966. predicted_transfer = 0;
  967. }
  968. else
  969. {
  970. /* The transfer will not be finished by then, take the
  971. * remainder into account */
  972. predicted_transfer = (now + predicted_transfer) - fifo->exp_end;
  973. }
  974. task->predicted_transfer = predicted_transfer;
  975. fifo->exp_end += predicted_transfer;
  976. fifo->exp_len += predicted_transfer;
  977. if(dt->num_priorities != -1)
  978. {
  979. int i;
  980. int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
  981. for(i = 0; i <= task_prio; i++)
  982. fifo->exp_len_per_priority[i] += predicted_transfer;
  983. }
  984. }
  985. /* If there is no prediction available, we consider the task has a null length */
  986. if (!isnan(predicted))
  987. {
  988. task->predicted = predicted;
  989. fifo->exp_end += predicted;
  990. fifo->exp_len += predicted;
  991. if(dt->num_priorities != -1)
  992. {
  993. int i;
  994. int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
  995. for(i = 0; i <= task_prio; i++)
  996. fifo->exp_len_per_priority[i] += predicted;
  997. }
  998. }
  999. if(dt->num_priorities != -1)
  1000. {
  1001. int i;
  1002. int task_prio = _normalize_prio(task->priority, dt->num_priorities, task->sched_ctx);
  1003. for(i = 0; i <= task_prio; i++)
  1004. fifo->ntasks_per_priority[i]++;
  1005. }
  1006. fifo->ntasks++;
  1007. _starpu_worker_unlock(workerid);
  1008. }
  1009. static void dmda_post_exec_hook(struct starpu_task * task, unsigned sched_ctx_id)
  1010. {
  1011. struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
  1012. unsigned workerid = starpu_worker_get_id_check();
  1013. struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
  1014. _starpu_worker_lock_self();
  1015. _starpu_fifo_task_finished(fifo, task, dt->num_priorities);
  1016. _starpu_worker_unlock_self();
  1017. }
  1018. struct starpu_sched_policy _starpu_sched_dm_policy =
  1019. {
  1020. .init_sched = initialize_dmda_policy,
  1021. .deinit_sched = deinitialize_dmda_policy,
  1022. .add_workers = dmda_add_workers ,
  1023. .remove_workers = dmda_remove_workers,
  1024. .push_task = dm_push_task,
  1025. .simulate_push_task = NULL,
  1026. .pop_task = dmda_pop_task,
  1027. .pre_exec_hook = dmda_pre_exec_hook,
  1028. .post_exec_hook = dmda_post_exec_hook,
  1029. .pop_every_task = dmda_pop_every_task,
  1030. .policy_name = "dm",
  1031. .policy_description = "performance model",
  1032. .worker_type = STARPU_WORKER_LIST,
  1033. };
  1034. struct starpu_sched_policy _starpu_sched_dmda_policy =
  1035. {
  1036. .init_sched = initialize_dmda_policy,
  1037. .deinit_sched = deinitialize_dmda_policy,
  1038. .add_workers = dmda_add_workers ,
  1039. .remove_workers = dmda_remove_workers,
  1040. .push_task = dmda_push_task,
  1041. .simulate_push_task = dmda_simulate_push_task,
  1042. .push_task_notify = dmda_push_task_notify,
  1043. .pop_task = dmda_pop_task,
  1044. .pre_exec_hook = dmda_pre_exec_hook,
  1045. .post_exec_hook = dmda_post_exec_hook,
  1046. .pop_every_task = dmda_pop_every_task,
  1047. .policy_name = "dmda",
  1048. .policy_description = "data-aware performance model",
  1049. .worker_type = STARPU_WORKER_LIST,
  1050. };
  1051. struct starpu_sched_policy _starpu_sched_dmda_sorted_policy =
  1052. {
  1053. .init_sched = initialize_dmda_sorted_policy,
  1054. .deinit_sched = deinitialize_dmda_policy,
  1055. .add_workers = dmda_add_workers ,
  1056. .remove_workers = dmda_remove_workers,
  1057. .push_task = dmda_push_sorted_task,
  1058. .simulate_push_task = dmda_simulate_push_sorted_task,
  1059. .push_task_notify = dmda_push_task_notify,
  1060. .pop_task = dmda_pop_ready_task,
  1061. .pre_exec_hook = dmda_pre_exec_hook,
  1062. .post_exec_hook = dmda_post_exec_hook,
  1063. .pop_every_task = dmda_pop_every_task,
  1064. .policy_name = "dmdas",
  1065. .policy_description = "data-aware performance model (sorted)",
  1066. .worker_type = STARPU_WORKER_LIST,
  1067. };
  1068. struct starpu_sched_policy _starpu_sched_dmda_sorted_decision_policy =
  1069. {
  1070. .init_sched = initialize_dmda_sorted_policy,
  1071. .deinit_sched = deinitialize_dmda_policy,
  1072. .add_workers = dmda_add_workers ,
  1073. .remove_workers = dmda_remove_workers,
  1074. .push_task = dmda_push_sorted_decision_task,
  1075. .simulate_push_task = dmda_simulate_push_sorted_decision_task,
  1076. .push_task_notify = dmda_push_task_notify,
  1077. .pop_task = dmda_pop_ready_task,
  1078. .pre_exec_hook = dmda_pre_exec_hook,
  1079. .post_exec_hook = dmda_post_exec_hook,
  1080. .pop_every_task = dmda_pop_every_task,
  1081. .policy_name = "dmdasd",
  1082. .policy_description = "data-aware performance model (sorted decision)",
  1083. .worker_type = STARPU_WORKER_LIST,
  1084. };
  1085. struct starpu_sched_policy _starpu_sched_dmda_ready_policy =
  1086. {
  1087. .init_sched = initialize_dmda_policy,
  1088. .deinit_sched = deinitialize_dmda_policy,
  1089. .add_workers = dmda_add_workers ,
  1090. .remove_workers = dmda_remove_workers,
  1091. .push_task = dmda_push_task,
  1092. .simulate_push_task = dmda_simulate_push_task,
  1093. .push_task_notify = dmda_push_task_notify,
  1094. .pop_task = dmda_pop_ready_task,
  1095. .pre_exec_hook = dmda_pre_exec_hook,
  1096. .post_exec_hook = dmda_post_exec_hook,
  1097. .pop_every_task = dmda_pop_every_task,
  1098. .policy_name = "dmdar",
  1099. .policy_description = "data-aware performance model (ready)",
  1100. .worker_type = STARPU_WORKER_LIST,
  1101. };