sched_policy.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2011 Université de Bordeaux 1
  4. * Copyright (C) 2010-2011 Centre National de la Recherche Scientifique
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <pthread.h>
  18. #include <starpu.h>
  19. #include <common/config.h>
  20. #include <common/utils.h>
  21. #include <core/sched_policy.h>
  22. #include <profiling/profiling.h>
  23. #include <common/barrier.h>
  24. static struct starpu_sched_policy policy;
  25. static int use_prefetch = 0;
  26. int starpu_get_prefetch_flag(void)
  27. {
  28. return use_prefetch;
  29. }
  30. /*
  31. * Predefined policies
  32. */
  33. extern struct starpu_sched_policy _starpu_sched_ws_policy;
  34. extern struct starpu_sched_policy _starpu_sched_prio_policy;
  35. extern struct starpu_sched_policy _starpu_sched_random_policy;
  36. extern struct starpu_sched_policy _starpu_sched_dm_policy;
  37. extern struct starpu_sched_policy _starpu_sched_dmda_policy;
  38. extern struct starpu_sched_policy _starpu_sched_dmda_ready_policy;
  39. extern struct starpu_sched_policy _starpu_sched_dmda_sorted_policy;
  40. extern struct starpu_sched_policy _starpu_sched_eager_policy;
  41. extern struct starpu_sched_policy _starpu_sched_parallel_heft_policy;
  42. extern struct starpu_sched_policy _starpu_sched_pgreedy_policy;
  43. extern struct starpu_sched_policy heft_policy;
  44. static struct starpu_sched_policy *predefined_policies[] =
  45. {
  46. &_starpu_sched_ws_policy,
  47. &_starpu_sched_prio_policy,
  48. &_starpu_sched_dm_policy,
  49. &_starpu_sched_dmda_policy,
  50. &heft_policy,
  51. &_starpu_sched_dmda_ready_policy,
  52. &_starpu_sched_dmda_sorted_policy,
  53. &_starpu_sched_random_policy,
  54. &_starpu_sched_eager_policy,
  55. &_starpu_sched_parallel_heft_policy,
  56. &_starpu_sched_pgreedy_policy
  57. };
  58. struct starpu_sched_policy *_starpu_get_sched_policy(void)
  59. {
  60. return &policy;
  61. }
  62. /*
  63. * Methods to initialize the scheduling policy
  64. */
  65. static void load_sched_policy(struct starpu_sched_policy *sched_policy)
  66. {
  67. STARPU_ASSERT(sched_policy);
  68. #ifdef STARPU_VERBOSE
  69. if (sched_policy->policy_name)
  70. {
  71. if (sched_policy->policy_description)
  72. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  73. else
  74. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  75. }
  76. #endif
  77. policy.init_sched = sched_policy->init_sched;
  78. policy.deinit_sched = sched_policy->deinit_sched;
  79. policy.push_task = sched_policy->push_task;
  80. policy.push_task_notify = sched_policy->push_task_notify;
  81. policy.pop_task = sched_policy->pop_task;
  82. policy.post_exec_hook = sched_policy->post_exec_hook;
  83. policy.pop_every_task = sched_policy->pop_every_task;
  84. }
  85. static struct starpu_sched_policy *find_sched_policy_from_name(const char *policy_name)
  86. {
  87. if (!policy_name)
  88. return NULL;
  89. unsigned i;
  90. for (i = 0; i < sizeof(predefined_policies)/sizeof(predefined_policies[0]); i++)
  91. {
  92. struct starpu_sched_policy *p;
  93. p = predefined_policies[i];
  94. if (p->policy_name)
  95. {
  96. if (strcmp(policy_name, p->policy_name) == 0)
  97. {
  98. /* we found a policy with the requested name */
  99. return p;
  100. }
  101. }
  102. }
  103. fprintf(stderr, "Warning: scheduling policy \"%s\" was not found, try \"help\" to get a list\n", policy_name);
  104. /* nothing was found */
  105. return NULL;
  106. }
  107. static void display_sched_help_message(void)
  108. {
  109. const char *sched_env = getenv("STARPU_SCHED");
  110. if (sched_env && (strcmp(sched_env, "help") == 0))
  111. {
  112. fprintf(stderr, "STARPU_SCHED can be either of\n");
  113. /* display the description of all predefined policies */
  114. unsigned i;
  115. for (i = 0; i < sizeof(predefined_policies)/sizeof(predefined_policies[0]); i++)
  116. {
  117. struct starpu_sched_policy *p;
  118. p = predefined_policies[i];
  119. fprintf(stderr, "%s\t-> %s\n", p->policy_name, p->policy_description);
  120. }
  121. }
  122. }
  123. static struct starpu_sched_policy *select_sched_policy(struct _starpu_machine_config *config)
  124. {
  125. struct starpu_sched_policy *selected_policy = NULL;
  126. struct starpu_conf *user_conf = config->user_conf;
  127. /* First, we check whether the application explicitely gave a scheduling policy or not */
  128. if (user_conf && (user_conf->sched_policy))
  129. return user_conf->sched_policy;
  130. /* Otherwise, we look if the application specified the name of a policy to load */
  131. const char *sched_pol_name;
  132. if (user_conf && (user_conf->sched_policy_name))
  133. {
  134. sched_pol_name = user_conf->sched_policy_name;
  135. }
  136. else
  137. {
  138. sched_pol_name = getenv("STARPU_SCHED");
  139. }
  140. if (sched_pol_name)
  141. selected_policy = find_sched_policy_from_name(sched_pol_name);
  142. /* Perhaps there was no policy that matched the name */
  143. if (selected_policy)
  144. return selected_policy;
  145. /* If no policy was specified, we use the greedy policy as a default */
  146. return &_starpu_sched_eager_policy;
  147. }
  148. void _starpu_init_sched_policy(struct _starpu_machine_config *config)
  149. {
  150. /* Perhaps we have to display some help */
  151. display_sched_help_message();
  152. /* Prefetch is activated by default */
  153. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  154. if (use_prefetch == -1)
  155. use_prefetch = 1;
  156. /* By default, we don't calibrate */
  157. unsigned do_calibrate = 0;
  158. if (config->user_conf && (config->user_conf->calibrate != -1))
  159. {
  160. do_calibrate = config->user_conf->calibrate;
  161. }
  162. else
  163. {
  164. int res = starpu_get_env_number("STARPU_CALIBRATE");
  165. do_calibrate = (res < 0)?0:(unsigned)res;
  166. }
  167. _starpu_set_calibrate_flag(do_calibrate);
  168. struct starpu_sched_policy *selected_policy;
  169. selected_policy = select_sched_policy(config);
  170. load_sched_policy(selected_policy);
  171. policy.init_sched(&config->topology, &policy);
  172. }
  173. void _starpu_deinit_sched_policy(struct _starpu_machine_config *config)
  174. {
  175. if (policy.deinit_sched)
  176. policy.deinit_sched(&config->topology, &policy);
  177. }
  178. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  179. * case workerid identifies a combined worker, a task will be enqueued into
  180. * each worker of the combination. */
  181. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  182. {
  183. int nbasic_workers = (int)starpu_worker_get_count();
  184. /* Is this a basic worker or a combined worker ? */
  185. int is_basic_worker = (workerid < nbasic_workers);
  186. unsigned memory_node;
  187. struct _starpu_worker *worker = NULL;
  188. struct _starpu_combined_worker *combined_worker = NULL;
  189. if (is_basic_worker)
  190. {
  191. worker = _starpu_get_worker_struct(workerid);
  192. memory_node = worker->memory_node;
  193. }
  194. else
  195. {
  196. combined_worker = _starpu_get_combined_worker_struct(workerid);
  197. memory_node = combined_worker->memory_node;
  198. }
  199. if (use_prefetch)
  200. starpu_prefetch_task_input_on_node(task, memory_node);
  201. if (policy.push_task_notify)
  202. policy.push_task_notify(task, workerid);
  203. if (is_basic_worker)
  204. {
  205. unsigned node = starpu_worker_get_memory_node(workerid);
  206. if (_starpu_task_uses_multiformat_handles(task))
  207. {
  208. unsigned i;
  209. for (i = 0; i < task->cl->nbuffers; i++)
  210. {
  211. struct starpu_task *conversion_task;
  212. starpu_data_handle_t handle;
  213. handle = task->handles[i];
  214. if (!_starpu_handle_needs_conversion_task(handle, node))
  215. continue;
  216. conversion_task = _starpu_create_conversion_task(handle, node);
  217. conversion_task->mf_skip = 1;
  218. conversion_task->execute_on_a_specific_worker = 1;
  219. conversion_task->workerid = workerid;
  220. _starpu_task_submit_conversion_task(conversion_task, workerid);
  221. //_STARPU_DEBUG("Pushing a conversion task\n");
  222. }
  223. for (i = 0; i < task->cl->nbuffers; i++)
  224. task->handles[i]->mf_node = node;
  225. }
  226. return _starpu_push_local_task(worker, task, 0);
  227. }
  228. else
  229. {
  230. /* This is a combined worker so we create task aliases */
  231. int worker_size = combined_worker->worker_size;
  232. int *combined_workerid = combined_worker->combined_workerid;
  233. int ret = 0;
  234. int i;
  235. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  236. j->task_size = worker_size;
  237. j->combined_workerid = workerid;
  238. j->active_task_alias_count = 0;
  239. _STARPU_PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
  240. _STARPU_PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
  241. for (i = 0; i < worker_size; i++)
  242. {
  243. struct starpu_task *alias = _starpu_create_task_alias(task);
  244. worker = _starpu_get_worker_struct(combined_workerid[i]);
  245. ret |= _starpu_push_local_task(worker, alias, 0);
  246. }
  247. return ret;
  248. }
  249. }
  250. /* the generic interface that call the proper underlying implementation */
  251. int _starpu_push_task(struct _starpu_job *j, unsigned job_is_already_locked)
  252. {
  253. struct starpu_task *task = j->task;
  254. _STARPU_LOG_IN();
  255. _starpu_increment_nready_tasks();
  256. task->status = STARPU_TASK_READY;
  257. _starpu_profiling_set_task_push_start_time(task);
  258. /* in case there is no codelet associated to the task (that's a control
  259. * task), we directly execute its callback and enforce the
  260. * corresponding dependencies */
  261. if (task->cl == NULL)
  262. {
  263. _starpu_handle_job_termination(j, job_is_already_locked);
  264. _STARPU_LOG_OUT_TAG("handle_job_termination");
  265. return 0;
  266. }
  267. int ret;
  268. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  269. {
  270. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  271. }
  272. else
  273. {
  274. STARPU_ASSERT(policy.push_task);
  275. ret = policy.push_task(task);
  276. }
  277. _starpu_profiling_set_task_push_end_time(task);
  278. _STARPU_LOG_OUT();
  279. return ret;
  280. }
  281. /*
  282. * Given a handle that needs to be converted in order to be used on the given
  283. * node, returns a task that takes care of the conversion.
  284. */
  285. struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle,
  286. unsigned int node)
  287. {
  288. struct starpu_task *conversion_task;
  289. struct starpu_multiformat_interface *format_interface;
  290. enum _starpu_node_kind node_kind;
  291. conversion_task = starpu_task_create();
  292. conversion_task->synchronous = 0;
  293. conversion_task->buffers[0].handle = handle;
  294. conversion_task->buffers[0].mode = STARPU_RW;
  295. /* The node does not really matter here */
  296. format_interface = (struct starpu_multiformat_interface *) starpu_data_get_interface_on_node(handle, 0);
  297. node_kind = _starpu_get_node_kind(node);
  298. handle->refcnt++;
  299. handle->busy_count++;
  300. switch(node_kind)
  301. {
  302. case STARPU_CPU_RAM:
  303. switch (_starpu_get_node_kind(handle->mf_node))
  304. {
  305. case STARPU_CPU_RAM:
  306. STARPU_ASSERT(0);
  307. #ifdef STARPU_USE_CUDA
  308. case STARPU_CUDA_RAM:
  309. conversion_task->cl = format_interface->ops->cuda_to_cpu_cl;
  310. break;
  311. #endif
  312. #ifdef STARPU_USE_OPENCL
  313. case STARPU_OPENCL_RAM:
  314. conversion_task->cl = format_interface->ops->opencl_to_cpu_cl;
  315. break;
  316. #endif
  317. default:
  318. fprintf(stderr, "Oops : %d\n", handle->mf_node);
  319. STARPU_ASSERT(0);
  320. }
  321. break;
  322. #ifdef STARPU_USE_CUDA
  323. case STARPU_CUDA_RAM:
  324. conversion_task->cl = format_interface->ops->cpu_to_cuda_cl;
  325. break;
  326. #endif
  327. #ifdef STARPU_USE_OPENCL
  328. case STARPU_OPENCL_RAM:
  329. conversion_task->cl = format_interface->ops->cpu_to_opencl_cl;
  330. break;
  331. #endif
  332. case STARPU_SPU_LS: /* Not supported */
  333. default:
  334. STARPU_ASSERT(0);
  335. }
  336. return conversion_task;
  337. }
  338. struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
  339. {
  340. struct starpu_task *task;
  341. /* We can't tell in advance which task will be picked up, so we measure
  342. * a timestamp, and will attribute it afterwards to the task. */
  343. int profiling = starpu_profiling_status_get();
  344. struct timespec pop_start_time;
  345. if (profiling)
  346. _starpu_clock_gettime(&pop_start_time);
  347. pick:
  348. /* perhaps there is some local task to be executed first */
  349. task = _starpu_pop_local_task(worker);
  350. if (!task && policy.pop_task)
  351. task = policy.pop_task();
  352. if (!task)
  353. return NULL;
  354. /* Make sure we do not bother with all the multiformat-specific code if
  355. * it is not necessary. */
  356. if (!_starpu_task_uses_multiformat_handles(task))
  357. goto profiling;
  358. /* This is either a conversion task, or a regular task for which the
  359. * conversion tasks have already been created and submitted */
  360. if (task->mf_skip)
  361. goto profiling;
  362. int worker_id = starpu_worker_get_id();
  363. if (!starpu_worker_can_execute_task(worker_id, task, 0))
  364. return task;
  365. unsigned node = starpu_worker_get_memory_node(worker_id);
  366. /*
  367. * We do have a task that uses multiformat handles. Let's create the
  368. * required conversion tasks.
  369. */
  370. unsigned i;
  371. for (i = 0; i < task->cl->nbuffers; i++)
  372. {
  373. struct starpu_task *conversion_task;
  374. starpu_data_handle_t handle;
  375. handle = task->handles[i];
  376. if (!_starpu_handle_needs_conversion_task(handle, node))
  377. continue;
  378. conversion_task = _starpu_create_conversion_task(handle, node);
  379. conversion_task->mf_skip = 1;
  380. conversion_task->execute_on_a_specific_worker = 1;
  381. conversion_task->workerid = worker_id;
  382. /*
  383. * Next tasks will need to know where these handles have gone.
  384. */
  385. handle->mf_node = node;
  386. _starpu_task_submit_conversion_task(conversion_task, worker_id);
  387. }
  388. task->mf_skip = 1;
  389. starpu_task_list_push_front(&worker->local_tasks, task);
  390. goto pick;
  391. profiling:
  392. /* Note that we may get a NULL task in case the scheduler was unlocked
  393. * for some reason. */
  394. if (profiling)
  395. {
  396. struct starpu_task_profiling_info *profiling_info;
  397. profiling_info = task->profiling_info;
  398. /* The task may have been created before profiling was enabled,
  399. * so we check if the profiling_info structure is available
  400. * even though we already tested if profiling is enabled. */
  401. if (profiling_info)
  402. {
  403. memcpy(&profiling_info->pop_start_time,
  404. &pop_start_time, sizeof(struct timespec));
  405. _starpu_clock_gettime(&profiling_info->pop_end_time);
  406. }
  407. }
  408. return task;
  409. }
  410. struct starpu_task *_starpu_pop_every_task(void)
  411. {
  412. STARPU_ASSERT(policy.pop_every_task);
  413. /* TODO set profiling info */
  414. return policy.pop_every_task();
  415. }
  416. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  417. {
  418. if (policy.post_exec_hook)
  419. policy.post_exec_hook(task);
  420. }
  421. void _starpu_wait_on_sched_event(void)
  422. {
  423. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  424. _STARPU_PTHREAD_MUTEX_LOCK(worker->sched_mutex);
  425. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  426. if (_starpu_machine_is_running())
  427. {
  428. #ifndef STARPU_NON_BLOCKING_DRIVERS
  429. _STARPU_PTHREAD_COND_WAIT(worker->sched_cond,
  430. worker->sched_mutex);
  431. #endif
  432. }
  433. _STARPU_PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
  434. }
  435. /* The scheduling policy may put tasks directly into a worker's local queue so
  436. * that it is not always necessary to create its own queue when the local queue
  437. * is sufficient. If "back" not null, the task is put at the back of the queue
  438. * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  439. * a FIFO ordering. */
  440. int starpu_push_local_task(int workerid, struct starpu_task *task, int back)
  441. {
  442. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  443. return _starpu_push_local_task(worker, task, back);
  444. }