sched_policy.c 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2015 Université de Bordeaux
  4. * Copyright (C) 2010-2014 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <starpu.h>
  19. #include <common/config.h>
  20. #include <common/utils.h>
  21. #include <core/sched_policy.h>
  22. #include <profiling/profiling.h>
  23. #include <common/barrier.h>
  24. #include <core/debug.h>
  25. static int use_prefetch = 0;
  26. double idle[STARPU_NMAXWORKERS];
  27. double idle_start[STARPU_NMAXWORKERS];
  28. int starpu_get_prefetch_flag(void)
  29. {
  30. return use_prefetch;
  31. }
  32. static struct starpu_sched_policy *predefined_policies[] =
  33. {
  34. &_starpu_sched_modular_eager_policy,
  35. &_starpu_sched_modular_eager_prefetching_policy,
  36. &_starpu_sched_modular_prio_policy,
  37. &_starpu_sched_modular_prio_prefetching_policy,
  38. &_starpu_sched_modular_random_policy,
  39. &_starpu_sched_modular_random_prio_policy,
  40. &_starpu_sched_modular_random_prefetching_policy,
  41. &_starpu_sched_modular_random_prio_prefetching_policy,
  42. //&_starpu_sched_modular_ws_policy,
  43. &_starpu_sched_modular_heft_policy,
  44. &_starpu_sched_modular_heft2_policy,
  45. &_starpu_sched_eager_policy,
  46. &_starpu_sched_prio_policy,
  47. &_starpu_sched_random_policy,
  48. &_starpu_sched_lws_policy,
  49. &_starpu_sched_ws_policy,
  50. &_starpu_sched_dm_policy,
  51. &_starpu_sched_dmda_policy,
  52. &_starpu_sched_dmda_ready_policy,
  53. &_starpu_sched_dmda_sorted_policy,
  54. &_starpu_sched_parallel_heft_policy,
  55. &_starpu_sched_peager_policy,
  56. NULL
  57. };
  58. struct starpu_sched_policy **starpu_sched_get_predefined_policies()
  59. {
  60. return predefined_policies;
  61. }
  62. struct starpu_sched_policy *_starpu_get_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  63. {
  64. return sched_ctx->sched_policy;
  65. }
  66. /*
  67. * Methods to initialize the scheduling policy
  68. */
  69. static void load_sched_policy(struct starpu_sched_policy *sched_policy, struct _starpu_sched_ctx *sched_ctx)
  70. {
  71. STARPU_ASSERT(sched_policy);
  72. #ifdef STARPU_VERBOSE
  73. if (sched_policy->policy_name)
  74. {
  75. if (sched_policy->policy_description)
  76. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  77. else
  78. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  79. }
  80. #endif
  81. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  82. memcpy(policy, sched_policy, sizeof(*policy));
  83. }
  84. static struct starpu_sched_policy *find_sched_policy_from_name(const char *policy_name)
  85. {
  86. if (!policy_name)
  87. return NULL;
  88. if (strncmp(policy_name, "heft", 5) == 0)
  89. {
  90. _STARPU_DISP("Warning: heft is now called \"dmda\".\n");
  91. return &_starpu_sched_dmda_policy;
  92. }
  93. struct starpu_sched_policy **policy;
  94. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  95. {
  96. struct starpu_sched_policy *p = *policy;
  97. if (p->policy_name)
  98. {
  99. if (strcmp(policy_name, p->policy_name) == 0)
  100. {
  101. /* we found a policy with the requested name */
  102. return p;
  103. }
  104. }
  105. }
  106. if (strcmp(policy_name, "help") != 0)
  107. fprintf(stderr, "Warning: scheduling policy \"%s\" was not found, try \"help\" to get a list\n", policy_name);
  108. /* nothing was found */
  109. return NULL;
  110. }
  111. static void display_sched_help_message(void)
  112. {
  113. const char *sched_env = getenv("STARPU_SCHED");
  114. if (sched_env && (strcmp(sched_env, "help") == 0))
  115. {
  116. /* display the description of all predefined policies */
  117. struct starpu_sched_policy **policy;
  118. fprintf(stderr, "\nThe variable STARPU_SCHED can be set to one of the following strings:\n");
  119. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  120. {
  121. struct starpu_sched_policy *p = *policy;
  122. fprintf(stderr, "%-30s\t-> %s\n", p->policy_name, p->policy_description);
  123. }
  124. fprintf(stderr, "\n");
  125. }
  126. }
  127. struct starpu_sched_policy *_starpu_select_sched_policy(struct _starpu_machine_config *config, const char *required_policy)
  128. {
  129. struct starpu_sched_policy *selected_policy = NULL;
  130. struct starpu_conf *user_conf = config->conf;
  131. if(required_policy)
  132. selected_policy = find_sched_policy_from_name(required_policy);
  133. /* First, we check whether the application explicitely gave a scheduling policy or not */
  134. if (!selected_policy && user_conf && (user_conf->sched_policy))
  135. return user_conf->sched_policy;
  136. /* Otherwise, we look if the application specified the name of a policy to load */
  137. const char *sched_pol_name;
  138. sched_pol_name = getenv("STARPU_SCHED");
  139. if (sched_pol_name == NULL && user_conf && user_conf->sched_policy_name)
  140. sched_pol_name = user_conf->sched_policy_name;
  141. if (!selected_policy && sched_pol_name)
  142. selected_policy = find_sched_policy_from_name(sched_pol_name);
  143. /* Perhaps there was no policy that matched the name */
  144. if (selected_policy)
  145. return selected_policy;
  146. /* If no policy was specified, we use the greedy policy as a default */
  147. return &_starpu_sched_eager_policy;
  148. }
  149. void _starpu_init_sched_policy(struct _starpu_machine_config *config, struct _starpu_sched_ctx *sched_ctx, struct starpu_sched_policy *selected_policy)
  150. {
  151. /* Perhaps we have to display some help */
  152. display_sched_help_message();
  153. /* Prefetch is activated by default */
  154. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  155. if (use_prefetch == -1)
  156. use_prefetch = 1;
  157. /* Set calibrate flag */
  158. _starpu_set_calibrate_flag(config->conf->calibrate);
  159. load_sched_policy(selected_policy, sched_ctx);
  160. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  161. sched_ctx->sched_policy->init_sched(sched_ctx->id);
  162. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  163. }
  164. void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  165. {
  166. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  167. if (policy->deinit_sched)
  168. {
  169. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  170. policy->deinit_sched(sched_ctx->id);
  171. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  172. }
  173. }
  174. static void _starpu_push_task_on_specific_worker_notify_sched(struct starpu_task *task, struct _starpu_worker *worker, int workerid, int perf_workerid)
  175. {
  176. /* if we push a task on a specific worker, notify all the sched_ctxs the worker belongs to */
  177. struct _starpu_sched_ctx *sched_ctx;
  178. struct _starpu_sched_ctx_list *l = NULL;
  179. for (l = worker->sched_ctx_list; l; l = l->next)
  180. {
  181. sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
  182. if (sched_ctx->sched_policy != NULL && sched_ctx->sched_policy->push_task_notify)
  183. {
  184. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  185. sched_ctx->sched_policy->push_task_notify(task, workerid, perf_workerid, sched_ctx->id);
  186. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  187. }
  188. }
  189. }
  190. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  191. * case workerid identifies a combined worker, a task will be enqueued into
  192. * each worker of the combination. */
  193. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  194. {
  195. int nbasic_workers = (int)starpu_worker_get_count();
  196. /* Is this a basic worker or a combined worker ? */
  197. int is_basic_worker = (workerid < nbasic_workers);
  198. unsigned memory_node;
  199. struct _starpu_worker *worker = NULL;
  200. struct _starpu_combined_worker *combined_worker = NULL;
  201. if (is_basic_worker)
  202. {
  203. worker = _starpu_get_worker_struct(workerid);
  204. memory_node = worker->memory_node;
  205. }
  206. else
  207. {
  208. combined_worker = _starpu_get_combined_worker_struct(workerid);
  209. memory_node = combined_worker->memory_node;
  210. }
  211. if (use_prefetch)
  212. starpu_prefetch_task_input_on_node(task, memory_node);
  213. if (is_basic_worker)
  214. _starpu_push_task_on_specific_worker_notify_sched(task, worker, workerid, workerid);
  215. else
  216. {
  217. /* Notify all workers of the combined worker */
  218. int worker_size = combined_worker->worker_size;
  219. int *combined_workerid = combined_worker->combined_workerid;
  220. int j;
  221. for (j = 0; j < worker_size; j++)
  222. {
  223. int subworkerid = combined_workerid[j];
  224. _starpu_push_task_on_specific_worker_notify_sched(task, _starpu_get_worker_struct(subworkerid), subworkerid, workerid);
  225. }
  226. }
  227. #ifdef STARPU_USE_SC_HYPERVISOR
  228. starpu_sched_ctx_call_pushed_task_cb(workerid, task->sched_ctx);
  229. #endif //STARPU_USE_SC_HYPERVISOR
  230. unsigned i;
  231. if (is_basic_worker)
  232. {
  233. unsigned node = starpu_worker_get_memory_node(workerid);
  234. if (_starpu_task_uses_multiformat_handles(task))
  235. {
  236. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  237. for (i = 0; i < nbuffers; i++)
  238. {
  239. struct starpu_task *conversion_task;
  240. starpu_data_handle_t handle;
  241. handle = STARPU_TASK_GET_HANDLE(task, i);
  242. if (!_starpu_handle_needs_conversion_task(handle, node))
  243. continue;
  244. conversion_task = _starpu_create_conversion_task(handle, node);
  245. conversion_task->mf_skip = 1;
  246. conversion_task->execute_on_a_specific_worker = 1;
  247. conversion_task->workerid = workerid;
  248. _starpu_task_submit_conversion_task(conversion_task, workerid);
  249. //_STARPU_DEBUG("Pushing a conversion task\n");
  250. }
  251. for (i = 0; i < nbuffers; i++)
  252. {
  253. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  254. handle->mf_node = node;
  255. }
  256. }
  257. // if(task->sched_ctx != _starpu_get_initial_sched_ctx()->id)
  258. if(task->priority > 0)
  259. return _starpu_push_local_task(worker, task, 1);
  260. else
  261. return _starpu_push_local_task(worker, task, 0);
  262. }
  263. else
  264. {
  265. /* This is a combined worker so we create task aliases */
  266. int worker_size = combined_worker->worker_size;
  267. int *combined_workerid = combined_worker->combined_workerid;
  268. int ret = 0;
  269. struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
  270. job->task_size = worker_size;
  271. job->combined_workerid = workerid;
  272. job->active_task_alias_count = 0;
  273. STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, worker_size);
  274. STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, worker_size);
  275. /* Note: we have to call that early, or else the task may have
  276. * disappeared already */
  277. starpu_push_task_end(task);
  278. int j;
  279. for (j = 0; j < worker_size; j++)
  280. {
  281. struct starpu_task *alias = starpu_task_dup(task);
  282. worker = _starpu_get_worker_struct(combined_workerid[j]);
  283. ret |= _starpu_push_local_task(worker, alias, 0);
  284. }
  285. return ret;
  286. }
  287. }
  288. /* the generic interface that call the proper underlying implementation */
  289. int _starpu_push_task(struct _starpu_job *j)
  290. {
  291. if(j->task->prologue_callback_func)
  292. j->task->prologue_callback_func(j->task->prologue_callback_arg);
  293. struct starpu_task *task = j->task;
  294. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  295. unsigned nworkers = 0;
  296. int ret;
  297. _STARPU_LOG_IN();
  298. unsigned can_push = _starpu_increment_nready_tasks_of_sched_ctx(task->sched_ctx, task->flops, task);
  299. task->status = STARPU_TASK_READY;
  300. #ifdef HAVE_AYUDAME_H
  301. if (AYU_event)
  302. {
  303. intptr_t id = -1;
  304. AYU_event(AYU_ADDTASKTOQUEUE, j->job_id, &id);
  305. }
  306. #endif
  307. /* if the context does not have any workers save the tasks in a temp list */
  308. if(!sched_ctx->is_initial_sched)
  309. {
  310. /*if there are workers in the ctx that are not able to execute tasks
  311. we consider the ctx empty */
  312. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  313. if(nworkers == 0)
  314. {
  315. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  316. starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
  317. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  318. #ifdef STARPU_USE_SC_HYPERVISOR
  319. if(sched_ctx != NULL && sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  320. && sched_ctx->perf_counters->notify_empty_ctx)
  321. {
  322. _STARPU_TRACE_HYPERVISOR_BEGIN();
  323. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  324. _STARPU_TRACE_HYPERVISOR_END();
  325. }
  326. #endif
  327. return 0;
  328. }
  329. }
  330. if(!can_push)
  331. return 0;
  332. /* in case there is no codelet associated to the task (that's a control
  333. * task), we directly execute its callback and enforce the
  334. * corresponding dependencies */
  335. if (task->cl == NULL)
  336. {
  337. _starpu_handle_job_termination(j);
  338. _STARPU_LOG_OUT_TAG("handle_job_termination");
  339. return 0;
  340. }
  341. ret = _starpu_push_task_to_workers(task);
  342. if (ret == -EAGAIN)
  343. /* pushed to empty context, that's fine */
  344. ret = 0;
  345. return ret;
  346. }
  347. int _starpu_push_task_to_workers(struct starpu_task *task)
  348. {
  349. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  350. unsigned nworkers = 0;
  351. _STARPU_TRACE_JOB_PUSH(task, task->priority > 0);
  352. /* if the contexts still does not have workers put the task back to its place in
  353. the empty ctx list */
  354. if(!sched_ctx->is_initial_sched)
  355. {
  356. /*if there are workers in the ctx that are not able to execute tasks
  357. we consider the ctx empty */
  358. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  359. if (nworkers == 0)
  360. {
  361. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  362. starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task);
  363. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  364. #ifdef STARPU_USE_SC_HYPERVISOR
  365. if(sched_ctx != NULL && sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  366. && sched_ctx->perf_counters->notify_empty_ctx)
  367. {
  368. _STARPU_TRACE_HYPERVISOR_BEGIN();
  369. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  370. _STARPU_TRACE_HYPERVISOR_END();
  371. }
  372. #endif
  373. return -EAGAIN;
  374. }
  375. }
  376. _starpu_profiling_set_task_push_start_time(task);
  377. int ret = 0;
  378. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  379. {
  380. unsigned node = starpu_worker_get_memory_node(task->workerid);
  381. if (starpu_get_prefetch_flag())
  382. starpu_prefetch_task_input_on_node(task, node);
  383. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  384. }
  385. else
  386. {
  387. struct _starpu_machine_config *config = _starpu_get_machine_config();
  388. /* When a task can only be executed on a given arch and we have
  389. * only one memory node for that arch, we can systematically
  390. * prefetch before the scheduling decision. */
  391. if (starpu_get_prefetch_flag()) {
  392. if (task->cl->where == STARPU_CPU && config->cpus_nodeid >= 0)
  393. starpu_prefetch_task_input_on_node(task, config->cpus_nodeid);
  394. else if (task->cl->where == STARPU_CUDA && config->cuda_nodeid >= 0)
  395. starpu_prefetch_task_input_on_node(task, config->cuda_nodeid);
  396. else if (task->cl->where == STARPU_OPENCL && config->opencl_nodeid >= 0)
  397. starpu_prefetch_task_input_on_node(task, config->opencl_nodeid);
  398. else if (task->cl->where == STARPU_MIC && config->mic_nodeid >= 0)
  399. starpu_prefetch_task_input_on_node(task, config->mic_nodeid);
  400. else if (task->cl->where == STARPU_SCC && config->scc_nodeid >= 0)
  401. starpu_prefetch_task_input_on_node(task, config->scc_nodeid);
  402. }
  403. if(!sched_ctx->sched_policy)
  404. {
  405. if(!sched_ctx->awake_workers)
  406. ret = _starpu_push_task_on_specific_worker(task, sched_ctx->main_master);
  407. else
  408. {
  409. struct starpu_worker_collection *workers = sched_ctx->workers;
  410. struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
  411. job->task_size = workers->nworkers;
  412. job->combined_workerid = -1; // workerid; its a ctx not combined worker
  413. job->active_task_alias_count = 0;
  414. STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, workers->nworkers);
  415. STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, workers->nworkers);
  416. /* Note: we have to call that early, or else the task may have
  417. * disappeared already */
  418. starpu_push_task_end(task);
  419. unsigned workerid;
  420. struct starpu_sched_ctx_iterator it;
  421. if(workers->init_iterator)
  422. workers->init_iterator(workers, &it);
  423. while(workers->has_next(workers, &it))
  424. {
  425. workerid = workers->get_next(workers, &it);
  426. struct starpu_task *alias = starpu_task_dup(task);
  427. ret |= _starpu_push_task_on_specific_worker(alias, workerid);
  428. }
  429. }
  430. }
  431. else
  432. {
  433. STARPU_ASSERT(sched_ctx->sched_policy->push_task);
  434. /* check out if there are any workers in the context */
  435. starpu_pthread_rwlock_t *changing_ctx_mutex = _starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx->id);
  436. STARPU_PTHREAD_RWLOCK_RDLOCK(changing_ctx_mutex);
  437. nworkers = starpu_sched_ctx_get_nworkers(sched_ctx->id);
  438. if (nworkers == 0)
  439. ret = -1;
  440. else
  441. {
  442. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  443. ret = sched_ctx->sched_policy->push_task(task);
  444. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  445. }
  446. STARPU_PTHREAD_RWLOCK_UNLOCK(changing_ctx_mutex);
  447. }
  448. if(ret == -1)
  449. {
  450. fprintf(stderr, "repush task \n");
  451. _STARPU_TRACE_JOB_POP(task, task->priority > 0);
  452. ret = _starpu_push_task_to_workers(task);
  453. }
  454. }
  455. /* Note: from here, the task might have been destroyed already! */
  456. _STARPU_LOG_OUT();
  457. return ret;
  458. }
  459. /* This is called right after the scheduler has pushed a task to a queue
  460. * but just before releasing mutexes: we need the task to still be alive!
  461. */
  462. int starpu_push_task_end(struct starpu_task *task)
  463. {
  464. _starpu_profiling_set_task_push_end_time(task);
  465. task->scheduled = 1;
  466. return 0;
  467. }
  468. /*
  469. * Given a handle that needs to be converted in order to be used on the given
  470. * node, returns a task that takes care of the conversion.
  471. */
  472. struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle,
  473. unsigned int node)
  474. {
  475. return _starpu_create_conversion_task_for_arch(handle, starpu_node_get_kind(node));
  476. }
  477. struct starpu_task *_starpu_create_conversion_task_for_arch(starpu_data_handle_t handle,
  478. enum starpu_node_kind node_kind)
  479. {
  480. struct starpu_task *conversion_task;
  481. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID)
  482. struct starpu_multiformat_interface *format_interface;
  483. #endif
  484. conversion_task = starpu_task_create();
  485. conversion_task->name = "conversion_task";
  486. conversion_task->synchronous = 0;
  487. STARPU_TASK_SET_HANDLE(conversion_task, handle, 0);
  488. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID)
  489. /* The node does not really matter here */
  490. format_interface = (struct starpu_multiformat_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
  491. #endif
  492. _starpu_spin_lock(&handle->header_lock);
  493. handle->refcnt++;
  494. handle->busy_count++;
  495. _starpu_spin_unlock(&handle->header_lock);
  496. switch(node_kind)
  497. {
  498. case STARPU_CPU_RAM:
  499. case STARPU_SCC_RAM:
  500. case STARPU_SCC_SHM:
  501. switch (starpu_node_get_kind(handle->mf_node))
  502. {
  503. case STARPU_CPU_RAM:
  504. case STARPU_SCC_RAM:
  505. case STARPU_SCC_SHM:
  506. STARPU_ABORT();
  507. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  508. case STARPU_CUDA_RAM:
  509. {
  510. struct starpu_multiformat_data_interface_ops *mf_ops;
  511. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  512. conversion_task->cl = mf_ops->cuda_to_cpu_cl;
  513. break;
  514. }
  515. #endif
  516. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  517. case STARPU_OPENCL_RAM:
  518. {
  519. struct starpu_multiformat_data_interface_ops *mf_ops;
  520. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  521. conversion_task->cl = mf_ops->opencl_to_cpu_cl;
  522. break;
  523. }
  524. #endif
  525. #ifdef STARPU_USE_MIC
  526. case STARPU_MIC_RAM:
  527. {
  528. struct starpu_multiformat_data_interface_ops *mf_ops;
  529. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  530. conversion_task->cl = mf_ops->mic_to_cpu_cl;
  531. break;
  532. }
  533. #endif
  534. default:
  535. _STARPU_ERROR("Oops : %u\n", handle->mf_node);
  536. }
  537. break;
  538. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  539. case STARPU_CUDA_RAM:
  540. {
  541. struct starpu_multiformat_data_interface_ops *mf_ops;
  542. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  543. conversion_task->cl = mf_ops->cpu_to_cuda_cl;
  544. break;
  545. }
  546. #endif
  547. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  548. case STARPU_OPENCL_RAM:
  549. {
  550. struct starpu_multiformat_data_interface_ops *mf_ops;
  551. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  552. conversion_task->cl = mf_ops->cpu_to_opencl_cl;
  553. break;
  554. }
  555. #endif
  556. #ifdef STARPU_USE_MIC
  557. case STARPU_MIC_RAM:
  558. {
  559. struct starpu_multiformat_data_interface_ops *mf_ops;
  560. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  561. conversion_task->cl = mf_ops->cpu_to_mic_cl;
  562. break;
  563. }
  564. #endif
  565. default:
  566. STARPU_ABORT();
  567. }
  568. STARPU_TASK_SET_MODE(conversion_task, STARPU_RW, 0);
  569. return conversion_task;
  570. }
  571. static
  572. struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker *worker)
  573. {
  574. struct _starpu_sched_ctx_list *l = NULL;
  575. for (l = worker->sched_ctx_list; l; l = l->next)
  576. {
  577. if(worker->removed_from_ctx[l->sched_ctx] == 1)
  578. {
  579. return _starpu_get_sched_ctx_struct(l->sched_ctx);
  580. }
  581. }
  582. unsigned are_2_priorities = 0;
  583. for (l = worker->sched_ctx_list; l; l = l->next)
  584. {
  585. if(l->priority != worker->pop_ctx_priority)
  586. {
  587. are_2_priorities = 1;
  588. break;
  589. }
  590. }
  591. if(!worker->reverse_phase[worker->pop_ctx_priority])
  592. {
  593. /* find a context in which the worker hasn't poped yet */
  594. for (l = worker->sched_ctx_list; l; l = l->next)
  595. {
  596. if(l->priority == worker->pop_ctx_priority)
  597. {
  598. if(!worker->poped_in_ctx[l->sched_ctx])
  599. {
  600. worker->poped_in_ctx[l->sched_ctx] = !worker->poped_in_ctx[l->sched_ctx];
  601. return _starpu_get_sched_ctx_struct(l->sched_ctx);
  602. }
  603. }
  604. }
  605. worker->reverse_phase[worker->pop_ctx_priority] = !worker->reverse_phase[worker->pop_ctx_priority];
  606. if(are_2_priorities)
  607. worker->pop_ctx_priority = !worker->pop_ctx_priority;
  608. }
  609. are_2_priorities = 0;
  610. if(worker->reverse_phase[worker->pop_ctx_priority])
  611. {
  612. /* if the context has already poped in every one start from the begining */
  613. for (l = worker->sched_ctx_list; l; l = l->next)
  614. {
  615. if(l->priority == worker->pop_ctx_priority)
  616. {
  617. if(worker->poped_in_ctx[l->sched_ctx])
  618. {
  619. worker->poped_in_ctx[l->sched_ctx] = !worker->poped_in_ctx[l->sched_ctx];
  620. return _starpu_get_sched_ctx_struct(l->sched_ctx);
  621. }
  622. }
  623. }
  624. worker->reverse_phase[worker->pop_ctx_priority] = !worker->reverse_phase[worker->pop_ctx_priority];
  625. if(are_2_priorities)
  626. worker->pop_ctx_priority = !worker->pop_ctx_priority;
  627. }
  628. unsigned first_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  629. for (l = worker->sched_ctx_list; l; l = l->next)
  630. {
  631. if(l->priority == worker->pop_ctx_priority)
  632. {
  633. first_sched_ctx = l->sched_ctx;
  634. break;
  635. }
  636. }
  637. // if(worker->pop_ctx_priority == 0 && first_sched_ctx == STARPU_NMAX_SCHED_CTXS)
  638. if(first_sched_ctx == STARPU_NMAX_SCHED_CTXS)
  639. first_sched_ctx = worker->sched_ctx_list->sched_ctx;
  640. worker->poped_in_ctx[first_sched_ctx] = !worker->poped_in_ctx[first_sched_ctx];
  641. return _starpu_get_sched_ctx_struct(first_sched_ctx);
  642. }
  643. struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
  644. {
  645. struct starpu_task *task;
  646. int worker_id;
  647. unsigned node;
  648. /* We can't tell in advance which task will be picked up, so we measure
  649. * a timestamp, and will attribute it afterwards to the task. */
  650. int profiling = starpu_profiling_status_get();
  651. struct timespec pop_start_time;
  652. if (profiling)
  653. _starpu_clock_gettime(&pop_start_time);
  654. pick:
  655. /* perhaps there is some local task to be executed first */
  656. task = _starpu_pop_local_task(worker);
  657. /* get tasks from the stacks of the strategy */
  658. if(!task)
  659. {
  660. struct _starpu_sched_ctx *sched_ctx ;
  661. #ifndef STARPU_NON_BLOCKING_DRIVERS
  662. int been_here[STARPU_NMAX_SCHED_CTXS];
  663. int i;
  664. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  665. been_here[i] = 0;
  666. while(!task)
  667. #endif
  668. {
  669. if(worker->nsched_ctxs == 1)
  670. sched_ctx = _starpu_get_initial_sched_ctx();
  671. else
  672. {
  673. while(1)
  674. {
  675. sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
  676. if(worker->removed_from_ctx[sched_ctx->id] == 1 && worker->shares_tasks_lists[sched_ctx->id] == 1)
  677. {
  678. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  679. worker->removed_from_ctx[sched_ctx->id] = 0;
  680. sched_ctx = NULL;
  681. }
  682. else
  683. break;
  684. }
  685. }
  686. if(sched_ctx && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  687. {
  688. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
  689. {
  690. task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
  691. }
  692. }
  693. if(!task)
  694. {
  695. /* it doesn't matter if it shares tasks list or not in the scheduler,
  696. if it does not have any task to pop just get it out of here */
  697. /* however if it shares a task list it will be removed as soon as he
  698. finishes this job (in handle_job_termination) */
  699. if(worker->removed_from_ctx[sched_ctx->id])
  700. {
  701. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  702. worker->removed_from_ctx[sched_ctx->id] = 0;
  703. }
  704. #ifdef STARPU_USE_SC_HYPERVISOR
  705. if(worker->pop_ctx_priority)
  706. {
  707. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  708. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_idle_cycle && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
  709. {
  710. // _STARPU_TRACE_HYPERVISOR_BEGIN();
  711. perf_counters->notify_idle_cycle(sched_ctx->id, worker->workerid, 1.0);
  712. // _STARPU_TRACE_HYPERVISOR_END();
  713. }
  714. }
  715. #endif //STARPU_USE_SC_HYPERVISOR
  716. #ifndef STARPU_NON_BLOCKING_DRIVERS
  717. if(been_here[sched_ctx->id] || worker->nsched_ctxs == 1)
  718. break;
  719. been_here[sched_ctx->id] = 1;
  720. #endif
  721. }
  722. }
  723. }
  724. if (!task)
  725. {
  726. idle_start[worker->workerid] = starpu_timing_now();
  727. return NULL;
  728. }
  729. if(idle_start[worker->workerid] != 0.0)
  730. {
  731. double idle_end = starpu_timing_now();
  732. idle[worker->workerid] += (idle_end - idle_start[worker->workerid]);
  733. idle_start[worker->workerid] = 0.0;
  734. }
  735. #ifdef STARPU_USE_SC_HYPERVISOR
  736. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  737. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  738. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_poped_task && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
  739. {
  740. // _STARPU_TRACE_HYPERVISOR_BEGIN();
  741. perf_counters->notify_poped_task(task->sched_ctx, worker->workerid);
  742. // _STARPU_TRACE_HYPERVISOR_END();
  743. }
  744. #endif //STARPU_USE_SC_HYPERVISOR
  745. /* Make sure we do not bother with all the multiformat-specific code if
  746. * it is not necessary. */
  747. if (!_starpu_task_uses_multiformat_handles(task))
  748. goto profiling;
  749. /* This is either a conversion task, or a regular task for which the
  750. * conversion tasks have already been created and submitted */
  751. if (task->mf_skip)
  752. goto profiling;
  753. /*
  754. * This worker may not be able to execute this task. In this case, we
  755. * should return the task anyway. It will be pushed back almost immediatly.
  756. * This way, we avoid computing and executing the conversions tasks.
  757. * Here, we do not care about what implementation is used.
  758. */
  759. worker_id = starpu_worker_get_id();
  760. if (!starpu_worker_can_execute_task_first_impl(worker_id, task, NULL))
  761. return task;
  762. node = starpu_worker_get_memory_node(worker_id);
  763. /*
  764. * We do have a task that uses multiformat handles. Let's create the
  765. * required conversion tasks.
  766. */
  767. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  768. unsigned i;
  769. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  770. for (i = 0; i < nbuffers; i++)
  771. {
  772. struct starpu_task *conversion_task;
  773. starpu_data_handle_t handle;
  774. handle = STARPU_TASK_GET_HANDLE(task, i);
  775. if (!_starpu_handle_needs_conversion_task(handle, node))
  776. continue;
  777. conversion_task = _starpu_create_conversion_task(handle, node);
  778. conversion_task->mf_skip = 1;
  779. conversion_task->execute_on_a_specific_worker = 1;
  780. conversion_task->workerid = worker_id;
  781. /*
  782. * Next tasks will need to know where these handles have gone.
  783. */
  784. handle->mf_node = node;
  785. _starpu_task_submit_conversion_task(conversion_task, worker_id);
  786. }
  787. task->mf_skip = 1;
  788. starpu_task_list_push_back(&worker->local_tasks, task);
  789. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  790. goto pick;
  791. profiling:
  792. if (profiling)
  793. {
  794. struct starpu_profiling_task_info *profiling_info;
  795. profiling_info = task->profiling_info;
  796. /* The task may have been created before profiling was enabled,
  797. * so we check if the profiling_info structure is available
  798. * even though we already tested if profiling is enabled. */
  799. if (profiling_info)
  800. {
  801. memcpy(&profiling_info->pop_start_time,
  802. &pop_start_time, sizeof(struct timespec));
  803. _starpu_clock_gettime(&profiling_info->pop_end_time);
  804. }
  805. }
  806. if(task->prologue_callback_pop_func)
  807. task->prologue_callback_pop_func(task->prologue_callback_pop_arg);
  808. return task;
  809. }
  810. struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx)
  811. {
  812. struct starpu_task *task = NULL;
  813. if(sched_ctx->sched_policy)
  814. {
  815. STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
  816. /* TODO set profiling info */
  817. if(sched_ctx->sched_policy->pop_every_task)
  818. {
  819. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  820. task = sched_ctx->sched_policy->pop_every_task(sched_ctx->id);
  821. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  822. }
  823. }
  824. return task;
  825. }
  826. void _starpu_sched_pre_exec_hook(struct starpu_task *task)
  827. {
  828. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  829. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pre_exec_hook)
  830. {
  831. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  832. sched_ctx->sched_policy->pre_exec_hook(task);
  833. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  834. }
  835. }
  836. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  837. {
  838. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  839. if (sched_ctx->sched_policy && sched_ctx->sched_policy->post_exec_hook)
  840. {
  841. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  842. sched_ctx->sched_policy->post_exec_hook(task);
  843. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  844. }
  845. }
  846. void _starpu_wait_on_sched_event(void)
  847. {
  848. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  849. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  850. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  851. if (_starpu_machine_is_running())
  852. {
  853. #ifndef STARPU_NON_BLOCKING_DRIVERS
  854. STARPU_PTHREAD_COND_WAIT(&worker->sched_cond,
  855. &worker->sched_mutex);
  856. #endif
  857. }
  858. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  859. }
  860. /* The scheduling policy may put tasks directly into a worker's local queue so
  861. * that it is not always necessary to create its own queue when the local queue
  862. * is sufficient. If "back" not null, the task is put at the back of the queue
  863. * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  864. * a FIFO ordering. */
  865. int starpu_push_local_task(int workerid, struct starpu_task *task, int prio)
  866. {
  867. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  868. return _starpu_push_local_task(worker, task, prio);
  869. }
  870. void _starpu_print_idle_time()
  871. {
  872. double all_idle = 0.0;
  873. int i = 0;
  874. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  875. all_idle += idle[i];
  876. FILE *f;
  877. const char *sched_env = getenv("STARPU_IDLE_FILE");
  878. if(!sched_env)
  879. sched_env = "starpu_idle_microsec.log";
  880. f = fopen(sched_env, "a");
  881. if (!f)
  882. {
  883. fprintf(stderr, "couldn't open %s: %s\n", sched_env, strerror(errno));
  884. }
  885. else
  886. {
  887. fprintf(f, "%lf \n", all_idle);
  888. fclose(f);
  889. }
  890. }