sched_policy.c 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2014 Université de Bordeaux 1
  4. * Copyright (C) 2010-2014 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <starpu.h>
  19. #include <common/config.h>
  20. #include <common/utils.h>
  21. #include <core/sched_policy.h>
  22. #include <profiling/profiling.h>
  23. #include <common/barrier.h>
  24. #include <core/debug.h>
  25. static int use_prefetch = 0;
  26. double idle[STARPU_NMAXWORKERS];
  27. double idle_start[STARPU_NMAXWORKERS];
  28. int starpu_get_prefetch_flag(void)
  29. {
  30. return use_prefetch;
  31. }
  32. static struct starpu_sched_policy *predefined_policies[] =
  33. {
  34. &_starpu_sched_eager_policy,
  35. &_starpu_sched_prio_policy,
  36. &_starpu_sched_random_policy,
  37. &_starpu_sched_ws_policy,
  38. &_starpu_sched_dm_policy,
  39. &_starpu_sched_dmda_policy,
  40. &_starpu_sched_dmda_ready_policy,
  41. &_starpu_sched_dmda_sorted_policy,
  42. &_starpu_sched_parallel_heft_policy,
  43. &_starpu_sched_peager_policy,
  44. NULL
  45. };
  46. struct starpu_sched_policy **starpu_sched_get_predefined_policies()
  47. {
  48. return predefined_policies;
  49. }
  50. struct starpu_sched_policy *_starpu_get_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  51. {
  52. return sched_ctx->sched_policy;
  53. }
  54. /*
  55. * Methods to initialize the scheduling policy
  56. */
  57. static void load_sched_policy(struct starpu_sched_policy *sched_policy, struct _starpu_sched_ctx *sched_ctx)
  58. {
  59. STARPU_ASSERT(sched_policy);
  60. #ifdef STARPU_VERBOSE
  61. if (sched_policy->policy_name)
  62. {
  63. if (sched_policy->policy_description)
  64. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  65. else
  66. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  67. }
  68. #endif
  69. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  70. memcpy(policy, sched_policy, sizeof(*policy));
  71. }
  72. static struct starpu_sched_policy *find_sched_policy_from_name(const char *policy_name)
  73. {
  74. if (!policy_name)
  75. return NULL;
  76. if (strncmp(policy_name, "heft", 5) == 0)
  77. {
  78. _STARPU_DISP("Warning: heft is now called \"dmda\".\n");
  79. return &_starpu_sched_dmda_policy;
  80. }
  81. struct starpu_sched_policy **policy;
  82. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  83. {
  84. struct starpu_sched_policy *p = *policy;
  85. if (p->policy_name)
  86. {
  87. if (strcmp(policy_name, p->policy_name) == 0)
  88. {
  89. /* we found a policy with the requested name */
  90. return p;
  91. }
  92. }
  93. }
  94. if (strcmp(policy_name, "help") != 0)
  95. fprintf(stderr, "Warning: scheduling policy \"%s\" was not found, try \"help\" to get a list\n", policy_name);
  96. /* nothing was found */
  97. return NULL;
  98. }
  99. static void display_sched_help_message(void)
  100. {
  101. const char *sched_env = getenv("STARPU_SCHED");
  102. if (sched_env && (strcmp(sched_env, "help") == 0))
  103. {
  104. /* display the description of all predefined policies */
  105. struct starpu_sched_policy **policy;
  106. fprintf(stderr, "\nThe variable STARPU_SCHED can be set to one of the following strings:\n");
  107. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  108. {
  109. struct starpu_sched_policy *p = *policy;
  110. fprintf(stderr, "%s\t-> %s\n", p->policy_name, p->policy_description);
  111. }
  112. fprintf(stderr, "\n");
  113. }
  114. }
  115. struct starpu_sched_policy *_starpu_select_sched_policy(struct _starpu_machine_config *config, const char *required_policy)
  116. {
  117. struct starpu_sched_policy *selected_policy = NULL;
  118. struct starpu_conf *user_conf = config->conf;
  119. if(required_policy)
  120. selected_policy = find_sched_policy_from_name(required_policy);
  121. /* First, we check whether the application explicitely gave a scheduling policy or not */
  122. if (!selected_policy && user_conf && (user_conf->sched_policy))
  123. return user_conf->sched_policy;
  124. /* Otherwise, we look if the application specified the name of a policy to load */
  125. const char *sched_pol_name;
  126. sched_pol_name = getenv("STARPU_SCHED");
  127. if (sched_pol_name == NULL && user_conf && user_conf->sched_policy_name)
  128. sched_pol_name = user_conf->sched_policy_name;
  129. if (!selected_policy && sched_pol_name)
  130. selected_policy = find_sched_policy_from_name(sched_pol_name);
  131. /* Perhaps there was no policy that matched the name */
  132. if (selected_policy)
  133. return selected_policy;
  134. /* If no policy was specified, we use the greedy policy as a default */
  135. return &_starpu_sched_eager_policy;
  136. }
  137. void _starpu_init_sched_policy(struct _starpu_machine_config *config, struct _starpu_sched_ctx *sched_ctx, struct starpu_sched_policy *selected_policy)
  138. {
  139. /* Perhaps we have to display some help */
  140. display_sched_help_message();
  141. /* Prefetch is activated by default */
  142. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  143. if (use_prefetch == -1)
  144. use_prefetch = 1;
  145. /* Set calibrate flag */
  146. _starpu_set_calibrate_flag(config->conf->calibrate);
  147. load_sched_policy(selected_policy, sched_ctx);
  148. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  149. sched_ctx->sched_policy->init_sched(sched_ctx->id);
  150. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  151. }
  152. void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  153. {
  154. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  155. if (policy->deinit_sched)
  156. {
  157. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  158. policy->deinit_sched(sched_ctx->id);
  159. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  160. }
  161. }
  162. static void _starpu_push_task_on_specific_worker_notify_sched(struct starpu_task *task, struct _starpu_worker *worker, int workerid, int perf_workerid)
  163. {
  164. /* if we push a task on a specific worker, notify all the sched_ctxs the worker belongs to */
  165. struct _starpu_sched_ctx *sched_ctx;
  166. struct _starpu_sched_ctx_list *l = NULL;
  167. for (l = worker->sched_ctx_list; l; l = l->next)
  168. {
  169. sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
  170. if (sched_ctx->sched_policy != NULL && sched_ctx->sched_policy->push_task_notify)
  171. {
  172. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  173. sched_ctx->sched_policy->push_task_notify(task, workerid, perf_workerid, sched_ctx->id);
  174. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  175. }
  176. }
  177. }
  178. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  179. * case workerid identifies a combined worker, a task will be enqueued into
  180. * each worker of the combination. */
  181. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  182. {
  183. int nbasic_workers = (int)starpu_worker_get_count();
  184. /* Is this a basic worker or a combined worker ? */
  185. int is_basic_worker = (workerid < nbasic_workers);
  186. unsigned memory_node;
  187. struct _starpu_worker *worker = NULL;
  188. struct _starpu_combined_worker *combined_worker = NULL;
  189. if (is_basic_worker)
  190. {
  191. worker = _starpu_get_worker_struct(workerid);
  192. memory_node = worker->memory_node;
  193. }
  194. else
  195. {
  196. combined_worker = _starpu_get_combined_worker_struct(workerid);
  197. memory_node = combined_worker->memory_node;
  198. }
  199. if (use_prefetch)
  200. starpu_prefetch_task_input_on_node(task, memory_node);
  201. if (is_basic_worker)
  202. _starpu_push_task_on_specific_worker_notify_sched(task, worker, workerid, workerid);
  203. else
  204. {
  205. /* Notify all workers of the combined worker */
  206. int worker_size = combined_worker->worker_size;
  207. int *combined_workerid = combined_worker->combined_workerid;
  208. int j;
  209. for (j = 0; j < worker_size; j++)
  210. {
  211. int subworkerid = combined_workerid[j];
  212. _starpu_push_task_on_specific_worker_notify_sched(task, _starpu_get_worker_struct(subworkerid), subworkerid, workerid);
  213. }
  214. }
  215. #ifdef STARPU_USE_SC_HYPERVISOR
  216. starpu_sched_ctx_call_pushed_task_cb(workerid, task->sched_ctx);
  217. #endif //STARPU_USE_SC_HYPERVISOR
  218. unsigned i;
  219. if (is_basic_worker)
  220. {
  221. unsigned node = starpu_worker_get_memory_node(workerid);
  222. if (_starpu_task_uses_multiformat_handles(task))
  223. {
  224. for (i = 0; i < task->cl->nbuffers; i++)
  225. {
  226. struct starpu_task *conversion_task;
  227. starpu_data_handle_t handle;
  228. handle = STARPU_TASK_GET_HANDLE(task, i);
  229. if (!_starpu_handle_needs_conversion_task(handle, node))
  230. continue;
  231. conversion_task = _starpu_create_conversion_task(handle, node);
  232. conversion_task->mf_skip = 1;
  233. conversion_task->execute_on_a_specific_worker = 1;
  234. conversion_task->workerid = workerid;
  235. _starpu_task_submit_conversion_task(conversion_task, workerid);
  236. //_STARPU_DEBUG("Pushing a conversion task\n");
  237. }
  238. for (i = 0; i < task->cl->nbuffers; i++)
  239. {
  240. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  241. handle->mf_node = node;
  242. }
  243. }
  244. // if(task->sched_ctx != _starpu_get_initial_sched_ctx()->id)
  245. if(task->priority > 0)
  246. return _starpu_push_local_task(worker, task, 1);
  247. else
  248. return _starpu_push_local_task(worker, task, 0);
  249. }
  250. else
  251. {
  252. /* This is a combined worker so we create task aliases */
  253. int worker_size = combined_worker->worker_size;
  254. int *combined_workerid = combined_worker->combined_workerid;
  255. int ret = 0;
  256. struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
  257. job->task_size = worker_size;
  258. job->combined_workerid = workerid;
  259. job->active_task_alias_count = 0;
  260. STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, worker_size);
  261. STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, worker_size);
  262. /* Note: we have to call that early, or else the task may have
  263. * disappeared already */
  264. starpu_push_task_end(task);
  265. int j;
  266. for (j = 0; j < worker_size; j++)
  267. {
  268. struct starpu_task *alias = starpu_task_dup(task);
  269. worker = _starpu_get_worker_struct(combined_workerid[j]);
  270. ret |= _starpu_push_local_task(worker, alias, 0);
  271. }
  272. return ret;
  273. }
  274. }
  275. /* the generic interface that call the proper underlying implementation */
  276. int _starpu_push_task(struct _starpu_job *j)
  277. {
  278. if(j->task->prologue_callback_func)
  279. j->task->prologue_callback_func(j->task->prologue_callback_arg);
  280. struct starpu_task *task = j->task;
  281. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  282. unsigned nworkers = 0;
  283. int ret;
  284. _STARPU_LOG_IN();
  285. _STARPU_TRACE_JOB_PUSH(task, task->priority > 0);
  286. _starpu_increment_nready_tasks_of_sched_ctx(task->sched_ctx, task->flops);
  287. task->status = STARPU_TASK_READY;
  288. #ifdef HAVE_AYUDAME_H
  289. if (AYU_event)
  290. {
  291. intptr_t id = -1;
  292. AYU_event(AYU_ADDTASKTOQUEUE, j->job_id, &id);
  293. }
  294. #endif
  295. /* if the context does not have any workers save the tasks in a temp list */
  296. if(!sched_ctx->is_initial_sched)
  297. {
  298. /*if there are workers in the ctx that are not able to execute tasks
  299. we consider the ctx empty */
  300. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  301. if(nworkers == 0)
  302. {
  303. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  304. starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
  305. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  306. #ifdef STARPU_USE_SC_HYPERVISOR
  307. if(sched_ctx != NULL && sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  308. && sched_ctx->perf_counters->notify_empty_ctx)
  309. {
  310. _STARPU_TRACE_HYPERVISOR_BEGIN();
  311. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  312. _STARPU_TRACE_HYPERVISOR_END();
  313. }
  314. #endif
  315. return 0;
  316. }
  317. }
  318. /* in case there is no codelet associated to the task (that's a control
  319. * task), we directly execute its callback and enforce the
  320. * corresponding dependencies */
  321. if (task->cl == NULL)
  322. {
  323. _starpu_handle_job_termination(j);
  324. _STARPU_LOG_OUT_TAG("handle_job_termination");
  325. return 0;
  326. }
  327. ret = _starpu_push_task_to_workers(task);
  328. if (ret == -EAGAIN)
  329. /* pushed to empty context, that's fine */
  330. ret = 0;
  331. return ret;
  332. }
  333. int _starpu_push_task_to_workers(struct starpu_task *task)
  334. {
  335. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  336. unsigned nworkers = 0;
  337. /* if the contexts still does not have workers put the task back to its place in
  338. the empty ctx list */
  339. if(!sched_ctx->is_initial_sched)
  340. {
  341. /*if there are workers in the ctx that are not able to execute tasks
  342. we consider the ctx empty */
  343. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  344. if (nworkers == 0)
  345. {
  346. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  347. starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task);
  348. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  349. #ifdef STARPU_USE_SC_HYPERVISOR
  350. if(sched_ctx != NULL && sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  351. && sched_ctx->perf_counters->notify_empty_ctx)
  352. {
  353. _STARPU_TRACE_HYPERVISOR_BEGIN();
  354. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  355. _STARPU_TRACE_HYPERVISOR_END();
  356. }
  357. #endif
  358. return -EAGAIN;
  359. }
  360. }
  361. _starpu_profiling_set_task_push_start_time(task);
  362. int ret;
  363. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  364. {
  365. unsigned node = starpu_worker_get_memory_node(task->workerid);
  366. if (starpu_get_prefetch_flag())
  367. starpu_prefetch_task_input_on_node(task, node);
  368. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  369. }
  370. else
  371. {
  372. struct _starpu_machine_config *config = _starpu_get_machine_config();
  373. /* When a task can only be executed on a given arch and we have
  374. * only one memory node for that arch, we can systematically
  375. * prefetch before the scheduling decision. */
  376. if (starpu_get_prefetch_flag()) {
  377. if (task->cl->where == STARPU_CPU && config->cpus_nodeid >= 0)
  378. starpu_prefetch_task_input_on_node(task, config->cpus_nodeid);
  379. else if (task->cl->where == STARPU_CUDA && config->cuda_nodeid >= 0)
  380. starpu_prefetch_task_input_on_node(task, config->cuda_nodeid);
  381. else if (task->cl->where == STARPU_OPENCL && config->opencl_nodeid >= 0)
  382. starpu_prefetch_task_input_on_node(task, config->opencl_nodeid);
  383. else if (task->cl->where == STARPU_MIC && config->mic_nodeid >= 0)
  384. starpu_prefetch_task_input_on_node(task, config->mic_nodeid);
  385. else if (task->cl->where == STARPU_SCC && config->scc_nodeid >= 0)
  386. starpu_prefetch_task_input_on_node(task, config->scc_nodeid);
  387. }
  388. if(!sched_ctx->sched_policy)
  389. {
  390. ret = _starpu_push_task_on_specific_worker(task, sched_ctx->main_master);
  391. }
  392. else
  393. {
  394. STARPU_ASSERT(sched_ctx->sched_policy->push_task);
  395. /* check out if there are any workers in the context */
  396. starpu_pthread_rwlock_t *changing_ctx_mutex = _starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx->id);
  397. STARPU_PTHREAD_RWLOCK_RDLOCK(changing_ctx_mutex);
  398. nworkers = starpu_sched_ctx_get_nworkers(sched_ctx->id);
  399. if (nworkers == 0)
  400. ret = -1;
  401. else
  402. {
  403. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  404. ret = sched_ctx->sched_policy->push_task(task);
  405. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  406. }
  407. STARPU_PTHREAD_RWLOCK_UNLOCK(changing_ctx_mutex);
  408. }
  409. if(ret == -1)
  410. {
  411. fprintf(stderr, "repush task \n");
  412. _STARPU_TRACE_JOB_POP(task, task->priority > 0);
  413. ret = _starpu_push_task_to_workers(task);
  414. }
  415. }
  416. /* Note: from here, the task might have been destroyed already! */
  417. _STARPU_LOG_OUT();
  418. return ret;
  419. }
  420. /* This is called right after the scheduler has pushed a task to a queue
  421. * but just before releasing mutexes: we need the task to still be alive!
  422. */
  423. int starpu_push_task_end(struct starpu_task *task)
  424. {
  425. _starpu_profiling_set_task_push_end_time(task);
  426. task->scheduled = 1;
  427. return 0;
  428. }
  429. /*
  430. * Given a handle that needs to be converted in order to be used on the given
  431. * node, returns a task that takes care of the conversion.
  432. */
  433. struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle,
  434. unsigned int node)
  435. {
  436. return _starpu_create_conversion_task_for_arch(handle, starpu_node_get_kind(node));
  437. }
  438. struct starpu_task *_starpu_create_conversion_task_for_arch(starpu_data_handle_t handle,
  439. enum starpu_node_kind node_kind)
  440. {
  441. struct starpu_task *conversion_task;
  442. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID)
  443. struct starpu_multiformat_interface *format_interface;
  444. #endif
  445. conversion_task = starpu_task_create();
  446. conversion_task->name = "conversion_task";
  447. conversion_task->synchronous = 0;
  448. STARPU_TASK_SET_HANDLE(conversion_task, handle, 0);
  449. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID)
  450. /* The node does not really matter here */
  451. format_interface = (struct starpu_multiformat_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
  452. #endif
  453. _starpu_spin_lock(&handle->header_lock);
  454. handle->refcnt++;
  455. handle->busy_count++;
  456. _starpu_spin_unlock(&handle->header_lock);
  457. switch(node_kind)
  458. {
  459. case STARPU_CPU_RAM:
  460. case STARPU_SCC_RAM:
  461. case STARPU_SCC_SHM:
  462. switch (starpu_node_get_kind(handle->mf_node))
  463. {
  464. case STARPU_CPU_RAM:
  465. case STARPU_SCC_RAM:
  466. case STARPU_SCC_SHM:
  467. STARPU_ABORT();
  468. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  469. case STARPU_CUDA_RAM:
  470. {
  471. struct starpu_multiformat_data_interface_ops *mf_ops;
  472. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  473. conversion_task->cl = mf_ops->cuda_to_cpu_cl;
  474. break;
  475. }
  476. #endif
  477. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  478. case STARPU_OPENCL_RAM:
  479. {
  480. struct starpu_multiformat_data_interface_ops *mf_ops;
  481. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  482. conversion_task->cl = mf_ops->opencl_to_cpu_cl;
  483. break;
  484. }
  485. #endif
  486. #ifdef STARPU_USE_MIC
  487. case STARPU_MIC_RAM:
  488. {
  489. struct starpu_multiformat_data_interface_ops *mf_ops;
  490. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  491. conversion_task->cl = mf_ops->mic_to_cpu_cl;
  492. break;
  493. }
  494. #endif
  495. default:
  496. _STARPU_ERROR("Oops : %u\n", handle->mf_node);
  497. }
  498. break;
  499. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  500. case STARPU_CUDA_RAM:
  501. {
  502. struct starpu_multiformat_data_interface_ops *mf_ops;
  503. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  504. conversion_task->cl = mf_ops->cpu_to_cuda_cl;
  505. break;
  506. }
  507. #endif
  508. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  509. case STARPU_OPENCL_RAM:
  510. {
  511. struct starpu_multiformat_data_interface_ops *mf_ops;
  512. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  513. conversion_task->cl = mf_ops->cpu_to_opencl_cl;
  514. break;
  515. }
  516. #endif
  517. #ifdef STARPU_USE_MIC
  518. case STARPU_MIC_RAM:
  519. {
  520. struct starpu_multiformat_data_interface_ops *mf_ops;
  521. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  522. conversion_task->cl = mf_ops->cpu_to_mic_cl;
  523. break;
  524. }
  525. #endif
  526. default:
  527. STARPU_ABORT();
  528. }
  529. STARPU_CODELET_SET_MODE(conversion_task->cl, STARPU_RW, 0);
  530. return conversion_task;
  531. }
  532. static
  533. struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker *worker)
  534. {
  535. struct _starpu_sched_ctx_list *l = NULL;
  536. unsigned are_2_priorities = 0;
  537. for (l = worker->sched_ctx_list; l; l = l->next)
  538. {
  539. if(l->priority != worker->pop_ctx_priority)
  540. {
  541. are_2_priorities = 1;
  542. break;
  543. }
  544. }
  545. if(!worker->reverse_phase[worker->pop_ctx_priority])
  546. {
  547. /* find a context in which the worker hasn't poped yet */
  548. for (l = worker->sched_ctx_list; l; l = l->next)
  549. {
  550. if(l->priority == worker->pop_ctx_priority)
  551. {
  552. if(!worker->poped_in_ctx[l->sched_ctx])
  553. {
  554. worker->poped_in_ctx[l->sched_ctx] = !worker->poped_in_ctx[l->sched_ctx];
  555. return _starpu_get_sched_ctx_struct(l->sched_ctx);
  556. }
  557. }
  558. }
  559. worker->reverse_phase[worker->pop_ctx_priority] = !worker->reverse_phase[worker->pop_ctx_priority];
  560. if(are_2_priorities)
  561. worker->pop_ctx_priority = !worker->pop_ctx_priority;
  562. }
  563. are_2_priorities = 0;
  564. if(worker->reverse_phase[worker->pop_ctx_priority])
  565. {
  566. /* if the context has already poped in every one start from the begining */
  567. for (l = worker->sched_ctx_list; l; l = l->next)
  568. {
  569. if(l->priority == worker->pop_ctx_priority)
  570. {
  571. if(worker->poped_in_ctx[l->sched_ctx])
  572. {
  573. worker->poped_in_ctx[l->sched_ctx] = !worker->poped_in_ctx[l->sched_ctx];
  574. return _starpu_get_sched_ctx_struct(l->sched_ctx);
  575. }
  576. }
  577. }
  578. worker->reverse_phase[worker->pop_ctx_priority] = !worker->reverse_phase[worker->pop_ctx_priority];
  579. if(are_2_priorities)
  580. worker->pop_ctx_priority = !worker->pop_ctx_priority;
  581. }
  582. unsigned first_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  583. for (l = worker->sched_ctx_list; l; l = l->next)
  584. {
  585. if(l->priority == worker->pop_ctx_priority)
  586. {
  587. first_sched_ctx = l->sched_ctx;
  588. break;
  589. }
  590. }
  591. // if(worker->pop_ctx_priority == 0 && first_sched_ctx == STARPU_NMAX_SCHED_CTXS)
  592. if(first_sched_ctx == STARPU_NMAX_SCHED_CTXS)
  593. first_sched_ctx = worker->sched_ctx_list->sched_ctx;
  594. worker->poped_in_ctx[first_sched_ctx] = !worker->poped_in_ctx[first_sched_ctx];
  595. return _starpu_get_sched_ctx_struct(first_sched_ctx);
  596. }
  597. struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
  598. {
  599. struct starpu_task *task;
  600. int worker_id;
  601. unsigned node;
  602. /* We can't tell in advance which task will be picked up, so we measure
  603. * a timestamp, and will attribute it afterwards to the task. */
  604. int profiling = starpu_profiling_status_get();
  605. struct timespec pop_start_time;
  606. if (profiling)
  607. _starpu_clock_gettime(&pop_start_time);
  608. pick:
  609. /* perhaps there is some local task to be executed first */
  610. task = _starpu_pop_local_task(worker);
  611. /* get tasks from the stacks of the strategy */
  612. if(!task)
  613. {
  614. struct _starpu_sched_ctx *sched_ctx ;
  615. #ifndef STARPU_NON_BLOCKING_DRIVERS
  616. int been_here[STARPU_NMAX_SCHED_CTXS];
  617. int i;
  618. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  619. been_here[i] = 0;
  620. while(!task)
  621. #endif
  622. {
  623. if(worker->nsched_ctxs == 1)
  624. sched_ctx = _starpu_get_initial_sched_ctx();
  625. else
  626. {
  627. while(1)
  628. {
  629. sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
  630. if(worker->removed_from_ctx[sched_ctx->id] == 1 && worker->shares_tasks_lists[sched_ctx->id] == 1)
  631. {
  632. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  633. worker->removed_from_ctx[sched_ctx->id] = 0;
  634. sched_ctx = NULL;
  635. }
  636. else
  637. break;
  638. }
  639. }
  640. if(sched_ctx && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  641. {
  642. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
  643. {
  644. task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
  645. }
  646. }
  647. if(!task)
  648. {
  649. /* it doesn't matter if it shares tasks list or not in the scheduler,
  650. if it does not have any task to pop just get it out of here */
  651. /* however if it shares a task list it will be removed as soon as he
  652. finishes this job (in handle_job_termination) */
  653. if(worker->removed_from_ctx[sched_ctx->id])
  654. {
  655. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  656. worker->removed_from_ctx[sched_ctx->id] = 0;
  657. }
  658. #ifdef STARPU_USE_SC_HYPERVISOR
  659. if(worker->pop_ctx_priority)
  660. {
  661. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  662. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_idle_cycle && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
  663. {
  664. // _STARPU_TRACE_HYPERVISOR_BEGIN();
  665. perf_counters->notify_idle_cycle(sched_ctx->id, worker->workerid, 1.0);
  666. // _STARPU_TRACE_HYPERVISOR_END();
  667. }
  668. }
  669. #endif //STARPU_USE_SC_HYPERVISOR
  670. #ifndef STARPU_NON_BLOCKING_DRIVERS
  671. if(been_here[sched_ctx->id] || worker->nsched_ctxs == 1)
  672. break;
  673. been_here[sched_ctx->id] = 1;
  674. #endif
  675. }
  676. }
  677. }
  678. if (!task)
  679. {
  680. idle_start[worker->workerid] = starpu_timing_now();
  681. return NULL;
  682. }
  683. if(idle_start[worker->workerid] != 0.0)
  684. {
  685. double idle_end = starpu_timing_now();
  686. idle[worker->workerid] += (idle_end - idle_start[worker->workerid]);
  687. idle_start[worker->workerid] = 0.0;
  688. }
  689. #ifdef STARPU_USE_SC_HYPERVISOR
  690. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  691. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  692. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_poped_task && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
  693. {
  694. // _STARPU_TRACE_HYPERVISOR_BEGIN();
  695. perf_counters->notify_poped_task(task->sched_ctx, worker->workerid);
  696. // _STARPU_TRACE_HYPERVISOR_END();
  697. }
  698. #endif //STARPU_USE_SC_HYPERVISOR
  699. /* Make sure we do not bother with all the multiformat-specific code if
  700. * it is not necessary. */
  701. if (!_starpu_task_uses_multiformat_handles(task))
  702. goto profiling;
  703. /* This is either a conversion task, or a regular task for which the
  704. * conversion tasks have already been created and submitted */
  705. if (task->mf_skip)
  706. goto profiling;
  707. /*
  708. * This worker may not be able to execute this task. In this case, we
  709. * should return the task anyway. It will be pushed back almost immediatly.
  710. * This way, we avoid computing and executing the conversions tasks.
  711. * Here, we do not care about what implementation is used.
  712. */
  713. worker_id = starpu_worker_get_id();
  714. if (!starpu_worker_can_execute_task(worker_id, task, 0))
  715. return task;
  716. node = starpu_worker_get_memory_node(worker_id);
  717. /*
  718. * We do have a task that uses multiformat handles. Let's create the
  719. * required conversion tasks.
  720. */
  721. unsigned i;
  722. for (i = 0; i < task->cl->nbuffers; i++)
  723. {
  724. struct starpu_task *conversion_task;
  725. starpu_data_handle_t handle;
  726. handle = STARPU_TASK_GET_HANDLE(task, i);
  727. if (!_starpu_handle_needs_conversion_task(handle, node))
  728. continue;
  729. conversion_task = _starpu_create_conversion_task(handle, node);
  730. conversion_task->mf_skip = 1;
  731. conversion_task->execute_on_a_specific_worker = 1;
  732. conversion_task->workerid = worker_id;
  733. /*
  734. * Next tasks will need to know where these handles have gone.
  735. */
  736. handle->mf_node = node;
  737. _starpu_task_submit_conversion_task(conversion_task, worker_id);
  738. }
  739. task->mf_skip = 1;
  740. starpu_task_list_push_back(&worker->local_tasks, task);
  741. goto pick;
  742. profiling:
  743. if (profiling)
  744. {
  745. struct starpu_profiling_task_info *profiling_info;
  746. profiling_info = task->profiling_info;
  747. /* The task may have been created before profiling was enabled,
  748. * so we check if the profiling_info structure is available
  749. * even though we already tested if profiling is enabled. */
  750. if (profiling_info)
  751. {
  752. memcpy(&profiling_info->pop_start_time,
  753. &pop_start_time, sizeof(struct timespec));
  754. _starpu_clock_gettime(&profiling_info->pop_end_time);
  755. }
  756. }
  757. if(task->prologue_callback_pop_func)
  758. task->prologue_callback_pop_func(task->prologue_callback_pop_arg);
  759. return task;
  760. }
  761. struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx)
  762. {
  763. struct starpu_task *task = NULL;
  764. if(sched_ctx->sched_policy)
  765. {
  766. STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
  767. /* TODO set profiling info */
  768. if(sched_ctx->sched_policy->pop_every_task)
  769. {
  770. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  771. task = sched_ctx->sched_policy->pop_every_task(sched_ctx->id);
  772. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  773. }
  774. }
  775. return task;
  776. }
  777. void _starpu_sched_pre_exec_hook(struct starpu_task *task)
  778. {
  779. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  780. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pre_exec_hook)
  781. {
  782. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  783. sched_ctx->sched_policy->pre_exec_hook(task);
  784. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  785. }
  786. }
  787. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  788. {
  789. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  790. if (sched_ctx->sched_policy && sched_ctx->sched_policy->post_exec_hook)
  791. {
  792. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  793. sched_ctx->sched_policy->post_exec_hook(task);
  794. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  795. }
  796. }
  797. void _starpu_wait_on_sched_event(void)
  798. {
  799. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  800. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  801. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  802. if (_starpu_machine_is_running())
  803. {
  804. #ifndef STARPU_NON_BLOCKING_DRIVERS
  805. STARPU_PTHREAD_COND_WAIT(&worker->sched_cond,
  806. &worker->sched_mutex);
  807. #endif
  808. }
  809. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  810. }
  811. /* The scheduling policy may put tasks directly into a worker's local queue so
  812. * that it is not always necessary to create its own queue when the local queue
  813. * is sufficient. If "back" not null, the task is put at the back of the queue
  814. * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  815. * a FIFO ordering. */
  816. int starpu_push_local_task(int workerid, struct starpu_task *task, int prio)
  817. {
  818. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  819. return _starpu_push_local_task(worker, task, prio);
  820. }
  821. void _starpu_print_idle_time()
  822. {
  823. double all_idle = 0.0;
  824. int i = 0;
  825. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  826. all_idle += idle[i];
  827. FILE *f;
  828. const char *sched_env = getenv("STARPU_IDLE_FILE");
  829. if(!sched_env)
  830. f = fopen("starpu_idle_microsec.log", "a");
  831. else
  832. f = fopen(sched_env, "a");
  833. fprintf(f, "%lf \n", all_idle);
  834. fclose(f);
  835. }