task.c 51 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. * Copyright (C) 2011 Télécom-SudParis
  5. * Copyright (C) 2013 Thibaut Lambert
  6. * Copyright (C) 2016 Uppsala University
  7. * Copyright (C) 2017 Erwan Leria
  8. *
  9. * StarPU is free software; you can redistribute it and/or modify
  10. * it under the terms of the GNU Lesser General Public License as published by
  11. * the Free Software Foundation; either version 2.1 of the License, or (at
  12. * your option) any later version.
  13. *
  14. * StarPU is distributed in the hope that it will be useful, but
  15. * WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  17. *
  18. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  19. */
  20. #include <starpu.h>
  21. #include <starpu_profiling.h>
  22. #include <core/workers.h>
  23. #include <core/sched_ctx.h>
  24. #include <core/jobs.h>
  25. #include <core/task.h>
  26. #include <core/task_bundle.h>
  27. #include <core/dependencies/data_concurrency.h>
  28. #include <common/config.h>
  29. #include <common/utils.h>
  30. #include <common/fxt.h>
  31. #include <common/knobs.h>
  32. #include <datawizard/memory_nodes.h>
  33. #include <profiling/profiling.h>
  34. #include <profiling/bound.h>
  35. #include <math.h>
  36. #include <string.h>
  37. #include <core/debug.h>
  38. #include <core/sched_ctx.h>
  39. #include <time.h>
  40. #include <signal.h>
  41. #include <core/simgrid.h>
  42. #ifdef STARPU_HAVE_WINDOWS
  43. #include <windows.h>
  44. #endif
  45. /* global counters */
  46. static int __g_total_submitted;
  47. static int __g_peak_submitted;
  48. static int __g_peak_ready;
  49. /* global counter variables */
  50. int64_t _starpu_task__g_total_submitted__value;
  51. int64_t _starpu_task__g_peak_submitted__value;
  52. int64_t _starpu_task__g_current_submitted__value;
  53. int64_t _starpu_task__g_peak_ready__value;
  54. int64_t _starpu_task__g_current_ready__value;
  55. /* per-worker counters */
  56. static int __w_total_executed;
  57. static int __w_cumul_execution_time;
  58. /* per-codelet counters */
  59. static int __c_total_submitted;
  60. static int __c_peak_submitted;
  61. static int __c_peak_ready;
  62. static int __c_total_executed;
  63. static int __c_cumul_execution_time;
  64. /* - */
  65. /* per-scheduler knobs */
  66. static int __s_max_priority_cap_knob;
  67. static int __s_min_priority_cap_knob;
  68. /* knob variables */
  69. static int __s_max_priority_cap__value;
  70. static int __s_min_priority_cap__value;
  71. static struct starpu_perf_knob_group * __kg_starpu_task__per_scheduler;
  72. /* - */
  73. static void global_sample_updater(struct starpu_perf_counter_sample *sample, void *context)
  74. {
  75. STARPU_ASSERT(context == NULL); /* no context for the global updater */
  76. (void)context;
  77. _starpu_perf_counter_sample_set_int64_value(sample, __g_total_submitted, _starpu_task__g_total_submitted__value);
  78. _starpu_perf_counter_sample_set_int64_value(sample, __g_peak_submitted, _starpu_task__g_peak_submitted__value);
  79. _starpu_perf_counter_sample_set_int64_value(sample, __g_peak_ready, _starpu_task__g_peak_ready__value);
  80. }
  81. static void per_worker_sample_updater(struct starpu_perf_counter_sample *sample, void *context)
  82. {
  83. STARPU_ASSERT(context != NULL);
  84. struct _starpu_worker *worker = context;
  85. _starpu_perf_counter_sample_set_int64_value(sample, __w_total_executed, worker->__w_total_executed__value);
  86. _starpu_perf_counter_sample_set_double_value(sample, __w_cumul_execution_time, worker->__w_cumul_execution_time__value);
  87. }
  88. static void per_codelet_sample_updater(struct starpu_perf_counter_sample *sample, void *context)
  89. {
  90. STARPU_ASSERT(sample->listener != NULL && sample->listener->set != NULL);
  91. struct starpu_perf_counter_set *set = sample->listener->set;
  92. STARPU_ASSERT(set->scope == starpu_perf_counter_scope_per_codelet);
  93. STARPU_ASSERT(context != NULL);
  94. struct starpu_codelet *cl = context;
  95. _starpu_perf_counter_sample_set_int64_value(sample, __c_total_submitted, cl->perf_counter_values->task.total_submitted);
  96. _starpu_perf_counter_sample_set_int64_value(sample, __c_peak_submitted, cl->perf_counter_values->task.peak_submitted);
  97. _starpu_perf_counter_sample_set_int64_value(sample, __c_peak_ready, cl->perf_counter_values->task.peak_ready);
  98. _starpu_perf_counter_sample_set_int64_value(sample, __c_total_executed, cl->perf_counter_values->task.total_executed);
  99. _starpu_perf_counter_sample_set_double_value(sample, __c_cumul_execution_time, cl->perf_counter_values->task.cumul_execution_time);
  100. }
  101. void _starpu__task_c__register_counters(void)
  102. {
  103. {
  104. const enum starpu_perf_counter_scope scope = starpu_perf_counter_scope_global;
  105. __STARPU_PERF_COUNTER_REG("starpu.task", scope, g_total_submitted, int64, "number of tasks submitted globally (since StarPU initialization)");
  106. __STARPU_PERF_COUNTER_REG("starpu.task", scope, g_peak_submitted, int64, "maximum simultaneous number of tasks submitted and not yet ready, globally (since StarPU initialization)");
  107. __STARPU_PERF_COUNTER_REG("starpu.task", scope, g_peak_ready, int64, "maximum simultaneous number of tasks ready and not yet executing, globally (since StarPU initialization)");
  108. _starpu_perf_counter_register_updater(scope, global_sample_updater);
  109. }
  110. {
  111. const enum starpu_perf_counter_scope scope = starpu_perf_counter_scope_per_worker;
  112. __STARPU_PERF_COUNTER_REG("starpu.task", scope, w_total_executed, int64, "number of tasks executed on this worker (since StarPU initialization)");
  113. __STARPU_PERF_COUNTER_REG("starpu.task", scope, w_cumul_execution_time, double, "cumulated execution time of tasks executed on this worker (microseconds, since StarPU initialization)");
  114. _starpu_perf_counter_register_updater(scope, per_worker_sample_updater);
  115. }
  116. {
  117. const enum starpu_perf_counter_scope scope = starpu_perf_counter_scope_per_codelet;
  118. __STARPU_PERF_COUNTER_REG("starpu.task", scope, c_total_submitted, int64, "number of codelet's task instances submitted using this codelet (since enabled)");
  119. __STARPU_PERF_COUNTER_REG("starpu.task", scope, c_peak_submitted, int64, "maximum simultaneous number of codelet's task instances submitted and not yet ready (since enabled)");
  120. __STARPU_PERF_COUNTER_REG("starpu.task", scope, c_peak_ready, int64, "maximum simultaneous number of codelet's task instances ready and not yet executing (since enabled)");
  121. __STARPU_PERF_COUNTER_REG("starpu.task", scope, c_total_executed, int64, "number of codelet's task instances executed using this codelet (since enabled)");
  122. __STARPU_PERF_COUNTER_REG("starpu.task", scope, c_cumul_execution_time, double, "cumulated execution time of codelet's task instances (since enabled)");
  123. _starpu_perf_counter_register_updater(scope, per_codelet_sample_updater);
  124. }
  125. }
  126. /* - */
  127. static void sched_knobs__set(const struct starpu_perf_knob * const knob, void *context, const struct starpu_perf_knob_value * const value)
  128. {
  129. const char * const sched_policy_name = *(const char **)context;
  130. (void) sched_policy_name;
  131. if (knob->id == __s_max_priority_cap_knob)
  132. {
  133. STARPU_ASSERT(value->val_int32_t <= STARPU_MAX_PRIO);
  134. STARPU_ASSERT(value->val_int32_t >= STARPU_MIN_PRIO);
  135. STARPU_ASSERT(value->val_int32_t >= __s_min_priority_cap__value);
  136. __s_max_priority_cap__value = value->val_int32_t;
  137. }
  138. else if (knob->id == __s_min_priority_cap_knob)
  139. {
  140. STARPU_ASSERT(value->val_int32_t <= STARPU_MAX_PRIO);
  141. STARPU_ASSERT(value->val_int32_t >= STARPU_MIN_PRIO);
  142. STARPU_ASSERT(value->val_int32_t <= __s_max_priority_cap__value);
  143. __s_min_priority_cap__value = value->val_int32_t;
  144. }
  145. else
  146. {
  147. STARPU_ASSERT(0);
  148. abort();
  149. }
  150. }
  151. static void sched_knobs__get(const struct starpu_perf_knob * const knob, void *context, struct starpu_perf_knob_value * const value)
  152. {
  153. const char * const sched_policy_name = *(const char **)context;
  154. (void) sched_policy_name;
  155. if (knob->id == __s_max_priority_cap_knob)
  156. {
  157. value->val_int32_t = __s_max_priority_cap__value;
  158. }
  159. else if (knob->id == __s_min_priority_cap_knob)
  160. {
  161. value->val_int32_t = __s_min_priority_cap__value;
  162. }
  163. else
  164. {
  165. STARPU_ASSERT(0);
  166. abort();
  167. }
  168. }
  169. void _starpu__task_c__register_knobs(void)
  170. {
  171. #if 0
  172. {
  173. const enum starpu_perf_knob_scope scope = starpu_perf_knob_scope_global;
  174. __kg_starpu_global = _starpu_perf_knob_group_register(scope, global_knobs__set, global_knobs__get);
  175. }
  176. #endif
  177. #if 0
  178. {
  179. const enum starpu_perf_knob_scope scope = starpu_perf_knob_scope_per_worker;
  180. __kg_starpu_worker__per_worker = _starpu_perf_knob_group_register(scope, worker_knobs__set, worker_knobs__get);
  181. }
  182. #endif
  183. {
  184. const enum starpu_perf_knob_scope scope = starpu_perf_knob_scope_per_scheduler;
  185. __kg_starpu_task__per_scheduler = _starpu_perf_knob_group_register(scope, sched_knobs__set, sched_knobs__get);
  186. /* TODO: priority capping knobs actually work globally for now, the sched policy name is ignored */
  187. __STARPU_PERF_KNOB_REG("starpu.task", __kg_starpu_task__per_scheduler, s_max_priority_cap_knob, int32, "force task priority to this value or below (priority value)");
  188. __s_max_priority_cap__value = STARPU_MAX_PRIO;
  189. __STARPU_PERF_KNOB_REG("starpu.task", __kg_starpu_task__per_scheduler, s_min_priority_cap_knob, int32, "force task priority to this value or above (priority value)");
  190. __s_min_priority_cap__value = STARPU_MIN_PRIO;
  191. }
  192. }
  193. void _starpu__task_c__unregister_knobs(void)
  194. {
  195. _starpu_perf_knob_group_unregister(__kg_starpu_task__per_scheduler);
  196. __kg_starpu_task__per_scheduler = NULL;
  197. }
  198. /* - */
  199. /* XXX this should be reinitialized when StarPU is shutdown (or we should make
  200. * sure that no task remains !) */
  201. /* TODO we could make this hierarchical to avoid contention ? */
  202. //static starpu_pthread_cond_t submitted_cond = STARPU_PTHREAD_COND_INITIALIZER;
  203. /* This key stores the task currently handled by the thread, note that we
  204. * cannot use the worker structure to store that information because it is
  205. * possible that we have a task with a NULL codelet, which means its callback
  206. * could be executed by a user thread as well. */
  207. static starpu_pthread_key_t current_task_key;
  208. static int limit_min_submitted_tasks;
  209. static int limit_max_submitted_tasks;
  210. static int watchdog_crash;
  211. static int watchdog_delay;
  212. /*
  213. * Function to call when watchdog detects that no task has finished for more than STARPU_WATCHDOG_TIMEOUT seconds
  214. */
  215. static void (*watchdog_hook)(void *) = NULL;
  216. static void * watchdog_hook_arg = NULL;
  217. #define _STARPU_TASK_MAGIC 42
  218. /* Called once at starpu_init */
  219. void _starpu_task_init(void)
  220. {
  221. STARPU_PTHREAD_KEY_CREATE(&current_task_key, NULL);
  222. limit_min_submitted_tasks = starpu_get_env_number("STARPU_LIMIT_MIN_SUBMITTED_TASKS");
  223. limit_max_submitted_tasks = starpu_get_env_number("STARPU_LIMIT_MAX_SUBMITTED_TASKS");
  224. watchdog_crash = starpu_get_env_number("STARPU_WATCHDOG_CRASH");
  225. watchdog_delay = starpu_get_env_number_default("STARPU_WATCHDOG_DELAY", 0);
  226. }
  227. void _starpu_task_deinit(void)
  228. {
  229. STARPU_PTHREAD_KEY_DELETE(current_task_key);
  230. }
  231. void starpu_task_init(struct starpu_task *task)
  232. {
  233. /* TODO: memcpy from a template instead? benchmark it */
  234. STARPU_ASSERT(task);
  235. /* As most of the fields must be initialised at NULL, let's put 0
  236. * everywhere */
  237. memset(task, 0, sizeof(struct starpu_task));
  238. task->sequential_consistency = 1;
  239. task->where = -1;
  240. /* Now we can initialise fields which recquire custom value */
  241. /* Note: remember to update STARPU_TASK_INITIALIZER as well */
  242. #if STARPU_DEFAULT_PRIO != 0
  243. task->priority = STARPU_DEFAULT_PRIO;
  244. #endif
  245. task->detach = 1;
  246. #if STARPU_TASK_INIT != 0
  247. task->status = STARPU_TASK_INIT;
  248. #endif
  249. task->predicted = NAN;
  250. task->predicted_transfer = NAN;
  251. task->predicted_start = NAN;
  252. task->magic = _STARPU_TASK_MAGIC;
  253. task->sched_ctx = STARPU_NMAX_SCHED_CTXS;
  254. task->flops = 0.0;
  255. }
  256. /* Free all the ressources allocated for a task, without deallocating the task
  257. * structure itself (this is required for statically allocated tasks).
  258. * All values previously set by the user, like codelet and handles, remain
  259. * unchanged */
  260. void starpu_task_clean(struct starpu_task *task)
  261. {
  262. STARPU_ASSERT(task);
  263. task->magic = 0;
  264. /* If a buffer was allocated to store the profiling info, we free it. */
  265. if (task->profiling_info)
  266. {
  267. free(task->profiling_info);
  268. task->profiling_info = NULL;
  269. }
  270. /* If case the task is (still) part of a bundle */
  271. starpu_task_bundle_t bundle = task->bundle;
  272. if (bundle)
  273. starpu_task_bundle_remove(bundle, task);
  274. if (task->dyn_handles)
  275. {
  276. free(task->dyn_handles);
  277. task->dyn_handles = NULL;
  278. free(task->dyn_interfaces);
  279. task->dyn_interfaces = NULL;
  280. }
  281. if (task->dyn_modes)
  282. {
  283. free(task->dyn_modes);
  284. task->dyn_modes = NULL;
  285. }
  286. struct _starpu_job *j = (struct _starpu_job *)task->starpu_private;
  287. if (j)
  288. {
  289. _starpu_job_destroy(j);
  290. task->starpu_private = NULL;
  291. }
  292. }
  293. struct starpu_task * STARPU_ATTRIBUTE_MALLOC starpu_task_create(void)
  294. {
  295. struct starpu_task *task;
  296. _STARPU_MALLOC(task, sizeof(struct starpu_task));
  297. starpu_task_init(task);
  298. /* Dynamically allocated tasks are destroyed by default */
  299. task->destroy = 1;
  300. return task;
  301. }
  302. /* Free the ressource allocated during starpu_task_create. This function can be
  303. * called automatically after the execution of a task by setting the "destroy"
  304. * flag of the starpu_task structure (default behaviour). Calling this function
  305. * on a statically allocated task results in an undefined behaviour. */
  306. void _starpu_task_destroy(struct starpu_task *task)
  307. {
  308. /* If starpu_task_destroy is called in a callback, we just set the destroy
  309. flag. The task will be destroyed after the callback returns */
  310. if (task == starpu_task_get_current()
  311. && _starpu_get_local_worker_status() == STATUS_CALLBACK)
  312. {
  313. task->destroy = 1;
  314. }
  315. else
  316. {
  317. starpu_task_clean(task);
  318. /* TODO handle the case of task with detach = 1 and destroy = 1 */
  319. /* TODO handle the case of non terminated tasks -> assertion failure, it's too dangerous to be doing something like this */
  320. /* Does user want StarPU release cl_arg ? */
  321. if (task->cl_arg_free)
  322. free(task->cl_arg);
  323. /* Does user want StarPU release cl_ret ? */
  324. if (task->cl_ret_free)
  325. free(task->cl_ret);
  326. /* Does user want StarPU release callback_arg ? */
  327. if (task->callback_arg_free)
  328. free(task->callback_arg);
  329. /* Does user want StarPU release epilogue callback_arg ? */
  330. if (task->epilogue_callback_arg_free)
  331. free(task->epilogue_callback_arg);
  332. /* Does user want StarPU release prologue_callback_arg ? */
  333. if (task->prologue_callback_arg_free)
  334. free(task->prologue_callback_arg);
  335. /* Does user want StarPU release prologue_pop_arg ? */
  336. if (task->prologue_callback_pop_arg_free)
  337. free(task->prologue_callback_pop_arg);
  338. free(task);
  339. }
  340. }
  341. void starpu_task_destroy(struct starpu_task *task)
  342. {
  343. STARPU_ASSERT(task);
  344. STARPU_ASSERT_MSG(!task->destroy || !task->detach, "starpu_task_destroy must not be called for task with destroy = 1 and detach = 1");
  345. _starpu_task_destroy(task);
  346. }
  347. int starpu_task_finished(struct starpu_task *task)
  348. {
  349. STARPU_ASSERT(task);
  350. STARPU_ASSERT_MSG(!task->detach, "starpu_task_finished can only be called on tasks with detach = 0");
  351. return _starpu_job_finished(_starpu_get_job_associated_to_task(task));
  352. }
  353. int starpu_task_wait(struct starpu_task *task)
  354. {
  355. _STARPU_LOG_IN();
  356. STARPU_ASSERT(task);
  357. STARPU_ASSERT_MSG(!task->detach, "starpu_task_wait can only be called on tasks with detach = 0");
  358. if (task->detach || task->synchronous)
  359. {
  360. _STARPU_DEBUG("Task is detached or synchronous. Waiting returns immediately\n");
  361. _STARPU_LOG_OUT_TAG("einval");
  362. return -EINVAL;
  363. }
  364. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait must not be called from a task or callback");
  365. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  366. _STARPU_TRACE_TASK_WAIT_START(j);
  367. starpu_do_schedule();
  368. _starpu_wait_job(j);
  369. /* as this is a synchronous task, the liberation of the job
  370. structure was deferred */
  371. if (task->destroy)
  372. _starpu_task_destroy(task);
  373. _starpu_perf_counter_update_global_sample();
  374. _STARPU_TRACE_TASK_WAIT_END();
  375. _STARPU_LOG_OUT();
  376. return 0;
  377. }
  378. int starpu_task_wait_array(struct starpu_task **tasks, unsigned nb_tasks)
  379. {
  380. unsigned i;
  381. for (i = 0; i < nb_tasks; i++)
  382. {
  383. int ret = starpu_task_wait(tasks[i]);
  384. if (ret)
  385. return ret;
  386. }
  387. return 0;
  388. }
  389. #ifdef STARPU_OPENMP
  390. int _starpu_task_test_termination(struct starpu_task *task)
  391. {
  392. STARPU_ASSERT(task);
  393. STARPU_ASSERT_MSG(!task->detach, "starpu_task_wait can only be called on tasks with detach = 0");
  394. if (task->detach || task->synchronous)
  395. {
  396. _STARPU_DEBUG("Task is detached or synchronous\n");
  397. _STARPU_LOG_OUT_TAG("einval");
  398. return -EINVAL;
  399. }
  400. struct _starpu_job *j = (struct _starpu_job *)task->starpu_private;
  401. int ret = _starpu_test_job_termination(j);
  402. if (ret)
  403. {
  404. if (task->destroy)
  405. _starpu_task_destroy(task);
  406. }
  407. return ret;
  408. }
  409. #endif
  410. /* NB in case we have a regenerable task, it is possible that the job was
  411. * already counted. */
  412. int _starpu_submit_job(struct _starpu_job *j, int nodeps)
  413. {
  414. struct starpu_task *task = j->task;
  415. int ret;
  416. #ifdef STARPU_OPENMP
  417. const unsigned continuation = j->continuation;
  418. #else
  419. const unsigned continuation = 0;
  420. #endif
  421. _STARPU_LOG_IN();
  422. /* notify bound computation of a new task */
  423. _starpu_bound_record(j);
  424. _starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
  425. _starpu_sched_task_submit(task);
  426. #ifdef STARPU_USE_SC_HYPERVISOR
  427. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(j->task->sched_ctx);
  428. if(sched_ctx != NULL && j->task->sched_ctx != _starpu_get_initial_sched_ctx()->id && j->task->sched_ctx != STARPU_NMAX_SCHED_CTXS
  429. && sched_ctx->perf_counters != NULL)
  430. {
  431. struct starpu_perfmodel_arch arch;
  432. _STARPU_MALLOC(arch.devices, sizeof(struct starpu_perfmodel_device));
  433. arch.ndevices = 1;
  434. arch.devices[0].type = STARPU_CPU_WORKER;
  435. arch.devices[0].devid = 0;
  436. arch.devices[0].ncores = 1;
  437. _starpu_compute_buffers_footprint(j->task->cl->model, &arch, 0, j);
  438. free(arch.devices);
  439. size_t data_size = 0;
  440. if (j->task->cl)
  441. {
  442. unsigned i, nbuffers = STARPU_TASK_GET_NBUFFERS(j->task);
  443. for(i = 0; i < nbuffers; i++)
  444. {
  445. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  446. if (handle != NULL)
  447. data_size += _starpu_data_get_size(handle);
  448. }
  449. }
  450. _STARPU_TRACE_HYPERVISOR_BEGIN();
  451. sched_ctx->perf_counters->notify_submitted_job(j->task, j->footprint, data_size);
  452. _STARPU_TRACE_HYPERVISOR_END();
  453. }
  454. #endif//STARPU_USE_SC_HYPERVISOR
  455. /* We retain handle reference count */
  456. if (task->cl && !continuation)
  457. {
  458. unsigned i;
  459. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  460. for (i=0; i<nbuffers; i++)
  461. {
  462. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  463. _starpu_spin_lock(&handle->header_lock);
  464. handle->busy_count++;
  465. _starpu_spin_unlock(&handle->header_lock);
  466. }
  467. }
  468. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  469. _starpu_handle_job_submission(j);
  470. #ifdef STARPU_OPENMP
  471. if (continuation)
  472. {
  473. j->discontinuous = 1;
  474. j->continuation = 0;
  475. }
  476. #endif
  477. if (nodeps)
  478. {
  479. ret = _starpu_take_deps_and_schedule(j);
  480. }
  481. else
  482. {
  483. #ifdef STARPU_OPENMP
  484. if (continuation)
  485. {
  486. ret = _starpu_reenforce_task_deps_and_schedule(j);
  487. }
  488. else
  489. #endif
  490. {
  491. ret = _starpu_enforce_deps_and_schedule(j);
  492. }
  493. }
  494. _STARPU_LOG_OUT();
  495. return ret;
  496. }
  497. /* Note: this is racy, so valgrind would complain. But since we'll always put
  498. * the same values, this is not a problem. */
  499. void _starpu_codelet_check_deprecated_fields(struct starpu_codelet *cl)
  500. {
  501. if (!cl)
  502. return;
  503. if (cl->checked)
  504. {
  505. STARPU_RMB();
  506. return;
  507. }
  508. uint32_t where = cl->where;
  509. int is_where_unset = where == 0;
  510. unsigned i, some_impl;
  511. /* Check deprecated and unset fields (where, <device>_func,
  512. * <device>_funcs) */
  513. /* CPU */
  514. if (cl->cpu_func && cl->cpu_func != STARPU_MULTIPLE_CPU_IMPLEMENTATIONS && cl->cpu_funcs[0])
  515. {
  516. _STARPU_DISP("[warning] [struct starpu_codelet] both cpu_func and cpu_funcs are set. Ignoring cpu_func.\n");
  517. cl->cpu_func = STARPU_MULTIPLE_CPU_IMPLEMENTATIONS;
  518. }
  519. if (cl->cpu_func && cl->cpu_func != STARPU_MULTIPLE_CPU_IMPLEMENTATIONS)
  520. {
  521. cl->cpu_funcs[0] = cl->cpu_func;
  522. cl->cpu_func = STARPU_MULTIPLE_CPU_IMPLEMENTATIONS;
  523. }
  524. some_impl = 0;
  525. for (i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
  526. if (cl->cpu_funcs[i])
  527. {
  528. some_impl = 1;
  529. break;
  530. }
  531. if (some_impl && cl->cpu_func == 0)
  532. {
  533. cl->cpu_func = STARPU_MULTIPLE_CPU_IMPLEMENTATIONS;
  534. }
  535. if (some_impl && is_where_unset)
  536. {
  537. where |= STARPU_CPU;
  538. }
  539. /* CUDA */
  540. if (cl->cuda_func && cl->cuda_func != STARPU_MULTIPLE_CUDA_IMPLEMENTATIONS && cl->cuda_funcs[0])
  541. {
  542. _STARPU_DISP("[warning] [struct starpu_codelet] both cuda_func and cuda_funcs are set. Ignoring cuda_func.\n");
  543. cl->cuda_func = STARPU_MULTIPLE_CUDA_IMPLEMENTATIONS;
  544. }
  545. if (cl->cuda_func && cl->cuda_func != STARPU_MULTIPLE_CUDA_IMPLEMENTATIONS)
  546. {
  547. cl->cuda_funcs[0] = cl->cuda_func;
  548. cl->cuda_func = STARPU_MULTIPLE_CUDA_IMPLEMENTATIONS;
  549. }
  550. some_impl = 0;
  551. for (i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
  552. if (cl->cuda_funcs[i])
  553. {
  554. some_impl = 1;
  555. break;
  556. }
  557. if (some_impl && cl->cuda_func == 0)
  558. {
  559. cl->cuda_func = STARPU_MULTIPLE_CUDA_IMPLEMENTATIONS;
  560. }
  561. if (some_impl && is_where_unset)
  562. {
  563. where |= STARPU_CUDA;
  564. }
  565. /* OpenCL */
  566. if (cl->opencl_func && cl->opencl_func != STARPU_MULTIPLE_OPENCL_IMPLEMENTATIONS && cl->opencl_funcs[0])
  567. {
  568. _STARPU_DISP("[warning] [struct starpu_codelet] both opencl_func and opencl_funcs are set. Ignoring opencl_func.\n");
  569. cl->opencl_func = STARPU_MULTIPLE_OPENCL_IMPLEMENTATIONS;
  570. }
  571. if (cl->opencl_func && cl->opencl_func != STARPU_MULTIPLE_OPENCL_IMPLEMENTATIONS)
  572. {
  573. cl->opencl_funcs[0] = cl->opencl_func;
  574. cl->opencl_func = STARPU_MULTIPLE_OPENCL_IMPLEMENTATIONS;
  575. }
  576. some_impl = 0;
  577. for (i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
  578. if (cl->opencl_funcs[i])
  579. {
  580. some_impl = 1;
  581. break;
  582. }
  583. if (some_impl && cl->opencl_func == 0)
  584. {
  585. cl->opencl_func = STARPU_MULTIPLE_OPENCL_IMPLEMENTATIONS;
  586. }
  587. if (some_impl && is_where_unset)
  588. {
  589. where |= STARPU_OPENCL;
  590. }
  591. /* FPGA */
  592. if (cl->fpga_func && cl->fpga_func != STARPU_MULTIPLE_FPGA_IMPLEMENTATIONS && cl->fpga_funcs[0])
  593. {
  594. _STARPU_DISP("[warning] [struct starpu_codelet] both fpga_func and fpga_funcs are set. Ignoring fpga_func.\n");
  595. cl->fpga_func = STARPU_MULTIPLE_FPGA_IMPLEMENTATIONS;
  596. }
  597. if (cl->fpga_func && cl->fpga_func != STARPU_MULTIPLE_FPGA_IMPLEMENTATIONS)
  598. {
  599. cl->fpga_funcs[0] = cl->fpga_func;
  600. cl->fpga_func = STARPU_MULTIPLE_FPGA_IMPLEMENTATIONS;
  601. }
  602. some_impl = 0;
  603. for (i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
  604. if (cl->fpga_funcs[i])
  605. {
  606. some_impl = 1;
  607. break;
  608. }
  609. if (some_impl && cl->fpga_func == 0)
  610. {
  611. cl->fpga_func = STARPU_MULTIPLE_FPGA_IMPLEMENTATIONS;
  612. }
  613. if (some_impl && is_where_unset)
  614. {
  615. where |= STARPU_FPGA;
  616. }
  617. some_impl = 0;
  618. for (i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
  619. if (cl->mpi_ms_funcs[i])
  620. {
  621. some_impl = 1;
  622. break;
  623. }
  624. if (some_impl && is_where_unset)
  625. {
  626. where |= STARPU_MPI_MS;
  627. }
  628. some_impl = 0;
  629. for (i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
  630. if (cl->cpu_funcs_name[i])
  631. {
  632. some_impl = 1;
  633. break;
  634. }
  635. if (some_impl && is_where_unset)
  636. {
  637. where |= STARPU_MPI_MS;
  638. }
  639. cl->where = where;
  640. STARPU_WMB();
  641. cl->checked = 1;
  642. }
  643. void _starpu_task_check_deprecated_fields(struct starpu_task *task STARPU_ATTRIBUTE_UNUSED)
  644. {
  645. /* None any more */
  646. }
  647. static int _starpu_task_submit_head(struct starpu_task *task)
  648. {
  649. unsigned is_sync = task->synchronous;
  650. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  651. if (task->status == STARPU_TASK_STOPPED || task->status == STARPU_TASK_FINISHED)
  652. task->status = STARPU_TASK_INIT;
  653. else
  654. STARPU_ASSERT(task->status == STARPU_TASK_INIT);
  655. if (j->internal)
  656. {
  657. // Internal tasks are submitted to initial context
  658. task->sched_ctx = _starpu_get_initial_sched_ctx()->id;
  659. }
  660. else if (task->sched_ctx == STARPU_NMAX_SCHED_CTXS)
  661. {
  662. // If the task has not specified a context, we set the current context
  663. task->sched_ctx = _starpu_sched_ctx_get_current_context();
  664. }
  665. if (is_sync)
  666. {
  667. /* Perhaps it is not possible to submit a synchronous
  668. * (blocking) task */
  669. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "submitting a synchronous task must not be done from a task or a callback");
  670. task->detach = 0;
  671. }
  672. _starpu_task_check_deprecated_fields(task);
  673. _starpu_codelet_check_deprecated_fields(task->cl);
  674. if (task->where== -1 && task->cl)
  675. task->where = task->cl->where;
  676. if (task->cl)
  677. {
  678. unsigned i;
  679. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  680. _STARPU_TRACE_UPDATE_TASK_CNT(0);
  681. /* Check buffers */
  682. if (task->dyn_handles == NULL)
  683. STARPU_ASSERT_MSG(STARPU_TASK_GET_NBUFFERS(task) <= STARPU_NMAXBUFS,
  684. "Codelet %p has too many buffers (%d vs max %d). Either use --enable-maxbuffers configure option to increase the max, or use dyn_handles instead of handles.",
  685. task->cl, STARPU_TASK_GET_NBUFFERS(task), STARPU_NMAXBUFS);
  686. if (STARPU_UNLIKELY(task->dyn_handles))
  687. {
  688. _STARPU_MALLOC(task->dyn_interfaces, nbuffers * sizeof(void *));
  689. }
  690. for (i = 0; i < nbuffers; i++)
  691. {
  692. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  693. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, i);
  694. int node = task->cl->specific_nodes ? STARPU_CODELET_GET_NODE(task->cl, i) : -1;
  695. /* Make sure handles are valid */
  696. STARPU_ASSERT_MSG(handle->magic == _STARPU_TASK_MAGIC, "data %p is invalid (was it already unregistered?)", handle);
  697. /* Make sure handles are not partitioned */
  698. STARPU_ASSERT_MSG(handle->nchildren == 0, "only unpartitioned data (or the pieces of a partitioned data) can be used in a task");
  699. /* Make sure the specified node exists */
  700. STARPU_ASSERT_MSG(node == STARPU_SPECIFIC_NODE_LOCAL || node == STARPU_SPECIFIC_NODE_CPU || node == STARPU_SPECIFIC_NODE_SLOW || node == STARPU_SPECIFIC_NODE_LOCAL_OR_CPU || (node >= 0 && node < (int) starpu_memory_nodes_get_count()), "The codelet-specified memory node does not exist");
  701. /* Provide the home interface for now if any,
  702. * for can_execute hooks */
  703. if (handle->home_node != -1)
  704. _STARPU_TASK_SET_INTERFACE(task, starpu_data_get_interface_on_node(handle, handle->home_node), i);
  705. if (!(task->cl->flags & STARPU_CODELET_NOPLANS) &&
  706. ((handle->nplans && !handle->nchildren) || handle->siblings)
  707. && handle->partition_automatic_disabled == 0
  708. )
  709. /* This handle is involved with asynchronous
  710. * partitioning as a parent or a child, make
  711. * sure the right plan is active, submit
  712. * appropiate partitioning / unpartitioning if
  713. * not */
  714. _starpu_data_partition_access_submit(handle, (mode & STARPU_W) != 0);
  715. }
  716. /* Check the type of worker(s) required by the task exist */
  717. if (STARPU_UNLIKELY(!_starpu_worker_exists(task)))
  718. {
  719. _STARPU_LOG_OUT_TAG("ENODEV");
  720. return -ENODEV;
  721. }
  722. /* In case we require that a task should be explicitely
  723. * executed on a specific worker, we make sure that the worker
  724. * is able to execute this task. */
  725. if (STARPU_UNLIKELY(task->execute_on_a_specific_worker && !starpu_combined_worker_can_execute_task(task->workerid, task, 0)))
  726. {
  727. _STARPU_LOG_OUT_TAG("ENODEV");
  728. return -ENODEV;
  729. }
  730. if (task->cl->model)
  731. _starpu_init_and_load_perfmodel(task->cl->model);
  732. if (task->cl->energy_model)
  733. _starpu_init_and_load_perfmodel(task->cl->energy_model);
  734. }
  735. return 0;
  736. }
  737. /* application should submit new tasks to StarPU through this function */
  738. int _starpu_task_submit(struct starpu_task *task, int nodeps)
  739. {
  740. _STARPU_LOG_IN();
  741. STARPU_ASSERT(task);
  742. STARPU_ASSERT_MSG(task->magic == _STARPU_TASK_MAGIC, "Tasks must be created with starpu_task_create, or initialized with starpu_task_init.");
  743. STARPU_ASSERT_MSG(starpu_is_initialized(), "starpu_init must be called (and return no error) before submitting tasks.");
  744. int ret;
  745. {
  746. /* task knobs */
  747. if (task->priority > __s_max_priority_cap__value)
  748. task->priority = __s_max_priority_cap__value;
  749. if (task->priority < __s_min_priority_cap__value)
  750. task->priority = __s_min_priority_cap__value;
  751. }
  752. unsigned is_sync = task->synchronous;
  753. starpu_task_bundle_t bundle = task->bundle;
  754. STARPU_ASSERT_MSG(!(nodeps && bundle), "not supported\n");
  755. /* internally, StarPU manipulates a struct _starpu_job * which is a wrapper around a
  756. * task structure, it is possible that this job structure was already
  757. * allocated. */
  758. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  759. const unsigned continuation =
  760. #ifdef STARPU_OPENMP
  761. j->continuation
  762. #else
  763. 0
  764. #endif
  765. ;
  766. if (!_starpu_perf_counter_paused() && !j->internal && !continuation)
  767. {
  768. (void) STARPU_ATOMIC_ADD64(&_starpu_task__g_total_submitted__value, 1);
  769. int64_t value = STARPU_ATOMIC_ADD64(&_starpu_task__g_current_submitted__value, 1);
  770. _starpu_perf_counter_update_max_int64(&_starpu_task__g_peak_submitted__value, value);
  771. _starpu_perf_counter_update_global_sample();
  772. if (task->cl && task->cl->perf_counter_values)
  773. {
  774. struct starpu_perf_counter_sample_cl_values * const pcv = task->cl->perf_counter_values;
  775. (void) STARPU_ATOMIC_ADD64(&pcv->task.total_submitted, 1);
  776. value = STARPU_ATOMIC_ADD64(&pcv->task.current_submitted, 1);
  777. _starpu_perf_counter_update_max_int64(&pcv->task.peak_submitted, value);
  778. _starpu_perf_counter_update_per_codelet_sample(task->cl);
  779. }
  780. }
  781. STARPU_ASSERT_MSG(!(nodeps && continuation), "not supported\n");
  782. if (!j->internal)
  783. {
  784. int nsubmitted_tasks = starpu_task_nsubmitted();
  785. if (limit_max_submitted_tasks >= 0 && limit_max_submitted_tasks < nsubmitted_tasks
  786. && limit_min_submitted_tasks >= 0 && limit_min_submitted_tasks < nsubmitted_tasks)
  787. {
  788. starpu_do_schedule();
  789. _STARPU_TRACE_TASK_THROTTLE_START();
  790. starpu_task_wait_for_n_submitted(limit_min_submitted_tasks);
  791. _STARPU_TRACE_TASK_THROTTLE_END();
  792. }
  793. }
  794. _STARPU_TRACE_TASK_SUBMIT_START();
  795. ret = _starpu_task_submit_head(task);
  796. if (ret)
  797. {
  798. _STARPU_TRACE_TASK_SUBMIT_END();
  799. return ret;
  800. }
  801. if (!continuation)
  802. {
  803. STARPU_ASSERT_MSG(!j->submitted || j->terminated >= 1, "Tasks can not be submitted a second time before being terminated. Please use different task structures, or use the regenerate flag to let the task resubmit itself automatically.");
  804. _STARPU_TRACE_TASK_SUBMIT(j,
  805. _starpu_get_sched_ctx_struct(task->sched_ctx)->iterations[0],
  806. _starpu_get_sched_ctx_struct(task->sched_ctx)->iterations[1]);
  807. _STARPU_TRACE_TASK_NAME(j);
  808. _STARPU_TRACE_TASK_LINE(j);
  809. }
  810. /* If this is a continuation, we don't modify the implicit data dependencies detected earlier. */
  811. if (task->cl && !continuation)
  812. {
  813. _starpu_job_set_ordered_buffers(j);
  814. if (!nodeps)
  815. _starpu_detect_implicit_data_deps(task);
  816. }
  817. if (STARPU_UNLIKELY(bundle))
  818. {
  819. /* We need to make sure that models for other tasks of the
  820. * bundle are also loaded, so the scheduler can estimate the
  821. * duration of the whole bundle */
  822. STARPU_PTHREAD_MUTEX_LOCK(&bundle->mutex);
  823. struct _starpu_task_bundle_entry *entry;
  824. entry = bundle->list;
  825. while (entry)
  826. {
  827. if (entry->task->cl->model)
  828. _starpu_init_and_load_perfmodel(entry->task->cl->model);
  829. if (entry->task->cl->energy_model)
  830. _starpu_init_and_load_perfmodel(entry->task->cl->energy_model);
  831. entry = entry->next;
  832. }
  833. STARPU_PTHREAD_MUTEX_UNLOCK(&bundle->mutex);
  834. }
  835. /* If profiling is activated, we allocate a structure to store the
  836. * appropriate info. */
  837. struct starpu_profiling_task_info *info = task->profiling_info;
  838. int profiling = starpu_profiling_status_get();
  839. if (!info)
  840. {
  841. info = _starpu_allocate_profiling_info_if_needed(task);
  842. task->profiling_info = info;
  843. }
  844. /* The task is considered as block until we are sure there remains not
  845. * dependency. */
  846. task->status = STARPU_TASK_BLOCKED;
  847. if (STARPU_UNLIKELY(profiling))
  848. _starpu_clock_gettime(&info->submit_time);
  849. ret = _starpu_submit_job(j, nodeps);
  850. #ifdef STARPU_SIMGRID
  851. if (_starpu_simgrid_task_submit_cost())
  852. starpu_sleep(0.000001);
  853. #endif
  854. if (is_sync)
  855. {
  856. _starpu_sched_do_schedule(task->sched_ctx);
  857. _starpu_wait_job(j);
  858. if (task->destroy)
  859. _starpu_task_destroy(task);
  860. }
  861. _STARPU_TRACE_TASK_SUBMIT_END();
  862. _STARPU_LOG_OUT();
  863. return ret;
  864. }
  865. #undef starpu_task_submit
  866. int starpu_task_submit(struct starpu_task *task)
  867. {
  868. return _starpu_task_submit(task, 0);
  869. }
  870. int _starpu_task_submit_internally(struct starpu_task *task)
  871. {
  872. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  873. j->internal = 1;
  874. return starpu_task_submit(task);
  875. }
  876. /* application should submit new tasks to StarPU through this function */
  877. int starpu_task_submit_to_ctx(struct starpu_task *task, unsigned sched_ctx_id)
  878. {
  879. task->sched_ctx = sched_ctx_id;
  880. return starpu_task_submit(task);
  881. }
  882. /* The StarPU core can submit tasks directly to the scheduler or a worker,
  883. * skipping dependencies completely (when it knows what it is doing). */
  884. int starpu_task_submit_nodeps(struct starpu_task *task)
  885. {
  886. return _starpu_task_submit(task, 1);
  887. }
  888. /*
  889. * worker->sched_mutex must be locked when calling this function.
  890. */
  891. int _starpu_task_submit_conversion_task(struct starpu_task *task,
  892. unsigned int workerid)
  893. {
  894. int ret;
  895. STARPU_ASSERT(task->cl);
  896. STARPU_ASSERT(task->execute_on_a_specific_worker);
  897. ret = _starpu_task_submit_head(task);
  898. STARPU_ASSERT(ret == 0);
  899. /* We retain handle reference count that would have been acquired by data dependencies. */
  900. unsigned i;
  901. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  902. for (i=0; i<nbuffers; i++)
  903. {
  904. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, i);
  905. _starpu_spin_lock(&handle->header_lock);
  906. handle->busy_count++;
  907. _starpu_spin_unlock(&handle->header_lock);
  908. }
  909. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  910. _starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
  911. _starpu_sched_task_submit(task);
  912. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  913. _starpu_handle_job_submission(j);
  914. _starpu_increment_nready_tasks_of_sched_ctx(j->task->sched_ctx, j->task->flops, j->task);
  915. _starpu_job_set_ordered_buffers(j);
  916. STARPU_ASSERT(task->status == STARPU_TASK_INIT);
  917. task->status = STARPU_TASK_READY;
  918. _starpu_profiling_set_task_push_start_time(task);
  919. unsigned node = starpu_worker_get_memory_node(workerid);
  920. if (starpu_get_prefetch_flag())
  921. starpu_prefetch_task_input_on_node(task, node);
  922. struct _starpu_worker *worker;
  923. worker = _starpu_get_worker_struct(workerid);
  924. starpu_task_prio_list_push_back(&worker->local_tasks, task);
  925. starpu_wake_worker_locked(worker->workerid);
  926. _starpu_profiling_set_task_push_end_time(task);
  927. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  928. return 0;
  929. }
  930. void starpu_codelet_init(struct starpu_codelet *cl)
  931. {
  932. memset(cl, 0, sizeof(struct starpu_codelet));
  933. }
  934. #define _STARPU_CODELET_WORKER_NAME_LEN 32
  935. void starpu_codelet_display_stats(struct starpu_codelet *cl)
  936. {
  937. unsigned worker;
  938. unsigned nworkers = starpu_worker_get_count();
  939. if (cl->name)
  940. fprintf(stderr, "Statistics for codelet %s\n", cl->name);
  941. else if (cl->model && cl->model->symbol)
  942. fprintf(stderr, "Statistics for codelet %s\n", cl->model->symbol);
  943. unsigned long total = 0;
  944. for (worker = 0; worker < nworkers; worker++)
  945. total += cl->per_worker_stats[worker];
  946. for (worker = 0; worker < nworkers; worker++)
  947. {
  948. char name[_STARPU_CODELET_WORKER_NAME_LEN];
  949. starpu_worker_get_name(worker, name, _STARPU_CODELET_WORKER_NAME_LEN);
  950. fprintf(stderr, "\t%s -> %lu / %lu (%2.2f %%)\n", name, cl->per_worker_stats[worker], total, (100.0f*cl->per_worker_stats[worker])/total);
  951. }
  952. }
  953. /*
  954. * We wait for all the tasks that have already been submitted. Note that a
  955. * regenerable is not considered finished until it was explicitely set as
  956. * non-regenerale anymore (eg. from a callback).
  957. */
  958. int _starpu_task_wait_for_all_and_return_nb_waited_tasks(void)
  959. {
  960. unsigned nsched_ctxs = _starpu_get_nsched_ctxs();
  961. unsigned sched_ctx_id = nsched_ctxs == 1 ? 0 : starpu_sched_ctx_get_context();
  962. /* if there is no indication about which context to wait,
  963. we wait for all tasks submitted to starpu */
  964. if (sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  965. {
  966. _STARPU_DEBUG("Waiting for all tasks\n");
  967. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait_for_all must not be called from a task or callback");
  968. STARPU_AYU_BARRIER();
  969. struct _starpu_machine_config *config = _starpu_get_machine_config();
  970. if(config->topology.nsched_ctxs == 1)
  971. {
  972. _starpu_sched_do_schedule(0);
  973. return _starpu_task_wait_for_all_in_ctx_and_return_nb_waited_tasks(0);
  974. }
  975. else
  976. {
  977. int s;
  978. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  979. {
  980. if(config->sched_ctxs[s].do_schedule == 1)
  981. {
  982. _starpu_sched_do_schedule(config->sched_ctxs[s].id);
  983. }
  984. }
  985. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  986. {
  987. if(config->sched_ctxs[s].do_schedule == 1)
  988. {
  989. starpu_task_wait_for_all_in_ctx(config->sched_ctxs[s].id);
  990. }
  991. }
  992. return 0;
  993. }
  994. }
  995. else
  996. {
  997. _starpu_sched_do_schedule(sched_ctx_id);
  998. _STARPU_DEBUG("Waiting for tasks submitted to context %u\n", sched_ctx_id);
  999. return _starpu_task_wait_for_all_in_ctx_and_return_nb_waited_tasks(sched_ctx_id);
  1000. }
  1001. }
  1002. int starpu_task_wait_for_all(void)
  1003. {
  1004. _starpu_task_wait_for_all_and_return_nb_waited_tasks();
  1005. if (!_starpu_perf_counter_paused())
  1006. _starpu_perf_counter_update_global_sample();
  1007. return 0;
  1008. }
  1009. int _starpu_task_wait_for_all_in_ctx_and_return_nb_waited_tasks(unsigned sched_ctx)
  1010. {
  1011. _STARPU_TRACE_TASK_WAIT_FOR_ALL_START();
  1012. int ret = _starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx);
  1013. _STARPU_TRACE_TASK_WAIT_FOR_ALL_END();
  1014. /* TODO: improve Temanejo into knowing about contexts ... */
  1015. STARPU_AYU_BARRIER();
  1016. return ret;
  1017. }
  1018. int starpu_task_wait_for_all_in_ctx(unsigned sched_ctx)
  1019. {
  1020. _starpu_task_wait_for_all_in_ctx_and_return_nb_waited_tasks(sched_ctx);
  1021. if (!_starpu_perf_counter_paused())
  1022. _starpu_perf_counter_update_global_sample();
  1023. return 0;
  1024. }
  1025. /*
  1026. * We wait until there's a certain number of the tasks that have already been
  1027. * submitted left. Note that a regenerable is not considered finished until it
  1028. * was explicitely set as non-regenerale anymore (eg. from a callback).
  1029. */
  1030. int starpu_task_wait_for_n_submitted(unsigned n)
  1031. {
  1032. unsigned nsched_ctxs = _starpu_get_nsched_ctxs();
  1033. unsigned sched_ctx_id = nsched_ctxs == 1 ? 0 : starpu_sched_ctx_get_context();
  1034. /* if there is no indication about which context to wait,
  1035. we wait for all tasks submitted to starpu */
  1036. if (sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  1037. {
  1038. _STARPU_DEBUG("Waiting for all tasks\n");
  1039. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait_for_n_submitted must not be called from a task or callback");
  1040. struct _starpu_machine_config *config = _starpu_get_machine_config();
  1041. if(config->topology.nsched_ctxs == 1)
  1042. _starpu_wait_for_n_submitted_tasks_of_sched_ctx(0, n);
  1043. else
  1044. {
  1045. int s;
  1046. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  1047. {
  1048. if(config->sched_ctxs[s].do_schedule == 1)
  1049. {
  1050. _starpu_wait_for_n_submitted_tasks_of_sched_ctx(config->sched_ctxs[s].id, n);
  1051. }
  1052. }
  1053. }
  1054. }
  1055. else
  1056. {
  1057. _STARPU_DEBUG("Waiting for tasks submitted to context %u\n", sched_ctx_id);
  1058. _starpu_wait_for_n_submitted_tasks_of_sched_ctx(sched_ctx_id, n);
  1059. }
  1060. if (!_starpu_perf_counter_paused())
  1061. _starpu_perf_counter_update_global_sample();
  1062. return 0;
  1063. }
  1064. int starpu_task_wait_for_n_submitted_in_ctx(unsigned sched_ctx, unsigned n)
  1065. {
  1066. _starpu_wait_for_n_submitted_tasks_of_sched_ctx(sched_ctx, n);
  1067. if (!_starpu_perf_counter_paused())
  1068. _starpu_perf_counter_update_global_sample();
  1069. return 0;
  1070. }
  1071. /*
  1072. * We wait until there is no ready task any more (i.e. StarPU will not be able
  1073. * to progress any more).
  1074. */
  1075. int starpu_task_wait_for_no_ready(void)
  1076. {
  1077. STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_task_wait_for_no_ready must not be called from a task or callback");
  1078. struct _starpu_machine_config *config = _starpu_get_machine_config();
  1079. if(config->topology.nsched_ctxs == 1)
  1080. {
  1081. _starpu_sched_do_schedule(0);
  1082. _starpu_wait_for_no_ready_of_sched_ctx(0);
  1083. }
  1084. else
  1085. {
  1086. int s;
  1087. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  1088. {
  1089. if(config->sched_ctxs[s].do_schedule == 1)
  1090. {
  1091. _starpu_sched_do_schedule(config->sched_ctxs[s].id);
  1092. }
  1093. }
  1094. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  1095. {
  1096. if(config->sched_ctxs[s].do_schedule == 1)
  1097. {
  1098. _starpu_wait_for_no_ready_of_sched_ctx(config->sched_ctxs[s].id);
  1099. }
  1100. }
  1101. }
  1102. if (!_starpu_perf_counter_paused())
  1103. _starpu_perf_counter_update_global_sample();
  1104. return 0;
  1105. }
  1106. void starpu_iteration_push(unsigned long iteration)
  1107. {
  1108. struct _starpu_sched_ctx *ctx = _starpu_get_sched_ctx_struct(_starpu_sched_ctx_get_current_context());
  1109. unsigned level = ctx->iteration_level++;
  1110. if (level < sizeof(ctx->iterations)/sizeof(ctx->iterations[0]))
  1111. ctx->iterations[level] = iteration;
  1112. }
  1113. void starpu_iteration_pop(void)
  1114. {
  1115. struct _starpu_sched_ctx *ctx = _starpu_get_sched_ctx_struct(_starpu_sched_ctx_get_current_context());
  1116. STARPU_ASSERT_MSG(ctx->iteration_level > 0, "calls to starpu_iteration_pop must match starpu_iteration_push calls");
  1117. unsigned level = ctx->iteration_level--;
  1118. if (level < sizeof(ctx->iterations)/sizeof(ctx->iterations[0]))
  1119. ctx->iterations[level] = -1;
  1120. }
  1121. void starpu_do_schedule(void)
  1122. {
  1123. struct _starpu_machine_config *config = _starpu_get_machine_config();
  1124. if(config->topology.nsched_ctxs == 1)
  1125. _starpu_sched_do_schedule(0);
  1126. else
  1127. {
  1128. int s;
  1129. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  1130. {
  1131. if(config->sched_ctxs[s].do_schedule == 1)
  1132. {
  1133. _starpu_sched_do_schedule(config->sched_ctxs[s].id);
  1134. }
  1135. }
  1136. }
  1137. }
  1138. void
  1139. starpu_drivers_request_termination(void)
  1140. {
  1141. struct _starpu_machine_config *config = _starpu_get_machine_config();
  1142. STARPU_PTHREAD_MUTEX_LOCK(&config->submitted_mutex);
  1143. int nsubmitted = starpu_task_nsubmitted();
  1144. config->submitting = 0;
  1145. if (nsubmitted == 0)
  1146. {
  1147. ANNOTATE_HAPPENS_AFTER(&config->running);
  1148. config->running = 0;
  1149. ANNOTATE_HAPPENS_BEFORE(&config->running);
  1150. STARPU_WMB();
  1151. int s;
  1152. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  1153. {
  1154. if(config->sched_ctxs[s].do_schedule == 1)
  1155. {
  1156. _starpu_check_nsubmitted_tasks_of_sched_ctx(config->sched_ctxs[s].id);
  1157. }
  1158. }
  1159. }
  1160. STARPU_PTHREAD_MUTEX_UNLOCK(&config->submitted_mutex);
  1161. }
  1162. int starpu_task_nsubmitted(void)
  1163. {
  1164. int nsubmitted = 0;
  1165. struct _starpu_machine_config *config = _starpu_get_machine_config();
  1166. if(config->topology.nsched_ctxs == 1)
  1167. nsubmitted = _starpu_get_nsubmitted_tasks_of_sched_ctx(0);
  1168. else
  1169. {
  1170. int s;
  1171. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  1172. {
  1173. if(config->sched_ctxs[s].do_schedule == 1)
  1174. {
  1175. nsubmitted += _starpu_get_nsubmitted_tasks_of_sched_ctx(config->sched_ctxs[s].id);
  1176. }
  1177. }
  1178. }
  1179. return nsubmitted;
  1180. }
  1181. int starpu_task_nready(void)
  1182. {
  1183. int nready = 0;
  1184. struct _starpu_machine_config *config = _starpu_get_machine_config();
  1185. if(config->topology.nsched_ctxs == 1)
  1186. nready = starpu_sched_ctx_get_nready_tasks(0);
  1187. else
  1188. {
  1189. int s;
  1190. for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
  1191. {
  1192. if(config->sched_ctxs[s].do_schedule == 1)
  1193. {
  1194. nready += starpu_sched_ctx_get_nready_tasks(config->sched_ctxs[s].id);
  1195. }
  1196. }
  1197. }
  1198. return nready;
  1199. }
  1200. /* Return the task currently executed by the worker, or NULL if this is called
  1201. * either from a thread that is not a task or simply because there is no task
  1202. * being executed at the moment. */
  1203. struct starpu_task *starpu_task_get_current(void)
  1204. {
  1205. return (struct starpu_task *) STARPU_PTHREAD_GETSPECIFIC(current_task_key);
  1206. }
  1207. void _starpu_set_current_task(struct starpu_task *task)
  1208. {
  1209. STARPU_PTHREAD_SETSPECIFIC(current_task_key, task);
  1210. }
  1211. int starpu_task_get_current_data_node(unsigned i)
  1212. {
  1213. struct starpu_task *task = starpu_task_get_current();
  1214. if (!task)
  1215. return -1;
  1216. struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
  1217. struct _starpu_data_descr *descrs = _STARPU_JOB_GET_ORDERED_BUFFERS(j);
  1218. unsigned orderedindex = descrs[i].orderedindex;
  1219. return descrs[orderedindex].node;
  1220. }
  1221. #ifdef STARPU_OPENMP
  1222. /* Prepare the fields of the currentl task for accepting a new set of
  1223. * dependencies in anticipation of becoming a continuation.
  1224. *
  1225. * When the task becomes 'continued', it will only be queued again when the new
  1226. * set of dependencies is fulfilled. */
  1227. void _starpu_task_prepare_for_continuation(void)
  1228. {
  1229. _starpu_job_prepare_for_continuation(_starpu_get_job_associated_to_task(starpu_task_get_current()));
  1230. }
  1231. void _starpu_task_prepare_for_continuation_ext(unsigned continuation_resubmit,
  1232. void (*continuation_callback_on_sleep)(void *arg), void *continuation_callback_on_sleep_arg)
  1233. {
  1234. _starpu_job_prepare_for_continuation_ext(_starpu_get_job_associated_to_task(starpu_task_get_current()),
  1235. continuation_resubmit, continuation_callback_on_sleep, continuation_callback_on_sleep_arg);
  1236. }
  1237. void _starpu_task_set_omp_cleanup_callback(struct starpu_task *task, void (*omp_cleanup_callback)(void *arg), void *omp_cleanup_callback_arg)
  1238. {
  1239. _starpu_job_set_omp_cleanup_callback(_starpu_get_job_associated_to_task(task),
  1240. omp_cleanup_callback, omp_cleanup_callback_arg);
  1241. }
  1242. #endif
  1243. /*
  1244. * Returns 0 if tasks does not use any multiformat handle, 1 otherwise.
  1245. */
  1246. int
  1247. _starpu_task_uses_multiformat_handles(struct starpu_task *task)
  1248. {
  1249. unsigned i;
  1250. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  1251. for (i = 0; i < nbuffers; i++)
  1252. {
  1253. if (_starpu_data_is_multiformat_handle(STARPU_TASK_GET_HANDLE(task, i)))
  1254. return 1;
  1255. }
  1256. return 0;
  1257. }
  1258. /*
  1259. * Checks whether the given handle needs to be converted in order to be used on
  1260. * the node given as the second argument.
  1261. */
  1262. int
  1263. _starpu_handle_needs_conversion_task(starpu_data_handle_t handle,
  1264. unsigned int node)
  1265. {
  1266. return _starpu_handle_needs_conversion_task_for_arch(handle, starpu_node_get_kind(node));
  1267. }
  1268. int
  1269. _starpu_handle_needs_conversion_task_for_arch(starpu_data_handle_t handle,
  1270. enum starpu_node_kind node_kind)
  1271. {
  1272. /*
  1273. * Here, we assume that CUDA devices and OpenCL devices use the
  1274. * same data structure. A conversion is only needed when moving
  1275. * data from a CPU to a GPU, or the other way around.
  1276. */
  1277. switch (node_kind)
  1278. {
  1279. case STARPU_CPU_RAM:
  1280. case STARPU_MPI_MS_RAM:
  1281. switch(starpu_node_get_kind(handle->mf_node))
  1282. {
  1283. case STARPU_CPU_RAM:
  1284. case STARPU_MPI_MS_RAM:
  1285. return 0;
  1286. default:
  1287. return 1;
  1288. }
  1289. break;
  1290. default:
  1291. switch(starpu_node_get_kind(handle->mf_node))
  1292. {
  1293. case STARPU_CPU_RAM:
  1294. case STARPU_MPI_MS_RAM:
  1295. return 1;
  1296. default:
  1297. return 0;
  1298. }
  1299. break;
  1300. }
  1301. /* that instruction should never be reached */
  1302. return -EINVAL;
  1303. }
  1304. void starpu_task_set_implementation(struct starpu_task *task, unsigned impl)
  1305. {
  1306. _starpu_get_job_associated_to_task(task)->nimpl = impl;
  1307. }
  1308. unsigned starpu_task_get_implementation(struct starpu_task *task)
  1309. {
  1310. return _starpu_get_job_associated_to_task(task)->nimpl;
  1311. }
  1312. unsigned long starpu_task_get_job_id(struct starpu_task *task)
  1313. {
  1314. return _starpu_get_job_associated_to_task(task)->job_id;
  1315. }
  1316. static starpu_pthread_t watchdog_thread;
  1317. static int sleep_some(float timeout)
  1318. {
  1319. /* If we do a sleep(timeout), we might have to wait too long at the end of the computation. */
  1320. /* To avoid that, we do several sleep() of 1s (and check after each if starpu is still running) */
  1321. float t;
  1322. for (t = timeout ; t > 1.; t--)
  1323. {
  1324. starpu_sleep(1.);
  1325. if (!_starpu_machine_is_running())
  1326. /* Application finished, don't bother finishing the sleep */
  1327. return 0;
  1328. }
  1329. /* and one final sleep (of less than 1 s) with the rest (if needed) */
  1330. if (t > 0.)
  1331. starpu_sleep(t);
  1332. return 1;
  1333. }
  1334. /* Check from times to times that StarPU does finish some tasks */
  1335. static void *watchdog_func(void *arg)
  1336. {
  1337. char *timeout_env = arg;
  1338. float timeout, delay;
  1339. #ifdef _MSC_VER
  1340. timeout = ((float) _atoi64(timeout_env)) / 1000000;
  1341. #else
  1342. timeout = ((float) atoll(timeout_env)) / 1000000;
  1343. #endif
  1344. delay = ((float) watchdog_delay) / 1000000;
  1345. struct _starpu_machine_config *config = _starpu_get_machine_config();
  1346. starpu_pthread_setname("watchdog");
  1347. if (!sleep_some(delay))
  1348. return NULL;
  1349. STARPU_PTHREAD_MUTEX_LOCK(&config->submitted_mutex);
  1350. while (_starpu_machine_is_running())
  1351. {
  1352. int last_nsubmitted = starpu_task_nsubmitted();
  1353. config->watchdog_ok = 0;
  1354. STARPU_PTHREAD_MUTEX_UNLOCK(&config->submitted_mutex);
  1355. if (!sleep_some(timeout))
  1356. return NULL;
  1357. STARPU_PTHREAD_MUTEX_LOCK(&config->submitted_mutex);
  1358. if (!config->watchdog_ok && last_nsubmitted
  1359. && last_nsubmitted == starpu_task_nsubmitted())
  1360. {
  1361. if (watchdog_hook == NULL)
  1362. _STARPU_MSG("The StarPU watchdog detected that no task finished for %fs (can be configured through STARPU_WATCHDOG_TIMEOUT)\n",
  1363. timeout);
  1364. else
  1365. watchdog_hook(watchdog_hook_arg);
  1366. if (watchdog_crash)
  1367. {
  1368. _STARPU_MSG("Crashing the process\n");
  1369. raise(SIGABRT);
  1370. }
  1371. else if (watchdog_hook == NULL)
  1372. _STARPU_MSG("Set the STARPU_WATCHDOG_CRASH environment variable if you want to abort the process in such a case\n");
  1373. }
  1374. /* Only shout again after another period */
  1375. config->watchdog_ok = 1;
  1376. }
  1377. STARPU_PTHREAD_MUTEX_UNLOCK(&config->submitted_mutex);
  1378. return NULL;
  1379. }
  1380. void starpu_task_watchdog_set_hook(void (*hook)(void *), void *hook_arg)
  1381. {
  1382. watchdog_hook = hook;
  1383. watchdog_hook_arg = hook_arg;
  1384. }
  1385. void _starpu_watchdog_init()
  1386. {
  1387. struct _starpu_machine_config *config = _starpu_get_machine_config();
  1388. char *timeout_env = starpu_getenv("STARPU_WATCHDOG_TIMEOUT");
  1389. STARPU_PTHREAD_MUTEX_INIT(&config->submitted_mutex, NULL);
  1390. if (!timeout_env)
  1391. return;
  1392. STARPU_PTHREAD_CREATE(&watchdog_thread, NULL, watchdog_func, timeout_env);
  1393. }
  1394. void _starpu_watchdog_shutdown(void)
  1395. {
  1396. char *timeout_env = starpu_getenv("STARPU_WATCHDOG_TIMEOUT");
  1397. if (!timeout_env)
  1398. return;
  1399. STARPU_PTHREAD_JOIN(watchdog_thread, NULL);
  1400. }
  1401. static void _starpu_ft_check_support(const struct starpu_task *task)
  1402. {
  1403. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  1404. unsigned i;
  1405. for (i = 0; i < nbuffers; i++)
  1406. {
  1407. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, i);
  1408. STARPU_ASSERT_MSG (mode == STARPU_R || mode == STARPU_W,
  1409. "starpu_task_failed is only supported for tasks with access modes STARPU_R and STARPU_W");
  1410. }
  1411. }
  1412. struct starpu_task *starpu_task_ft_create_retry
  1413. (const struct starpu_task *meta_task, const struct starpu_task *template_task, void (*check_ft)(void *))
  1414. {
  1415. /* Create a new task to actually perform the result */
  1416. struct starpu_task *new_task = starpu_task_create();
  1417. *new_task = *template_task;
  1418. new_task->prologue_callback_func = NULL;
  1419. /* XXX: cl_arg needs to be duplicated */
  1420. STARPU_ASSERT_MSG(!meta_task->cl_arg_free || !meta_task->cl_arg, "not supported yet");
  1421. STARPU_ASSERT_MSG(!meta_task->callback_func, "not supported");
  1422. new_task->callback_func = check_ft;
  1423. new_task->callback_arg = (void*) meta_task;
  1424. new_task->callback_arg_free = 0;
  1425. new_task->prologue_callback_arg_free = 0;
  1426. STARPU_ASSERT_MSG(!new_task->prologue_callback_pop_arg_free, "not supported");
  1427. new_task->use_tag = 0;
  1428. new_task->synchronous = 0;
  1429. new_task->destroy = 1;
  1430. new_task->regenerate = 0;
  1431. new_task->no_submitorder = 1;
  1432. new_task->failed = 0;
  1433. new_task->scheduled = 0;
  1434. new_task->prefetched = 0;
  1435. new_task->status = STARPU_TASK_INIT;
  1436. new_task->profiling_info = NULL;
  1437. new_task->prev = NULL;
  1438. new_task->next = NULL;
  1439. new_task->starpu_private = NULL;
  1440. new_task->omp_task = NULL;
  1441. return new_task;
  1442. }
  1443. static void _starpu_default_check_ft(void *arg)
  1444. {
  1445. struct starpu_task *meta_task = arg;
  1446. struct starpu_task *current_task = starpu_task_get_current();
  1447. struct starpu_task *new_task;
  1448. int ret;
  1449. if (!current_task->failed)
  1450. {
  1451. starpu_task_ft_success(meta_task);
  1452. return;
  1453. }
  1454. new_task = starpu_task_ft_create_retry
  1455. (meta_task, current_task, _starpu_default_check_ft);
  1456. ret = starpu_task_submit_nodeps(new_task);
  1457. STARPU_ASSERT(!ret);
  1458. }
  1459. void starpu_task_ft_prologue(void *arg)
  1460. {
  1461. struct starpu_task *meta_task = starpu_task_get_current();
  1462. struct starpu_task *new_task;
  1463. void (*check_ft)(void*) = arg;
  1464. int ret;
  1465. if (!check_ft)
  1466. check_ft = _starpu_default_check_ft;
  1467. /* Create a task which will do the actual computation */
  1468. new_task = starpu_task_ft_create_retry
  1469. (meta_task, meta_task, check_ft);
  1470. ret = starpu_task_submit_nodeps(new_task);
  1471. STARPU_ASSERT(!ret);
  1472. /* Make the parent task wait for the result getting correct */
  1473. starpu_task_end_dep_add(meta_task, 1);
  1474. meta_task->where = STARPU_NOWHERE;
  1475. }
  1476. void starpu_task_ft_failed(struct starpu_task *task)
  1477. {
  1478. _starpu_ft_check_support(task);
  1479. task->failed = 1;
  1480. }
  1481. void starpu_task_ft_success(struct starpu_task *meta_task)
  1482. {
  1483. starpu_task_end_dep_release(meta_task);
  1484. }