sched_policy.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2013 Université de Bordeaux 1
  4. * Copyright (C) 2010-2013 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <starpu.h>
  19. #include <common/config.h>
  20. #include <common/utils.h>
  21. #include <core/sched_policy.h>
  22. #include <profiling/profiling.h>
  23. #include <common/barrier.h>
  24. #include <core/debug.h>
  25. static int use_prefetch = 0;
  26. int starpu_get_prefetch_flag(void)
  27. {
  28. return use_prefetch;
  29. }
  30. static struct starpu_sched_policy *predefined_policies[] =
  31. {
  32. &_starpu_sched_eager_policy,
  33. &_starpu_sched_prio_policy,
  34. &_starpu_sched_random_policy,
  35. &_starpu_sched_ws_policy,
  36. &_starpu_sched_dm_policy,
  37. &_starpu_sched_dmda_policy,
  38. &_starpu_sched_dmda_ready_policy,
  39. &_starpu_sched_dmda_sorted_policy,
  40. &_starpu_sched_parallel_heft_policy,
  41. &_starpu_sched_peager_policy,
  42. NULL
  43. };
  44. struct starpu_sched_policy **starpu_sched_get_predefined_policies()
  45. {
  46. return predefined_policies;
  47. }
  48. struct starpu_sched_policy *_starpu_get_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  49. {
  50. return sched_ctx->sched_policy;
  51. }
  52. /*
  53. * Methods to initialize the scheduling policy
  54. */
  55. static void load_sched_policy(struct starpu_sched_policy *sched_policy, struct _starpu_sched_ctx *sched_ctx)
  56. {
  57. STARPU_ASSERT(sched_policy);
  58. #ifdef STARPU_VERBOSE
  59. if (sched_policy->policy_name)
  60. {
  61. if (sched_policy->policy_description)
  62. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  63. else
  64. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  65. }
  66. #endif
  67. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  68. memcpy(policy, sched_policy, sizeof(*policy));
  69. }
  70. static struct starpu_sched_policy *find_sched_policy_from_name(const char *policy_name)
  71. {
  72. if (!policy_name)
  73. return NULL;
  74. if (strncmp(policy_name, "heft", 5) == 0)
  75. {
  76. _STARPU_DISP("Warning: heft is now called \"dmda\".\n");
  77. return &_starpu_sched_dmda_policy;
  78. }
  79. struct starpu_sched_policy **policy;
  80. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  81. {
  82. struct starpu_sched_policy *p = *policy;
  83. if (p->policy_name)
  84. {
  85. if (strcmp(policy_name, p->policy_name) == 0)
  86. {
  87. /* we found a policy with the requested name */
  88. return p;
  89. }
  90. }
  91. }
  92. if (strcmp(policy_name, "help") != 0)
  93. fprintf(stderr, "Warning: scheduling policy \"%s\" was not found, try \"help\" to get a list\n", policy_name);
  94. /* nothing was found */
  95. return NULL;
  96. }
  97. static void display_sched_help_message(void)
  98. {
  99. const char *sched_env = getenv("STARPU_SCHED");
  100. if (sched_env && (strcmp(sched_env, "help") == 0))
  101. {
  102. /* display the description of all predefined policies */
  103. struct starpu_sched_policy **policy;
  104. fprintf(stderr, "\nThe variable STARPU_SCHED can be set to one of the following strings:\n");
  105. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  106. {
  107. struct starpu_sched_policy *p = *policy;
  108. fprintf(stderr, "%s\t-> %s\n", p->policy_name, p->policy_description);
  109. }
  110. fprintf(stderr, "\n");
  111. }
  112. }
  113. struct starpu_sched_policy *_starpu_select_sched_policy(struct _starpu_machine_config *config, const char *required_policy)
  114. {
  115. struct starpu_sched_policy *selected_policy = NULL;
  116. struct starpu_conf *user_conf = config->conf;
  117. if(required_policy)
  118. selected_policy = find_sched_policy_from_name(required_policy);
  119. /* First, we check whether the application explicitely gave a scheduling policy or not */
  120. if (!selected_policy && user_conf && (user_conf->sched_policy))
  121. return user_conf->sched_policy;
  122. /* Otherwise, we look if the application specified the name of a policy to load */
  123. const char *sched_pol_name;
  124. sched_pol_name = getenv("STARPU_SCHED");
  125. if (sched_pol_name == NULL && user_conf && user_conf->sched_policy_name)
  126. sched_pol_name = user_conf->sched_policy_name;
  127. if (!selected_policy && sched_pol_name)
  128. selected_policy = find_sched_policy_from_name(sched_pol_name);
  129. /* Perhaps there was no policy that matched the name */
  130. if (selected_policy)
  131. return selected_policy;
  132. /* If no policy was specified, we use the greedy policy as a default */
  133. return &_starpu_sched_eager_policy;
  134. }
  135. void _starpu_init_sched_policy(struct _starpu_machine_config *config, struct _starpu_sched_ctx *sched_ctx, struct starpu_sched_policy *selected_policy)
  136. {
  137. /* Perhaps we have to display some help */
  138. display_sched_help_message();
  139. /* Prefetch is activated by default */
  140. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  141. if (use_prefetch == -1)
  142. use_prefetch = 1;
  143. /* Set calibrate flag */
  144. _starpu_set_calibrate_flag(config->conf->calibrate);
  145. load_sched_policy(selected_policy, sched_ctx);
  146. sched_ctx->sched_policy->init_sched(sched_ctx->id);
  147. }
  148. void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  149. {
  150. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  151. if (policy->deinit_sched)
  152. policy->deinit_sched(sched_ctx->id);
  153. }
  154. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  155. * case workerid identifies a combined worker, a task will be enqueued into
  156. * each worker of the combination. */
  157. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  158. {
  159. int nbasic_workers = (int)starpu_worker_get_count();
  160. /* Is this a basic worker or a combined worker ? */
  161. int is_basic_worker = (workerid < nbasic_workers);
  162. unsigned memory_node;
  163. struct _starpu_worker *worker = NULL;
  164. struct _starpu_combined_worker *combined_worker = NULL;
  165. if (is_basic_worker)
  166. {
  167. worker = _starpu_get_worker_struct(workerid);
  168. memory_node = worker->memory_node;
  169. }
  170. else
  171. {
  172. combined_worker = _starpu_get_combined_worker_struct(workerid);
  173. memory_node = combined_worker->memory_node;
  174. }
  175. if (use_prefetch)
  176. starpu_prefetch_task_input_on_node(task, memory_node);
  177. /* if we push a task on a specific worker, notify all the sched_ctxs the worker belongs to */
  178. struct _starpu_sched_ctx *sched_ctx;
  179. struct _starpu_sched_ctx_list *l = NULL;
  180. for (l = worker->sched_ctx_list; l; l = l->next)
  181. {
  182. sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
  183. if (sched_ctx->sched_policy != NULL && sched_ctx->sched_policy->push_task_notify)
  184. sched_ctx->sched_policy->push_task_notify(task, workerid, sched_ctx->id);
  185. }
  186. #ifdef STARPU_USE_SC_HYPERVISOR
  187. starpu_sched_ctx_call_pushed_task_cb(workerid, task->sched_ctx);
  188. #endif //STARPU_USE_SC_HYPERVISOR
  189. unsigned i;
  190. if (is_basic_worker)
  191. {
  192. unsigned node = starpu_worker_get_memory_node(workerid);
  193. if (_starpu_task_uses_multiformat_handles(task))
  194. {
  195. for (i = 0; i < task->cl->nbuffers; i++)
  196. {
  197. struct starpu_task *conversion_task;
  198. starpu_data_handle_t handle;
  199. handle = STARPU_TASK_GET_HANDLE(task, i);
  200. if (!_starpu_handle_needs_conversion_task(handle, node))
  201. continue;
  202. conversion_task = _starpu_create_conversion_task(handle, node);
  203. conversion_task->mf_skip = 1;
  204. conversion_task->execute_on_a_specific_worker = 1;
  205. conversion_task->workerid = workerid;
  206. _starpu_task_submit_conversion_task(conversion_task, workerid);
  207. //_STARPU_DEBUG("Pushing a conversion task\n");
  208. }
  209. for (i = 0; i < task->cl->nbuffers; i++)
  210. {
  211. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  212. handle->mf_node = node;
  213. }
  214. }
  215. // if(task->sched_ctx != _starpu_get_initial_sched_ctx()->id)
  216. if(task->priority > 0)
  217. return _starpu_push_local_task(worker, task, 1);
  218. else
  219. return _starpu_push_local_task(worker, task, 0);
  220. }
  221. else
  222. {
  223. /* This is a combined worker so we create task aliases */
  224. int worker_size = combined_worker->worker_size;
  225. int *combined_workerid = combined_worker->combined_workerid;
  226. int ret = 0;
  227. struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
  228. job->task_size = worker_size;
  229. job->combined_workerid = workerid;
  230. job->active_task_alias_count = 0;
  231. STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, worker_size);
  232. STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, worker_size);
  233. /* Note: we have to call that early, or else the task may have
  234. * disappeared already */
  235. starpu_push_task_end(task);
  236. int j;
  237. for (j = 0; j < worker_size; j++)
  238. {
  239. struct starpu_task *alias = starpu_task_dup(task);
  240. worker = _starpu_get_worker_struct(combined_workerid[j]);
  241. ret |= _starpu_push_local_task(worker, alias, 0);
  242. }
  243. return ret;
  244. }
  245. }
  246. /* the generic interface that call the proper underlying implementation */
  247. int _starpu_push_task(struct _starpu_job *j)
  248. {
  249. if(j->task->prologue_callback_func)
  250. j->task->prologue_callback_func(j->task->prologue_callback_arg);
  251. struct starpu_task *task = j->task;
  252. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  253. unsigned nworkers = 0;
  254. int ret;
  255. _STARPU_LOG_IN();
  256. _STARPU_TRACE_JOB_PUSH(task, task->priority > 0);
  257. _starpu_increment_nready_tasks_of_sched_ctx(task->sched_ctx, task->flops);
  258. task->status = STARPU_TASK_READY;
  259. #ifdef HAVE_AYUDAME_H
  260. if (AYU_event)
  261. {
  262. intptr_t id = -1;
  263. AYU_event(AYU_ADDTASKTOQUEUE, j->job_id, &id);
  264. }
  265. #endif
  266. /* if the context does not have any workers save the tasks in a temp list */
  267. if(!sched_ctx->is_initial_sched)
  268. {
  269. /*if there are workers in the ctx that are not able to execute tasks
  270. we consider the ctx empty */
  271. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  272. if(nworkers == 0)
  273. {
  274. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  275. starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
  276. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  277. #ifdef STARPU_USE_SC_HYPERVISOR
  278. if(sched_ctx != NULL && sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  279. && sched_ctx->perf_counters->notify_empty_ctx)
  280. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  281. #endif
  282. return 0;
  283. }
  284. }
  285. /* in case there is no codelet associated to the task (that's a control
  286. * task), we directly execute its callback and enforce the
  287. * corresponding dependencies */
  288. if (task->cl == NULL)
  289. {
  290. _starpu_handle_job_termination(j);
  291. _STARPU_LOG_OUT_TAG("handle_job_termination");
  292. return 0;
  293. }
  294. ret = _starpu_push_task_to_workers(task);
  295. if (ret == -EAGAIN)
  296. /* pushed to empty context, that's fine */
  297. ret = 0;
  298. return ret;
  299. }
  300. int _starpu_push_task_to_workers(struct starpu_task *task)
  301. {
  302. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  303. unsigned nworkers = 0;
  304. /* if the contexts still does not have workers put the task back to its place in
  305. the empty ctx list */
  306. if(!sched_ctx->is_initial_sched)
  307. {
  308. /*if there are workers in the ctx that are not able to execute tasks
  309. we consider the ctx empty */
  310. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  311. if (nworkers == 0)
  312. {
  313. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  314. starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task);
  315. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  316. #ifdef STARPU_USE_SC_HYPERVISOR
  317. if(sched_ctx != NULL && sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  318. && sched_ctx->perf_counters->notify_empty_ctx)
  319. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  320. #endif
  321. return -EAGAIN;
  322. }
  323. }
  324. _starpu_profiling_set_task_push_start_time(task);
  325. int ret;
  326. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  327. {
  328. unsigned node = starpu_worker_get_memory_node(task->workerid);
  329. if (starpu_get_prefetch_flag())
  330. starpu_prefetch_task_input_on_node(task, node);
  331. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  332. }
  333. else
  334. {
  335. struct _starpu_machine_config *config = _starpu_get_machine_config();
  336. /* When a task can only be executed on a given arch and we have
  337. * only one memory node for that arch, we can systematically
  338. * prefetch before the scheduling decision. */
  339. if (starpu_get_prefetch_flag()) {
  340. if (task->cl->where == STARPU_CPU && config->cpus_nodeid >= 0)
  341. starpu_prefetch_task_input_on_node(task, config->cpus_nodeid);
  342. else if (task->cl->where == STARPU_CUDA && config->cuda_nodeid >= 0)
  343. starpu_prefetch_task_input_on_node(task, config->cuda_nodeid);
  344. else if (task->cl->where == STARPU_OPENCL && config->opencl_nodeid >= 0)
  345. starpu_prefetch_task_input_on_node(task, config->opencl_nodeid);
  346. else if (task->cl->where == STARPU_MIC && config->mic_nodeid >= 0)
  347. starpu_prefetch_task_input_on_node(task, config->mic_nodeid);
  348. else if (task->cl->where == STARPU_SCC && config->scc_nodeid >= 0)
  349. starpu_prefetch_task_input_on_node(task, config->scc_nodeid);
  350. }
  351. STARPU_ASSERT(sched_ctx->sched_policy->push_task);
  352. /* check out if there are any workers in the context */
  353. starpu_pthread_rwlock_t *changing_ctx_mutex = _starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx->id);
  354. STARPU_PTHREAD_RWLOCK_RDLOCK(changing_ctx_mutex);
  355. nworkers = starpu_sched_ctx_get_nworkers(sched_ctx->id);
  356. ret = nworkers == 0 ? -1 : sched_ctx->sched_policy->push_task(task);
  357. STARPU_PTHREAD_RWLOCK_UNLOCK(changing_ctx_mutex);
  358. if(ret == -1)
  359. {
  360. fprintf(stderr, "repush task \n");
  361. _STARPU_TRACE_JOB_POP(task, task->priority > 0);
  362. ret = _starpu_push_task_to_workers(task);
  363. }
  364. }
  365. /* Note: from here, the task might have been destroyed already! */
  366. _STARPU_LOG_OUT();
  367. return ret;
  368. }
  369. /* This is called right after the scheduler has pushed a task to a queue
  370. * but just before releasing mutexes: we need the task to still be alive!
  371. */
  372. int starpu_push_task_end(struct starpu_task *task)
  373. {
  374. _starpu_profiling_set_task_push_end_time(task);
  375. task->scheduled = 1;
  376. return 0;
  377. }
  378. /*
  379. * Given a handle that needs to be converted in order to be used on the given
  380. * node, returns a task that takes care of the conversion.
  381. */
  382. struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle,
  383. unsigned int node)
  384. {
  385. return _starpu_create_conversion_task_for_arch(handle, starpu_node_get_kind(node));
  386. }
  387. struct starpu_task *_starpu_create_conversion_task_for_arch(starpu_data_handle_t handle,
  388. enum starpu_node_kind node_kind)
  389. {
  390. struct starpu_task *conversion_task;
  391. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID)
  392. struct starpu_multiformat_interface *format_interface;
  393. #endif
  394. conversion_task = starpu_task_create();
  395. conversion_task->name = "conversion_task";
  396. conversion_task->synchronous = 0;
  397. STARPU_TASK_SET_HANDLE(conversion_task, handle, 0);
  398. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID)
  399. /* The node does not really matter here */
  400. format_interface = (struct starpu_multiformat_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
  401. #endif
  402. _starpu_spin_lock(&handle->header_lock);
  403. handle->refcnt++;
  404. handle->busy_count++;
  405. _starpu_spin_unlock(&handle->header_lock);
  406. switch(node_kind)
  407. {
  408. case STARPU_CPU_RAM:
  409. case STARPU_SCC_RAM:
  410. case STARPU_SCC_SHM:
  411. switch (starpu_node_get_kind(handle->mf_node))
  412. {
  413. case STARPU_CPU_RAM:
  414. case STARPU_SCC_RAM:
  415. case STARPU_SCC_SHM:
  416. STARPU_ABORT();
  417. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  418. case STARPU_CUDA_RAM:
  419. {
  420. struct starpu_multiformat_data_interface_ops *mf_ops;
  421. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  422. conversion_task->cl = mf_ops->cuda_to_cpu_cl;
  423. break;
  424. }
  425. #endif
  426. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  427. case STARPU_OPENCL_RAM:
  428. {
  429. struct starpu_multiformat_data_interface_ops *mf_ops;
  430. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  431. conversion_task->cl = mf_ops->opencl_to_cpu_cl;
  432. break;
  433. }
  434. #endif
  435. #ifdef STARPU_USE_MIC
  436. case STARPU_MIC_RAM:
  437. {
  438. struct starpu_multiformat_data_interface_ops *mf_ops;
  439. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  440. conversion_task->cl = mf_ops->mic_to_cpu_cl;
  441. break;
  442. }
  443. #endif
  444. default:
  445. _STARPU_ERROR("Oops : %u\n", handle->mf_node);
  446. }
  447. break;
  448. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  449. case STARPU_CUDA_RAM:
  450. {
  451. struct starpu_multiformat_data_interface_ops *mf_ops;
  452. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  453. conversion_task->cl = mf_ops->cpu_to_cuda_cl;
  454. break;
  455. }
  456. #endif
  457. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  458. case STARPU_OPENCL_RAM:
  459. {
  460. struct starpu_multiformat_data_interface_ops *mf_ops;
  461. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  462. conversion_task->cl = mf_ops->cpu_to_opencl_cl;
  463. break;
  464. }
  465. #endif
  466. #ifdef STARPU_USE_MIC
  467. case STARPU_MIC_RAM:
  468. {
  469. struct starpu_multiformat_data_interface_ops *mf_ops;
  470. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  471. conversion_task->cl = mf_ops->cpu_to_mic_cl;
  472. break;
  473. }
  474. #endif
  475. default:
  476. STARPU_ABORT();
  477. }
  478. STARPU_CODELET_SET_MODE(conversion_task->cl, STARPU_RW, 0);
  479. return conversion_task;
  480. }
  481. static
  482. struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker *worker)
  483. {
  484. struct _starpu_sched_ctx *sched_ctx, *good_sched_ctx = NULL;
  485. unsigned smallest_counter = worker->nsched_ctxs;
  486. struct _starpu_sched_ctx_list *l = NULL;
  487. for (l = worker->sched_ctx_list; l; l = l->next)
  488. {
  489. sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
  490. /* if(worker->removed_from_ctx[sched_ctx->id] == 1 && worker->shares_tasks_lists[sched_ctx->id] == 1) */
  491. /* return sched_ctx; */
  492. if(sched_ctx->pop_counter[worker->workerid] < worker->nsched_ctxs &&
  493. smallest_counter > sched_ctx->pop_counter[worker->workerid])
  494. {
  495. good_sched_ctx = sched_ctx;
  496. smallest_counter = sched_ctx->pop_counter[worker->workerid];
  497. }
  498. }
  499. if(good_sched_ctx == NULL)
  500. {
  501. for (l = worker->sched_ctx_list; l; l = l->next)
  502. {
  503. sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
  504. sched_ctx->pop_counter[worker->workerid] = 0;
  505. }
  506. return _starpu_get_sched_ctx_struct(worker->sched_ctx_list->sched_ctx);
  507. }
  508. return good_sched_ctx;
  509. }
  510. struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
  511. {
  512. struct starpu_task *task;
  513. int worker_id;
  514. unsigned node;
  515. /* We can't tell in advance which task will be picked up, so we measure
  516. * a timestamp, and will attribute it afterwards to the task. */
  517. int profiling = starpu_profiling_status_get();
  518. struct timespec pop_start_time;
  519. if (profiling)
  520. _starpu_clock_gettime(&pop_start_time);
  521. pick:
  522. /* perhaps there is some local task to be executed first */
  523. task = _starpu_pop_local_task(worker);
  524. /* get tasks from the stacks of the strategy */
  525. if(!task)
  526. {
  527. struct _starpu_sched_ctx *sched_ctx ;
  528. #ifndef STARPU_NON_BLOCKING_DRIVERS
  529. int been_here[STARPU_NMAX_SCHED_CTXS];
  530. int i;
  531. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  532. been_here[i] = 0;
  533. while(!task)
  534. #endif
  535. {
  536. if(worker->nsched_ctxs == 1)
  537. sched_ctx = _starpu_get_initial_sched_ctx();
  538. else
  539. {
  540. while(1)
  541. {
  542. sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
  543. if(worker->removed_from_ctx[sched_ctx->id] == 1 && worker->shares_tasks_lists[sched_ctx->id] == 1)
  544. {
  545. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  546. worker->removed_from_ctx[sched_ctx->id] = 0;
  547. sched_ctx = NULL;
  548. }
  549. else
  550. break;
  551. }
  552. }
  553. if(sched_ctx && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  554. {
  555. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
  556. {
  557. task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
  558. }
  559. }
  560. if(!task)
  561. {
  562. /* it doesn't matter if it shares tasks list or not in the scheduler,
  563. if it does not have any task to pop just get it out of here */
  564. /* however if it shares a task list it will be removed as soon as he
  565. finishes this job (in handle_job_termination) */
  566. if(worker->removed_from_ctx[sched_ctx->id])
  567. {
  568. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  569. worker->removed_from_ctx[sched_ctx->id] = 0;
  570. }
  571. #ifdef STARPU_USE_SC_HYPERVISOR
  572. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  573. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_idle_cycle)
  574. perf_counters->notify_idle_cycle(sched_ctx->id, worker->workerid, 1.0);
  575. #endif //STARPU_USE_SC_HYPERVISOR
  576. #ifndef STARPU_NON_BLOCKING_DRIVERS
  577. if((sched_ctx->pop_counter[worker->workerid] == 0 && been_here[sched_ctx->id]) || worker->nsched_ctxs == 1)
  578. break;
  579. been_here[sched_ctx->id] = 1;
  580. #endif
  581. }
  582. sched_ctx->pop_counter[worker->workerid]++;
  583. }
  584. }
  585. if (!task)
  586. return NULL;
  587. #ifdef STARPU_USE_SC_HYPERVISOR
  588. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  589. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  590. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_poped_task)
  591. perf_counters->notify_poped_task(task->sched_ctx, worker->workerid);
  592. #endif //STARPU_USE_SC_HYPERVISOR
  593. /* Make sure we do not bother with all the multiformat-specific code if
  594. * it is not necessary. */
  595. if (!_starpu_task_uses_multiformat_handles(task))
  596. goto profiling;
  597. /* This is either a conversion task, or a regular task for which the
  598. * conversion tasks have already been created and submitted */
  599. if (task->mf_skip)
  600. goto profiling;
  601. worker_id = starpu_worker_get_id();
  602. if (!starpu_worker_can_execute_task(worker_id, task, 0))
  603. return task;
  604. node = starpu_worker_get_memory_node(worker_id);
  605. /*
  606. * We do have a task that uses multiformat handles. Let's create the
  607. * required conversion tasks.
  608. */
  609. unsigned i;
  610. for (i = 0; i < task->cl->nbuffers; i++)
  611. {
  612. struct starpu_task *conversion_task;
  613. starpu_data_handle_t handle;
  614. handle = STARPU_TASK_GET_HANDLE(task, i);
  615. if (!_starpu_handle_needs_conversion_task(handle, node))
  616. continue;
  617. conversion_task = _starpu_create_conversion_task(handle, node);
  618. conversion_task->mf_skip = 1;
  619. conversion_task->execute_on_a_specific_worker = 1;
  620. conversion_task->workerid = worker_id;
  621. /*
  622. * Next tasks will need to know where these handles have gone.
  623. */
  624. handle->mf_node = node;
  625. _starpu_task_submit_conversion_task(conversion_task, worker_id);
  626. }
  627. task->mf_skip = 1;
  628. starpu_task_list_push_back(&worker->local_tasks, task);
  629. goto pick;
  630. profiling:
  631. if (profiling)
  632. {
  633. struct starpu_profiling_task_info *profiling_info;
  634. profiling_info = task->profiling_info;
  635. /* The task may have been created before profiling was enabled,
  636. * so we check if the profiling_info structure is available
  637. * even though we already tested if profiling is enabled. */
  638. if (profiling_info)
  639. {
  640. memcpy(&profiling_info->pop_start_time,
  641. &pop_start_time, sizeof(struct timespec));
  642. _starpu_clock_gettime(&profiling_info->pop_end_time);
  643. }
  644. }
  645. return task;
  646. }
  647. struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx)
  648. {
  649. STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
  650. /* TODO set profiling info */
  651. if(sched_ctx->sched_policy->pop_every_task)
  652. return sched_ctx->sched_policy->pop_every_task(sched_ctx->id);
  653. return NULL;
  654. }
  655. void _starpu_sched_pre_exec_hook(struct starpu_task *task)
  656. {
  657. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  658. if (sched_ctx->sched_policy->pre_exec_hook)
  659. sched_ctx->sched_policy->pre_exec_hook(task);
  660. }
  661. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  662. {
  663. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  664. if (sched_ctx->sched_policy->post_exec_hook)
  665. sched_ctx->sched_policy->post_exec_hook(task);
  666. }
  667. void _starpu_wait_on_sched_event(void)
  668. {
  669. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  670. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  671. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  672. if (_starpu_machine_is_running())
  673. {
  674. #ifndef STARPU_NON_BLOCKING_DRIVERS
  675. STARPU_PTHREAD_COND_WAIT(&worker->sched_cond,
  676. &worker->sched_mutex);
  677. #endif
  678. }
  679. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  680. }
  681. /* The scheduling policy may put tasks directly into a worker's local queue so
  682. * that it is not always necessary to create its own queue when the local queue
  683. * is sufficient. If "back" not null, the task is put at the back of the queue
  684. * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  685. * a FIFO ordering. */
  686. int starpu_push_local_task(int workerid, struct starpu_task *task, int prio)
  687. {
  688. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  689. return _starpu_push_local_task(worker, task, prio);
  690. }