sched_policy.c 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2015 Université de Bordeaux
  4. * Copyright (C) 2010-2015 CNRS
  5. * Copyright (C) 2011 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <starpu.h>
  19. #include <common/config.h>
  20. #include <common/utils.h>
  21. #include <core/sched_policy.h>
  22. #include <profiling/profiling.h>
  23. #include <common/barrier.h>
  24. #include <core/debug.h>
  25. static int use_prefetch = 0;
  26. double idle[STARPU_NMAXWORKERS];
  27. double idle_start[STARPU_NMAXWORKERS];
  28. int starpu_get_prefetch_flag(void)
  29. {
  30. return use_prefetch;
  31. }
  32. static struct starpu_sched_policy *predefined_policies[] =
  33. {
  34. &_starpu_sched_modular_eager_policy,
  35. &_starpu_sched_modular_eager_prefetching_policy,
  36. &_starpu_sched_modular_prio_policy,
  37. &_starpu_sched_modular_prio_prefetching_policy,
  38. &_starpu_sched_modular_random_policy,
  39. &_starpu_sched_modular_random_prio_policy,
  40. &_starpu_sched_modular_random_prefetching_policy,
  41. &_starpu_sched_modular_random_prio_prefetching_policy,
  42. //&_starpu_sched_modular_ws_policy,
  43. &_starpu_sched_modular_heft_policy,
  44. &_starpu_sched_modular_heft2_policy,
  45. &_starpu_sched_eager_policy,
  46. &_starpu_sched_prio_policy,
  47. &_starpu_sched_random_policy,
  48. &_starpu_sched_lws_policy,
  49. &_starpu_sched_ws_policy,
  50. &_starpu_sched_dm_policy,
  51. &_starpu_sched_dmda_policy,
  52. &_starpu_sched_dmda_ready_policy,
  53. &_starpu_sched_dmda_sorted_policy,
  54. &_starpu_sched_dmda_sorted_decision_policy,
  55. &_starpu_sched_parallel_heft_policy,
  56. &_starpu_sched_peager_policy,
  57. &_starpu_sched_heteroprio_policy,
  58. NULL
  59. };
  60. struct starpu_sched_policy **starpu_sched_get_predefined_policies()
  61. {
  62. return predefined_policies;
  63. }
  64. struct starpu_sched_policy *_starpu_get_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  65. {
  66. return sched_ctx->sched_policy;
  67. }
  68. /*
  69. * Methods to initialize the scheduling policy
  70. */
  71. static void load_sched_policy(struct starpu_sched_policy *sched_policy, struct _starpu_sched_ctx *sched_ctx)
  72. {
  73. STARPU_ASSERT(sched_policy);
  74. #ifdef STARPU_VERBOSE
  75. if (sched_policy->policy_name)
  76. {
  77. if (sched_policy->policy_description)
  78. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  79. else
  80. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  81. }
  82. #endif
  83. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  84. memcpy(policy, sched_policy, sizeof(*policy));
  85. }
  86. static struct starpu_sched_policy *find_sched_policy_from_name(const char *policy_name)
  87. {
  88. if (!policy_name)
  89. return NULL;
  90. if (strncmp(policy_name, "heft", 5) == 0)
  91. {
  92. _STARPU_DISP("Warning: heft is now called \"dmda\".\n");
  93. return &_starpu_sched_dmda_policy;
  94. }
  95. struct starpu_sched_policy **policy;
  96. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  97. {
  98. struct starpu_sched_policy *p = *policy;
  99. if (p->policy_name)
  100. {
  101. if (strcmp(policy_name, p->policy_name) == 0)
  102. {
  103. /* we found a policy with the requested name */
  104. return p;
  105. }
  106. }
  107. }
  108. if (strcmp(policy_name, "help") != 0)
  109. fprintf(stderr, "Warning: scheduling policy \"%s\" was not found, try \"help\" to get a list\n", policy_name);
  110. /* nothing was found */
  111. return NULL;
  112. }
  113. static void display_sched_help_message(void)
  114. {
  115. const char *sched_env = starpu_getenv("STARPU_SCHED");
  116. if (sched_env && (strcmp(sched_env, "help") == 0))
  117. {
  118. /* display the description of all predefined policies */
  119. struct starpu_sched_policy **policy;
  120. fprintf(stderr, "\nThe variable STARPU_SCHED can be set to one of the following strings:\n");
  121. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  122. {
  123. struct starpu_sched_policy *p = *policy;
  124. fprintf(stderr, "%-30s\t-> %s\n", p->policy_name, p->policy_description);
  125. }
  126. fprintf(stderr, "\n");
  127. }
  128. }
  129. struct starpu_sched_policy *_starpu_select_sched_policy(struct _starpu_machine_config *config, const char *required_policy)
  130. {
  131. struct starpu_sched_policy *selected_policy = NULL;
  132. struct starpu_conf *user_conf = &config->conf;
  133. if(required_policy)
  134. selected_policy = find_sched_policy_from_name(required_policy);
  135. /* First, we check whether the application explicitely gave a scheduling policy or not */
  136. if (!selected_policy && user_conf && (user_conf->sched_policy))
  137. return user_conf->sched_policy;
  138. /* Otherwise, we look if the application specified the name of a policy to load */
  139. const char *sched_pol_name;
  140. sched_pol_name = starpu_getenv("STARPU_SCHED");
  141. if (sched_pol_name == NULL && user_conf && user_conf->sched_policy_name)
  142. sched_pol_name = user_conf->sched_policy_name;
  143. if (!selected_policy && sched_pol_name)
  144. selected_policy = find_sched_policy_from_name(sched_pol_name);
  145. /* Perhaps there was no policy that matched the name */
  146. if (selected_policy)
  147. return selected_policy;
  148. /* If no policy was specified, we use the greedy policy as a default */
  149. return &_starpu_sched_eager_policy;
  150. }
  151. void _starpu_init_sched_policy(struct _starpu_machine_config *config, struct _starpu_sched_ctx *sched_ctx, struct starpu_sched_policy *selected_policy)
  152. {
  153. /* Perhaps we have to display some help */
  154. display_sched_help_message();
  155. /* Prefetch is activated by default */
  156. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  157. if (use_prefetch == -1)
  158. use_prefetch = 1;
  159. /* Set calibrate flag */
  160. _starpu_set_calibrate_flag(config->conf.calibrate);
  161. load_sched_policy(selected_policy, sched_ctx);
  162. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  163. sched_ctx->sched_policy->init_sched(sched_ctx->id);
  164. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  165. }
  166. void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  167. {
  168. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  169. if (policy->deinit_sched)
  170. {
  171. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  172. policy->deinit_sched(sched_ctx->id);
  173. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  174. }
  175. }
  176. static void _starpu_push_task_on_specific_worker_notify_sched(struct starpu_task *task, struct _starpu_worker *worker, int workerid, int perf_workerid)
  177. {
  178. /* if we push a task on a specific worker, notify all the sched_ctxs the worker belongs to */
  179. struct _starpu_sched_ctx *sched_ctx;
  180. struct _starpu_sched_ctx_elt *e = NULL;
  181. struct _starpu_sched_ctx_list_iterator list_it;
  182. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  183. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  184. {
  185. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  186. sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
  187. if (sched_ctx->sched_policy != NULL && sched_ctx->sched_policy->push_task_notify)
  188. {
  189. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  190. sched_ctx->sched_policy->push_task_notify(task, workerid, perf_workerid, sched_ctx->id);
  191. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  192. }
  193. }
  194. }
  195. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  196. * case workerid identifies a combined worker, a task will be enqueued into
  197. * each worker of the combination. */
  198. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  199. {
  200. int nbasic_workers = (int)starpu_worker_get_count();
  201. /* Is this a basic worker or a combined worker ? */
  202. int is_basic_worker = (workerid < nbasic_workers);
  203. unsigned memory_node;
  204. struct _starpu_worker *worker = NULL;
  205. struct _starpu_combined_worker *combined_worker = NULL;
  206. if (is_basic_worker)
  207. {
  208. worker = _starpu_get_worker_struct(workerid);
  209. memory_node = worker->memory_node;
  210. }
  211. else
  212. {
  213. combined_worker = _starpu_get_combined_worker_struct(workerid);
  214. memory_node = combined_worker->memory_node;
  215. }
  216. if (use_prefetch)
  217. starpu_prefetch_task_input_on_node(task, memory_node);
  218. if (is_basic_worker)
  219. _starpu_push_task_on_specific_worker_notify_sched(task, worker, workerid, workerid);
  220. else
  221. {
  222. /* Notify all workers of the combined worker */
  223. int worker_size = combined_worker->worker_size;
  224. int *combined_workerid = combined_worker->combined_workerid;
  225. int j;
  226. for (j = 0; j < worker_size; j++)
  227. {
  228. int subworkerid = combined_workerid[j];
  229. _starpu_push_task_on_specific_worker_notify_sched(task, _starpu_get_worker_struct(subworkerid), subworkerid, workerid);
  230. }
  231. }
  232. #ifdef STARPU_USE_SC_HYPERVISOR
  233. starpu_sched_ctx_call_pushed_task_cb(workerid, task->sched_ctx);
  234. #endif //STARPU_USE_SC_HYPERVISOR
  235. unsigned i;
  236. if (is_basic_worker)
  237. {
  238. unsigned node = starpu_worker_get_memory_node(workerid);
  239. if (_starpu_task_uses_multiformat_handles(task))
  240. {
  241. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  242. for (i = 0; i < nbuffers; i++)
  243. {
  244. struct starpu_task *conversion_task;
  245. starpu_data_handle_t handle;
  246. handle = STARPU_TASK_GET_HANDLE(task, i);
  247. if (!_starpu_handle_needs_conversion_task(handle, node))
  248. continue;
  249. conversion_task = _starpu_create_conversion_task(handle, node);
  250. conversion_task->mf_skip = 1;
  251. conversion_task->execute_on_a_specific_worker = 1;
  252. conversion_task->workerid = workerid;
  253. _starpu_task_submit_conversion_task(conversion_task, workerid);
  254. //_STARPU_DEBUG("Pushing a conversion task\n");
  255. }
  256. for (i = 0; i < nbuffers; i++)
  257. {
  258. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  259. handle->mf_node = node;
  260. }
  261. }
  262. // if(task->sched_ctx != _starpu_get_initial_sched_ctx()->id)
  263. if(task->priority > 0)
  264. return _starpu_push_local_task(worker, task, 1);
  265. else
  266. return _starpu_push_local_task(worker, task, 0);
  267. }
  268. else
  269. {
  270. /* This is a combined worker so we create task aliases */
  271. int worker_size = combined_worker->worker_size;
  272. int *combined_workerid = combined_worker->combined_workerid;
  273. int ret = 0;
  274. struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
  275. job->task_size = worker_size;
  276. job->combined_workerid = workerid;
  277. job->active_task_alias_count = 0;
  278. STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, worker_size);
  279. STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, worker_size);
  280. job->after_work_busy_barrier = worker_size;
  281. /* Note: we have to call that early, or else the task may have
  282. * disappeared already */
  283. starpu_push_task_end(task);
  284. int j;
  285. for (j = 0; j < worker_size; j++)
  286. {
  287. struct starpu_task *alias = starpu_task_dup(task);
  288. alias->destroy = 1;
  289. worker = _starpu_get_worker_struct(combined_workerid[j]);
  290. ret |= _starpu_push_local_task(worker, alias, 0);
  291. }
  292. return ret;
  293. }
  294. }
  295. /* the generic interface that call the proper underlying implementation */
  296. int _starpu_push_task(struct _starpu_job *j)
  297. {
  298. if(j->task->prologue_callback_func)
  299. j->task->prologue_callback_func(j->task->prologue_callback_arg);
  300. return _starpu_repush_task(j);
  301. }
  302. int _starpu_repush_task(struct _starpu_job *j)
  303. {
  304. struct starpu_task *task = j->task;
  305. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  306. unsigned nworkers = 0;
  307. int ret;
  308. _STARPU_LOG_IN();
  309. unsigned can_push = _starpu_increment_nready_tasks_of_sched_ctx(task->sched_ctx, task->flops, task);
  310. task->status = STARPU_TASK_READY;
  311. #ifdef HAVE_AYUDAME_H
  312. if (AYU_event)
  313. {
  314. intptr_t id = -1;
  315. AYU_event(AYU_ADDTASKTOQUEUE, j->job_id, &id);
  316. }
  317. #endif
  318. /* if the context does not have any workers save the tasks in a temp list */
  319. if(!sched_ctx->is_initial_sched)
  320. {
  321. /*if there are workers in the ctx that are not able to execute tasks
  322. we consider the ctx empty */
  323. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  324. if(nworkers == 0)
  325. {
  326. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  327. starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
  328. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  329. #ifdef STARPU_USE_SC_HYPERVISOR
  330. if(sched_ctx != NULL && sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  331. && sched_ctx->perf_counters->notify_empty_ctx)
  332. {
  333. _STARPU_TRACE_HYPERVISOR_BEGIN();
  334. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  335. _STARPU_TRACE_HYPERVISOR_END();
  336. }
  337. #endif
  338. return 0;
  339. }
  340. }
  341. if(!can_push)
  342. return 0;
  343. /* in case there is no codelet associated to the task (that's a control
  344. * task), we directly execute its callback and enforce the
  345. * corresponding dependencies */
  346. if (task->cl == NULL || task->cl->where == STARPU_NOWHERE)
  347. {
  348. if(task->prologue_callback_pop_func)
  349. task->prologue_callback_pop_func(task->prologue_callback_pop_arg);
  350. _starpu_handle_job_termination(j);
  351. _STARPU_LOG_OUT_TAG("handle_job_termination");
  352. return 0;
  353. }
  354. ret = _starpu_push_task_to_workers(task);
  355. if (ret == -EAGAIN)
  356. /* pushed to empty context, that's fine */
  357. ret = 0;
  358. return ret;
  359. }
  360. int _starpu_push_task_to_workers(struct starpu_task *task)
  361. {
  362. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  363. unsigned nworkers = 0;
  364. _STARPU_TRACE_JOB_PUSH(task, task->priority > 0);
  365. /* if the contexts still does not have workers put the task back to its place in
  366. the empty ctx list */
  367. if(!sched_ctx->is_initial_sched)
  368. {
  369. /*if there are workers in the ctx that are not able to execute tasks
  370. we consider the ctx empty */
  371. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  372. if (nworkers == 0)
  373. {
  374. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  375. starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task);
  376. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  377. #ifdef STARPU_USE_SC_HYPERVISOR
  378. if(sched_ctx != NULL && sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  379. && sched_ctx->perf_counters->notify_empty_ctx)
  380. {
  381. _STARPU_TRACE_HYPERVISOR_BEGIN();
  382. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  383. _STARPU_TRACE_HYPERVISOR_END();
  384. }
  385. #endif
  386. return -EAGAIN;
  387. }
  388. }
  389. _starpu_profiling_set_task_push_start_time(task);
  390. int ret = 0;
  391. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  392. {
  393. unsigned node = starpu_worker_get_memory_node(task->workerid);
  394. if (starpu_get_prefetch_flag())
  395. starpu_prefetch_task_input_on_node(task, node);
  396. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  397. }
  398. else
  399. {
  400. struct _starpu_machine_config *config = _starpu_get_machine_config();
  401. /* When a task can only be executed on a given arch and we have
  402. * only one memory node for that arch, we can systematically
  403. * prefetch before the scheduling decision. */
  404. if (starpu_get_prefetch_flag())
  405. {
  406. if (task->cl->where == STARPU_CPU && config->cpus_nodeid >= 0)
  407. starpu_prefetch_task_input_on_node(task, config->cpus_nodeid);
  408. else if (task->cl->where == STARPU_CUDA && config->cuda_nodeid >= 0)
  409. starpu_prefetch_task_input_on_node(task, config->cuda_nodeid);
  410. else if (task->cl->where == STARPU_OPENCL && config->opencl_nodeid >= 0)
  411. starpu_prefetch_task_input_on_node(task, config->opencl_nodeid);
  412. else if (task->cl->where == STARPU_MIC && config->mic_nodeid >= 0)
  413. starpu_prefetch_task_input_on_node(task, config->mic_nodeid);
  414. else if (task->cl->where == STARPU_SCC && config->scc_nodeid >= 0)
  415. starpu_prefetch_task_input_on_node(task, config->scc_nodeid);
  416. }
  417. if(!sched_ctx->sched_policy)
  418. {
  419. if(!sched_ctx->awake_workers)
  420. ret = _starpu_push_task_on_specific_worker(task, sched_ctx->main_master);
  421. else
  422. {
  423. struct starpu_worker_collection *workers = sched_ctx->workers;
  424. struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
  425. job->task_size = workers->nworkers;
  426. job->combined_workerid = -1; // workerid; its a ctx not combined worker
  427. job->active_task_alias_count = 0;
  428. STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, workers->nworkers);
  429. STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, workers->nworkers);
  430. job->after_work_busy_barrier = workers->nworkers;
  431. /* Note: we have to call that early, or else the task may have
  432. * disappeared already */
  433. starpu_push_task_end(task);
  434. unsigned workerid;
  435. struct starpu_sched_ctx_iterator it;
  436. if(workers->init_iterator)
  437. workers->init_iterator(workers, &it);
  438. while(workers->has_next(workers, &it))
  439. {
  440. workerid = workers->get_next(workers, &it);
  441. struct starpu_task *alias = starpu_task_dup(task);
  442. alias->destroy = 1;
  443. ret |= _starpu_push_task_on_specific_worker(alias, workerid);
  444. }
  445. }
  446. }
  447. else
  448. {
  449. STARPU_ASSERT(sched_ctx->sched_policy->push_task);
  450. /* check out if there are any workers in the context */
  451. starpu_pthread_rwlock_t *changing_ctx_mutex = _starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx->id);
  452. STARPU_PTHREAD_RWLOCK_RDLOCK(changing_ctx_mutex);
  453. nworkers = starpu_sched_ctx_get_nworkers(sched_ctx->id);
  454. if (nworkers == 0)
  455. ret = -1;
  456. else
  457. {
  458. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  459. ret = sched_ctx->sched_policy->push_task(task);
  460. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  461. }
  462. STARPU_PTHREAD_RWLOCK_UNLOCK(changing_ctx_mutex);
  463. }
  464. if(ret == -1)
  465. {
  466. fprintf(stderr, "repush task \n");
  467. _STARPU_TRACE_JOB_POP(task, task->priority > 0);
  468. ret = _starpu_push_task_to_workers(task);
  469. }
  470. }
  471. /* Note: from here, the task might have been destroyed already! */
  472. _STARPU_LOG_OUT();
  473. return ret;
  474. }
  475. /* This is called right after the scheduler has pushed a task to a queue
  476. * but just before releasing mutexes: we need the task to still be alive!
  477. */
  478. int starpu_push_task_end(struct starpu_task *task)
  479. {
  480. _STARPU_TRACE_JOB_POP(task, task->priority > 0);
  481. _starpu_profiling_set_task_push_end_time(task);
  482. task->scheduled = 1;
  483. return 0;
  484. }
  485. /*
  486. * Given a handle that needs to be converted in order to be used on the given
  487. * node, returns a task that takes care of the conversion.
  488. */
  489. struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle,
  490. unsigned int node)
  491. {
  492. return _starpu_create_conversion_task_for_arch(handle, starpu_node_get_kind(node));
  493. }
  494. struct starpu_task *_starpu_create_conversion_task_for_arch(starpu_data_handle_t handle,
  495. enum starpu_node_kind node_kind)
  496. {
  497. struct starpu_task *conversion_task;
  498. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID)
  499. struct starpu_multiformat_interface *format_interface;
  500. #endif
  501. conversion_task = starpu_task_create();
  502. conversion_task->name = "conversion_task";
  503. conversion_task->synchronous = 0;
  504. STARPU_TASK_SET_HANDLE(conversion_task, handle, 0);
  505. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID)
  506. /* The node does not really matter here */
  507. format_interface = (struct starpu_multiformat_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
  508. #endif
  509. _starpu_spin_lock(&handle->header_lock);
  510. handle->refcnt++;
  511. handle->busy_count++;
  512. _starpu_spin_unlock(&handle->header_lock);
  513. switch(node_kind)
  514. {
  515. case STARPU_CPU_RAM:
  516. case STARPU_SCC_RAM:
  517. case STARPU_SCC_SHM:
  518. switch (starpu_node_get_kind(handle->mf_node))
  519. {
  520. case STARPU_CPU_RAM:
  521. case STARPU_SCC_RAM:
  522. case STARPU_SCC_SHM:
  523. STARPU_ABORT();
  524. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  525. case STARPU_CUDA_RAM:
  526. {
  527. struct starpu_multiformat_data_interface_ops *mf_ops;
  528. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  529. conversion_task->cl = mf_ops->cuda_to_cpu_cl;
  530. break;
  531. }
  532. #endif
  533. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  534. case STARPU_OPENCL_RAM:
  535. {
  536. struct starpu_multiformat_data_interface_ops *mf_ops;
  537. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  538. conversion_task->cl = mf_ops->opencl_to_cpu_cl;
  539. break;
  540. }
  541. #endif
  542. #ifdef STARPU_USE_MIC
  543. case STARPU_MIC_RAM:
  544. {
  545. struct starpu_multiformat_data_interface_ops *mf_ops;
  546. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  547. conversion_task->cl = mf_ops->mic_to_cpu_cl;
  548. break;
  549. }
  550. #endif
  551. default:
  552. _STARPU_ERROR("Oops : %u\n", handle->mf_node);
  553. }
  554. break;
  555. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  556. case STARPU_CUDA_RAM:
  557. {
  558. struct starpu_multiformat_data_interface_ops *mf_ops;
  559. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  560. conversion_task->cl = mf_ops->cpu_to_cuda_cl;
  561. break;
  562. }
  563. #endif
  564. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  565. case STARPU_OPENCL_RAM:
  566. {
  567. struct starpu_multiformat_data_interface_ops *mf_ops;
  568. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  569. conversion_task->cl = mf_ops->cpu_to_opencl_cl;
  570. break;
  571. }
  572. #endif
  573. #ifdef STARPU_USE_MIC
  574. case STARPU_MIC_RAM:
  575. {
  576. struct starpu_multiformat_data_interface_ops *mf_ops;
  577. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  578. conversion_task->cl = mf_ops->cpu_to_mic_cl;
  579. break;
  580. }
  581. #endif
  582. default:
  583. STARPU_ABORT();
  584. }
  585. STARPU_TASK_SET_MODE(conversion_task, STARPU_RW, 0);
  586. return conversion_task;
  587. }
  588. static
  589. struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker *worker)
  590. {
  591. struct _starpu_sched_ctx_elt *e = NULL;
  592. struct _starpu_sched_ctx_list_iterator list_it;
  593. int found = 0;
  594. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  595. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  596. {
  597. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  598. if (e->task_number > 0)
  599. return _starpu_get_sched_ctx_struct(e->sched_ctx);
  600. }
  601. _starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
  602. while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  603. {
  604. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  605. if (e->last_poped)
  606. {
  607. e->last_poped = 0;
  608. if (_starpu_sched_ctx_list_iterator_has_next(&list_it))
  609. {
  610. e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
  611. found = 1;
  612. }
  613. break;
  614. }
  615. }
  616. if (!found)
  617. e = worker->sched_ctx_list->head;
  618. e->last_poped = 1;
  619. return _starpu_get_sched_ctx_struct(e->sched_ctx);
  620. }
  621. struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
  622. {
  623. struct starpu_task *task;
  624. int worker_id;
  625. unsigned node;
  626. /* We can't tell in advance which task will be picked up, so we measure
  627. * a timestamp, and will attribute it afterwards to the task. */
  628. int profiling = starpu_profiling_status_get();
  629. struct timespec pop_start_time;
  630. if (profiling)
  631. _starpu_clock_gettime(&pop_start_time);
  632. pick:
  633. /* perhaps there is some local task to be executed first */
  634. task = _starpu_pop_local_task(worker);
  635. /* get tasks from the stacks of the strategy */
  636. if(!task)
  637. {
  638. struct _starpu_sched_ctx *sched_ctx ;
  639. #ifndef STARPU_NON_BLOCKING_DRIVERS
  640. int been_here[STARPU_NMAX_SCHED_CTXS];
  641. int i;
  642. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  643. been_here[i] = 0;
  644. while(!task)
  645. #endif
  646. {
  647. if(worker->nsched_ctxs == 1)
  648. sched_ctx = _starpu_get_initial_sched_ctx();
  649. else
  650. {
  651. while(1)
  652. {
  653. /** Caution
  654. * If you use multiple contexts your scheduler *needs*
  655. * to update the variable task_number of the ctx list.
  656. * In order to get the best performances.
  657. * This is done using functions :
  658. * starpu_sched_ctx_list_task_counters_increment...(...)
  659. * starpu_sched_ctx_list_task_counters_decrement...(...)
  660. **/
  661. sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
  662. if(worker->removed_from_ctx[sched_ctx->id] == 1 && worker->shares_tasks_lists[sched_ctx->id] == 1)
  663. {
  664. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  665. worker->removed_from_ctx[sched_ctx->id] = 0;
  666. sched_ctx = NULL;
  667. }
  668. else
  669. break;
  670. }
  671. }
  672. if(sched_ctx && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  673. {
  674. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
  675. {
  676. task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
  677. }
  678. }
  679. if(!task)
  680. {
  681. /* it doesn't matter if it shares tasks list or not in the scheduler,
  682. if it does not have any task to pop just get it out of here */
  683. /* however if it shares a task list it will be removed as soon as he
  684. finishes this job (in handle_job_termination) */
  685. if(worker->removed_from_ctx[sched_ctx->id])
  686. {
  687. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  688. worker->removed_from_ctx[sched_ctx->id] = 0;
  689. }
  690. #ifdef STARPU_USE_SC_HYPERVISOR
  691. if(worker->pop_ctx_priority)
  692. {
  693. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  694. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_idle_cycle && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
  695. {
  696. // _STARPU_TRACE_HYPERVISOR_BEGIN();
  697. perf_counters->notify_idle_cycle(sched_ctx->id, worker->workerid, 1.0);
  698. // _STARPU_TRACE_HYPERVISOR_END();
  699. }
  700. }
  701. #endif //STARPU_USE_SC_HYPERVISOR
  702. #ifndef STARPU_NON_BLOCKING_DRIVERS
  703. if(been_here[sched_ctx->id] || worker->nsched_ctxs == 1)
  704. break;
  705. been_here[sched_ctx->id] = 1;
  706. #endif
  707. }
  708. }
  709. }
  710. if (!task)
  711. {
  712. idle_start[worker->workerid] = starpu_timing_now();
  713. return NULL;
  714. }
  715. if(idle_start[worker->workerid] != 0.0)
  716. {
  717. double idle_end = starpu_timing_now();
  718. idle[worker->workerid] += (idle_end - idle_start[worker->workerid]);
  719. idle_start[worker->workerid] = 0.0;
  720. }
  721. #ifdef STARPU_USE_SC_HYPERVISOR
  722. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  723. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  724. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_poped_task && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
  725. {
  726. // _STARPU_TRACE_HYPERVISOR_BEGIN();
  727. perf_counters->notify_poped_task(task->sched_ctx, worker->workerid);
  728. // _STARPU_TRACE_HYPERVISOR_END();
  729. }
  730. #endif //STARPU_USE_SC_HYPERVISOR
  731. /* Make sure we do not bother with all the multiformat-specific code if
  732. * it is not necessary. */
  733. if (!_starpu_task_uses_multiformat_handles(task))
  734. goto profiling;
  735. /* This is either a conversion task, or a regular task for which the
  736. * conversion tasks have already been created and submitted */
  737. if (task->mf_skip)
  738. goto profiling;
  739. /*
  740. * This worker may not be able to execute this task. In this case, we
  741. * should return the task anyway. It will be pushed back almost immediatly.
  742. * This way, we avoid computing and executing the conversions tasks.
  743. * Here, we do not care about what implementation is used.
  744. */
  745. worker_id = starpu_worker_get_id();
  746. if (!starpu_worker_can_execute_task_first_impl(worker_id, task, NULL))
  747. return task;
  748. node = starpu_worker_get_memory_node(worker_id);
  749. /*
  750. * We do have a task that uses multiformat handles. Let's create the
  751. * required conversion tasks.
  752. */
  753. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  754. unsigned i;
  755. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  756. for (i = 0; i < nbuffers; i++)
  757. {
  758. struct starpu_task *conversion_task;
  759. starpu_data_handle_t handle;
  760. handle = STARPU_TASK_GET_HANDLE(task, i);
  761. if (!_starpu_handle_needs_conversion_task(handle, node))
  762. continue;
  763. conversion_task = _starpu_create_conversion_task(handle, node);
  764. conversion_task->mf_skip = 1;
  765. conversion_task->execute_on_a_specific_worker = 1;
  766. conversion_task->workerid = worker_id;
  767. /*
  768. * Next tasks will need to know where these handles have gone.
  769. */
  770. handle->mf_node = node;
  771. _starpu_task_submit_conversion_task(conversion_task, worker_id);
  772. }
  773. task->mf_skip = 1;
  774. starpu_task_list_push_back(&worker->local_tasks, task);
  775. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  776. goto pick;
  777. profiling:
  778. if (profiling)
  779. {
  780. struct starpu_profiling_task_info *profiling_info;
  781. profiling_info = task->profiling_info;
  782. /* The task may have been created before profiling was enabled,
  783. * so we check if the profiling_info structure is available
  784. * even though we already tested if profiling is enabled. */
  785. if (profiling_info)
  786. {
  787. memcpy(&profiling_info->pop_start_time,
  788. &pop_start_time, sizeof(struct timespec));
  789. _starpu_clock_gettime(&profiling_info->pop_end_time);
  790. }
  791. }
  792. if(task->prologue_callback_pop_func)
  793. task->prologue_callback_pop_func(task->prologue_callback_pop_arg);
  794. return task;
  795. }
  796. struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx)
  797. {
  798. struct starpu_task *task = NULL;
  799. if(sched_ctx->sched_policy)
  800. {
  801. STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
  802. /* TODO set profiling info */
  803. if(sched_ctx->sched_policy->pop_every_task)
  804. {
  805. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  806. task = sched_ctx->sched_policy->pop_every_task(sched_ctx->id);
  807. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  808. }
  809. }
  810. return task;
  811. }
  812. void _starpu_sched_pre_exec_hook(struct starpu_task *task)
  813. {
  814. unsigned sched_ctx_id = starpu_sched_ctx_get_ctx_for_task(task);
  815. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  816. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pre_exec_hook)
  817. {
  818. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  819. sched_ctx->sched_policy->pre_exec_hook(task);
  820. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  821. }
  822. }
  823. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  824. {
  825. unsigned sched_ctx_id = starpu_sched_ctx_get_ctx_for_task(task);
  826. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
  827. if (sched_ctx->sched_policy && sched_ctx->sched_policy->post_exec_hook)
  828. {
  829. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  830. sched_ctx->sched_policy->post_exec_hook(task);
  831. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  832. }
  833. }
  834. void _starpu_wait_on_sched_event(void)
  835. {
  836. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  837. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  838. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  839. if (_starpu_machine_is_running())
  840. {
  841. #ifndef STARPU_NON_BLOCKING_DRIVERS
  842. STARPU_PTHREAD_COND_WAIT(&worker->sched_cond,
  843. &worker->sched_mutex);
  844. #endif
  845. }
  846. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  847. }
  848. /* The scheduling policy may put tasks directly into a worker's local queue so
  849. * that it is not always necessary to create its own queue when the local queue
  850. * is sufficient. If "back" not null, the task is put at the back of the queue
  851. * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  852. * a FIFO ordering. */
  853. int starpu_push_local_task(int workerid, struct starpu_task *task, int prio)
  854. {
  855. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  856. return _starpu_push_local_task(worker, task, prio);
  857. }
  858. void _starpu_print_idle_time()
  859. {
  860. double all_idle = 0.0;
  861. int i = 0;
  862. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  863. all_idle += idle[i];
  864. FILE *f;
  865. const char *sched_env = starpu_getenv("STARPU_IDLE_FILE");
  866. if(!sched_env)
  867. sched_env = "starpu_idle_microsec.log";
  868. f = fopen(sched_env, "a");
  869. if (!f)
  870. {
  871. fprintf(stderr, "couldn't open %s: %s\n", sched_env, strerror(errno));
  872. }
  873. else
  874. {
  875. fprintf(f, "%lf \n", all_idle);
  876. fclose(f);
  877. }
  878. }