task.c 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009-2016 Université de Bordeaux
  4. * Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017 CNRS
  5. * Copyright (C) 2011 Télécom-SudParis
  6. * Copyright (C) 2011, 2014, 2016 INRIA
  7. * Copyright (C) 2016 Uppsala University
  8. *
  9. * StarPU is free software; you can redistribute it and/or modify
  10. * it under the terms of the GNU Lesser General Public License as published by
  11. * the Free Software Foundation; either version 2.1 of the License, or (at
  12. * your option) any later version.
  13. *
  14. * StarPU is distributed in the hope that it will be useful, but
  15. * WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  17. *
  18. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  19. */
  20. #include <starpu.h>
  21. #include <starpu_profiling.h>
  22. #include <core/workers.h>
  23. #include <core/sched_ctx.h>
  24. #include <core/jobs.h>
  25. #include <core/task.h>
  26. #include <core/task_bundle.h>
  27. #include <core/dependencies/data_concurrency.h>
  28. #include <common/config.h>
  29. #include <common/utils.h>
  30. #include <common/fxt.h>
  31. #include <profiling/profiling.h>
  32. #include <profiling/bound.h>
  33. #include <math.h>
  34. #include <string.h>
  35. #include <core/debug.h>
  36. #include <core/sched_ctx.h>
  37. #include <time.h>
  38. #include <signal.h>
  39. #include <core/simgrid.h>
  40. #ifdef STARPU_HAVE_WINDOWS
  41. #include <windows.h>
  42. #endif
  43. /* XXX this should be reinitialized when StarPU is shutdown (or we should make
  44. * sure that no task remains !) */
  45. /* TODO we could make this hierarchical to avoid contention ? */
  46. //static starpu_pthread_cond_t submitted_cond = STARPU_PTHREAD_COND_INITIALIZER;
  47. /* This key stores the task currently handled by the thread, note that we
  48. * cannot use the worker structure to store that information because it is
  49. * possible that we have a task with a NULL codelet, which means its callback
  50. * could be executed by a user thread as well. */
  51. static starpu_pthread_key_t current_task_key;
  52. static int limit_min_submitted_tasks;
  53. static int limit_max_submitted_tasks;
  54. static int watchdog_crash;
  55. /* Called once at starpu_init */
  56. void _starpu_task_init(void)
  57. {
  58. STARPU_PTHREAD_KEY_CREATE(&current_task_key, NULL);
  59. limit_min_submitted_tasks = starpu_get_env_number("STARPU_LIMIT_MIN_SUBMITTED_TASKS");
  60. limit_max_submitted_tasks = starpu_get_env_number("STARPU_LIMIT_MAX_SUBMITTED_TASKS");
  61. watchdog_crash = starpu_get_env_number("STARPU_WATCHDOG_CRASH");
  62. }
  63. void _starpu_task_deinit(void)
  64. {
  65. STARPU_PTHREAD_KEY_DELETE(current_task_key);
  66. }
  67. void starpu_task_init(struct starpu_task *task)
  68. {
  69. /* TODO: memcpy from a template instead? benchmark it */
  70. STARPU_ASSERT(task);
  71. /* As most of the fields must be initialised at NULL, let's put 0
  72. * everywhere */
  73. memset(task, 0, sizeof(struct starpu_task));
  74. task->sequential_consistency = 1;
  75. /* Now we can initialise fields which recquire custom value */
  76. #if STARPU_DEFAULT_PRIO != 0
  77. task->priority = STARPU_DEFAULT_PRIO;
  78. #endif
  79. task->detach = 1;
  80. #if STARPU_TASK_INVALID != 0
  81. task->status = STARPU_TASK_INVALID;
  82. #endif
  83. task->predicted = NAN;
  84. task->predicted_transfer = NAN;
  85. task->predicted_start = NAN;
  86. task->magic = 42;
  87. task->sched_ctx = STARPU_NMAX_SCHED_CTXS;
  88. task->flops = 0.0;
  89. }
  90. /* Free all the ressources allocated for a task, without deallocating the task
  91. * structure itself (this is required for statically allocated tasks).
  92. * All values previously set by the user, like codelet and handles, remain
  93. * unchanged */
  94. void starpu_task_clean(struct starpu_task *task)
  95. {
  96. STARPU_ASSERT(task);
  97. /* If a buffer was allocated to store the profiling info, we free it. */
  98. if (task->profiling_info)
  99. {
  100. free(task->profiling_info);
  101. task->profiling_info = NULL;
  102. }
  103. /* If case the task is (still) part of a bundle */
  104. starpu_task_bundle_t bundle = task->bundle;
  105. if (bundle)
  106. starpu_task_bundle_remove(bundle, task);
  107. if (task->dyn_handles)
  108. {
  109. free(task->dyn_handles);
  110. task->dyn_handles = NULL;
  111. free(task->dyn_interfaces);
  112. task->dyn_interfaces = NULL;
  113. }
  114. if (task->dyn_modes)
  115. {
  116. free(task->dyn_modes);
  117. task->dyn_modes = NULL;
  118. }
  119. struct _starpu_job *j = (struct _starpu_job *)task->starpu_private;
  120. if (j)
  121. {
  122. _starpu_job_destroy(j);
  123. task->starpu_private = NULL;
  124. }
  125. }
  126. struct starpu_task * STARPU_ATTRIBUTE_MALLOC starpu_task_create(void)
  127. {
  128. struct starpu_task *task;
  129. _STARPU_MALLOC(task, sizeof(struct starpu_task));
  130. starpu_task_init(task);
  131. /* Dynamically allocated tasks are destroyed by default */
  132. task->destroy = 1;
  133. return task;
  134. }
  135. /* Free the ressource allocated during starpu_task_create. This function can be
  136. * called automatically after the execution of a task by setting the "destroy"
  137. * flag of the starpu_task structure (default behaviour). Calling this function
  138. * on a statically allocated task results in an undefined behaviour. */
  139. void _starpu_task_destroy(struct starpu_task *task)
  140. {
  141. /* If starpu_task_destroy is called in a callback, we just set the destroy
  142. flag. The task will be destroyed after the callback returns */
  143. if (task == starpu_task_get_current()
  144. && _starpu_get_local_worker_status() == STATUS_CALLBACK)
  145. {
  146. task->destroy = 1;
  147. }
  148. else
  149. {
  150. starpu_task_clean(task);
  151. /* TODO handle the case of task with detach = 1 and destroy = 1 */
  152. /* TODO handle the case of non terminated tasks -> return -EINVAL */
  153. /* Does user want StarPU release cl_arg ? */
  154. if (task->cl_arg_free)
  155. free(task->cl_arg);
  156. /* Does user want StarPU release callback_arg ? */
  157. if (task->callback_arg_free)
  158. free(task->callback_arg);
  159. /* Does user want StarPU release prologue_callback_arg ? */
  160. if (task->prologue_callback_arg_free)
  161. free(task->prologue_callback_arg);
  162. /* Does user want StarPU release prologue_pop_arg ? */
  163. if (task->prologue_callback_pop_arg_free)
  164. free(task->prologue_callback_pop_arg);
  165. free(task);
  166. }
  167. }
  168. void starpu_task_destroy(struct starpu_task *task)
  169. {
  170. STARPU_ASSERT(task);
  171. STARPU_ASSERT_MSG(!task->destroy || !task->detach, "starpu_task_destroy must not be called for task with destroy = 1 and detach = 1");
  172. _starpu_task_destroy(task);
  173. }
  174. int starpu_task_finished(struct starpu_task *task)
  175. {
  176. STARPU_ASSERT(task);
  177. STARPU_ASSERT_MSG(!task->detach, "starpu_task_finished can only be called on tasks with detach = 0");
  178. return _starpu_job_finished(_starpu_get_job_associated_to_task(task));
  179. }
  180. int starpu_task_wait(struct starpu_task *task)
  181. {
  182. _STARPU_LOG_IN();
  183. STARPU_ASSERT(task);
  184. STARPU_ASSERT_MSG(!task->detach, "starpu_task_wait can only be called on tasks with detach = 0");
  185. if (task->detach || task->synchronous)
  186. {
  187. _STARPU_DEBUG("Task is detached or synchronous. Waiting returns immediately\n");
  188. _STARPU_LOG_OUT_TAG("einval");
  189. return -EINVAL;
  190. }
  191. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait must not be called from a task or callback");
  192. struct _starpu_job *j = (struct _starpu_job *)task->starpu_private;
  193. _STARPU_TRACE_TASK_WAIT_START(j);
  194. starpu_do_schedule();
  195. _starpu_wait_job(j);
  196. /* as this is a synchronous task, the liberation of the job
  197. structure was deferred */
  198. if (task->destroy)
  199. _starpu_task_destroy(task);
  200. _STARPU_TRACE_TASK_WAIT_END();
  201. _STARPU_LOG_OUT();
  202. return 0;
  203. }
  204. int starpu_task_wait_array(struct starpu_task **tasks, unsigned nb_tasks)
  205. {
  206. unsigned i;
  207. for (i = 0; i < nb_tasks; i++)
  208. {
  209. int ret = starpu_task_wait(tasks[i]);
  210. if (ret)
  211. return ret;
  212. }
  213. return 0;
  214. }
  215. #ifdef STARPU_OPENMP
  216. int _starpu_task_test_termination(struct starpu_task *task)
  217. {
  218. STARPU_ASSERT(task);
  219. STARPU_ASSERT_MSG(!task->detach, "starpu_task_wait can only be called on tasks with detach = 0");
  220. if (task->detach || task->synchronous)
  221. {
  222. _STARPU_DEBUG("Task is detached or synchronous\n");
  223. _STARPU_LOG_OUT_TAG("einval");
  224. return -EINVAL;
  225. }
  226. struct _starpu_job *j = (struct _starpu_job *)task->starpu_private;
  227. int ret = _starpu_test_job_termination(j);
  228. if (ret)
  229. {
  230. if (task->destroy)
  231. _starpu_task_destroy(task);
  232. }
  233. return ret;
  234. }
  235. #endif
  236. /* NB in case we have a regenerable task, it is possible that the job was
  237. * already counted. */
  238. int _starpu_submit_job(struct _starpu_job *j)
  239. {
  240. struct starpu_task *task = j->task;
  241. int ret;
  242. #ifdef STARPU_OPENMP
  243. const unsigned continuation = j->continuation;
  244. #else
  245. const unsigned continuation = 0;
  246. #endif
  247. _STARPU_LOG_IN();
  248. /* notify bound computation of a new task */
  249. _starpu_bound_record(j);
  250. _starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
  251. _starpu_sched_task_submit(task);
  252. #ifdef STARPU_USE_SC_HYPERVISOR
  253. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(j->task->sched_ctx);
  254. if(sched_ctx != NULL && j->task->sched_ctx != _starpu_get_initial_sched_ctx()->id && j->task->sched_ctx != STARPU_NMAX_SCHED_CTXS
  255. && sched_ctx->perf_counters != NULL)
  256. {
  257. struct starpu_perfmodel_arch arch;
  258. _STARPU_MALLOC(arch.devices, sizeof(struct starpu_perfmodel_device));
  259. arch.ndevices = 1;
  260. arch.devices[0].type = STARPU_CPU_WORKER;
  261. arch.devices[0].devid = 0;
  262. arch.devices[0].ncores = 1;
  263. _starpu_compute_buffers_footprint(j->task->cl->model, &arch, 0, j);
  264. free(arch.devices);
  265. size_t data_size = 0;
  266. if (j->task->cl)
  267. {
  268. unsigned i, nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
  269. for(i = 0; i < nbuffers; i++)
  270. {
  271. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  272. if (handle != NULL)
  273. data_size += _starpu_data_get_size(handle);
  274. }
  275. }
  276. _STARPU_TRACE_HYPERVISOR_BEGIN();
  277. sched_ctx->perf_counters->notify_submitted_job(j->task, j->footprint, data_size);
  278. _STARPU_TRACE_HYPERVISOR_END();
  279. }
  280. #endif//STARPU_USE_SC_HYPERVISOR
  281. /* We retain handle reference count */
  282. if (task->cl && !continuation)
  283. {
  284. unsigned i;
  285. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  286. for (i=0; i<nbuffers; i++)
  287. {
  288. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  289. _starpu_spin_lock(&handle->header_lock);
  290. handle->busy_count++;
  291. _starpu_spin_unlock(&handle->header_lock);
  292. }
  293. }
  294. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  295. _starpu_handle_job_submission(j);
  296. #ifdef STARPU_OPENMP
  297. if (continuation)
  298. {
  299. j->discontinuous = 1;
  300. j->continuation = 0;
  301. }
  302. #endif
  303. #ifdef STARPU_OPENMP
  304. if (continuation)
  305. {
  306. ret = _starpu_reenforce_task_deps_and_schedule(j);
  307. }
  308. else
  309. #endif
  310. {
  311. ret = _starpu_enforce_deps_and_schedule(j);
  312. }
  313. _STARPU_LOG_OUT();
  314. return ret;
  315. }
  316. /* Note: this is racy, so valgrind would complain. But since we'll always put
  317. * the same values, this is not a problem. */
  318. void _starpu_codelet_check_deprecated_fields(struct starpu_codelet *cl)
  319. {
  320. if (!cl)
  321. return;
  322. int is_where_unset = cl->where == 0;
  323. unsigned i, some_impl;
  324. /* Check deprecated and unset fields (where, <device>_func,
  325. * <device>_funcs) */
  326. /* CPU */
  327. if (cl->cpu_func && cl->cpu_func != STARPU_MULTIPLE_CPU_IMPLEMENTATIONS && cl->cpu_funcs[0])
  328. {
  329. _STARPU_DISP("[warning] [struct starpu_codelet] both cpu_func and cpu_funcs are set. Ignoring cpu_func.\n");
  330. cl->cpu_func = STARPU_MULTIPLE_CPU_IMPLEMENTATIONS;
  331. }
  332. if (cl->cpu_func && cl->cpu_func != STARPU_MULTIPLE_CPU_IMPLEMENTATIONS)
  333. {
  334. cl->cpu_funcs[0] = cl->cpu_func;
  335. cl->cpu_func = STARPU_MULTIPLE_CPU_IMPLEMENTATIONS;
  336. }
  337. some_impl = 0;
  338. for (i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
  339. if (cl->cpu_funcs[i])
  340. {
  341. some_impl = 1;
  342. break;
  343. }
  344. if (some_impl && cl->cpu_func == 0)
  345. {
  346. cl->cpu_func = STARPU_MULTIPLE_CPU_IMPLEMENTATIONS;
  347. }
  348. if (some_impl && is_where_unset)
  349. {
  350. cl->where |= STARPU_CPU;
  351. }
  352. /* CUDA */
  353. if (cl->cuda_func && cl->cuda_func != STARPU_MULTIPLE_CUDA_IMPLEMENTATIONS && cl->cuda_funcs[0])
  354. {
  355. _STARPU_DISP("[warning] [struct starpu_codelet] both cuda_func and cuda_funcs are set. Ignoring cuda_func.\n");
  356. cl->cuda_func = STARPU_MULTIPLE_CUDA_IMPLEMENTATIONS;
  357. }
  358. if (cl->cuda_func && cl->cuda_func != STARPU_MULTIPLE_CUDA_IMPLEMENTATIONS)
  359. {
  360. cl->cuda_funcs[0] = cl->cuda_func;
  361. cl->cuda_func = STARPU_MULTIPLE_CUDA_IMPLEMENTATIONS;
  362. }
  363. some_impl = 0;
  364. for (i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
  365. if (cl->cuda_funcs[i])
  366. {
  367. some_impl = 1;
  368. break;
  369. }
  370. if (some_impl && cl->cuda_func == 0)
  371. {
  372. cl->cuda_func = STARPU_MULTIPLE_CUDA_IMPLEMENTATIONS;
  373. }
  374. if (some_impl && is_where_unset)
  375. {
  376. cl->where |= STARPU_CUDA;
  377. }
  378. /* OpenCL */
  379. if (cl->opencl_func && cl->opencl_func != STARPU_MULTIPLE_OPENCL_IMPLEMENTATIONS && cl->opencl_funcs[0])
  380. {
  381. _STARPU_DISP("[warning] [struct starpu_codelet] both opencl_func and opencl_funcs are set. Ignoring opencl_func.\n");
  382. cl->opencl_func = STARPU_MULTIPLE_OPENCL_IMPLEMENTATIONS;
  383. }
  384. if (cl->opencl_func && cl->opencl_func != STARPU_MULTIPLE_OPENCL_IMPLEMENTATIONS)
  385. {
  386. cl->opencl_funcs[0] = cl->opencl_func;
  387. cl->opencl_func = STARPU_MULTIPLE_OPENCL_IMPLEMENTATIONS;
  388. }
  389. some_impl = 0;
  390. for (i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
  391. if (cl->opencl_funcs[i])
  392. {
  393. some_impl = 1;
  394. break;
  395. }
  396. if (some_impl && cl->opencl_func == 0)
  397. {
  398. cl->opencl_func = STARPU_MULTIPLE_OPENCL_IMPLEMENTATIONS;
  399. }
  400. if (some_impl && is_where_unset)
  401. {
  402. cl->where |= STARPU_OPENCL;
  403. }
  404. some_impl = 0;
  405. for (i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
  406. if (cl->mic_funcs[i])
  407. {
  408. some_impl = 1;
  409. break;
  410. }
  411. if (some_impl && is_where_unset)
  412. {
  413. cl->where |= STARPU_MIC;
  414. }
  415. some_impl = 0;
  416. for (i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
  417. if (cl->mpi_ms_funcs[i])
  418. {
  419. some_impl = 1;
  420. break;
  421. }
  422. if (some_impl && is_where_unset)
  423. {
  424. cl->where |= STARPU_MPI_MS;
  425. }
  426. some_impl = 0;
  427. for (i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
  428. if (cl->scc_funcs[i])
  429. {
  430. some_impl = 1;
  431. break;
  432. }
  433. if (some_impl && is_where_unset)
  434. {
  435. cl->where |= STARPU_SCC;
  436. }
  437. some_impl = 0;
  438. for (i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
  439. if (cl->cpu_funcs_name[i])
  440. {
  441. some_impl = 1;
  442. break;
  443. }
  444. if (some_impl && is_where_unset)
  445. {
  446. cl->where |= STARPU_MIC|STARPU_SCC|STARPU_MPI_MS;
  447. }
  448. }
  449. void _starpu_task_check_deprecated_fields(struct starpu_task *task STARPU_ATTRIBUTE_UNUSED)
  450. {
  451. /* None any more */
  452. }
  453. static int _starpu_task_submit_head(struct starpu_task *task)
  454. {
  455. unsigned is_sync = task->synchronous;
  456. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  457. if (j->internal)
  458. {
  459. // Internal tasks are submitted to initial context
  460. task->sched_ctx = _starpu_get_initial_sched_ctx()->id;
  461. }
  462. else if (task->sched_ctx == STARPU_NMAX_SCHED_CTXS)
  463. {
  464. // If the task has not specified a context, we set the current context
  465. task->sched_ctx = _starpu_sched_ctx_get_current_context();
  466. }
  467. if (is_sync)
  468. {
  469. /* Perhaps it is not possible to submit a synchronous
  470. * (blocking) task */
  471. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "submitting a synchronous task must not be done from a task or a callback");
  472. task->detach = 0;
  473. }
  474. _starpu_task_check_deprecated_fields(task);
  475. _starpu_codelet_check_deprecated_fields(task->cl);
  476. if (task->cl)
  477. {
  478. unsigned i;
  479. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  480. _STARPU_TRACE_UPDATE_TASK_CNT(0);
  481. /* Check buffers */
  482. if (task->dyn_handles == NULL)
  483. STARPU_ASSERT_MSG(STARPU_TASK_GET_NBUFFERS(task) <= STARPU_NMAXBUFS, "Codelet %p has too many buffers (%d vs max %d). Either use --enable-maxbuffers configure option to increase the max, or use dyn_handles instead of handles.", task->cl, STARPU_TASK_GET_NBUFFERS(task), STARPU_NMAXBUFS);
  484. if (task->dyn_handles)
  485. {
  486. _STARPU_MALLOC(task->dyn_interfaces, nbuffers * sizeof(void *));
  487. }
  488. for (i = 0; i < nbuffers; i++)
  489. {
  490. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  491. /* Make sure handles are not partitioned */
  492. STARPU_ASSERT_MSG(handle->nchildren == 0, "only unpartitioned data (or the pieces of a partitioned data) can be used in a task");
  493. /* Provide the home interface for now if any,
  494. * for can_execute hooks */
  495. if (handle->home_node != -1)
  496. _STARPU_TASK_SET_INTERFACE(task, starpu_data_get_interface_on_node(handle, handle->home_node), i);
  497. }
  498. /* Check the type of worker(s) required by the task exist */
  499. if (!_starpu_worker_exists(task))
  500. {
  501. _STARPU_LOG_OUT_TAG("ENODEV");
  502. return -ENODEV;
  503. }
  504. /* In case we require that a task should be explicitely
  505. * executed on a specific worker, we make sure that the worker
  506. * is able to execute this task. */
  507. if (task->execute_on_a_specific_worker && !starpu_combined_worker_can_execute_task(task->workerid, task, 0))
  508. {
  509. _STARPU_LOG_OUT_TAG("ENODEV");
  510. return -ENODEV;
  511. }
  512. if (task->cl->model)
  513. _starpu_init_and_load_perfmodel(task->cl->model);
  514. if (task->cl->energy_model)
  515. _starpu_init_and_load_perfmodel(task->cl->energy_model);
  516. }
  517. return 0;
  518. }
  519. /* application should submit new tasks to StarPU through this function */
  520. int starpu_task_submit(struct starpu_task *task)
  521. {
  522. _STARPU_LOG_IN();
  523. STARPU_ASSERT(task);
  524. STARPU_ASSERT_MSG(task->magic == 42, "Tasks must be created with starpu_task_create, or initialized with starpu_task_init.");
  525. int ret;
  526. unsigned is_sync = task->synchronous;
  527. starpu_task_bundle_t bundle = task->bundle;
  528. /* internally, StarPU manipulates a struct _starpu_job * which is a wrapper around a
  529. * task structure, it is possible that this job structure was already
  530. * allocated. */
  531. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  532. const unsigned continuation =
  533. #ifdef STARPU_OPENMP
  534. j->continuation
  535. #else
  536. 0
  537. #endif
  538. ;
  539. _STARPU_TRACE_TASK_SUBMIT_START();
  540. if (!j->internal)
  541. {
  542. int nsubmitted_tasks = starpu_task_nsubmitted();
  543. if (limit_max_submitted_tasks >= 0 && limit_max_submitted_tasks < nsubmitted_tasks
  544. && limit_min_submitted_tasks >= 0 && limit_min_submitted_tasks < nsubmitted_tasks)
  545. {
  546. starpu_do_schedule();
  547. starpu_task_wait_for_n_submitted(limit_min_submitted_tasks);
  548. }
  549. }
  550. ret = _starpu_task_submit_head(task);
  551. if (ret)
  552. {
  553. _STARPU_TRACE_TASK_SUBMIT_END();
  554. return ret;
  555. }
  556. if (!j->internal && !continuation)
  557. _STARPU_TRACE_TASK_SUBMIT(j);
  558. /* If this is a continuation, we don't modify the implicit data dependencies detected earlier. */
  559. if (task->cl && !continuation)
  560. _starpu_detect_implicit_data_deps(task);
  561. if (bundle)
  562. {
  563. /* We need to make sure that models for other tasks of the
  564. * bundle are also loaded, so the scheduler can estimate the
  565. * duration of the whole bundle */
  566. STARPU_PTHREAD_MUTEX_LOCK(&bundle->mutex);
  567. struct _starpu_task_bundle_entry *entry;
  568. entry = bundle->list;
  569. while (entry)
  570. {
  571. if (entry->task->cl->model)
  572. _starpu_init_and_load_perfmodel(entry->task->cl->model);
  573. if (entry->task->cl->energy_model)
  574. _starpu_init_and_load_perfmodel(entry->task->cl->energy_model);
  575. entry = entry->next;
  576. }
  577. STARPU_PTHREAD_MUTEX_UNLOCK(&bundle->mutex);
  578. }
  579. /* If profiling is activated, we allocate a structure to store the
  580. * appropriate info. */
  581. struct starpu_profiling_task_info *info;
  582. int profiling = starpu_profiling_status_get();
  583. info = _starpu_allocate_profiling_info_if_needed(task);
  584. task->profiling_info = info;
  585. /* The task is considered as block until we are sure there remains not
  586. * dependency. */
  587. task->status = STARPU_TASK_BLOCKED;
  588. if (profiling)
  589. _starpu_clock_gettime(&info->submit_time);
  590. ret = _starpu_submit_job(j);
  591. #ifdef STARPU_SIMGRID
  592. if (_starpu_simgrid_task_submit_cost())
  593. MSG_process_sleep(0.000001);
  594. #endif
  595. if (is_sync)
  596. {
  597. _starpu_sched_do_schedule(task->sched_ctx);
  598. _starpu_wait_job(j);
  599. if (task->destroy)
  600. _starpu_task_destroy(task);
  601. }
  602. _STARPU_TRACE_TASK_SUBMIT_END();
  603. _STARPU_LOG_OUT();
  604. return ret;
  605. }
  606. int _starpu_task_submit_internally(struct starpu_task *task)
  607. {
  608. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  609. j->internal = 1;
  610. return starpu_task_submit(task);
  611. }
  612. /* application should submit new tasks to StarPU through this function */
  613. int starpu_task_submit_to_ctx(struct starpu_task *task, unsigned sched_ctx_id)
  614. {
  615. task->sched_ctx = sched_ctx_id;
  616. return starpu_task_submit(task);
  617. }
  618. /* The StarPU core can submit tasks directly to the scheduler or a worker,
  619. * skipping dependencies completely (when it knows what it is doing). */
  620. int _starpu_task_submit_nodeps(struct starpu_task *task)
  621. {
  622. int ret = _starpu_task_submit_head(task);
  623. STARPU_ASSERT(ret == 0);
  624. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  625. _starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
  626. _starpu_sched_task_submit(task);
  627. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  628. _starpu_handle_job_submission(j);
  629. _starpu_increment_nready_tasks_of_sched_ctx(j->task->sched_ctx, j->task->flops, j->task);
  630. if (task->cl)
  631. /* This would be done by data dependencies checking */
  632. _starpu_job_set_ordered_buffers(j);
  633. task->status = STARPU_TASK_READY;
  634. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  635. return _starpu_push_task(j);
  636. }
  637. /*
  638. * worker->sched_mutex must be locked when calling this function.
  639. */
  640. int _starpu_task_submit_conversion_task(struct starpu_task *task,
  641. unsigned int workerid)
  642. {
  643. int ret;
  644. STARPU_ASSERT(task->cl);
  645. STARPU_ASSERT(task->execute_on_a_specific_worker);
  646. ret = _starpu_task_submit_head(task);
  647. STARPU_ASSERT(ret == 0);
  648. /* We retain handle reference count that would have been acquired by data dependencies. */
  649. unsigned i;
  650. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  651. for (i=0; i<nbuffers; i++)
  652. {
  653. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  654. _starpu_spin_lock(&handle->header_lock);
  655. handle->busy_count++;
  656. _starpu_spin_unlock(&handle->header_lock);
  657. }
  658. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  659. _starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
  660. _starpu_sched_task_submit(task);
  661. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  662. _starpu_handle_job_submission(j);
  663. _starpu_increment_nready_tasks_of_sched_ctx(j->task->sched_ctx, j->task->flops, j->task);
  664. _starpu_job_set_ordered_buffers(j);
  665. task->status = STARPU_TASK_READY;
  666. _starpu_profiling_set_task_push_start_time(task);
  667. unsigned node = starpu_worker_get_memory_node(workerid);
  668. if (starpu_get_prefetch_flag())
  669. starpu_prefetch_task_input_on_node(task, node);
  670. struct _starpu_worker *worker;
  671. worker = _starpu_get_worker_struct(workerid);
  672. starpu_task_list_push_back(&worker->local_tasks, task);
  673. _starpu_profiling_set_task_push_end_time(task);
  674. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  675. return 0;
  676. }
  677. void starpu_codelet_init(struct starpu_codelet *cl)
  678. {
  679. memset(cl, 0, sizeof(struct starpu_codelet));
  680. }
  681. void starpu_codelet_display_stats(struct starpu_codelet *cl)
  682. {
  683. unsigned worker;
  684. unsigned nworkers = starpu_worker_get_count();
  685. if (cl->name)
  686. fprintf(stderr, "Statistics for codelet %s\n", cl->name);
  687. else if (cl->model && cl->model->symbol)
  688. fprintf(stderr, "Statistics for codelet %s\n", cl->model->symbol);
  689. unsigned long total = 0;
  690. for (worker = 0; worker < nworkers; worker++)
  691. total += cl->per_worker_stats[worker];
  692. for (worker = 0; worker < nworkers; worker++)
  693. {
  694. char name[32];
  695. starpu_worker_get_name(worker, name, 32);
  696. fprintf(stderr, "\t%s -> %lu / %lu (%2.2f %%)\n", name, cl->per_worker_stats[worker], total, (100.0f*cl->per_worker_stats[worker])/total);
  697. }
  698. }
  699. /*
  700. * We wait for all the tasks that have already been submitted. Note that a
  701. * regenerable is not considered finished until it was explicitely set as
  702. * non-regenerale anymore (eg. from a callback).
  703. */
  704. int _starpu_task_wait_for_all_and_return_nb_waited_tasks(void)
  705. {
  706. unsigned nsched_ctxs = _starpu_get_nsched_ctxs();
  707. unsigned sched_ctx_id = nsched_ctxs == 1 ? 0 : starpu_sched_ctx_get_context();
  708. /* if there is no indication about which context to wait,
  709. we wait for all tasks submitted to starpu */
  710. if (sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  711. {
  712. _STARPU_DEBUG("Waiting for all tasks\n");
  713. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait_for_all must not be called from a task or callback");
  714. STARPU_AYU_BARRIER();
  715. struct _starpu_machine_config *config = _starpu_get_machine_config();
  716. if(config->topology.nsched_ctxs == 1)
  717. {
  718. _starpu_sched_do_schedule(0);
  719. return _starpu_task_wait_for_all_in_ctx_and_return_nb_waited_tasks(0);
  720. }
  721. else
  722. {
  723. int s;
  724. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  725. {
  726. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  727. {
  728. _starpu_sched_do_schedule(config->sched_ctxs[s].id);
  729. }
  730. }
  731. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  732. {
  733. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  734. {
  735. starpu_task_wait_for_all_in_ctx(config->sched_ctxs[s].id);
  736. }
  737. }
  738. return 0;
  739. }
  740. }
  741. else
  742. {
  743. _starpu_sched_do_schedule(sched_ctx_id);
  744. _STARPU_DEBUG("Waiting for tasks submitted to context %u\n", sched_ctx_id);
  745. return _starpu_task_wait_for_all_in_ctx_and_return_nb_waited_tasks(sched_ctx_id);
  746. }
  747. }
  748. int starpu_task_wait_for_all(void)
  749. {
  750. _starpu_task_wait_for_all_and_return_nb_waited_tasks();
  751. return 0;
  752. }
  753. int _starpu_task_wait_for_all_in_ctx_and_return_nb_waited_tasks(unsigned sched_ctx)
  754. {
  755. _STARPU_TRACE_TASK_WAIT_FOR_ALL_START();
  756. int ret = _starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx);
  757. _STARPU_TRACE_TASK_WAIT_FOR_ALL_END();
  758. /* TODO: improve Temanejo into knowing about contexts ... */
  759. STARPU_AYU_BARRIER();
  760. return ret;
  761. }
  762. int starpu_task_wait_for_all_in_ctx(unsigned sched_ctx)
  763. {
  764. _starpu_task_wait_for_all_in_ctx_and_return_nb_waited_tasks(sched_ctx);
  765. return 0;
  766. }
  767. /*
  768. * We wait until there's a certain number of the tasks that have already been
  769. * submitted left. Note that a regenerable is not considered finished until it
  770. * was explicitely set as non-regenerale anymore (eg. from a callback).
  771. */
  772. int starpu_task_wait_for_n_submitted(unsigned n)
  773. {
  774. unsigned nsched_ctxs = _starpu_get_nsched_ctxs();
  775. unsigned sched_ctx_id = nsched_ctxs == 1 ? 0 : starpu_sched_ctx_get_context();
  776. /* if there is no indication about which context to wait,
  777. we wait for all tasks submitted to starpu */
  778. if (sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  779. {
  780. _STARPU_DEBUG("Waiting for all tasks\n");
  781. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait_for_n_submitted must not be called from a task or callback");
  782. struct _starpu_machine_config *config = _starpu_get_machine_config();
  783. if(config->topology.nsched_ctxs == 1)
  784. _starpu_wait_for_n_submitted_tasks_of_sched_ctx(0, n);
  785. else
  786. {
  787. int s;
  788. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  789. {
  790. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  791. {
  792. _starpu_wait_for_n_submitted_tasks_of_sched_ctx(config->sched_ctxs[s].id, n);
  793. }
  794. }
  795. }
  796. return 0;
  797. }
  798. else
  799. {
  800. _STARPU_DEBUG("Waiting for tasks submitted to context %u\n", sched_ctx_id);
  801. _starpu_wait_for_n_submitted_tasks_of_sched_ctx(sched_ctx_id, n);
  802. }
  803. return 0;
  804. }
  805. int starpu_task_wait_for_n_submitted_in_ctx(unsigned sched_ctx, unsigned n)
  806. {
  807. _starpu_wait_for_n_submitted_tasks_of_sched_ctx(sched_ctx, n);
  808. return 0;
  809. }
  810. /*
  811. * We wait until there is no ready task any more (i.e. StarPU will not be able
  812. * to progress any more).
  813. */
  814. int starpu_task_wait_for_no_ready(void)
  815. {
  816. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait_for_no_ready must not be called from a task or callback");
  817. struct _starpu_machine_config *config = _starpu_get_machine_config();
  818. if(config->topology.nsched_ctxs == 1)
  819. {
  820. _starpu_sched_do_schedule(0);
  821. _starpu_wait_for_no_ready_of_sched_ctx(0);
  822. }
  823. else
  824. {
  825. int s;
  826. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  827. {
  828. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  829. {
  830. _starpu_sched_do_schedule(config->sched_ctxs[s].id);
  831. }
  832. }
  833. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  834. {
  835. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  836. {
  837. _starpu_wait_for_no_ready_of_sched_ctx(config->sched_ctxs[s].id);
  838. }
  839. }
  840. }
  841. return 0;
  842. }
  843. void starpu_do_schedule(void)
  844. {
  845. struct _starpu_machine_config *config = _starpu_get_machine_config();
  846. if(config->topology.nsched_ctxs == 1)
  847. _starpu_sched_do_schedule(0);
  848. else
  849. {
  850. int s;
  851. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  852. {
  853. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  854. {
  855. _starpu_sched_do_schedule(config->sched_ctxs[s].id);
  856. }
  857. }
  858. }
  859. }
  860. void
  861. starpu_drivers_request_termination(void)
  862. {
  863. struct _starpu_machine_config *config = _starpu_get_machine_config();
  864. STARPU_PTHREAD_MUTEX_LOCK(&config->submitted_mutex);
  865. int nsubmitted = starpu_task_nsubmitted();
  866. config->submitting = 0;
  867. if (nsubmitted == 0)
  868. {
  869. ANNOTATE_HAPPENS_AFTER(&config->running);
  870. config->running = 0;
  871. ANNOTATE_HAPPENS_BEFORE(&config->running);
  872. STARPU_WMB();
  873. int s;
  874. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  875. {
  876. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  877. {
  878. _starpu_check_nsubmitted_tasks_of_sched_ctx(config->sched_ctxs[s].id);
  879. }
  880. }
  881. }
  882. STARPU_PTHREAD_MUTEX_UNLOCK(&config->submitted_mutex);
  883. }
  884. int starpu_task_nsubmitted(void)
  885. {
  886. int nsubmitted = 0;
  887. struct _starpu_machine_config *config = _starpu_get_machine_config();
  888. if(config->topology.nsched_ctxs == 1)
  889. nsubmitted = _starpu_get_nsubmitted_tasks_of_sched_ctx(0);
  890. else
  891. {
  892. int s;
  893. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  894. {
  895. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  896. {
  897. nsubmitted += _starpu_get_nsubmitted_tasks_of_sched_ctx(config->sched_ctxs[s].id);
  898. }
  899. }
  900. }
  901. return nsubmitted;
  902. }
  903. int starpu_task_nready(void)
  904. {
  905. int nready = 0;
  906. struct _starpu_machine_config *config = _starpu_get_machine_config();
  907. if(config->topology.nsched_ctxs == 1)
  908. nready = starpu_sched_ctx_get_nready_tasks(0);
  909. else
  910. {
  911. int s;
  912. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  913. {
  914. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  915. {
  916. nready += starpu_sched_ctx_get_nready_tasks(config->sched_ctxs[s].id);
  917. }
  918. }
  919. }
  920. return nready;
  921. }
  922. /* Return the task currently executed by the worker, or NULL if this is called
  923. * either from a thread that is not a task or simply because there is no task
  924. * being executed at the moment. */
  925. struct starpu_task *starpu_task_get_current(void)
  926. {
  927. return (struct starpu_task *) STARPU_PTHREAD_GETSPECIFIC(current_task_key);
  928. }
  929. void _starpu_set_current_task(struct starpu_task *task)
  930. {
  931. STARPU_PTHREAD_SETSPECIFIC(current_task_key, task);
  932. }
  933. #ifdef STARPU_OPENMP
  934. /* Prepare the fields of the currentl task for accepting a new set of
  935. * dependencies in anticipation of becoming a continuation.
  936. *
  937. * When the task becomes 'continued', it will only be queued again when the new
  938. * set of dependencies is fulfilled. */
  939. void _starpu_task_prepare_for_continuation(void)
  940. {
  941. _starpu_job_prepare_for_continuation(_starpu_get_job_associated_to_task(starpu_task_get_current()));
  942. }
  943. void _starpu_task_prepare_for_continuation_ext(unsigned continuation_resubmit,
  944. void (*continuation_callback_on_sleep)(void *arg), void *continuation_callback_on_sleep_arg)
  945. {
  946. _starpu_job_prepare_for_continuation_ext(_starpu_get_job_associated_to_task(starpu_task_get_current()),
  947. continuation_resubmit, continuation_callback_on_sleep, continuation_callback_on_sleep_arg);
  948. }
  949. void _starpu_task_set_omp_cleanup_callback(struct starpu_task *task, void (*omp_cleanup_callback)(void *arg), void *omp_cleanup_callback_arg)
  950. {
  951. _starpu_job_set_omp_cleanup_callback(_starpu_get_job_associated_to_task(task),
  952. omp_cleanup_callback, omp_cleanup_callback_arg);
  953. }
  954. #endif
  955. /*
  956. * Returns 0 if tasks does not use any multiformat handle, 1 otherwise.
  957. */
  958. int
  959. _starpu_task_uses_multiformat_handles(struct starpu_task *task)
  960. {
  961. unsigned i;
  962. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  963. for (i = 0; i < nbuffers; i++)
  964. {
  965. if (_starpu_data_is_multiformat_handle(STARPU_TASK_GET_HANDLE(task, i)))
  966. return 1;
  967. }
  968. return 0;
  969. }
  970. /*
  971. * Checks whether the given handle needs to be converted in order to be used on
  972. * the node given as the second argument.
  973. */
  974. int
  975. _starpu_handle_needs_conversion_task(starpu_data_handle_t handle,
  976. unsigned int node)
  977. {
  978. return _starpu_handle_needs_conversion_task_for_arch(handle, starpu_node_get_kind(node));
  979. }
  980. int
  981. _starpu_handle_needs_conversion_task_for_arch(starpu_data_handle_t handle,
  982. enum starpu_node_kind node_kind)
  983. {
  984. /*
  985. * Here, we assume that CUDA devices and OpenCL devices use the
  986. * same data structure. A conversion is only needed when moving
  987. * data from a CPU to a GPU, or the other way around.
  988. */
  989. switch (node_kind)
  990. {
  991. case STARPU_CPU_RAM:
  992. switch(starpu_node_get_kind(handle->mf_node))
  993. {
  994. case STARPU_CPU_RAM:
  995. return 0;
  996. case STARPU_CUDA_RAM: /* Fall through */
  997. case STARPU_OPENCL_RAM:
  998. case STARPU_MIC_RAM:
  999. case STARPU_MPI_MS_RAM:
  1000. case STARPU_SCC_RAM:
  1001. return 1;
  1002. default:
  1003. STARPU_ABORT();
  1004. }
  1005. break;
  1006. case STARPU_CUDA_RAM: /* Fall through */
  1007. case STARPU_OPENCL_RAM:
  1008. case STARPU_MIC_RAM:
  1009. case STARPU_SCC_RAM:
  1010. switch(starpu_node_get_kind(handle->mf_node))
  1011. {
  1012. case STARPU_CPU_RAM:
  1013. return 1;
  1014. case STARPU_CUDA_RAM:
  1015. case STARPU_OPENCL_RAM:
  1016. case STARPU_MIC_RAM:
  1017. case STARPU_MPI_MS_RAM:
  1018. case STARPU_SCC_RAM:
  1019. return 0;
  1020. default:
  1021. STARPU_ABORT();
  1022. }
  1023. break;
  1024. default:
  1025. STARPU_ABORT();
  1026. }
  1027. /* that instruction should never be reached */
  1028. return -EINVAL;
  1029. }
  1030. void starpu_task_set_implementation(struct starpu_task *task, unsigned impl)
  1031. {
  1032. _starpu_get_job_associated_to_task(task)->nimpl = impl;
  1033. }
  1034. unsigned starpu_task_get_implementation(struct starpu_task *task)
  1035. {
  1036. return _starpu_get_job_associated_to_task(task)->nimpl;
  1037. }
  1038. static starpu_pthread_t watchdog_thread;
  1039. /* Check from times to times that StarPU does finish some tasks */
  1040. static void *watchdog_func(void *arg)
  1041. {
  1042. char *timeout_env = arg;
  1043. float timeout;
  1044. #ifdef _MSC_VER
  1045. timeout = ((float) _atoi64(timeout_env)) / 1000000;
  1046. #else
  1047. timeout = ((float) atoll(timeout_env)) / 1000000;
  1048. #endif
  1049. struct _starpu_machine_config *config = _starpu_get_machine_config();
  1050. starpu_pthread_setname("watchdog");
  1051. STARPU_PTHREAD_MUTEX_LOCK(&config->submitted_mutex);
  1052. while (_starpu_machine_is_running())
  1053. {
  1054. int last_nsubmitted = starpu_task_nsubmitted();
  1055. config->watchdog_ok = 0;
  1056. STARPU_PTHREAD_MUTEX_UNLOCK(&config->submitted_mutex);
  1057. /* If we do a sleep(timeout), we might have to wait too long at the end of the computation. */
  1058. /* To avoid that, we do several sleep() of 1s (and check after each if starpu is still running) */
  1059. float t;
  1060. for (t = timeout ; t > 1.; t--)
  1061. {
  1062. starpu_sleep(1.);
  1063. if (!_starpu_machine_is_running())
  1064. /* Application finished, don't bother finishing the sleep */
  1065. return NULL;
  1066. }
  1067. /* and one final sleep (of less than 1 s) with the rest (if needed) */
  1068. if (t > 0.)
  1069. starpu_sleep(t);
  1070. STARPU_PTHREAD_MUTEX_LOCK(&config->submitted_mutex);
  1071. if (!config->watchdog_ok && last_nsubmitted
  1072. && last_nsubmitted == starpu_task_nsubmitted())
  1073. {
  1074. _STARPU_MSG("The StarPU watchdog detected that no task finished for %fs (can be configured through STARPU_WATCHDOG_TIMEOUT)\n", timeout);
  1075. if (watchdog_crash)
  1076. {
  1077. _STARPU_MSG("Crashing the process\n");
  1078. raise(SIGABRT);
  1079. }
  1080. else
  1081. _STARPU_MSG("Set the STARPU_WATCHDOG_CRASH environment variable if you want to abort the process in such a case\n");
  1082. }
  1083. /* Only shout again after another period */
  1084. config->watchdog_ok = 1;
  1085. }
  1086. STARPU_PTHREAD_MUTEX_UNLOCK(&config->submitted_mutex);
  1087. return NULL;
  1088. }
  1089. void _starpu_watchdog_init(void)
  1090. {
  1091. struct _starpu_machine_config *config = _starpu_get_machine_config();
  1092. char *timeout_env = starpu_getenv("STARPU_WATCHDOG_TIMEOUT");
  1093. STARPU_PTHREAD_MUTEX_INIT(&config->submitted_mutex, NULL);
  1094. if (!timeout_env)
  1095. return;
  1096. STARPU_PTHREAD_CREATE(&watchdog_thread, NULL, watchdog_func, timeout_env);
  1097. }
  1098. void _starpu_watchdog_shutdown(void)
  1099. {
  1100. char *timeout_env = starpu_getenv("STARPU_WATCHDOG_TIMEOUT");
  1101. if (!timeout_env)
  1102. return;
  1103. starpu_pthread_join(watchdog_thread, NULL);
  1104. }