sched_policy.c 31 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2015 Université de Bordeaux
  4. * Copyright (C) 2010-2014 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <starpu.h>
  19. #include <common/config.h>
  20. #include <common/utils.h>
  21. #include <core/sched_policy.h>
  22. #include <profiling/profiling.h>
  23. #include <common/barrier.h>
  24. #include <core/debug.h>
  25. static int use_prefetch = 0;
  26. double idle[STARPU_NMAXWORKERS];
  27. double idle_start[STARPU_NMAXWORKERS];
  28. int starpu_get_prefetch_flag(void)
  29. {
  30. return use_prefetch;
  31. }
  32. static struct starpu_sched_policy *predefined_policies[] =
  33. {
  34. &_starpu_sched_modular_eager_policy,
  35. &_starpu_sched_modular_eager_prefetching_policy,
  36. &_starpu_sched_modular_prio_policy,
  37. &_starpu_sched_modular_prio_prefetching_policy,
  38. &_starpu_sched_modular_random_policy,
  39. &_starpu_sched_modular_random_prio_policy,
  40. &_starpu_sched_modular_random_prefetching_policy,
  41. &_starpu_sched_modular_random_prio_prefetching_policy,
  42. //&_starpu_sched_modular_ws_policy,
  43. &_starpu_sched_modular_heft_policy,
  44. &_starpu_sched_modular_heft2_policy,
  45. &_starpu_sched_eager_policy,
  46. &_starpu_sched_prio_policy,
  47. &_starpu_sched_random_policy,
  48. &_starpu_sched_lws_policy,
  49. &_starpu_sched_ws_policy,
  50. &_starpu_sched_dm_policy,
  51. &_starpu_sched_dmda_policy,
  52. &_starpu_sched_dmda_ready_policy,
  53. &_starpu_sched_dmda_sorted_policy,
  54. &_starpu_sched_parallel_heft_policy,
  55. &_starpu_sched_peager_policy,
  56. NULL
  57. };
  58. struct starpu_sched_policy **starpu_sched_get_predefined_policies()
  59. {
  60. return predefined_policies;
  61. }
  62. struct starpu_sched_policy *_starpu_get_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  63. {
  64. return sched_ctx->sched_policy;
  65. }
  66. /*
  67. * Methods to initialize the scheduling policy
  68. */
  69. static void load_sched_policy(struct starpu_sched_policy *sched_policy, struct _starpu_sched_ctx *sched_ctx)
  70. {
  71. STARPU_ASSERT(sched_policy);
  72. #ifdef STARPU_VERBOSE
  73. if (sched_policy->policy_name)
  74. {
  75. if (sched_policy->policy_description)
  76. _STARPU_DEBUG("Use %s scheduler (%s)\n", sched_policy->policy_name, sched_policy->policy_description);
  77. else
  78. _STARPU_DEBUG("Use %s scheduler \n", sched_policy->policy_name);
  79. }
  80. #endif
  81. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  82. memcpy(policy, sched_policy, sizeof(*policy));
  83. }
  84. static struct starpu_sched_policy *find_sched_policy_from_name(const char *policy_name)
  85. {
  86. if (!policy_name)
  87. return NULL;
  88. if (strncmp(policy_name, "heft", 5) == 0)
  89. {
  90. _STARPU_DISP("Warning: heft is now called \"dmda\".\n");
  91. return &_starpu_sched_dmda_policy;
  92. }
  93. struct starpu_sched_policy **policy;
  94. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  95. {
  96. struct starpu_sched_policy *p = *policy;
  97. if (p->policy_name)
  98. {
  99. if (strcmp(policy_name, p->policy_name) == 0)
  100. {
  101. /* we found a policy with the requested name */
  102. return p;
  103. }
  104. }
  105. }
  106. if (strcmp(policy_name, "help") != 0)
  107. fprintf(stderr, "Warning: scheduling policy \"%s\" was not found, try \"help\" to get a list\n", policy_name);
  108. /* nothing was found */
  109. return NULL;
  110. }
  111. static void display_sched_help_message(void)
  112. {
  113. const char *sched_env = getenv("STARPU_SCHED");
  114. if (sched_env && (strcmp(sched_env, "help") == 0))
  115. {
  116. /* display the description of all predefined policies */
  117. struct starpu_sched_policy **policy;
  118. fprintf(stderr, "\nThe variable STARPU_SCHED can be set to one of the following strings:\n");
  119. for(policy=predefined_policies ; *policy!=NULL ; policy++)
  120. {
  121. struct starpu_sched_policy *p = *policy;
  122. fprintf(stderr, "%-30s\t-> %s\n", p->policy_name, p->policy_description);
  123. }
  124. fprintf(stderr, "\n");
  125. }
  126. }
  127. struct starpu_sched_policy *_starpu_select_sched_policy(struct _starpu_machine_config *config, const char *required_policy)
  128. {
  129. struct starpu_sched_policy *selected_policy = NULL;
  130. struct starpu_conf *user_conf = config->conf;
  131. if(required_policy)
  132. selected_policy = find_sched_policy_from_name(required_policy);
  133. /* First, we check whether the application explicitely gave a scheduling policy or not */
  134. if (!selected_policy && user_conf && (user_conf->sched_policy))
  135. return user_conf->sched_policy;
  136. /* Otherwise, we look if the application specified the name of a policy to load */
  137. const char *sched_pol_name;
  138. sched_pol_name = getenv("STARPU_SCHED");
  139. if (sched_pol_name == NULL && user_conf && user_conf->sched_policy_name)
  140. sched_pol_name = user_conf->sched_policy_name;
  141. if (!selected_policy && sched_pol_name)
  142. selected_policy = find_sched_policy_from_name(sched_pol_name);
  143. /* Perhaps there was no policy that matched the name */
  144. if (selected_policy)
  145. return selected_policy;
  146. /* If no policy was specified, we use the greedy policy as a default */
  147. return &_starpu_sched_eager_policy;
  148. }
  149. void _starpu_init_sched_policy(struct _starpu_machine_config *config, struct _starpu_sched_ctx *sched_ctx, struct starpu_sched_policy *selected_policy)
  150. {
  151. /* Perhaps we have to display some help */
  152. display_sched_help_message();
  153. /* Prefetch is activated by default */
  154. use_prefetch = starpu_get_env_number("STARPU_PREFETCH");
  155. if (use_prefetch == -1)
  156. use_prefetch = 1;
  157. /* Set calibrate flag */
  158. _starpu_set_calibrate_flag(config->conf->calibrate);
  159. load_sched_policy(selected_policy, sched_ctx);
  160. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  161. sched_ctx->sched_policy->init_sched(sched_ctx->id);
  162. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  163. }
  164. void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  165. {
  166. struct starpu_sched_policy *policy = sched_ctx->sched_policy;
  167. if (policy->deinit_sched)
  168. {
  169. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  170. policy->deinit_sched(sched_ctx->id);
  171. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  172. }
  173. }
  174. static void _starpu_push_task_on_specific_worker_notify_sched(struct starpu_task *task, struct _starpu_worker *worker, int workerid, int perf_workerid)
  175. {
  176. /* if we push a task on a specific worker, notify all the sched_ctxs the worker belongs to */
  177. struct _starpu_sched_ctx *sched_ctx;
  178. struct _starpu_sched_ctx_list *l = NULL;
  179. for (l = worker->sched_ctx_list; l; l = l->next)
  180. {
  181. sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
  182. if (sched_ctx->sched_policy != NULL && sched_ctx->sched_policy->push_task_notify)
  183. {
  184. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  185. sched_ctx->sched_policy->push_task_notify(task, workerid, perf_workerid, sched_ctx->id);
  186. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  187. }
  188. }
  189. }
  190. /* Enqueue a task into the list of tasks explicitely attached to a worker. In
  191. * case workerid identifies a combined worker, a task will be enqueued into
  192. * each worker of the combination. */
  193. static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
  194. {
  195. int nbasic_workers = (int)starpu_worker_get_count();
  196. /* Is this a basic worker or a combined worker ? */
  197. int is_basic_worker = (workerid < nbasic_workers);
  198. unsigned memory_node;
  199. struct _starpu_worker *worker = NULL;
  200. struct _starpu_combined_worker *combined_worker = NULL;
  201. if (is_basic_worker)
  202. {
  203. worker = _starpu_get_worker_struct(workerid);
  204. memory_node = worker->memory_node;
  205. }
  206. else
  207. {
  208. combined_worker = _starpu_get_combined_worker_struct(workerid);
  209. memory_node = combined_worker->memory_node;
  210. }
  211. if (use_prefetch)
  212. starpu_prefetch_task_input_on_node(task, memory_node);
  213. if (is_basic_worker)
  214. _starpu_push_task_on_specific_worker_notify_sched(task, worker, workerid, workerid);
  215. else
  216. {
  217. /* Notify all workers of the combined worker */
  218. int worker_size = combined_worker->worker_size;
  219. int *combined_workerid = combined_worker->combined_workerid;
  220. int j;
  221. for (j = 0; j < worker_size; j++)
  222. {
  223. int subworkerid = combined_workerid[j];
  224. _starpu_push_task_on_specific_worker_notify_sched(task, _starpu_get_worker_struct(subworkerid), subworkerid, workerid);
  225. }
  226. }
  227. #ifdef STARPU_USE_SC_HYPERVISOR
  228. starpu_sched_ctx_call_pushed_task_cb(workerid, task->sched_ctx);
  229. #endif //STARPU_USE_SC_HYPERVISOR
  230. unsigned i;
  231. if (is_basic_worker)
  232. {
  233. unsigned node = starpu_worker_get_memory_node(workerid);
  234. if (_starpu_task_uses_multiformat_handles(task))
  235. {
  236. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  237. for (i = 0; i < nbuffers; i++)
  238. {
  239. struct starpu_task *conversion_task;
  240. starpu_data_handle_t handle;
  241. handle = STARPU_TASK_GET_HANDLE(task, i);
  242. if (!_starpu_handle_needs_conversion_task(handle, node))
  243. continue;
  244. conversion_task = _starpu_create_conversion_task(handle, node);
  245. conversion_task->mf_skip = 1;
  246. conversion_task->execute_on_a_specific_worker = 1;
  247. conversion_task->workerid = workerid;
  248. _starpu_task_submit_conversion_task(conversion_task, workerid);
  249. //_STARPU_DEBUG("Pushing a conversion task\n");
  250. }
  251. for (i = 0; i < nbuffers; i++)
  252. {
  253. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  254. handle->mf_node = node;
  255. }
  256. }
  257. // if(task->sched_ctx != _starpu_get_initial_sched_ctx()->id)
  258. if(task->priority > 0)
  259. return _starpu_push_local_task(worker, task, 1);
  260. else
  261. return _starpu_push_local_task(worker, task, 0);
  262. }
  263. else
  264. {
  265. /* This is a combined worker so we create task aliases */
  266. int worker_size = combined_worker->worker_size;
  267. int *combined_workerid = combined_worker->combined_workerid;
  268. int ret = 0;
  269. struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
  270. job->task_size = worker_size;
  271. job->combined_workerid = workerid;
  272. job->active_task_alias_count = 0;
  273. STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, worker_size);
  274. STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, worker_size);
  275. /* Note: we have to call that early, or else the task may have
  276. * disappeared already */
  277. starpu_push_task_end(task);
  278. int j;
  279. for (j = 0; j < worker_size; j++)
  280. {
  281. struct starpu_task *alias = starpu_task_dup(task);
  282. worker = _starpu_get_worker_struct(combined_workerid[j]);
  283. ret |= _starpu_push_local_task(worker, alias, 0);
  284. }
  285. return ret;
  286. }
  287. }
  288. /* the generic interface that call the proper underlying implementation */
  289. int _starpu_push_task(struct _starpu_job *j)
  290. {
  291. if(j->task->prologue_callback_func)
  292. j->task->prologue_callback_func(j->task->prologue_callback_arg);
  293. struct starpu_task *task = j->task;
  294. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  295. unsigned nworkers = 0;
  296. int ret;
  297. _STARPU_LOG_IN();
  298. _starpu_increment_nready_tasks_of_sched_ctx(task->sched_ctx, task->flops);
  299. task->status = STARPU_TASK_READY;
  300. #ifdef HAVE_AYUDAME_H
  301. if (AYU_event)
  302. {
  303. intptr_t id = -1;
  304. AYU_event(AYU_ADDTASKTOQUEUE, j->job_id, &id);
  305. }
  306. #endif
  307. /* if the context does not have any workers save the tasks in a temp list */
  308. if(!sched_ctx->is_initial_sched)
  309. {
  310. /*if there are workers in the ctx that are not able to execute tasks
  311. we consider the ctx empty */
  312. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  313. if(nworkers == 0)
  314. {
  315. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  316. starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
  317. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  318. #ifdef STARPU_USE_SC_HYPERVISOR
  319. if(sched_ctx != NULL && sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  320. && sched_ctx->perf_counters->notify_empty_ctx)
  321. {
  322. _STARPU_TRACE_HYPERVISOR_BEGIN();
  323. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  324. _STARPU_TRACE_HYPERVISOR_END();
  325. }
  326. #endif
  327. return 0;
  328. }
  329. }
  330. /* in case there is no codelet associated to the task (that's a control
  331. * task), we directly execute its callback and enforce the
  332. * corresponding dependencies */
  333. if (task->cl == NULL)
  334. {
  335. _starpu_handle_job_termination(j);
  336. _STARPU_LOG_OUT_TAG("handle_job_termination");
  337. return 0;
  338. }
  339. ret = _starpu_push_task_to_workers(task);
  340. if (ret == -EAGAIN)
  341. /* pushed to empty context, that's fine */
  342. ret = 0;
  343. return ret;
  344. }
  345. int _starpu_push_task_to_workers(struct starpu_task *task)
  346. {
  347. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  348. unsigned nworkers = 0;
  349. _STARPU_TRACE_JOB_PUSH(task, task->priority > 0);
  350. /* if the contexts still does not have workers put the task back to its place in
  351. the empty ctx list */
  352. if(!sched_ctx->is_initial_sched)
  353. {
  354. /*if there are workers in the ctx that are not able to execute tasks
  355. we consider the ctx empty */
  356. nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
  357. if (nworkers == 0)
  358. {
  359. STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
  360. starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task);
  361. STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
  362. #ifdef STARPU_USE_SC_HYPERVISOR
  363. if(sched_ctx != NULL && sched_ctx->id != 0 && sched_ctx->perf_counters != NULL
  364. && sched_ctx->perf_counters->notify_empty_ctx)
  365. {
  366. _STARPU_TRACE_HYPERVISOR_BEGIN();
  367. sched_ctx->perf_counters->notify_empty_ctx(sched_ctx->id, task);
  368. _STARPU_TRACE_HYPERVISOR_END();
  369. }
  370. #endif
  371. return -EAGAIN;
  372. }
  373. }
  374. _starpu_profiling_set_task_push_start_time(task);
  375. int ret = 0;
  376. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
  377. {
  378. unsigned node = starpu_worker_get_memory_node(task->workerid);
  379. if (starpu_get_prefetch_flag())
  380. starpu_prefetch_task_input_on_node(task, node);
  381. ret = _starpu_push_task_on_specific_worker(task, task->workerid);
  382. }
  383. else
  384. {
  385. struct _starpu_machine_config *config = _starpu_get_machine_config();
  386. /* When a task can only be executed on a given arch and we have
  387. * only one memory node for that arch, we can systematically
  388. * prefetch before the scheduling decision. */
  389. if (starpu_get_prefetch_flag()) {
  390. if (task->cl->where == STARPU_CPU && config->cpus_nodeid >= 0)
  391. starpu_prefetch_task_input_on_node(task, config->cpus_nodeid);
  392. else if (task->cl->where == STARPU_CUDA && config->cuda_nodeid >= 0)
  393. starpu_prefetch_task_input_on_node(task, config->cuda_nodeid);
  394. else if (task->cl->where == STARPU_OPENCL && config->opencl_nodeid >= 0)
  395. starpu_prefetch_task_input_on_node(task, config->opencl_nodeid);
  396. else if (task->cl->where == STARPU_MIC && config->mic_nodeid >= 0)
  397. starpu_prefetch_task_input_on_node(task, config->mic_nodeid);
  398. else if (task->cl->where == STARPU_SCC && config->scc_nodeid >= 0)
  399. starpu_prefetch_task_input_on_node(task, config->scc_nodeid);
  400. }
  401. if(!sched_ctx->sched_policy)
  402. {
  403. if(!sched_ctx->awake_workers)
  404. ret = _starpu_push_task_on_specific_worker(task, sched_ctx->main_master);
  405. else
  406. {
  407. struct starpu_worker_collection *workers = sched_ctx->workers;
  408. struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
  409. job->task_size = workers->nworkers;
  410. job->combined_workerid = -1; // workerid; its a ctx not combined worker
  411. job->active_task_alias_count = 0;
  412. STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, workers->nworkers);
  413. STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, workers->nworkers);
  414. /* Note: we have to call that early, or else the task may have
  415. * disappeared already */
  416. starpu_push_task_end(task);
  417. unsigned workerid;
  418. struct starpu_sched_ctx_iterator it;
  419. if(workers->init_iterator)
  420. workers->init_iterator(workers, &it);
  421. while(workers->has_next(workers, &it))
  422. {
  423. workerid = workers->get_next(workers, &it);
  424. struct starpu_task *alias = starpu_task_dup(task);
  425. ret |= _starpu_push_task_on_specific_worker(alias, workerid);
  426. }
  427. }
  428. }
  429. else
  430. {
  431. STARPU_ASSERT(sched_ctx->sched_policy->push_task);
  432. /* check out if there are any workers in the context */
  433. starpu_pthread_rwlock_t *changing_ctx_mutex = _starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx->id);
  434. STARPU_PTHREAD_RWLOCK_RDLOCK(changing_ctx_mutex);
  435. nworkers = starpu_sched_ctx_get_nworkers(sched_ctx->id);
  436. if (nworkers == 0)
  437. ret = -1;
  438. else
  439. {
  440. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  441. ret = sched_ctx->sched_policy->push_task(task);
  442. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  443. }
  444. STARPU_PTHREAD_RWLOCK_UNLOCK(changing_ctx_mutex);
  445. }
  446. if(ret == -1)
  447. {
  448. fprintf(stderr, "repush task \n");
  449. _STARPU_TRACE_JOB_POP(task, task->priority > 0);
  450. ret = _starpu_push_task_to_workers(task);
  451. }
  452. }
  453. /* Note: from here, the task might have been destroyed already! */
  454. _STARPU_LOG_OUT();
  455. return ret;
  456. }
  457. /* This is called right after the scheduler has pushed a task to a queue
  458. * but just before releasing mutexes: we need the task to still be alive!
  459. */
  460. int starpu_push_task_end(struct starpu_task *task)
  461. {
  462. _starpu_profiling_set_task_push_end_time(task);
  463. task->scheduled = 1;
  464. return 0;
  465. }
  466. /*
  467. * Given a handle that needs to be converted in order to be used on the given
  468. * node, returns a task that takes care of the conversion.
  469. */
  470. struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle,
  471. unsigned int node)
  472. {
  473. return _starpu_create_conversion_task_for_arch(handle, starpu_node_get_kind(node));
  474. }
  475. struct starpu_task *_starpu_create_conversion_task_for_arch(starpu_data_handle_t handle,
  476. enum starpu_node_kind node_kind)
  477. {
  478. struct starpu_task *conversion_task;
  479. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID)
  480. struct starpu_multiformat_interface *format_interface;
  481. #endif
  482. conversion_task = starpu_task_create();
  483. conversion_task->name = "conversion_task";
  484. conversion_task->synchronous = 0;
  485. STARPU_TASK_SET_HANDLE(conversion_task, handle, 0);
  486. #if defined(STARPU_USE_OPENCL) || defined(STARPU_USE_CUDA) || defined(STARPU_USE_MIC) || defined(STARPU_USE_SCC) || defined(STARPU_SIMGRID)
  487. /* The node does not really matter here */
  488. format_interface = (struct starpu_multiformat_interface *) starpu_data_get_interface_on_node(handle, STARPU_MAIN_RAM);
  489. #endif
  490. _starpu_spin_lock(&handle->header_lock);
  491. handle->refcnt++;
  492. handle->busy_count++;
  493. _starpu_spin_unlock(&handle->header_lock);
  494. switch(node_kind)
  495. {
  496. case STARPU_CPU_RAM:
  497. case STARPU_SCC_RAM:
  498. case STARPU_SCC_SHM:
  499. switch (starpu_node_get_kind(handle->mf_node))
  500. {
  501. case STARPU_CPU_RAM:
  502. case STARPU_SCC_RAM:
  503. case STARPU_SCC_SHM:
  504. STARPU_ABORT();
  505. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  506. case STARPU_CUDA_RAM:
  507. {
  508. struct starpu_multiformat_data_interface_ops *mf_ops;
  509. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  510. conversion_task->cl = mf_ops->cuda_to_cpu_cl;
  511. break;
  512. }
  513. #endif
  514. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  515. case STARPU_OPENCL_RAM:
  516. {
  517. struct starpu_multiformat_data_interface_ops *mf_ops;
  518. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  519. conversion_task->cl = mf_ops->opencl_to_cpu_cl;
  520. break;
  521. }
  522. #endif
  523. #ifdef STARPU_USE_MIC
  524. case STARPU_MIC_RAM:
  525. {
  526. struct starpu_multiformat_data_interface_ops *mf_ops;
  527. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  528. conversion_task->cl = mf_ops->mic_to_cpu_cl;
  529. break;
  530. }
  531. #endif
  532. default:
  533. _STARPU_ERROR("Oops : %u\n", handle->mf_node);
  534. }
  535. break;
  536. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  537. case STARPU_CUDA_RAM:
  538. {
  539. struct starpu_multiformat_data_interface_ops *mf_ops;
  540. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  541. conversion_task->cl = mf_ops->cpu_to_cuda_cl;
  542. break;
  543. }
  544. #endif
  545. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  546. case STARPU_OPENCL_RAM:
  547. {
  548. struct starpu_multiformat_data_interface_ops *mf_ops;
  549. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  550. conversion_task->cl = mf_ops->cpu_to_opencl_cl;
  551. break;
  552. }
  553. #endif
  554. #ifdef STARPU_USE_MIC
  555. case STARPU_MIC_RAM:
  556. {
  557. struct starpu_multiformat_data_interface_ops *mf_ops;
  558. mf_ops = (struct starpu_multiformat_data_interface_ops *) handle->ops->get_mf_ops(format_interface);
  559. conversion_task->cl = mf_ops->cpu_to_mic_cl;
  560. break;
  561. }
  562. #endif
  563. default:
  564. STARPU_ABORT();
  565. }
  566. STARPU_TASK_SET_MODE(conversion_task, STARPU_RW, 0);
  567. return conversion_task;
  568. }
  569. static
  570. struct _starpu_sched_ctx* _get_next_sched_ctx_to_pop_into(struct _starpu_worker *worker)
  571. {
  572. struct _starpu_sched_ctx_list *l = NULL;
  573. unsigned are_2_priorities = 0;
  574. for (l = worker->sched_ctx_list; l; l = l->next)
  575. {
  576. if(l->priority != worker->pop_ctx_priority)
  577. {
  578. are_2_priorities = 1;
  579. break;
  580. }
  581. }
  582. if(!worker->reverse_phase[worker->pop_ctx_priority])
  583. {
  584. /* find a context in which the worker hasn't poped yet */
  585. for (l = worker->sched_ctx_list; l; l = l->next)
  586. {
  587. if(l->priority == worker->pop_ctx_priority)
  588. {
  589. if(!worker->poped_in_ctx[l->sched_ctx])
  590. {
  591. worker->poped_in_ctx[l->sched_ctx] = !worker->poped_in_ctx[l->sched_ctx];
  592. return _starpu_get_sched_ctx_struct(l->sched_ctx);
  593. }
  594. }
  595. }
  596. worker->reverse_phase[worker->pop_ctx_priority] = !worker->reverse_phase[worker->pop_ctx_priority];
  597. if(are_2_priorities)
  598. worker->pop_ctx_priority = !worker->pop_ctx_priority;
  599. }
  600. are_2_priorities = 0;
  601. if(worker->reverse_phase[worker->pop_ctx_priority])
  602. {
  603. /* if the context has already poped in every one start from the begining */
  604. for (l = worker->sched_ctx_list; l; l = l->next)
  605. {
  606. if(l->priority == worker->pop_ctx_priority)
  607. {
  608. if(worker->poped_in_ctx[l->sched_ctx])
  609. {
  610. worker->poped_in_ctx[l->sched_ctx] = !worker->poped_in_ctx[l->sched_ctx];
  611. return _starpu_get_sched_ctx_struct(l->sched_ctx);
  612. }
  613. }
  614. }
  615. worker->reverse_phase[worker->pop_ctx_priority] = !worker->reverse_phase[worker->pop_ctx_priority];
  616. if(are_2_priorities)
  617. worker->pop_ctx_priority = !worker->pop_ctx_priority;
  618. }
  619. unsigned first_sched_ctx = STARPU_NMAX_SCHED_CTXS;
  620. for (l = worker->sched_ctx_list; l; l = l->next)
  621. {
  622. if(l->priority == worker->pop_ctx_priority)
  623. {
  624. first_sched_ctx = l->sched_ctx;
  625. break;
  626. }
  627. }
  628. // if(worker->pop_ctx_priority == 0 && first_sched_ctx == STARPU_NMAX_SCHED_CTXS)
  629. if(first_sched_ctx == STARPU_NMAX_SCHED_CTXS)
  630. first_sched_ctx = worker->sched_ctx_list->sched_ctx;
  631. worker->poped_in_ctx[first_sched_ctx] = !worker->poped_in_ctx[first_sched_ctx];
  632. return _starpu_get_sched_ctx_struct(first_sched_ctx);
  633. }
  634. struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
  635. {
  636. struct starpu_task *task;
  637. int worker_id;
  638. unsigned node;
  639. /* We can't tell in advance which task will be picked up, so we measure
  640. * a timestamp, and will attribute it afterwards to the task. */
  641. int profiling = starpu_profiling_status_get();
  642. struct timespec pop_start_time;
  643. if (profiling)
  644. _starpu_clock_gettime(&pop_start_time);
  645. pick:
  646. /* perhaps there is some local task to be executed first */
  647. task = _starpu_pop_local_task(worker);
  648. /* get tasks from the stacks of the strategy */
  649. if(!task)
  650. {
  651. struct _starpu_sched_ctx *sched_ctx ;
  652. #ifndef STARPU_NON_BLOCKING_DRIVERS
  653. int been_here[STARPU_NMAX_SCHED_CTXS];
  654. int i;
  655. for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
  656. been_here[i] = 0;
  657. while(!task)
  658. #endif
  659. {
  660. if(worker->nsched_ctxs == 1)
  661. sched_ctx = _starpu_get_initial_sched_ctx();
  662. else
  663. {
  664. while(1)
  665. {
  666. sched_ctx = _get_next_sched_ctx_to_pop_into(worker);
  667. if(worker->removed_from_ctx[sched_ctx->id] == 1 && worker->shares_tasks_lists[sched_ctx->id] == 1)
  668. {
  669. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  670. worker->removed_from_ctx[sched_ctx->id] = 0;
  671. sched_ctx = NULL;
  672. }
  673. else
  674. break;
  675. }
  676. }
  677. if(sched_ctx && sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
  678. {
  679. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
  680. {
  681. task = sched_ctx->sched_policy->pop_task(sched_ctx->id);
  682. }
  683. }
  684. if(!task)
  685. {
  686. /* it doesn't matter if it shares tasks list or not in the scheduler,
  687. if it does not have any task to pop just get it out of here */
  688. /* however if it shares a task list it will be removed as soon as he
  689. finishes this job (in handle_job_termination) */
  690. if(worker->removed_from_ctx[sched_ctx->id])
  691. {
  692. _starpu_worker_gets_out_of_ctx(sched_ctx->id, worker);
  693. worker->removed_from_ctx[sched_ctx->id] = 0;
  694. }
  695. #ifdef STARPU_USE_SC_HYPERVISOR
  696. if(worker->pop_ctx_priority)
  697. {
  698. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  699. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_idle_cycle && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
  700. {
  701. // _STARPU_TRACE_HYPERVISOR_BEGIN();
  702. perf_counters->notify_idle_cycle(sched_ctx->id, worker->workerid, 1.0);
  703. // _STARPU_TRACE_HYPERVISOR_END();
  704. }
  705. }
  706. #endif //STARPU_USE_SC_HYPERVISOR
  707. #ifndef STARPU_NON_BLOCKING_DRIVERS
  708. if(been_here[sched_ctx->id] || worker->nsched_ctxs == 1)
  709. break;
  710. been_here[sched_ctx->id] = 1;
  711. #endif
  712. }
  713. }
  714. }
  715. if (!task)
  716. {
  717. idle_start[worker->workerid] = starpu_timing_now();
  718. return NULL;
  719. }
  720. if(idle_start[worker->workerid] != 0.0)
  721. {
  722. double idle_end = starpu_timing_now();
  723. idle[worker->workerid] += (idle_end - idle_start[worker->workerid]);
  724. idle_start[worker->workerid] = 0.0;
  725. }
  726. #ifdef STARPU_USE_SC_HYPERVISOR
  727. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  728. struct starpu_sched_ctx_performance_counters *perf_counters = sched_ctx->perf_counters;
  729. if(sched_ctx->id != 0 && perf_counters != NULL && perf_counters->notify_poped_task && _starpu_sched_ctx_allow_hypervisor(sched_ctx->id))
  730. {
  731. // _STARPU_TRACE_HYPERVISOR_BEGIN();
  732. perf_counters->notify_poped_task(task->sched_ctx, worker->workerid);
  733. // _STARPU_TRACE_HYPERVISOR_END();
  734. }
  735. #endif //STARPU_USE_SC_HYPERVISOR
  736. /* Make sure we do not bother with all the multiformat-specific code if
  737. * it is not necessary. */
  738. if (!_starpu_task_uses_multiformat_handles(task))
  739. goto profiling;
  740. /* This is either a conversion task, or a regular task for which the
  741. * conversion tasks have already been created and submitted */
  742. if (task->mf_skip)
  743. goto profiling;
  744. /*
  745. * This worker may not be able to execute this task. In this case, we
  746. * should return the task anyway. It will be pushed back almost immediatly.
  747. * This way, we avoid computing and executing the conversions tasks.
  748. * Here, we do not care about what implementation is used.
  749. */
  750. worker_id = starpu_worker_get_id();
  751. if (!starpu_worker_can_execute_task_first_impl(worker_id, task, NULL))
  752. return task;
  753. node = starpu_worker_get_memory_node(worker_id);
  754. /*
  755. * We do have a task that uses multiformat handles. Let's create the
  756. * required conversion tasks.
  757. */
  758. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  759. unsigned i;
  760. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  761. for (i = 0; i < nbuffers; i++)
  762. {
  763. struct starpu_task *conversion_task;
  764. starpu_data_handle_t handle;
  765. handle = STARPU_TASK_GET_HANDLE(task, i);
  766. if (!_starpu_handle_needs_conversion_task(handle, node))
  767. continue;
  768. conversion_task = _starpu_create_conversion_task(handle, node);
  769. conversion_task->mf_skip = 1;
  770. conversion_task->execute_on_a_specific_worker = 1;
  771. conversion_task->workerid = worker_id;
  772. /*
  773. * Next tasks will need to know where these handles have gone.
  774. */
  775. handle->mf_node = node;
  776. _starpu_task_submit_conversion_task(conversion_task, worker_id);
  777. }
  778. task->mf_skip = 1;
  779. starpu_task_list_push_back(&worker->local_tasks, task);
  780. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  781. goto pick;
  782. profiling:
  783. if (profiling)
  784. {
  785. struct starpu_profiling_task_info *profiling_info;
  786. profiling_info = task->profiling_info;
  787. /* The task may have been created before profiling was enabled,
  788. * so we check if the profiling_info structure is available
  789. * even though we already tested if profiling is enabled. */
  790. if (profiling_info)
  791. {
  792. memcpy(&profiling_info->pop_start_time,
  793. &pop_start_time, sizeof(struct timespec));
  794. _starpu_clock_gettime(&profiling_info->pop_end_time);
  795. }
  796. }
  797. if(task->prologue_callback_pop_func)
  798. task->prologue_callback_pop_func(task->prologue_callback_pop_arg);
  799. return task;
  800. }
  801. struct starpu_task *_starpu_pop_every_task(struct _starpu_sched_ctx *sched_ctx)
  802. {
  803. struct starpu_task *task = NULL;
  804. if(sched_ctx->sched_policy)
  805. {
  806. STARPU_ASSERT(sched_ctx->sched_policy->pop_every_task);
  807. /* TODO set profiling info */
  808. if(sched_ctx->sched_policy->pop_every_task)
  809. {
  810. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  811. task = sched_ctx->sched_policy->pop_every_task(sched_ctx->id);
  812. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  813. }
  814. }
  815. return task;
  816. }
  817. void _starpu_sched_pre_exec_hook(struct starpu_task *task)
  818. {
  819. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  820. if (sched_ctx->sched_policy && sched_ctx->sched_policy->pre_exec_hook)
  821. {
  822. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  823. sched_ctx->sched_policy->pre_exec_hook(task);
  824. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  825. }
  826. }
  827. void _starpu_sched_post_exec_hook(struct starpu_task *task)
  828. {
  829. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  830. if (sched_ctx->sched_policy && sched_ctx->sched_policy->post_exec_hook)
  831. {
  832. _STARPU_TRACE_WORKER_SCHEDULING_PUSH;
  833. sched_ctx->sched_policy->post_exec_hook(task);
  834. _STARPU_TRACE_WORKER_SCHEDULING_POP;
  835. }
  836. }
  837. void _starpu_wait_on_sched_event(void)
  838. {
  839. struct _starpu_worker *worker = _starpu_get_local_worker_key();
  840. STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
  841. _starpu_handle_all_pending_node_data_requests(worker->memory_node);
  842. if (_starpu_machine_is_running())
  843. {
  844. #ifndef STARPU_NON_BLOCKING_DRIVERS
  845. STARPU_PTHREAD_COND_WAIT(&worker->sched_cond,
  846. &worker->sched_mutex);
  847. #endif
  848. }
  849. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
  850. }
  851. /* The scheduling policy may put tasks directly into a worker's local queue so
  852. * that it is not always necessary to create its own queue when the local queue
  853. * is sufficient. If "back" not null, the task is put at the back of the queue
  854. * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  855. * a FIFO ordering. */
  856. int starpu_push_local_task(int workerid, struct starpu_task *task, int prio)
  857. {
  858. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  859. return _starpu_push_local_task(worker, task, prio);
  860. }
  861. void _starpu_print_idle_time()
  862. {
  863. double all_idle = 0.0;
  864. int i = 0;
  865. for(i = 0; i < STARPU_NMAXWORKERS; i++)
  866. all_idle += idle[i];
  867. FILE *f;
  868. const char *sched_env = getenv("STARPU_IDLE_FILE");
  869. if(!sched_env)
  870. sched_env = "starpu_idle_microsec.log";
  871. f = fopen(sched_env, "a");
  872. if (!f)
  873. {
  874. fprintf(stderr, "couldn't open %s: %s\n", sched_env, strerror(errno));
  875. }
  876. else
  877. {
  878. fprintf(f, "%lf \n", all_idle);
  879. fclose(f);
  880. }
  881. }