sched_policy.c 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2015 Université de Bordeaux
  4. * Copyright (C) 2010-2015 CNRS
  5. * Copyright (C) 2011 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <starpu.h>
  19. #include <common/config.h>
  20. #include <common/utils.h>
  21. #include <core/sched_policy.h>
  22. #include <profiling/profiling.h>
  23. #include <common/barrier.h>
  24. #include <core/debug.h>
  25. static int use_prefetch = 0;
  26. double idle[STARPU_NMAXWORKERS];
  27. double idle_start[STARPU_NMAXWORKERS];
  28. int starpu_get_prefetch_flag(void)
  29. {
  30. return use_prefetch;
  31. }
  32. static struct starpu_sched_policy *predefined_policies[] =
  33. {
  34. &_starpu_sched_modular_eager_policy,
  35. &_starpu_sched_modular_eager_prefetching_policy,
  36. &_starpu_sched_modular_prio_policy,
  37. &_starpu_sched_modular_prio_prefetching_policy,
  38. &_starpu_sched_modular_random_policy,
  39. &_starpu_sched_modular_random_prio_policy,
  40. &_starpu_sched_modular_random_prefetching_policy,
  41. &_starpu_sched_modular_random_prio_prefetching_policy,
  42. //&_starpu_sched_modular_ws_policy,
  43. &_starpu_sched_modular_heft_policy,
  44. &_starpu_sched_modular_heft2_policy,
  45. &_starpu_sched_eager_policy,
  46. &_starpu_sched_prio_policy,
  47. &_starpu_sched_random_policy,
  48. &_starpu_sched_lws_policy,
  49. &_starpu_sched_ws_policy,
  50. &_starpu_sched_dm_policy,
  51. &_starpu_sched_dmda_policy,
  52. &_starpu_sched_dmda_ready_policy,
  53. &_starpu_sched_dmda_sorted_policy,
  54. &_starpu_sched_dmda_sorted_decision_policy,
  55. &_starpu_sched_parallel_heft_policy,
  56. &_starpu_sched_peager_policy,
  57. NULL
  58. };
  59. struct starpu_sched_policy **starpu_sched_get_predefined_policies()
  60. {
  61. return predefined_policies;
  62. }
  63. struct starpu_sched_policy *_starpu_get_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  64. {
  65. return sched_ctx->sched_policy;
  66. }
  67. /*
  68. * Methods to initialize the scheduling policy
  69. */
  70. static void load_sched_policy(struct starpu_sched_policy *sched_policy, struct _starpu_sched_ctx *sched_ctx)
  71. {
  72. STARPU_ASSERT(sched_policy);
  73. #ifdef STARPU_VERBOSE
  74. if (sched_policy->policy_name)
  75. {
  76. if (sched_policy->policy_description)
  77. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  78. else
  79. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  80. }
  81. #endif
  82. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  83. memcpy(policy, sched_policy, sizeof(*policy));
  84. }
  85. static struct starpu_sched_policy *find_sched_policy_from_name(const char *policy_name)
  86. {
  87. if (!policy_name)
  88. return NULL;
  89. if (strncmp(policy_name, "heft", 5) == 0)
  90. {
  91. _STARPU_DISP("Warning: heft is now called \"dmda\".\n");
  92. return &_starpu_sched_dmda_policy;
  93. }
  94. struct starpu_sched_policy **policy;
  95. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  96. {
  97. struct starpu_sched_policy *p = *policy;
  98. if (p->policy_name)
  99. {
  100. if (strcmp(policy_name, p->policy_name) == 0)
  101. {
  102. /* we found a policy with the requested name */
  103. return p;
  104. }
  105. }
  106. }
  107. if (strcmp(policy_name, "help") != 0)
  108. fprintf(stderr, "Warning: scheduling policy \"%s\" was not found, try \"help\" to get a list\n", policy_name);
  109. /* nothing was found */
  110. return NULL;
  111. }
  112. static void display_sched_help_message(void)
  113. {
  114. const char *sched_env = getenv("STARPU_SCHED");
  115. if (sched_env && (strcmp(sched_env, "help") == 0))
  116. {
  117. /* display the description of all predefined policies */
  118. struct starpu_sched_policy **policy;
  119. fprintf(stderr, "\nThe variable STARPU_SCHED can be set to one of the following strings:\n");
  120. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  121. {
  122. struct starpu_sched_policy *p = *policy;
  123. fprintf(stderr, "%-30s\t-> %s\n", p->policy_name, p->policy_description);
  124. }
  125. fprintf(stderr, "\n");
  126. }
  127. }
  128. struct starpu_sched_policy *_starpu_select_sched_policy(struct _starpu_machine_config *config, const char *required_policy)
  129. {
  130. struct starpu_sched_policy *selected_policy = NULL;
  131. struct starpu_conf *user_conf = config->conf;
  132. if(required_policy)
  133. selected_policy = find_sched_policy_from_name(required_policy);
  134. /* First, we check whether the application explicitely gave a scheduling policy or not */
  135. if (!selected_policy && user_conf && (user_conf->sched_policy))
  136. return user_conf->sched_policy;
  137. /* Otherwise, we look if the application specified the name of a policy to load */
  138. const char *sched_pol_name;
  139. sched_pol_name = getenv("STARPU_SCHED");
  140. if (sched_pol_name == NULL && user_conf && user_conf->sched_policy_name)
  141. sched_pol_name = user_conf->sched_policy_name;
  142. if (!selected_policy && sched_pol_name)
  143. selected_policy = find_sched_policy_from_name(sched_pol_name);
  144. /* Perhaps there was no policy that matched the name */
  145. if (selected_policy)
  146. return selected_policy;
  147. /* If no policy was specified, we use the greedy policy as a default */
  148. return &_starpu_sched_eager_policy;
  149. }
  150. void _starpu_init_sched_policy(struct _starpu_machine_config *config, struct _starpu_sched_ctx *sched_ctx, struct starpu_sched_policy *selected_policy)
  151. {
  152. /* Perhaps we have to display some help */
  153. display_sched_help_message();
  154. /* Prefetch is activated by default */
  155. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  156. if (use_prefetch == -1)
  157. use_prefetch = 1;
  158. /* Set calibrate flag */
  159. _starpu_set_calibrate_flag(config->conf->calibrate);
  160. load_sched_policy(selected_policy, sched_ctx);
  161. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  162. sched_ctx->sched_policy->init_sched(sched_ctx->id);
  163. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  164. }
  165. void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  166. {
  167. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  168. if (policy->deinit_sched)
  169. {
  170. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  171. policy->deinit_sched(sched_ctx->id);
  172. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  173. }
  174. }
  175. static void _starpu_push_task_on_specific_worker_notify_sched(struct starpu_task *task, struct _starpu_worker *worker, int workerid, int perf_workerid)
  176. {
  177. /* if we push a task on a specific worker, notify all the sched_ctxs the worker belongs to */
  178. struct _starpu_sched_ctx *sched_ctx;
  179. struct _starpu_sched_ctx_list *l = NULL;
  180. for (l = worker->sched_ctx_list; l; l = l->next)
  181. {
  182. sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
  183. if (sched_ctx->sched_policy != NULL && sched_ctx->sched_policy->push_task_notify)
  184. {
  185. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  186. sched_ctx->sched_policy->push_task_notify(task, workerid, perf_workerid, sched_ctx->id);
  187. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  188. }
  189. }
  190. }
  191. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  192. * case workerid identifies a combined worker, a task will be enqueued into
  193. * each worker of the combination. */
  194. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  195. {
  196. int nbasic_workers = (int)starpu_worker_get_count();
  197. /* Is this a basic worker or a combined worker ? */
  198. int is_basic_worker = (workerid < nbasic_workers);
  199. unsigned memory_node;
  200. struct _starpu_worker *worker = NULL;
  201. struct _starpu_combined_worker *combined_worker = NULL;
  202. if (is_basic_worker)
  203. {
  204. worker = _starpu_get_worker_struct(workerid);
  205. memory_node = worker->memory_node;
  206. }
  207. else
  208. {
  209. combined_worker = _starpu_get_combined_worker_struct(workerid);
  210. memory_node = combined_worker->memory_node;
  211. }
  212. if (use_prefetch)
  213. starpu_prefetch_task_input_on_node(task, memory_node);
  214. if (is_basic_worker)
  215. _starpu_push_task_on_specific_worker_notify_sched(task, worker, workerid, workerid);
  216. else
  217. {
  218. /* Notify all workers of the combined worker */
  219. int worker_size = combined_worker->worker_size;
  220. int *combined_workerid = combined_worker->combined_workerid;
  221. int j;
  222. for (j = 0; j < worker_size; j++)
  223. {
  224. int subworkerid = combined_workerid[j];
  225. _starpu_push_task_on_specific_worker_notify_sched(task, _starpu_get_worker_struct(subworkerid), subworkerid, workerid);
  226. }
  227. }
  228. #ifdef STARPU_USE_SC_HYPERVISOR
  229. starpu_sched_ctx_call_pushed_task_cb(workerid, task->sched_ctx);
  230. #endif //STARPU_USE_SC_HYPERVISOR
  231. unsigned i;
  232. if (is_basic_worker)
  233. {
  234. unsigned node = starpu_worker_get_memory_node(workerid);
  235. if (_starpu_task_uses_multiformat_handles(task))
  236. {
  237. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  238. for (i = 0; i < nbuffers; i++)
  239. {
  240. struct starpu_task *conversion_task;
  241. starpu_data_handle_t handle;
  242. handle = STARPU_TASK_GET_HANDLE(task, i);
  243. if (!_starpu_handle_needs_conversion_task(handle, node))
  244. continue;
  245. conversion_task = _starpu_create_conversion_task(handle, node);
  246. conversion_task->mf_skip = 1;
  247. conversion_task->execute_on_a_specific_worker = 1;
  248. conversion_task->workerid = workerid;
  249. _starpu_task_submit_conversion_task(conversion_task, workerid);
  250. //_STARPU_DEBUG("Pushing a conversion task\n");
  251. }
  252. for (i = 0; i < nbuffers; i++)
  253. {
  254. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  255. handle->mf_node = node;
  256. }
  257. }
  258. // if(task->sched_ctx != _starpu_get_initial_sched_ctx()->id)
  259. if(task->priority > 0)
  260. return _starpu_push_local_task(worker, task, 1);
  261. else
  262. return _starpu_push_local_task(worker, task, 0);
  263. }
  264. else
  265. {
  266. /* This is a combined worker so we create task aliases */
  267. int worker_size = combined_worker->worker_size;
  268. int *combined_workerid = combined_worker->combined_workerid;
  269. int ret = 0;
  270. struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
  271. job->task_size = worker_size;
  272. job->combined_workerid = workerid;
  273. job->active_task_alias_count = 0;
  274. STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, worker_size);
  275. STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, worker_size);
  276. /* Note: we have to call that early, or else the task may have
  277. * disappeared already */
  278. starpu_push_task_end(task);
  279. int j;
  280. for (j = 0; j < worker_size; j++)
  281. {
  282. struct starpu_task *alias = starpu_task_dup(task);
  283. worker = _starpu_get_worker_struct(combined_workerid[j]);
  284. ret |= _starpu_push_local_task(worker, alias, 0);
  285. }
  286. return ret;
  287. }
  288. }
  289. /* the generic interface that call the proper underlying implementation */
  290. int _starpu_push_task(struct _starpu_job *j)
  291. {
  292. if(j->task->prologue_callback_func)
  293. j->task->prologue_callback_func(j->task->prologue_callback_arg);
  294. return _starpu_repush_task(j);
  295. }
  296. int _starpu_repush_task(struct _starpu_job *j)
  297. {
  298. struct starpu_task *task = j->task;
  299. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  300. unsigned nworkers = 0;
  301. int ret;
  302. _STARPU_LOG_IN();
  303. unsigned can_push = _starpu_increment_nready_tasks_of_sched_ctx(task->sched_ctx, task->flops, task);
  304. task->status = STARPU_TASK_READY;
  305. #ifdef HAVE_AYUDAME_H
  306. if (AYU_event)
  307. {
  308. intptr_t id = -1;
  309. AYU_event(AYU_ADDTASKTOQUEUE, j->job_id, &id);
  310. }
  311. #endif
  312. /* if the context does not have any workers save the tasks in a temp list */
  313. if(!sched_ctx->is_initial_sched)
  314. {
  315. /*if there are workers in the ctx that are not able to execute tasks
  316. we consider the ctx empty */
  317. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  318. if(nworkers == 0)
  319. {
  320. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  321. starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
  322. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  323. #ifdef STARPU_USE_SC_HYPERVISOR
  324. if(sched_ctx != NULL && sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  325. && sched_ctx->perf_counters->notify_empty_ctx)
  326. {
  327. _STARPU_TRACE_HYPERVISOR_BEGIN();
  328. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  329. _STARPU_TRACE_HYPERVISOR_END();
  330. }
  331. #endif
  332. return 0;
  333. }
  334. }
  335. if(!can_push)
  336. return 0;
  337. /* in case there is no codelet associated to the task (that's a control
  338. * task), we directly execute its callback and enforce the
  339. * corresponding dependencies */
  340. if (task->cl == NULL)
  341. {
  342. if(task->prologue_callback_pop_func)
  343. task->prologue_callback_pop_func(task->prologue_callback_pop_arg);
  344. _starpu_handle_job_termination(j);
  345. _STARPU_LOG_OUT_TAG("handle_job_termination");
  346. return 0;
  347. }
  348. ret = _starpu_push_task_to_workers(task);
  349. if (ret == -EAGAIN)
  350. /* pushed to empty context, that's fine */
  351. ret = 0;
  352. return ret;
  353. }
  354. int _starpu_push_task_to_workers(struct starpu_task *task)
  355. {
  356. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  357. unsigned nworkers = 0;
  358. _STARPU_TRACE_JOB_PUSH(task, task->priority > 0);
  359. /* if the contexts still does not have workers put the task back to its place in
  360. the empty ctx list */
  361. if(!sched_ctx->is_initial_sched)
  362. {
  363. /*if there are workers in the ctx that are not able to execute tasks
  364. we consider the ctx empty */
  365. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  366. if (nworkers == 0)
  367. {
  368. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  369. starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task);
  370. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  371. #ifdef STARPU_USE_SC_HYPERVISOR
  372. if(sched_ctx != NULL && sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  373. && sched_ctx->perf_counters->notify_empty_ctx)
  374. {
  375. _STARPU_TRACE_HYPERVISOR_BEGIN();
  376. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  377. _STARPU_TRACE_HYPERVISOR_END();
  378. }
  379. #endif
  380. return -EAGAIN;
  381. }
  382. }
  383. _starpu_profiling_set_task_push_start_time(task);
  384. int ret = 0;
  385. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  386. {
  387. unsigned node = starpu_worker_get_memory_node(task->workerid);
  388. if (starpu_get_prefetch_flag())
  389. starpu_prefetch_task_input_on_node(task, node);
  390. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  391. }
  392. else
  393. {
  394. struct _starpu_machine_config *config = _starpu_get_machine_config();
  395. /* When a task can only be executed on a given arch and we have
  396. * only one memory node for that arch, we can systematically
  397. * prefetch before the scheduling decision. */
  398. if (starpu_get_prefetch_flag())
  399. {
  400. if (task->cl->where == STARPU_CPU && config->cpus_nodeid >= 0)
  401. starpu_prefetch_task_input_on_node(task, config->cpus_nodeid);
  402. else if (task->cl->where == STARPU_CUDA && config->cuda_nodeid >= 0)
  403. starpu_prefetch_task_input_on_node(task, config->cuda_nodeid);
  404. else if (task->cl->where == STARPU_OPENCL && config->opencl_nodeid >= 0)
  405. starpu_prefetch_task_input_on_node(task, config->opencl_nodeid);
  406. else if (task->cl->where == STARPU_MIC && config->mic_nodeid >= 0)
  407. starpu_prefetch_task_input_on_node(task, config->mic_nodeid);
  408. else if (task->cl->where == STARPU_SCC && config->scc_nodeid >= 0)
  409. starpu_prefetch_task_input_on_node(task, config->scc_nodeid);
  410. }
  411. if(!sched_ctx->sched_policy)
  412. {
  413. if(!sched_ctx->awake_workers)
  414. ret = _starpu_push_task_on_specific_worker(task, sched_ctx->main_master);
  415. else
  416. {
  417. struct starpu_worker_collection *workers = sched_ctx->workers;
  418. struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
  419. job->task_size = workers->nworkers;
  420. job->combined_workerid = -1; // workerid; its a ctx not combined worker
  421. job->active_task_alias_count = 0;
  422. STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, workers->nworkers);
  423. STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, workers->nworkers);
  424. /* Note: we have to call that early, or else the task may have
  425. * disappeared already */
  426. starpu_push_task_end(task);
  427. unsigned workerid;
  428. struct starpu_sched_ctx_iterator it;
  429. if(workers->init_iterator)
  430. workers->init_iterator(workers, &it);
  431. while(workers->has_next(workers, &it))
  432. {
  433. workerid = workers->get_next(workers, &it);
  434. struct starpu_task *alias = starpu_task_dup(task);
  435. ret |= _starpu_push_task_on_specific_worker(alias, workerid);
  436. }
  437. }
  438. }
  439. else
  440. {
  441. STARPU_ASSERT(sched_ctx->sched_policy->push_task);
  442. /* check out if there are any workers in the context */
  443. starpu_pthread_rwlock_t *changing_ctx_mutex = _starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx->id);
  444. STARPU_PTHREAD_RWLOCK_RDLOCK(changing_ctx_mutex);
  445. nworkers = starpu_sched_ctx_get_nworkers(sched_ctx->id);
  446. if (nworkers == 0)
  447. ret = -1;
  448. else
  449. {
  450. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  451. ret = sched_ctx->sched_policy->push_task(task);
  452. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  453. }
  454. STARPU_PTHREAD_RWLOCK_UNLOCK(changing_ctx_mutex);
  455. }
  456. if(ret == -1)
  457. {
  458. fprintf(stderr, "repush task \n");
  459. _STARPU_TRACE_JOB_POP(task, task->priority > 0);
  460. ret = _starpu_push_task_to_workers(task);
  461. }
  462. }
  463. /* Note: from here, the task might have been destroyed already! */
  464. _STARPU_LOG_OUT();
  465. return ret;
  466. }
  467. /* This is called right after the scheduler has pushed a task to a queue
  468. * but just before releasing mutexes: we need the task to still be alive!
  469. */
  470. int starpu_push_task_end(struct starpu_task *task)
  471. {
  472. _starpu_profiling_set_task_push_end_time(task);
  473. task->scheduled = 1;
  474. return 0;
  475. }
  476. /*
  477. * Given a handle that needs to be converted in order to be used on the given
  478. * node, returns a task that takes care of the conversion.
  479. */
  480. struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle,
  481. unsigned int node)
  482. {
  483. return _starpu_create_conversion_task_for_arch(handle, starpu_node_get_kind(node));
  484. }
  485. struct starpu_task *_starpu_create_conversion_task_for_arch(starpu_data_handle_t handle,
  486. enum starpu_node_kind node_kind)
  487. {
  488. struct starpu_task *conversion_task;
  489. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID)
  490. struct starpu_multiformat_interface *format_interface;
  491. #endif
  492. conversion_task = starpu_task_create();
  493. conversion_task->name = "conversion_task";
  494. conversion_task->synchronous = 0;
  495. STARPU_TASK_SET_HANDLE(conversion_task, handle, 0);
  496. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID)
  497. /* The node does not really matter here */
  498. format_interface = (struct starpu_multiformat_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
  499. #endif
  500. _starpu_spin_lock(&handle->header_lock);
  501. handle->refcnt++;
  502. handle->busy_count++;
  503. _starpu_spin_unlock(&handle->header_lock);
  504. switch(node_kind)
  505. {
  506. case STARPU_CPU_RAM:
  507. case STARPU_SCC_RAM:
  508. case STARPU_SCC_SHM:
  509. switch (starpu_node_get_kind(handle->mf_node))
  510. {
  511. case STARPU_CPU_RAM:
  512. case STARPU_SCC_RAM:
  513. case STARPU_SCC_SHM:
  514. STARPU_ABORT();
  515. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  516. case STARPU_CUDA_RAM:
  517. {
  518. struct starpu_multiformat_data_interface_ops *mf_ops;
  519. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  520. conversion_task->cl = mf_ops->cuda_to_cpu_cl;
  521. break;
  522. }
  523. #endif
  524. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  525. case STARPU_OPENCL_RAM:
  526. {
  527. struct starpu_multiformat_data_interface_ops *mf_ops;
  528. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  529. conversion_task->cl = mf_ops->opencl_to_cpu_cl;
  530. break;
  531. }
  532. #endif
  533. #ifdef STARPU_USE_MIC
  534. case STARPU_MIC_RAM:
  535. {
  536. struct starpu_multiformat_data_interface_ops *mf_ops;
  537. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  538. conversion_task->cl = mf_ops->mic_to_cpu_cl;
  539. break;
  540. }
  541. #endif
  542. default:
  543. _STARPU_ERROR("Oops : %u\n", handle->mf_node);
  544. }
  545. break;
  546. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  547. case STARPU_CUDA_RAM:
  548. {
  549. struct starpu_multiformat_data_interface_ops *mf_ops;
  550. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  551. conversion_task->cl = mf_ops->cpu_to_cuda_cl;
  552. break;
  553. }
  554. #endif
  555. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  556. case STARPU_OPENCL_RAM:
  557. {
  558. struct starpu_multiformat_data_interface_ops *mf_ops;
  559. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  560. conversion_task->cl = mf_ops->cpu_to_opencl_cl;
  561. break;
  562. }
  563. #endif
  564. #ifdef STARPU_USE_MIC
  565. case STARPU_MIC_RAM:
  566. {
  567. struct starpu_multiformat_data_interface_ops *mf_ops;
  568. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  569. conversion_task->cl = mf_ops->cpu_to_mic_cl;
  570. break;
  571. }
  572. #endif
  573. default:
  574. STARPU_ABORT();
  575. }
  576. STARPU_TASK_SET_MODE(conversion_task, STARPU_RW, 0);
  577. return conversion_task;
  578. }
  579. static
  580. struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker *worker)
  581. {
  582. struct _starpu_sched_ctx_list *l = NULL;
  583. for (l = worker->sched_ctx_list; l; l = l->next)
  584. {
  585. if(worker->removed_from_ctx[l->sched_ctx] == 1)
  586. {
  587. return _starpu_get_sched_ctx_struct(l->sched_ctx);
  588. }
  589. }
  590. unsigned are_2_priorities = 0;
  591. for (l = worker->sched_ctx_list; l; l = l->next)
  592. {
  593. if(l->priority != worker->pop_ctx_priority)
  594. {
  595. are_2_priorities = 1;
  596. break;
  597. }
  598. }
  599. if(!worker->reverse_phase[worker->pop_ctx_priority])
  600. {
  601. /* find a context in which the worker hasn't poped yet */
  602. for (l = worker->sched_ctx_list; l; l = l->next)
  603. {
  604. if(l->priority == worker->pop_ctx_priority)
  605. {
  606. if(!worker->poped_in_ctx[l->sched_ctx])
  607. {
  608. worker->poped_in_ctx[l->sched_ctx] = !worker->poped_in_ctx[l->sched_ctx];
  609. return _starpu_get_sched_ctx_struct(l->sched_ctx);
  610. }
  611. }
  612. }
  613. worker->reverse_phase[worker->pop_ctx_priority] = !worker->reverse_phase[worker->pop_ctx_priority];
  614. if(are_2_priorities)
  615. worker->pop_ctx_priority = !worker->pop_ctx_priority;
  616. }
  617. are_2_priorities = 0;
  618. if(worker->reverse_phase[worker->pop_ctx_priority])
  619. {
  620. /* if the context has already poped in every one start from the begining */
  621. for (l = worker->sched_ctx_list; l; l = l->next)
  622. {
  623. if(l->priority == worker->pop_ctx_priority)
  624. {
  625. if(worker->poped_in_ctx[l->sched_ctx])
  626. {
  627. worker->poped_in_ctx[l->sched_ctx] = !worker->poped_in_ctx[l->sched_ctx];
  628. return _starpu_get_sched_ctx_struct(l->sched_ctx);
  629. }
  630. }
  631. }
  632. worker->reverse_phase[worker->pop_ctx_priority] = !worker->reverse_phase[worker->pop_ctx_priority];
  633. if(are_2_priorities)
  634. worker->pop_ctx_priority = !worker->pop_ctx_priority;
  635. }
  636. unsigned first_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  637. for (l = worker->sched_ctx_list; l; l = l->next)
  638. {
  639. if(l->priority == worker->pop_ctx_priority)
  640. {
  641. first_sched_ctx = l->sched_ctx;
  642. break;
  643. }
  644. }
  645. // if(worker->pop_ctx_priority == 0 && first_sched_ctx == STARPU_NMAX_SCHED_CTXS)
  646. if(first_sched_ctx == STARPU_NMAX_SCHED_CTXS)
  647. first_sched_ctx = worker->sched_ctx_list->sched_ctx;
  648. worker->poped_in_ctx[first_sched_ctx] = !worker->poped_in_ctx[first_sched_ctx];
  649. return _starpu_get_sched_ctx_struct(first_sched_ctx);
  650. }
  651. struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
  652. {
  653. struct starpu_task *task;
  654. int worker_id;
  655. unsigned node;
  656. /* We can't tell in advance which task will be picked up, so we measure
  657. * a timestamp, and will attribute it afterwards to the task. */
  658. int profiling = starpu_profiling_status_get();
  659. struct timespec pop_start_time;
  660. if (profiling)
  661. _starpu_clock_gettime(&pop_start_time);
  662. pick:
  663. /* perhaps there is some local task to be executed first */
  664. task = _starpu_pop_local_task(worker);
  665. /* get tasks from the stacks of the strategy */
  666. if(!task)
  667. {
  668. struct _starpu_sched_ctx *sched_ctx ;
  669. #ifndef STARPU_NON_BLOCKING_DRIVERS
  670. int been_here[STARPU_NMAX_SCHED_CTXS];
  671. int i;
  672. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  673. been_here[i] = 0;
  674. while(!task)
  675. #endif
  676. {
  677. if(worker->nsched_ctxs == 1)
  678. sched_ctx = _starpu_get_initial_sched_ctx();
  679. else
  680. {
  681. while(1)
  682. {
  683. sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
  684. if(worker->removed_from_ctx[sched_ctx->id] == 1 && worker->shares_tasks_lists[sched_ctx->id] == 1)
  685. {
  686. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  687. worker->removed_from_ctx[sched_ctx->id] = 0;
  688. sched_ctx = NULL;
  689. }
  690. else
  691. break;
  692. }
  693. }
  694. if(sched_ctx && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  695. {
  696. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
  697. {
  698. task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
  699. }
  700. }
  701. if(!task)
  702. {
  703. /* it doesn't matter if it shares tasks list or not in the scheduler,
  704. if it does not have any task to pop just get it out of here */
  705. /* however if it shares a task list it will be removed as soon as he
  706. finishes this job (in handle_job_termination) */
  707. if(worker->removed_from_ctx[sched_ctx->id])
  708. {
  709. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  710. worker->removed_from_ctx[sched_ctx->id] = 0;
  711. }
  712. #ifdef STARPU_USE_SC_HYPERVISOR
  713. if(worker->pop_ctx_priority)
  714. {
  715. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  716. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_idle_cycle && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
  717. {
  718. // _STARPU_TRACE_HYPERVISOR_BEGIN();
  719. perf_counters->notify_idle_cycle(sched_ctx->id, worker->workerid, 1.0);
  720. // _STARPU_TRACE_HYPERVISOR_END();
  721. }
  722. }
  723. #endif //STARPU_USE_SC_HYPERVISOR
  724. #ifndef STARPU_NON_BLOCKING_DRIVERS
  725. if(been_here[sched_ctx->id] || worker->nsched_ctxs == 1)
  726. break;
  727. been_here[sched_ctx->id] = 1;
  728. #endif
  729. }
  730. }
  731. }
  732. if (!task)
  733. {
  734. idle_start[worker->workerid] = starpu_timing_now();
  735. return NULL;
  736. }
  737. if(idle_start[worker->workerid] != 0.0)
  738. {
  739. double idle_end = starpu_timing_now();
  740. idle[worker->workerid] += (idle_end - idle_start[worker->workerid]);
  741. idle_start[worker->workerid] = 0.0;
  742. }
  743. #ifdef STARPU_USE_SC_HYPERVISOR
  744. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  745. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  746. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_poped_task && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
  747. {
  748. // _STARPU_TRACE_HYPERVISOR_BEGIN();
  749. perf_counters->notify_poped_task(task->sched_ctx, worker->workerid);
  750. // _STARPU_TRACE_HYPERVISOR_END();
  751. }
  752. #endif //STARPU_USE_SC_HYPERVISOR
  753. /* Make sure we do not bother with all the multiformat-specific code if
  754. * it is not necessary. */
  755. if (!_starpu_task_uses_multiformat_handles(task))
  756. goto profiling;
  757. /* This is either a conversion task, or a regular task for which the
  758. * conversion tasks have already been created and submitted */
  759. if (task->mf_skip)
  760. goto profiling;
  761. /*
  762. * This worker may not be able to execute this task. In this case, we
  763. * should return the task anyway. It will be pushed back almost immediatly.
  764. * This way, we avoid computing and executing the conversions tasks.
  765. * Here, we do not care about what implementation is used.
  766. */
  767. worker_id = starpu_worker_get_id();
  768. if (!starpu_worker_can_execute_task_first_impl(worker_id, task, NULL))
  769. return task;
  770. node = starpu_worker_get_memory_node(worker_id);
  771. /*
  772. * We do have a task that uses multiformat handles. Let's create the
  773. * required conversion tasks.
  774. */
  775. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  776. unsigned i;
  777. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  778. for (i = 0; i < nbuffers; i++)
  779. {
  780. struct starpu_task *conversion_task;
  781. starpu_data_handle_t handle;
  782. handle = STARPU_TASK_GET_HANDLE(task, i);
  783. if (!_starpu_handle_needs_conversion_task(handle, node))
  784. continue;
  785. conversion_task = _starpu_create_conversion_task(handle, node);
  786. conversion_task->mf_skip = 1;
  787. conversion_task->execute_on_a_specific_worker = 1;
  788. conversion_task->workerid = worker_id;
  789. /*
  790. * Next tasks will need to know where these handles have gone.
  791. */
  792. handle->mf_node = node;
  793. _starpu_task_submit_conversion_task(conversion_task, worker_id);
  794. }
  795. task->mf_skip = 1;
  796. starpu_task_list_push_back(&worker->local_tasks, task);
  797. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  798. goto pick;
  799. profiling:
  800. if (profiling)
  801. {
  802. struct starpu_profiling_task_info *profiling_info;
  803. profiling_info = task->profiling_info;
  804. /* The task may have been created before profiling was enabled,
  805. * so we check if the profiling_info structure is available
  806. * even though we already tested if profiling is enabled. */
  807. if (profiling_info)
  808. {
  809. memcpy(&profiling_info->pop_start_time,
  810. &pop_start_time, sizeof(struct timespec));
  811. _starpu_clock_gettime(&profiling_info->pop_end_time);
  812. }
  813. }
  814. if(task->prologue_callback_pop_func)
  815. task->prologue_callback_pop_func(task->prologue_callback_pop_arg);
  816. return task;
  817. }
  818. struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx)
  819. {
  820. struct starpu_task *task = NULL;
  821. if(sched_ctx->sched_policy)
  822. {
  823. STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
  824. /* TODO set profiling info */
  825. if(sched_ctx->sched_policy->pop_every_task)
  826. {
  827. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  828. task = sched_ctx->sched_policy->pop_every_task(sched_ctx->id);
  829. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  830. }
  831. }
  832. return task;
  833. }
  834. void _starpu_sched_pre_exec_hook(struct starpu_task *task)
  835. {
  836. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  837. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pre_exec_hook)
  838. {
  839. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  840. sched_ctx->sched_policy->pre_exec_hook(task);
  841. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  842. }
  843. }
  844. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  845. {
  846. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  847. if (sched_ctx->sched_policy && sched_ctx->sched_policy->post_exec_hook)
  848. {
  849. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  850. sched_ctx->sched_policy->post_exec_hook(task);
  851. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  852. }
  853. }
  854. void _starpu_wait_on_sched_event(void)
  855. {
  856. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  857. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  858. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  859. if (_starpu_machine_is_running())
  860. {
  861. #ifndef STARPU_NON_BLOCKING_DRIVERS
  862. STARPU_PTHREAD_COND_WAIT(&worker->sched_cond,
  863. &worker->sched_mutex);
  864. #endif
  865. }
  866. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  867. }
  868. /* The scheduling policy may put tasks directly into a worker's local queue so
  869. * that it is not always necessary to create its own queue when the local queue
  870. * is sufficient. If "back" not null, the task is put at the back of the queue
  871. * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  872. * a FIFO ordering. */
  873. int starpu_push_local_task(int workerid, struct starpu_task *task, int prio)
  874. {
  875. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  876. return _starpu_push_local_task(worker, task, prio);
  877. }
  878. void _starpu_print_idle_time()
  879. {
  880. double all_idle = 0.0;
  881. int i = 0;
  882. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  883. all_idle += idle[i];
  884. FILE *f;
  885. const char *sched_env = getenv("STARPU_IDLE_FILE");
  886. if(!sched_env)
  887. sched_env = "starpu_idle_microsec.log";
  888. f = fopen(sched_env, "a");
  889. if (!f)
  890. {
  891. fprintf(stderr, "couldn't open %s: %s\n", sched_env, strerror(errno));
  892. }
  893. else
  894. {
  895. fprintf(f, "%lf \n", all_idle);
  896. fclose(f);
  897. }
  898. }