sched_policy.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2012 Université de Bordeaux 1
  4. * Copyright (C) 2010-2012 Centre National de la Recherche Scientifique
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <pthread.h>
  18. #include <starpu.h>
  19. #include <common/config.h>
  20. #include <common/utils.h>
  21. #include <core/sched_policy.h>
  22. #include <profiling/profiling.h>
  23. #include <common/barrier.h>
  24. #include <core/debug.h>
  25. static struct starpu_sched_policy policy;
  26. static int use_prefetch = 0;
  27. int starpu_get_prefetch_flag(void)
  28. {
  29. return use_prefetch;
  30. }
  31. static struct starpu_sched_policy *predefined_policies[] =
  32. {
  33. &_starpu_sched_ws_policy,
  34. &_starpu_sched_prio_policy,
  35. &_starpu_sched_dm_policy,
  36. &_starpu_sched_dmda_policy,
  37. &_starpu_sched_dmda_ready_policy,
  38. &_starpu_sched_dmda_sorted_policy,
  39. &_starpu_sched_random_policy,
  40. &_starpu_sched_eager_policy,
  41. &_starpu_sched_parallel_heft_policy,
  42. &_starpu_sched_pgreedy_policy
  43. };
  44. struct starpu_sched_policy *_starpu_get_sched_policy(void)
  45. {
  46. return &policy;
  47. }
  48. /*
  49. * Methods to initialize the scheduling policy
  50. */
  51. static void load_sched_policy(struct starpu_sched_policy *sched_policy)
  52. {
  53. STARPU_ASSERT(sched_policy);
  54. #ifdef STARPU_VERBOSE
  55. if (sched_policy->policy_name)
  56. {
  57. if (sched_policy->policy_description)
  58. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  59. else
  60. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  61. }
  62. #endif
  63. memcpy(&policy, sched_policy, sizeof(policy));
  64. }
  65. static struct starpu_sched_policy *find_sched_policy_from_name(const char *policy_name)
  66. {
  67. if (!policy_name)
  68. return NULL;
  69. if (strncmp(policy_name, "heft", 5) == 0)
  70. {
  71. _STARPU_DISP("Warning: heft is now called \"dmda\".\n");
  72. return &_starpu_sched_dmda_policy;
  73. }
  74. unsigned i;
  75. for (i = 0; i < sizeof(predefined_policies)/sizeof(predefined_policies[0]); i++)
  76. {
  77. struct starpu_sched_policy *p;
  78. p = predefined_policies[i];
  79. if (p->policy_name)
  80. {
  81. if (strcmp(policy_name, p->policy_name) == 0)
  82. {
  83. /* we found a policy with the requested name */
  84. return p;
  85. }
  86. }
  87. }
  88. fprintf(stderr, "Warning: scheduling policy \"%s\" was not found, try \"help\" to get a list\n", policy_name);
  89. /* nothing was found */
  90. return NULL;
  91. }
  92. static void display_sched_help_message(void)
  93. {
  94. const char *sched_env = getenv("STARPU_SCHED");
  95. if (sched_env && (strcmp(sched_env, "help") == 0))
  96. {
  97. fprintf(stderr, "STARPU_SCHED can be either of\n");
  98. /* display the description of all predefined policies */
  99. unsigned i;
  100. for (i = 0; i < sizeof(predefined_policies)/sizeof(predefined_policies[0]); i++)
  101. {
  102. struct starpu_sched_policy *p;
  103. p = predefined_policies[i];
  104. fprintf(stderr, "%s\t-> %s\n", p->policy_name, p->policy_description);
  105. }
  106. }
  107. }
  108. static struct starpu_sched_policy *select_sched_policy(struct _starpu_machine_config *config)
  109. {
  110. struct starpu_sched_policy *selected_policy = NULL;
  111. /* First, we check whether the application explicitely gave a scheduling policy or not */
  112. if (config->conf->sched_policy)
  113. return config->conf->sched_policy;
  114. /* Otherwise, we look if the application specified the name of a policy to load */
  115. if (config->conf->sched_policy_name)
  116. selected_policy = find_sched_policy_from_name(config->conf->sched_policy_name);
  117. /* Perhaps there was no policy that matched the name */
  118. if (selected_policy)
  119. return selected_policy;
  120. /* If no policy was specified, we use the greedy policy as a default */
  121. return &_starpu_sched_eager_policy;
  122. }
  123. void _starpu_init_sched_policy(struct _starpu_machine_config *config)
  124. {
  125. /* Perhaps we have to display some help */
  126. display_sched_help_message();
  127. /* Prefetch is activated by default */
  128. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  129. if (use_prefetch == -1)
  130. use_prefetch = 1;
  131. /* Set calibrate flag */
  132. _starpu_set_calibrate_flag(config->conf->calibrate);
  133. struct starpu_sched_policy *selected_policy;
  134. selected_policy = select_sched_policy(config);
  135. load_sched_policy(selected_policy);
  136. policy.init_sched(&config->topology, &policy);
  137. }
  138. void _starpu_deinit_sched_policy(struct _starpu_machine_config *config)
  139. {
  140. if (policy.deinit_sched)
  141. policy.deinit_sched(&config->topology, &policy);
  142. }
  143. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  144. * case workerid identifies a combined worker, a task will be enqueued into
  145. * each worker of the combination. */
  146. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  147. {
  148. int nbasic_workers = (int)starpu_worker_get_count();
  149. /* Is this a basic worker or a combined worker ? */
  150. int is_basic_worker = (workerid < nbasic_workers);
  151. unsigned memory_node;
  152. struct _starpu_worker *worker = NULL;
  153. struct _starpu_combined_worker *combined_worker = NULL;
  154. if (is_basic_worker)
  155. {
  156. worker = _starpu_get_worker_struct(workerid);
  157. memory_node = worker->memory_node;
  158. }
  159. else
  160. {
  161. combined_worker = _starpu_get_combined_worker_struct(workerid);
  162. memory_node = combined_worker->memory_node;
  163. }
  164. if (use_prefetch)
  165. starpu_prefetch_task_input_on_node(task, memory_node);
  166. if (policy.push_task_notify)
  167. policy.push_task_notify(task, workerid);
  168. if (is_basic_worker)
  169. {
  170. unsigned node = starpu_worker_get_memory_node(workerid);
  171. if (_starpu_task_uses_multiformat_handles(task))
  172. {
  173. unsigned i;
  174. for (i = 0; i < task->cl->nbuffers; i++)
  175. {
  176. struct starpu_task *conversion_task;
  177. starpu_data_handle_t handle;
  178. handle = task->handles[i];
  179. if (!_starpu_handle_needs_conversion_task(handle, node))
  180. continue;
  181. conversion_task = _starpu_create_conversion_task(handle, node);
  182. conversion_task->mf_skip = 1;
  183. conversion_task->execute_on_a_specific_worker = 1;
  184. conversion_task->workerid = workerid;
  185. _starpu_task_submit_conversion_task(conversion_task, workerid);
  186. //_STARPU_DEBUG("Pushing a conversion task\n");
  187. }
  188. for (i = 0; i < task->cl->nbuffers; i++)
  189. task->handles[i]->mf_node = node;
  190. }
  191. return _starpu_push_local_task(worker, task, 0);
  192. }
  193. else
  194. {
  195. /* This is a combined worker so we create task aliases */
  196. int worker_size = combined_worker->worker_size;
  197. int *combined_workerid = combined_worker->combined_workerid;
  198. int ret = 0;
  199. int i;
  200. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  201. j->task_size = worker_size;
  202. j->combined_workerid = workerid;
  203. j->active_task_alias_count = 0;
  204. _STARPU_PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
  205. _STARPU_PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
  206. for (i = 0; i < worker_size; i++)
  207. {
  208. struct starpu_task *alias = _starpu_create_task_alias(task);
  209. worker = _starpu_get_worker_struct(combined_workerid[i]);
  210. ret |= _starpu_push_local_task(worker, alias, 0);
  211. }
  212. return ret;
  213. }
  214. }
  215. /* the generic interface that call the proper underlying implementation */
  216. int _starpu_push_task(struct _starpu_job *j)
  217. {
  218. struct starpu_task *task = j->task;
  219. _STARPU_LOG_IN();
  220. _starpu_increment_nready_tasks();
  221. task->status = STARPU_TASK_READY;
  222. #ifdef HAVE_AYUDAME_H
  223. if (AYU_event) {
  224. int id = -1;
  225. AYU_event(AYU_ADDTASKTOQUEUE, j->job_id, &id);
  226. }
  227. #endif
  228. _starpu_profiling_set_task_push_start_time(task);
  229. /* in case there is no codelet associated to the task (that's a control
  230. * task), we directly execute its callback and enforce the
  231. * corresponding dependencies */
  232. if (task->cl == NULL)
  233. {
  234. _starpu_handle_job_termination(j);
  235. _STARPU_LOG_OUT_TAG("handle_job_termination");
  236. return 0;
  237. }
  238. int ret;
  239. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  240. {
  241. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  242. }
  243. else
  244. {
  245. STARPU_ASSERT(policy.push_task);
  246. ret = policy.push_task(task);
  247. }
  248. _starpu_profiling_set_task_push_end_time(task);
  249. _STARPU_LOG_OUT();
  250. return ret;
  251. }
  252. /*
  253. * Given a handle that needs to be converted in order to be used on the given
  254. * node, returns a task that takes care of the conversion.
  255. */
  256. struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle,
  257. unsigned int node)
  258. {
  259. struct starpu_task *conversion_task;
  260. struct starpu_multiformat_interface *format_interface;
  261. enum starpu_node_kind node_kind;
  262. conversion_task = starpu_task_create();
  263. conversion_task->synchronous = 0;
  264. conversion_task->handles[0] = handle;
  265. /* The node does not really matter here */
  266. format_interface = (struct starpu_multiformat_interface *) starpu_data_get_interface_on_node(handle, 0);
  267. node_kind = starpu_node_get_kind(node);
  268. _starpu_spin_lock(&handle->header_lock);
  269. handle->refcnt++;
  270. handle->busy_count++;
  271. _starpu_spin_unlock(&handle->header_lock);
  272. switch(node_kind)
  273. {
  274. case STARPU_CPU_RAM:
  275. switch (starpu_node_get_kind(handle->mf_node))
  276. {
  277. case STARPU_CPU_RAM:
  278. STARPU_ABORT();
  279. #ifdef STARPU_USE_CUDA
  280. case STARPU_CUDA_RAM:
  281. {
  282. struct starpu_multiformat_data_interface_ops *mf_ops;
  283. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  284. conversion_task->cl = mf_ops->cuda_to_cpu_cl;
  285. break;
  286. }
  287. #endif
  288. #ifdef STARPU_USE_OPENCL
  289. case STARPU_OPENCL_RAM:
  290. {
  291. struct starpu_multiformat_data_interface_ops *mf_ops;
  292. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  293. conversion_task->cl = mf_ops->opencl_to_cpu_cl;
  294. break;
  295. }
  296. #endif
  297. default:
  298. _STARPU_ERROR("Oops : %u\n", handle->mf_node);
  299. }
  300. break;
  301. #ifdef STARPU_USE_CUDA
  302. case STARPU_CUDA_RAM:
  303. {
  304. struct starpu_multiformat_data_interface_ops *mf_ops;
  305. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  306. conversion_task->cl = mf_ops->cpu_to_cuda_cl;
  307. break;
  308. }
  309. #endif
  310. #ifdef STARPU_USE_OPENCL
  311. case STARPU_OPENCL_RAM:
  312. {
  313. struct starpu_multiformat_data_interface_ops *mf_ops;
  314. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  315. conversion_task->cl = mf_ops->cpu_to_opencl_cl;
  316. break;
  317. }
  318. #endif
  319. case STARPU_SPU_LS: /* Not supported */
  320. default:
  321. STARPU_ABORT();
  322. }
  323. conversion_task->cl->modes[0] = STARPU_RW;
  324. return conversion_task;
  325. }
  326. struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
  327. {
  328. struct starpu_task *task;
  329. int worker_id;
  330. unsigned node;
  331. /* We can't tell in advance which task will be picked up, so we measure
  332. * a timestamp, and will attribute it afterwards to the task. */
  333. int profiling = starpu_profiling_status_get();
  334. struct timespec pop_start_time;
  335. if (profiling)
  336. _starpu_clock_gettime(&pop_start_time);
  337. pick:
  338. /* perhaps there is some local task to be executed first */
  339. task = _starpu_pop_local_task(worker);
  340. if (!task && policy.pop_task)
  341. task = policy.pop_task();
  342. if (!task)
  343. return NULL;
  344. /* Make sure we do not bother with all the multiformat-specific code if
  345. * it is not necessary. */
  346. if (!_starpu_task_uses_multiformat_handles(task))
  347. goto profiling;
  348. /* This is either a conversion task, or a regular task for which the
  349. * conversion tasks have already been created and submitted */
  350. if (task->mf_skip)
  351. goto profiling;
  352. worker_id = starpu_worker_get_id();
  353. if (!starpu_worker_can_execute_task(worker_id, task, 0))
  354. return task;
  355. node = starpu_worker_get_memory_node(worker_id);
  356. /*
  357. * We do have a task that uses multiformat handles. Let's create the
  358. * required conversion tasks.
  359. */
  360. unsigned i;
  361. for (i = 0; i < task->cl->nbuffers; i++)
  362. {
  363. struct starpu_task *conversion_task;
  364. starpu_data_handle_t handle;
  365. handle = task->handles[i];
  366. if (!_starpu_handle_needs_conversion_task(handle, node))
  367. continue;
  368. conversion_task = _starpu_create_conversion_task(handle, node);
  369. conversion_task->mf_skip = 1;
  370. conversion_task->execute_on_a_specific_worker = 1;
  371. conversion_task->workerid = worker_id;
  372. /*
  373. * Next tasks will need to know where these handles have gone.
  374. */
  375. handle->mf_node = node;
  376. _starpu_task_submit_conversion_task(conversion_task, worker_id);
  377. }
  378. task->mf_skip = 1;
  379. starpu_task_list_push_front(&worker->local_tasks, task);
  380. goto pick;
  381. profiling:
  382. if (profiling)
  383. {
  384. struct starpu_task_profiling_info *profiling_info;
  385. profiling_info = task->profiling_info;
  386. /* The task may have been created before profiling was enabled,
  387. * so we check if the profiling_info structure is available
  388. * even though we already tested if profiling is enabled. */
  389. if (profiling_info)
  390. {
  391. memcpy(&profiling_info->pop_start_time,
  392. &pop_start_time, sizeof(struct timespec));
  393. _starpu_clock_gettime(&profiling_info->pop_end_time);
  394. }
  395. }
  396. return task;
  397. }
  398. struct starpu_task *_starpu_pop_every_task(void)
  399. {
  400. STARPU_ASSERT(policy.pop_every_task);
  401. /* TODO set profiling info */
  402. return policy.pop_every_task();
  403. }
  404. void _starpu_sched_pre_exec_hook(struct starpu_task *task)
  405. {
  406. if (policy.pre_exec_hook)
  407. policy.pre_exec_hook(task);
  408. }
  409. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  410. {
  411. if (policy.post_exec_hook)
  412. policy.post_exec_hook(task);
  413. }
  414. void _starpu_wait_on_sched_event(void)
  415. {
  416. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  417. _STARPU_PTHREAD_MUTEX_LOCK(worker->sched_mutex);
  418. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  419. if (_starpu_machine_is_running())
  420. {
  421. #ifndef STARPU_NON_BLOCKING_DRIVERS
  422. _STARPU_PTHREAD_COND_WAIT(worker->sched_cond,
  423. worker->sched_mutex);
  424. #endif
  425. }
  426. _STARPU_PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
  427. }
  428. /* The scheduling policy may put tasks directly into a worker's local queue so
  429. * that it is not always necessary to create its own queue when the local queue
  430. * is sufficient. If "back" not null, the task is put at the back of the queue
  431. * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  432. * a FIFO ordering. */
  433. int starpu_push_local_task(int workerid, struct starpu_task *task, int back)
  434. {
  435. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  436. return _starpu_push_local_task(worker, task, back);
  437. }