task.c 29 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009-2013 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011, 2012, 2013 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 Télécom-SudParis
  6. * Copyright (C) 2011 INRIA
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. #include <starpu.h>
  20. #include <starpu_profiling.h>
  21. #include <core/workers.h>
  22. #include <core/sched_ctx.h>
  23. #include <core/jobs.h>
  24. #include <core/task.h>
  25. #include <core/task_bundle.h>
  26. #include <common/config.h>
  27. #include <common/utils.h>
  28. #include <common/fxt.h>
  29. #include <profiling/profiling.h>
  30. #include <profiling/bound.h>
  31. #include <math.h>
  32. #include <string.h>
  33. #include <core/debug.h>
  34. #include <core/sched_ctx.h>
  35. #include <time.h>
  36. #ifdef STARPU_HAVE_WINDOWS
  37. #include <windows.h>
  38. #endif
  39. /* XXX this should be reinitialized when StarPU is shutdown (or we should make
  40. * sure that no task remains !) */
  41. /* TODO we could make this hierarchical to avoid contention ? */
  42. //static starpu_pthread_cond_t submitted_cond = STARPU_PTHREAD_COND_INITIALIZER;
  43. /* This key stores the task currently handled by the thread, note that we
  44. * cannot use the worker structure to store that information because it is
  45. * possible that we have a task with a NULL codelet, which means its callback
  46. * could be executed by a user thread as well. */
  47. static starpu_pthread_key_t current_task_key;
  48. void starpu_task_init(struct starpu_task *task)
  49. {
  50. /* TODO: memcpy from a template instead? benchmark it */
  51. STARPU_ASSERT(task);
  52. /* As most of the fields must be initialised at NULL, let's put 0
  53. * everywhere */
  54. memset(task, 0, sizeof(struct starpu_task));
  55. task->sequential_consistency = 1;
  56. /* Now we can initialise fields which recquire custom value */
  57. #if STARPU_DEFAULT_PRIO != 0
  58. task->priority = STARPU_DEFAULT_PRIO;
  59. #endif
  60. task->detach = 1;
  61. #if STARPU_TASK_INVALID != 0
  62. task->status = STARPU_TASK_INVALID;
  63. #endif
  64. task->predicted = NAN;
  65. task->predicted_transfer = NAN;
  66. task->magic = 42;
  67. task->sched_ctx = STARPU_NMAX_SCHED_CTXS;
  68. task->flops = 0.0;
  69. task->scheduled = 0;
  70. task->dyn_handles = NULL;
  71. task->dyn_interfaces = NULL;
  72. task->name = NULL;
  73. }
  74. /* Free all the ressources allocated for a task, without deallocating the task
  75. * structure itself (this is required for statically allocated tasks).
  76. * All values previously set by the user, like codelet and handles, remain
  77. * unchanged */
  78. void starpu_task_clean(struct starpu_task *task)
  79. {
  80. STARPU_ASSERT(task);
  81. /* If a buffer was allocated to store the profiling info, we free it. */
  82. if (task->profiling_info)
  83. {
  84. free(task->profiling_info);
  85. task->profiling_info = NULL;
  86. }
  87. /* If case the task is (still) part of a bundle */
  88. starpu_task_bundle_t bundle = task->bundle;
  89. if (bundle)
  90. starpu_task_bundle_remove(bundle, task);
  91. if (task->dyn_handles)
  92. {
  93. free(task->dyn_handles);
  94. task->dyn_handles = NULL;
  95. free(task->dyn_interfaces);
  96. task->dyn_interfaces = NULL;
  97. }
  98. struct _starpu_job *j = (struct _starpu_job *)task->starpu_private;
  99. if (j)
  100. {
  101. _starpu_job_destroy(j);
  102. task->starpu_private = NULL;
  103. }
  104. }
  105. struct starpu_task * STARPU_ATTRIBUTE_MALLOC starpu_task_create(void)
  106. {
  107. struct starpu_task *task;
  108. task = (struct starpu_task *) malloc(sizeof(struct starpu_task));
  109. STARPU_ASSERT(task);
  110. starpu_task_init(task);
  111. /* Dynamically allocated tasks are destroyed by default */
  112. task->destroy = 1;
  113. return task;
  114. }
  115. /* Free the ressource allocated during starpu_task_create. This function can be
  116. * called automatically after the execution of a task by setting the "destroy"
  117. * flag of the starpu_task structure (default behaviour). Calling this function
  118. * on a statically allocated task results in an undefined behaviour. */
  119. void _starpu_task_destroy(struct starpu_task *task)
  120. {
  121. /* If starpu_task_destroy is called in a callback, we just set the destroy
  122. flag. The task will be destroyed after the callback returns */
  123. if (task == starpu_task_get_current()
  124. && _starpu_get_local_worker_status() == STATUS_CALLBACK)
  125. {
  126. task->destroy = 1;
  127. }
  128. else
  129. {
  130. starpu_task_clean(task);
  131. /* TODO handle the case of task with detach = 1 and destroy = 1 */
  132. /* TODO handle the case of non terminated tasks -> return -EINVAL */
  133. /* Does user want StarPU release cl_arg ? */
  134. if (task->cl_arg_free)
  135. free(task->cl_arg);
  136. /* Does user want StarPU release callback_arg ? */
  137. if (task->callback_arg_free)
  138. free(task->callback_arg);
  139. /* Does user want StarPU release prologue_callback_arg ? */
  140. if (task->prologue_callback_arg_free)
  141. free(task->prologue_callback_arg);
  142. free(task);
  143. }
  144. }
  145. void starpu_task_destroy(struct starpu_task *task)
  146. {
  147. STARPU_ASSERT(task);
  148. STARPU_ASSERT_MSG(!task->destroy || !task->detach, "starpu_task_destroy must not be called for task with destroy = 1 and detach = 1");
  149. _starpu_task_destroy(task);
  150. }
  151. int starpu_task_wait(struct starpu_task *task)
  152. {
  153. _STARPU_LOG_IN();
  154. STARPU_ASSERT(task);
  155. STARPU_ASSERT_MSG(!task->detach, "starpu_task_wait can only be called on tasks with detach = 0");
  156. if (task->detach || task->synchronous)
  157. {
  158. _STARPU_DEBUG("Task is detached or synchronous. Waiting returns immediately\n");
  159. _STARPU_LOG_OUT_TAG("einval");
  160. return -EINVAL;
  161. }
  162. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait must not be called from a task or callback");
  163. struct _starpu_job *j = (struct _starpu_job *)task->starpu_private;
  164. _starpu_wait_job(j);
  165. /* as this is a synchronous task, the liberation of the job
  166. structure was deferred */
  167. if (task->destroy)
  168. _starpu_task_destroy(task);
  169. _STARPU_LOG_OUT();
  170. return 0;
  171. }
  172. struct _starpu_job *_starpu_get_job_associated_to_task(struct starpu_task *task)
  173. {
  174. STARPU_ASSERT(task);
  175. if (!task->starpu_private)
  176. {
  177. struct _starpu_job *j = _starpu_job_create(task);
  178. task->starpu_private = j;
  179. }
  180. return (struct _starpu_job *)task->starpu_private;
  181. }
  182. /* NB in case we have a regenerable task, it is possible that the job was
  183. * already counted. */
  184. int _starpu_submit_job(struct _starpu_job *j)
  185. {
  186. struct starpu_task *task = j->task;
  187. _STARPU_LOG_IN();
  188. /* notify bound computation of a new task */
  189. _starpu_bound_record(j);
  190. _starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
  191. #ifdef STARPU_USE_SC_HYPERVISOR
  192. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(j->task->sched_ctx);
  193. if(sched_ctx != NULL && j->task->sched_ctx != _starpu_get_initial_sched_ctx()->id && j->task->sched_ctx != STARPU_NMAX_SCHED_CTXS
  194. && sched_ctx->perf_counters != NULL)
  195. {
  196. struct starpu_perfmodel_arch arch;
  197. arch.type = STARPU_CPU_WORKER;
  198. arch.devid = 0;
  199. arch.ncore = 0;
  200. _starpu_compute_buffers_footprint(j->task->cl->model, &arch, 0, j);
  201. int i;
  202. size_t data_size = 0;
  203. for(i = 0; i < STARPU_NMAXBUFS; i++)
  204. {
  205. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  206. if (handle != NULL)
  207. data_size += _starpu_data_get_size(handle);
  208. }
  209. _STARPU_TRACE_HYPERVISOR_BEGIN();
  210. sched_ctx->perf_counters->notify_submitted_job(j->task, j->footprint, data_size);
  211. _STARPU_TRACE_HYPERVISOR_END();
  212. }
  213. #endif//STARPU_USE_SC_HYPERVISOR
  214. /* We retain handle reference count */
  215. if (task->cl)
  216. {
  217. unsigned i;
  218. for (i=0; i<task->cl->nbuffers; i++)
  219. {
  220. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  221. _starpu_spin_lock(&handle->header_lock);
  222. handle->busy_count++;
  223. _starpu_spin_unlock(&handle->header_lock);
  224. }
  225. }
  226. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  227. /* Need to atomically set submitted to 1 and check dependencies, since
  228. * this is concucrent with _starpu_notify_cg */
  229. j->terminated = 0;
  230. if (!j->submitted)
  231. j->submitted = 1;
  232. else
  233. j->submitted = 2;
  234. int ret = _starpu_enforce_deps_and_schedule(j);
  235. _STARPU_LOG_OUT();
  236. return ret;
  237. }
  238. /* Note: this is racy, so valgrind would complain. But since we'll always put
  239. * the same values, this is not a problem. */
  240. void _starpu_codelet_check_deprecated_fields(struct starpu_codelet *cl)
  241. {
  242. if (!cl)
  243. return;
  244. int is_where_unset = cl->where == 0;
  245. /* Check deprecated and unset fields (where, <device>_func,
  246. * <device>_funcs) */
  247. /* CPU */
  248. if (cl->cpu_func && cl->cpu_func != STARPU_MULTIPLE_CPU_IMPLEMENTATIONS && cl->cpu_funcs[0])
  249. {
  250. _STARPU_DISP("[warning] [struct starpu_codelet] both cpu_func and cpu_funcs are set. Ignoring cpu_func.\n");
  251. cl->cpu_func = STARPU_MULTIPLE_CPU_IMPLEMENTATIONS;
  252. }
  253. if (cl->cpu_func && cl->cpu_func != STARPU_MULTIPLE_CPU_IMPLEMENTATIONS)
  254. {
  255. cl->cpu_funcs[0] = cl->cpu_func;
  256. cl->cpu_func = STARPU_MULTIPLE_CPU_IMPLEMENTATIONS;
  257. }
  258. if (cl->cpu_funcs[0] && cl->cpu_func == 0)
  259. {
  260. cl->cpu_func = STARPU_MULTIPLE_CPU_IMPLEMENTATIONS;
  261. }
  262. if (cl->cpu_funcs[0] && is_where_unset)
  263. {
  264. cl->where |= STARPU_CPU;
  265. }
  266. /* CUDA */
  267. if (cl->cuda_func && cl->cuda_func != STARPU_MULTIPLE_CUDA_IMPLEMENTATIONS && cl->cuda_funcs[0])
  268. {
  269. _STARPU_DISP("[warning] [struct starpu_codelet] both cuda_func and cuda_funcs are set. Ignoring cuda_func.\n");
  270. cl->cuda_func = STARPU_MULTIPLE_CUDA_IMPLEMENTATIONS;
  271. }
  272. if (cl->cuda_func && cl->cuda_func != STARPU_MULTIPLE_CUDA_IMPLEMENTATIONS)
  273. {
  274. cl->cuda_funcs[0] = cl->cuda_func;
  275. cl->cuda_func = STARPU_MULTIPLE_CUDA_IMPLEMENTATIONS;
  276. }
  277. if (cl->cuda_funcs[0] && cl->cuda_func == 0)
  278. {
  279. cl->cuda_func = STARPU_MULTIPLE_CUDA_IMPLEMENTATIONS;
  280. }
  281. if (cl->cuda_funcs[0] && is_where_unset)
  282. {
  283. cl->where |= STARPU_CUDA;
  284. }
  285. /* OpenCL */
  286. if (cl->opencl_func && cl->opencl_func != STARPU_MULTIPLE_OPENCL_IMPLEMENTATIONS && cl->opencl_funcs[0])
  287. {
  288. _STARPU_DISP("[warning] [struct starpu_codelet] both opencl_func and opencl_funcs are set. Ignoring opencl_func.\n");
  289. cl->opencl_func = STARPU_MULTIPLE_OPENCL_IMPLEMENTATIONS;
  290. }
  291. if (cl->opencl_func && cl->opencl_func != STARPU_MULTIPLE_OPENCL_IMPLEMENTATIONS)
  292. {
  293. cl->opencl_funcs[0] = cl->opencl_func;
  294. cl->opencl_func = STARPU_MULTIPLE_OPENCL_IMPLEMENTATIONS;
  295. }
  296. if (cl->opencl_funcs[0] && cl->opencl_func == 0)
  297. {
  298. cl->opencl_func = STARPU_MULTIPLE_OPENCL_IMPLEMENTATIONS;
  299. }
  300. if (cl->opencl_funcs[0] && is_where_unset)
  301. {
  302. cl->where |= STARPU_OPENCL;
  303. }
  304. if (cl->mic_funcs[0] && is_where_unset)
  305. {
  306. cl->where |= STARPU_MIC;
  307. }
  308. if (cl->scc_funcs[0] && is_where_unset)
  309. {
  310. cl->where |= STARPU_SCC;
  311. }
  312. if (cl->cpu_funcs_name[0] && is_where_unset)
  313. {
  314. cl->where |= STARPU_MIC|STARPU_SCC;
  315. }
  316. }
  317. void _starpu_task_check_deprecated_fields(struct starpu_task *task)
  318. {
  319. if (task->cl)
  320. {
  321. unsigned i;
  322. for(i=0; i<STARPU_MIN(task->cl->nbuffers, STARPU_NMAXBUFS) ; i++)
  323. {
  324. if (task->buffers[i].handle && task->handles[i])
  325. {
  326. _STARPU_DISP("[warning][struct starpu_task] task->buffers[%u] and task->handles[%u] both set. Ignoring task->buffers[%u] ?\n", i, i, i);
  327. STARPU_ASSERT(task->buffers[i].mode == task->cl->modes[i]);
  328. STARPU_ABORT();
  329. }
  330. if (task->buffers[i].handle)
  331. {
  332. task->handles[i] = task->buffers[i].handle;
  333. task->cl->modes[i] = task->buffers[i].mode;
  334. }
  335. }
  336. }
  337. }
  338. /* application should submit new tasks to StarPU through this function */
  339. int starpu_task_submit(struct starpu_task *task)
  340. {
  341. _STARPU_LOG_IN();
  342. STARPU_ASSERT(task);
  343. STARPU_ASSERT_MSG(task->magic == 42, "Tasks must be created with starpu_task_create, or initialized with starpu_task_init.");
  344. int ret;
  345. unsigned is_sync = task->synchronous;
  346. starpu_task_bundle_t bundle = task->bundle;
  347. /* internally, StarPU manipulates a struct _starpu_job * which is a wrapper around a
  348. * task structure, it is possible that this job structure was already
  349. * allocated. */
  350. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  351. if (j->internal)
  352. {
  353. // Internal tasks are submitted to initial context
  354. task->sched_ctx = _starpu_get_initial_sched_ctx()->id;
  355. }
  356. else if (task->sched_ctx == STARPU_NMAX_SCHED_CTXS)
  357. {
  358. // If the task has not specified a context, we set the current context
  359. task->sched_ctx = _starpu_sched_ctx_get_current_context();
  360. }
  361. if (is_sync)
  362. {
  363. /* Perhaps it is not possible to submit a synchronous
  364. * (blocking) task */
  365. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "submitting a synchronous task must not be done from a task or a callback");
  366. task->detach = 0;
  367. }
  368. _starpu_task_check_deprecated_fields(task);
  369. _starpu_codelet_check_deprecated_fields(task->cl);
  370. if (task->cl)
  371. {
  372. unsigned i;
  373. /* Check buffers */
  374. if (task->dyn_handles == NULL)
  375. STARPU_ASSERT_MSG(task->cl->nbuffers <= STARPU_NMAXBUFS, "Codelet %p has too many buffers (%d vs max %d). Either use --enable-maxbuffers configure option to increase the max, or use dyn_handles instead of handles.", task->cl, task->cl->nbuffers, STARPU_NMAXBUFS);
  376. if (task->dyn_handles)
  377. {
  378. task->dyn_interfaces = malloc(task->cl->nbuffers * sizeof(void *));
  379. }
  380. for (i = 0; i < task->cl->nbuffers; i++)
  381. {
  382. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  383. /* Make sure handles are not partitioned */
  384. STARPU_ASSERT_MSG(handle->nchildren == 0, "only unpartitioned data (or the pieces of a partitioned data) can be used in a task");
  385. /* Provide the home interface for now if any,
  386. * for can_execute hooks */
  387. if (handle->home_node != -1)
  388. _STARPU_TASK_SET_INTERFACE(task, starpu_data_get_interface_on_node(handle, handle->home_node), i);
  389. }
  390. /* Check the type of worker(s) required by the task exist */
  391. if (!_starpu_worker_exists(task))
  392. {
  393. _STARPU_LOG_OUT_TAG("ENODEV");
  394. return -ENODEV;
  395. }
  396. /* In case we require that a task should be explicitely
  397. * executed on a specific worker, we make sure that the worker
  398. * is able to execute this task. */
  399. if (task->execute_on_a_specific_worker && !starpu_combined_worker_can_execute_task(task->workerid, task, 0))
  400. {
  401. _STARPU_LOG_OUT_TAG("ENODEV");
  402. return -ENODEV;
  403. }
  404. _starpu_detect_implicit_data_deps(task);
  405. if (task->cl->model && task->cl->model->symbol)
  406. _starpu_load_perfmodel(task->cl->model);
  407. if (task->cl->power_model && task->cl->power_model->symbol)
  408. _starpu_load_perfmodel(task->cl->power_model);
  409. }
  410. if (bundle)
  411. {
  412. /* We need to make sure that models for other tasks of the
  413. * bundle are also loaded, so the scheduler can estimate the
  414. * duration of the whole bundle */
  415. STARPU_PTHREAD_MUTEX_LOCK(&bundle->mutex);
  416. struct _starpu_task_bundle_entry *entry;
  417. entry = bundle->list;
  418. while (entry)
  419. {
  420. if (entry->task->cl->model && entry->task->cl->model->symbol)
  421. _starpu_load_perfmodel(entry->task->cl->model);
  422. if (entry->task->cl->power_model && entry->task->cl->power_model->symbol)
  423. _starpu_load_perfmodel(entry->task->cl->power_model);
  424. entry = entry->next;
  425. }
  426. STARPU_PTHREAD_MUTEX_UNLOCK(&bundle->mutex);
  427. }
  428. /* If profiling is activated, we allocate a structure to store the
  429. * appropriate info. */
  430. struct starpu_profiling_task_info *info;
  431. int profiling = starpu_profiling_status_get();
  432. info = _starpu_allocate_profiling_info_if_needed(task);
  433. task->profiling_info = info;
  434. /* The task is considered as block until we are sure there remains not
  435. * dependency. */
  436. task->status = STARPU_TASK_BLOCKED;
  437. if (profiling)
  438. _starpu_clock_gettime(&info->submit_time);
  439. ret = _starpu_submit_job(j);
  440. if (is_sync)
  441. {
  442. _starpu_wait_job(j);
  443. if (task->destroy)
  444. _starpu_task_destroy(task);
  445. }
  446. _STARPU_LOG_OUT();
  447. return ret;
  448. }
  449. int _starpu_task_submit_internally(struct starpu_task *task)
  450. {
  451. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  452. j->internal = 1;
  453. return starpu_task_submit(task);
  454. }
  455. /* application should submit new tasks to StarPU through this function */
  456. int starpu_task_submit_to_ctx(struct starpu_task *task, unsigned sched_ctx_id)
  457. {
  458. task->sched_ctx = sched_ctx_id;
  459. return starpu_task_submit(task);
  460. }
  461. /* The StarPU core can submit tasks directly to the scheduler or a worker,
  462. * skipping dependencies completely (when it knows what it is doing). */
  463. int _starpu_task_submit_nodeps(struct starpu_task *task)
  464. {
  465. _starpu_task_check_deprecated_fields(task);
  466. _starpu_codelet_check_deprecated_fields(task->cl);
  467. if (task->cl)
  468. {
  469. if (task->cl->model)
  470. _starpu_load_perfmodel(task->cl->model);
  471. if (task->cl->power_model)
  472. _starpu_load_perfmodel(task->cl->power_model);
  473. }
  474. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  475. if (j->internal)
  476. {
  477. // Internal tasks are submitted to initial context
  478. j->task->sched_ctx = _starpu_get_initial_sched_ctx()->id;
  479. }
  480. else if (task->sched_ctx == STARPU_NMAX_SCHED_CTXS)
  481. {
  482. // If the task has not specified a context, we set the current context
  483. j->task->sched_ctx = _starpu_sched_ctx_get_current_context();
  484. }
  485. _starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
  486. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  487. j->submitted = 1;
  488. if (task->cl)
  489. {
  490. /* This would be done by data dependencies checking */
  491. unsigned i;
  492. for (i=0 ; i<task->cl->nbuffers ; i++)
  493. {
  494. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(j->task, i);
  495. _STARPU_JOB_SET_ORDERED_BUFFER_HANDLE(j, handle, i);
  496. enum starpu_data_access_mode mode = STARPU_CODELET_GET_MODE(j->task->cl, i);
  497. _STARPU_JOB_SET_ORDERED_BUFFER_MODE(j, mode, i);
  498. }
  499. }
  500. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  501. return _starpu_push_task(j);
  502. }
  503. /*
  504. * worker->sched_mutex must be locked when calling this function.
  505. */
  506. int _starpu_task_submit_conversion_task(struct starpu_task *task,
  507. unsigned int workerid)
  508. {
  509. STARPU_ASSERT(task->cl);
  510. STARPU_ASSERT(task->execute_on_a_specific_worker);
  511. _starpu_task_check_deprecated_fields(task);
  512. _starpu_codelet_check_deprecated_fields(task->cl);
  513. /* We should factorize that */
  514. if (task->cl->model)
  515. _starpu_load_perfmodel(task->cl->model);
  516. if (task->cl->power_model)
  517. _starpu_load_perfmodel(task->cl->power_model);
  518. /* We retain handle reference count */
  519. unsigned i;
  520. for (i=0; i<task->cl->nbuffers; i++)
  521. {
  522. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  523. _starpu_spin_lock(&handle->header_lock);
  524. handle->busy_count++;
  525. _starpu_spin_unlock(&handle->header_lock);
  526. }
  527. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  528. if (j->internal)
  529. {
  530. // Internal tasks are submitted to initial context
  531. j->task->sched_ctx = _starpu_get_initial_sched_ctx()->id;
  532. }
  533. else if (task->sched_ctx == STARPU_NMAX_SCHED_CTXS)
  534. {
  535. // If the task has not specified a context, we set the current context
  536. j->task->sched_ctx = _starpu_sched_ctx_get_current_context();
  537. }
  538. _starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
  539. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  540. j->submitted = 1;
  541. _starpu_increment_nready_tasks_of_sched_ctx(j->task->sched_ctx, j->task->flops);
  542. for (i=0 ; i<task->cl->nbuffers ; i++)
  543. {
  544. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(j->task, i);
  545. _STARPU_JOB_SET_ORDERED_BUFFER_HANDLE(j, handle, i);
  546. enum starpu_data_access_mode mode = STARPU_CODELET_GET_MODE(j->task->cl, i);
  547. _STARPU_JOB_SET_ORDERED_BUFFER_MODE(j, mode, i);
  548. }
  549. _STARPU_LOG_IN();
  550. task->status = STARPU_TASK_READY;
  551. _starpu_profiling_set_task_push_start_time(task);
  552. unsigned node = starpu_worker_get_memory_node(workerid);
  553. if (starpu_get_prefetch_flag())
  554. starpu_prefetch_task_input_on_node(task, node);
  555. struct _starpu_worker *worker;
  556. worker = _starpu_get_worker_struct(workerid);
  557. starpu_task_list_push_back(&worker->local_tasks, task);
  558. _starpu_profiling_set_task_push_end_time(task);
  559. _STARPU_LOG_OUT();
  560. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  561. return 0;
  562. }
  563. void starpu_codelet_init(struct starpu_codelet *cl)
  564. {
  565. memset(cl, 0, sizeof(struct starpu_codelet));
  566. }
  567. void starpu_codelet_display_stats(struct starpu_codelet *cl)
  568. {
  569. unsigned worker;
  570. unsigned nworkers = starpu_worker_get_count();
  571. if (cl->name)
  572. fprintf(stderr, "Statistics for codelet %s\n", cl->name);
  573. else if (cl->model && cl->model->symbol)
  574. fprintf(stderr, "Statistics for codelet %s\n", cl->model->symbol);
  575. unsigned long total = 0;
  576. for (worker = 0; worker < nworkers; worker++)
  577. total += cl->per_worker_stats[worker];
  578. for (worker = 0; worker < nworkers; worker++)
  579. {
  580. char name[32];
  581. starpu_worker_get_name(worker, name, 32);
  582. fprintf(stderr, "\t%s -> %lu / %lu (%2.2f %%)\n", name, cl->per_worker_stats[worker], total, (100.0f*cl->per_worker_stats[worker])/total);
  583. }
  584. }
  585. /*
  586. * We wait for all the tasks that have already been submitted. Note that a
  587. * regenerable is not considered finished until it was explicitely set as
  588. * non-regenerale anymore (eg. from a callback).
  589. */
  590. int starpu_task_wait_for_all(void)
  591. {
  592. unsigned nsched_ctxs = _starpu_get_nsched_ctxs();
  593. unsigned sched_ctx_id = nsched_ctxs == 1 ? 0 : starpu_sched_ctx_get_context();
  594. /* if there is no indication about which context to wait,
  595. we wait for all tasks submitted to starpu */
  596. if (sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  597. {
  598. _STARPU_DEBUG("Waiting for all tasks\n");
  599. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait_for_all must not be called from a task or callback");
  600. #ifdef HAVE_AYUDAME_H
  601. if (AYU_event) AYU_event(AYU_BARRIER, 0, NULL);
  602. #endif
  603. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  604. if(config->topology.nsched_ctxs == 1)
  605. starpu_task_wait_for_all_in_ctx(0);
  606. else
  607. {
  608. int s;
  609. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  610. {
  611. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  612. {
  613. starpu_task_wait_for_all_in_ctx(config->sched_ctxs[s].id);
  614. }
  615. }
  616. }
  617. return 0;
  618. }
  619. else
  620. {
  621. _STARPU_DEBUG("Waiting for tasks submitted to context %u\n", sched_ctx_id);
  622. return starpu_task_wait_for_all_in_ctx(sched_ctx_id);
  623. }
  624. }
  625. int starpu_task_wait_for_all_in_ctx(unsigned sched_ctx)
  626. {
  627. _starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx);
  628. #ifdef HAVE_AYUDAME_H
  629. /* TODO: improve Temanejo into knowing about contexts ... */
  630. if (AYU_event) AYU_event(AYU_BARRIER, 0, NULL);
  631. #endif
  632. return 0;
  633. }
  634. /*
  635. * We wait until there is no ready task any more (i.e. StarPU will not be able
  636. * to progress any more).
  637. */
  638. int starpu_task_wait_for_no_ready(void)
  639. {
  640. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait_for_no_ready must not be called from a task or callback");
  641. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  642. if(config->topology.nsched_ctxs == 1)
  643. _starpu_wait_for_no_ready_of_sched_ctx(0);
  644. else
  645. {
  646. int s;
  647. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  648. {
  649. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  650. {
  651. _starpu_wait_for_no_ready_of_sched_ctx(config->sched_ctxs[s].id);
  652. }
  653. }
  654. }
  655. return 0;
  656. }
  657. void
  658. starpu_drivers_request_termination(void)
  659. {
  660. struct _starpu_machine_config *config = _starpu_get_machine_config();
  661. STARPU_PTHREAD_MUTEX_LOCK(&config->submitted_mutex);
  662. int nsubmitted = starpu_task_nsubmitted();
  663. config->submitting = 0;
  664. if (nsubmitted == 0)
  665. {
  666. ANNOTATE_HAPPENS_AFTER(&config->running);
  667. config->running = 0;
  668. ANNOTATE_HAPPENS_BEFORE(&config->running);
  669. STARPU_WMB();
  670. int s;
  671. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  672. {
  673. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  674. {
  675. _starpu_check_nsubmitted_tasks_of_sched_ctx(config->sched_ctxs[s].id);
  676. }
  677. }
  678. }
  679. STARPU_PTHREAD_MUTEX_UNLOCK(&config->submitted_mutex);
  680. }
  681. int starpu_task_nsubmitted(void)
  682. {
  683. int nsubmitted = 0;
  684. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  685. if(config->topology.nsched_ctxs == 1)
  686. nsubmitted = _starpu_get_nsubmitted_tasks_of_sched_ctx(0);
  687. else
  688. {
  689. int s;
  690. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  691. {
  692. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  693. {
  694. nsubmitted += _starpu_get_nsubmitted_tasks_of_sched_ctx(config->sched_ctxs[s].id);
  695. }
  696. }
  697. }
  698. return nsubmitted;
  699. }
  700. int starpu_task_nready(void)
  701. {
  702. int nready = 0;
  703. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  704. if(config->topology.nsched_ctxs == 1)
  705. nready = starpu_get_nready_tasks_of_sched_ctx(0);
  706. else
  707. {
  708. int s;
  709. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  710. {
  711. if(config->sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  712. {
  713. nready += starpu_get_nready_tasks_of_sched_ctx(config->sched_ctxs[s].id);
  714. }
  715. }
  716. }
  717. return nready;
  718. }
  719. void _starpu_initialize_current_task_key(void)
  720. {
  721. STARPU_PTHREAD_KEY_CREATE(&current_task_key, NULL);
  722. }
  723. /* Return the task currently executed by the worker, or NULL if this is called
  724. * either from a thread that is not a task or simply because there is no task
  725. * being executed at the moment. */
  726. struct starpu_task *starpu_task_get_current(void)
  727. {
  728. return (struct starpu_task *) STARPU_PTHREAD_GETSPECIFIC(current_task_key);
  729. }
  730. void _starpu_set_current_task(struct starpu_task *task)
  731. {
  732. STARPU_PTHREAD_SETSPECIFIC(current_task_key, task);
  733. }
  734. /*
  735. * Returns 0 if tasks does not use any multiformat handle, 1 otherwise.
  736. */
  737. int
  738. _starpu_task_uses_multiformat_handles(struct starpu_task *task)
  739. {
  740. unsigned i;
  741. for (i = 0; i < task->cl->nbuffers; i++)
  742. {
  743. if (_starpu_data_is_multiformat_handle(STARPU_TASK_GET_HANDLE(task, i)))
  744. return 1;
  745. }
  746. return 0;
  747. }
  748. /*
  749. * Checks whether the given handle needs to be converted in order to be used on
  750. * the node given as the second argument.
  751. */
  752. int
  753. _starpu_handle_needs_conversion_task(starpu_data_handle_t handle,
  754. unsigned int node)
  755. {
  756. return _starpu_handle_needs_conversion_task_for_arch(handle, starpu_node_get_kind(node));
  757. }
  758. int
  759. _starpu_handle_needs_conversion_task_for_arch(starpu_data_handle_t handle,
  760. enum starpu_node_kind node_kind)
  761. {
  762. /*
  763. * Here, we assume that CUDA devices and OpenCL devices use the
  764. * same data structure. A conversion is only needed when moving
  765. * data from a CPU to a GPU, or the other way around.
  766. */
  767. switch (node_kind)
  768. {
  769. case STARPU_CPU_RAM:
  770. switch(starpu_node_get_kind(handle->mf_node))
  771. {
  772. case STARPU_CPU_RAM:
  773. return 0;
  774. case STARPU_CUDA_RAM: /* Fall through */
  775. case STARPU_OPENCL_RAM:
  776. case STARPU_MIC_RAM:
  777. case STARPU_SCC_RAM:
  778. return 1;
  779. default:
  780. STARPU_ABORT();
  781. }
  782. break;
  783. case STARPU_CUDA_RAM: /* Fall through */
  784. case STARPU_OPENCL_RAM:
  785. case STARPU_MIC_RAM:
  786. case STARPU_SCC_RAM:
  787. switch(starpu_node_get_kind(handle->mf_node))
  788. {
  789. case STARPU_CPU_RAM:
  790. return 1;
  791. case STARPU_CUDA_RAM:
  792. case STARPU_OPENCL_RAM:
  793. case STARPU_MIC_RAM:
  794. case STARPU_SCC_RAM:
  795. return 0;
  796. default:
  797. STARPU_ABORT();
  798. }
  799. break;
  800. default:
  801. STARPU_ABORT();
  802. }
  803. /* that instruction should never be reached */
  804. return -EINVAL;
  805. }
  806. starpu_cpu_func_t _starpu_task_get_cpu_nth_implementation(struct starpu_codelet *cl, unsigned nimpl)
  807. {
  808. return cl->cpu_funcs[nimpl];
  809. }
  810. starpu_cuda_func_t _starpu_task_get_cuda_nth_implementation(struct starpu_codelet *cl, unsigned nimpl)
  811. {
  812. return cl->cuda_funcs[nimpl];
  813. }
  814. starpu_opencl_func_t _starpu_task_get_opencl_nth_implementation(struct starpu_codelet *cl, unsigned nimpl)
  815. {
  816. return cl->opencl_funcs[nimpl];
  817. }
  818. void starpu_task_set_implementation(struct starpu_task *task, unsigned impl)
  819. {
  820. _starpu_get_job_associated_to_task(task)->nimpl = impl;
  821. }
  822. unsigned starpu_task_get_implementation(struct starpu_task *task)
  823. {
  824. return _starpu_get_job_associated_to_task(task)->nimpl;
  825. }
  826. starpu_mic_func_t _starpu_task_get_mic_nth_implementation(struct starpu_codelet *cl, unsigned nimpl)
  827. {
  828. return cl->mic_funcs[nimpl];
  829. }
  830. starpu_scc_func_t _starpu_task_get_scc_nth_implementation(struct starpu_codelet *cl, unsigned nimpl)
  831. {
  832. return cl->scc_funcs[nimpl];
  833. }
  834. char *_starpu_task_get_cpu_name_nth_implementation(struct starpu_codelet *cl, unsigned nimpl)
  835. {
  836. return cl->cpu_funcs_name[nimpl];
  837. }
  838. static starpu_pthread_t watchdog_thread;
  839. /* Check from times to times that StarPU does finish some tasks */
  840. static void *watchdog_func(void *foo STARPU_ATTRIBUTE_UNUSED)
  841. {
  842. struct timespec ts;
  843. char *timeout_env;
  844. unsigned long long timeout;
  845. if (! (timeout_env = getenv("STARPU_WATCHDOG_TIMEOUT")))
  846. return NULL;
  847. timeout = atoll(timeout_env);
  848. ts.tv_sec = timeout / 1000000;
  849. ts.tv_nsec = (timeout % 1000000) * 1000;
  850. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  851. STARPU_PTHREAD_MUTEX_LOCK(&config->submitted_mutex);
  852. while (_starpu_machine_is_running())
  853. {
  854. int last_nsubmitted = starpu_task_nsubmitted();
  855. config->watchdog_ok = 0;
  856. STARPU_PTHREAD_MUTEX_UNLOCK(&config->submitted_mutex);
  857. _starpu_sleep(ts);
  858. STARPU_PTHREAD_MUTEX_LOCK(&config->submitted_mutex);
  859. if (!config->watchdog_ok && last_nsubmitted
  860. && last_nsubmitted == starpu_task_nsubmitted())
  861. {
  862. fprintf(stderr,"The StarPU watchdog detected that no task finished for %u.%06us (can be configure through STARPU_WATCHDOG_TIMEOUT)\n", (unsigned)ts.tv_sec, (unsigned)ts.tv_nsec/1000);
  863. if (getenv("STARPU_WATCHDOG_CRASH"))
  864. {
  865. fprintf(stderr,"Crashing the process\n");
  866. assert(0);
  867. }
  868. else
  869. fprintf(stderr,"Set the STARPU_WATCHDOG_CRASH environment variable if you want to abort the process in such a case\n");
  870. }
  871. }
  872. STARPU_PTHREAD_MUTEX_UNLOCK(&config->submitted_mutex);
  873. return NULL;
  874. }
  875. void _starpu_watchdog_init(void)
  876. {
  877. struct _starpu_machine_config *config = (struct _starpu_machine_config *)_starpu_get_machine_config();
  878. STARPU_PTHREAD_MUTEX_INIT(&config->submitted_mutex, NULL);
  879. STARPU_PTHREAD_CREATE(&watchdog_thread, NULL, watchdog_func, NULL);
  880. }
  881. void _starpu_watchdog_shutdown(void)
  882. {
  883. starpu_pthread_join(watchdog_thread, NULL);
  884. }