starpu_task_insert_utils.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011, 2013-2014 Université Bordeaux
  4. * Copyright (C) 2011-2014 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011, 2014 INRIA
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <util/starpu_task_insert_utils.h>
  19. #include <common/config.h>
  20. #include <common/utils.h>
  21. #include <core/task.h>
  22. typedef void (*_starpu_callback_func_t)(void *);
  23. /* Deal with callbacks. The unpack function may be called multiple times when
  24. * we have a parallel task, and we should not free the cl_arg parameter from
  25. * the callback function. */
  26. struct _starpu_task_insert_cb_wrapper
  27. {
  28. _starpu_callback_func_t callback_func;
  29. void *callback_arg;
  30. };
  31. static
  32. void _starpu_task_insert_callback_wrapper(void *_cl_arg_wrapper)
  33. {
  34. struct _starpu_task_insert_cb_wrapper *cl_arg_wrapper = (struct _starpu_task_insert_cb_wrapper *) _cl_arg_wrapper;
  35. /* Execute the callback specified by the application */
  36. if (cl_arg_wrapper->callback_func)
  37. cl_arg_wrapper->callback_func(cl_arg_wrapper->callback_arg);
  38. }
  39. int _starpu_codelet_pack_args(void **arg_buffer, size_t *arg_buffer_size, va_list varg_list)
  40. {
  41. int arg_type;
  42. int nargs = 0;
  43. char *_arg_buffer = NULL; // We would like a void* but we use a char* to allow pointer arithmetic
  44. size_t _arg_buffer_size = 0;
  45. size_t current_offset = sizeof(nargs);
  46. while((arg_type = va_arg(varg_list, int)) != 0)
  47. {
  48. if (arg_type & STARPU_R || arg_type & STARPU_W || arg_type & STARPU_SCRATCH || arg_type & STARPU_REDUX)
  49. {
  50. (void)va_arg(varg_list, starpu_data_handle_t);
  51. }
  52. else if (arg_type==STARPU_DATA_ARRAY)
  53. {
  54. (void)va_arg(varg_list, starpu_data_handle_t*);
  55. (void)va_arg(varg_list, int);
  56. }
  57. else if (arg_type==STARPU_DATA_MODE_ARRAY)
  58. {
  59. (void)va_arg(varg_list, struct starpu_data_descr*);
  60. (void)va_arg(varg_list, int);
  61. }
  62. else if (arg_type==STARPU_VALUE)
  63. {
  64. /* We have a constant value: this should be followed by a pointer to the cst value and the size of the constant */
  65. void *ptr = va_arg(varg_list, void *);
  66. size_t ptr_size = va_arg(varg_list, size_t);
  67. nargs++;
  68. if (current_offset > _arg_buffer_size)
  69. {
  70. if (_arg_buffer_size == 0) _arg_buffer_size = 1024; else _arg_buffer_size *= 2;
  71. _arg_buffer = realloc(_arg_buffer, _arg_buffer_size);
  72. }
  73. memcpy(_arg_buffer+current_offset, (void *)&ptr_size, sizeof(ptr_size));
  74. current_offset += sizeof(ptr_size);
  75. memcpy(_arg_buffer+current_offset, ptr, ptr_size);
  76. current_offset += ptr_size;
  77. STARPU_ASSERT(current_offset <= _arg_buffer_size);
  78. }
  79. else if (arg_type==STARPU_CALLBACK)
  80. {
  81. (void)va_arg(varg_list, _starpu_callback_func_t);
  82. }
  83. else if (arg_type==STARPU_CALLBACK_WITH_ARG)
  84. {
  85. va_arg(varg_list, _starpu_callback_func_t);
  86. va_arg(varg_list, void *);
  87. }
  88. else if (arg_type==STARPU_CALLBACK_ARG)
  89. {
  90. (void)va_arg(varg_list, void *);
  91. }
  92. else if (arg_type==STARPU_PROLOGUE_CALLBACK)
  93. {
  94. va_arg(varg_list, _starpu_callback_func_t);
  95. }
  96. else if (arg_type==STARPU_PROLOGUE_CALLBACK_ARG)
  97. {
  98. (void)va_arg(varg_list, void *);
  99. }
  100. else if (arg_type==STARPU_PROLOGUE_CALLBACK_POP)
  101. {
  102. va_arg(varg_list, _starpu_callback_func_t);
  103. }
  104. else if (arg_type==STARPU_PROLOGUE_CALLBACK_POP_ARG)
  105. {
  106. (void)va_arg(varg_list, void *);
  107. }
  108. else if (arg_type==STARPU_PRIORITY)
  109. {
  110. (void)va_arg(varg_list, int);
  111. }
  112. else if (arg_type==STARPU_EXECUTE_ON_NODE)
  113. {
  114. (void)va_arg(varg_list, int);
  115. }
  116. else if (arg_type==STARPU_EXECUTE_ON_DATA)
  117. {
  118. (void)va_arg(varg_list, starpu_data_handle_t);
  119. }
  120. else if (arg_type==STARPU_EXECUTE_ON_WORKER)
  121. {
  122. va_arg(varg_list, int);
  123. }
  124. else if (arg_type==STARPU_WORKER_ORDER)
  125. {
  126. va_arg(varg_list, unsigned);
  127. }
  128. else if (arg_type==STARPU_SCHED_CTX)
  129. {
  130. (void)va_arg(varg_list, unsigned);
  131. }
  132. else if (arg_type==STARPU_HYPERVISOR_TAG)
  133. {
  134. (void)va_arg(varg_list, int);
  135. }
  136. else if (arg_type==STARPU_POSSIBLY_PARALLEL)
  137. {
  138. (void)va_arg(varg_list, unsigned);
  139. }
  140. else if (arg_type==STARPU_FLOPS)
  141. {
  142. (void)va_arg(varg_list, double);
  143. }
  144. else if (arg_type==STARPU_TAG || arg_type==STARPU_TAG_ONLY)
  145. {
  146. (void)va_arg(varg_list, starpu_tag_t);
  147. }
  148. else if (arg_type==STARPU_NODE_SELECTION_POLICY)
  149. {
  150. (void)va_arg(varg_list, char *);
  151. }
  152. else
  153. {
  154. STARPU_ABORT_MSG("Unrecognized argument %d\n", arg_type);
  155. }
  156. }
  157. if (nargs)
  158. {
  159. memcpy(_arg_buffer, (int *)&nargs, sizeof(nargs));
  160. }
  161. else
  162. {
  163. free(_arg_buffer);
  164. _arg_buffer = NULL;
  165. }
  166. *arg_buffer = _arg_buffer;
  167. *arg_buffer_size = _arg_buffer_size;
  168. return 0;
  169. }
  170. static
  171. void _starpu_task_insert_check_nb_buffers(struct starpu_codelet *cl, struct starpu_task **task, int *allocated_buffers, int current_buffer)
  172. {
  173. if (current_buffer >= STARPU_NMAXBUFS)
  174. {
  175. if (*allocated_buffers == 0)
  176. {
  177. int i;
  178. *allocated_buffers = STARPU_NMAXBUFS * 2;
  179. (*task)->dyn_handles = malloc(*allocated_buffers * sizeof(starpu_data_handle_t));
  180. for(i=0 ; i<current_buffer ; i++)
  181. {
  182. (*task)->dyn_handles[i] = (*task)->handles[i];
  183. }
  184. (*task)->dyn_modes = malloc(*allocated_buffers * sizeof(enum starpu_data_access_mode));
  185. for(i=0 ; i<current_buffer ; i++)
  186. {
  187. (*task)->dyn_modes[i] = (*task)->modes[i];
  188. }
  189. }
  190. else if (current_buffer >= *allocated_buffers)
  191. {
  192. *allocated_buffers *= 2;
  193. (*task)->dyn_handles = realloc((*task)->dyn_handles, *allocated_buffers * sizeof(starpu_data_handle_t));
  194. if ((*task)->cl->nbuffers == STARPU_VARIABLE_NBUFFERS)
  195. (*task)->dyn_modes = realloc((*task)->dyn_modes, *allocated_buffers * sizeof(enum starpu_data_access_mode));
  196. }
  197. }
  198. }
  199. void _starpu_task_insert_create(struct starpu_codelet *cl, struct starpu_task **task, va_list varg_list)
  200. {
  201. int arg_type;
  202. char *arg_buffer_ = NULL;
  203. size_t arg_buffer_size_ = 0;
  204. size_t current_offset = sizeof(int);
  205. int current_buffer;
  206. int nargs = 0;
  207. int allocated_buffers = 0;
  208. struct _starpu_task_insert_cb_wrapper *cl_arg_wrapper = (struct _starpu_task_insert_cb_wrapper *) malloc(sizeof(struct _starpu_task_insert_cb_wrapper));
  209. STARPU_ASSERT(cl_arg_wrapper);
  210. cl_arg_wrapper->callback_func = NULL;
  211. struct _starpu_task_insert_cb_wrapper *prologue_cl_arg_wrapper = (struct _starpu_task_insert_cb_wrapper *) malloc(sizeof(struct _starpu_task_insert_cb_wrapper));
  212. STARPU_ASSERT(prologue_cl_arg_wrapper);
  213. prologue_cl_arg_wrapper->callback_func = NULL;
  214. struct _starpu_task_insert_cb_wrapper *prologue_pop_cl_arg_wrapper = (struct _starpu_task_insert_cb_wrapper *) malloc(sizeof(struct _starpu_task_insert_cb_wrapper));
  215. STARPU_ASSERT(prologue_pop_cl_arg_wrapper);
  216. prologue_pop_cl_arg_wrapper->callback_func = NULL;
  217. (*task)->cl = cl;
  218. current_buffer = 0;
  219. while((arg_type = va_arg(varg_list, int)) != 0)
  220. {
  221. if (arg_type & STARPU_R || arg_type & STARPU_W || arg_type & STARPU_SCRATCH || arg_type & STARPU_REDUX)
  222. {
  223. /* We have an access mode : we expect to find a handle */
  224. starpu_data_handle_t handle = va_arg(varg_list, starpu_data_handle_t);
  225. enum starpu_data_access_mode mode = (enum starpu_data_access_mode) arg_type & ~STARPU_SSEND;
  226. STARPU_ASSERT(cl != NULL);
  227. _starpu_task_insert_check_nb_buffers(cl, task, &allocated_buffers, current_buffer);
  228. STARPU_TASK_SET_HANDLE((*task), handle, current_buffer);
  229. if (cl->nbuffers == STARPU_VARIABLE_NBUFFERS)
  230. STARPU_TASK_SET_MODE(*task, mode, current_buffer);
  231. else if (STARPU_CODELET_GET_MODE(cl, current_buffer))
  232. {
  233. STARPU_ASSERT_MSG(STARPU_CODELET_GET_MODE(cl, current_buffer) == mode,
  234. "The codelet <%s> defines the access mode %d for the buffer %d which is different from the mode %d given to starpu_task_insert\n",
  235. cl->name, STARPU_CODELET_GET_MODE(cl, current_buffer),
  236. current_buffer, mode);
  237. }
  238. else
  239. {
  240. #ifdef STARPU_DEVEL
  241. # warning shall we print a warning to the user
  242. /* Morse uses it to avoid having to set it in the codelet structure */
  243. #endif
  244. STARPU_CODELET_SET_MODE(cl, mode, current_buffer);
  245. }
  246. current_buffer++;
  247. }
  248. else if (arg_type == STARPU_DATA_ARRAY)
  249. {
  250. // Expect to find a array of handles and its size
  251. starpu_data_handle_t *handles = va_arg(varg_list, starpu_data_handle_t *);
  252. int nb_handles = va_arg(varg_list, int);
  253. STARPU_ASSERT(cl != NULL);
  254. int i;
  255. for(i=0 ; i<nb_handles ; i++)
  256. {
  257. _starpu_task_insert_check_nb_buffers(cl, task, &allocated_buffers, current_buffer);
  258. STARPU_TASK_SET_HANDLE((*task), handles[i], current_buffer);
  259. current_buffer++;
  260. }
  261. }
  262. else if (arg_type==STARPU_DATA_MODE_ARRAY)
  263. {
  264. // Expect to find a array of descr and its size
  265. struct starpu_data_descr *descrs = va_arg(varg_list, struct starpu_data_descr *);
  266. int nb_descrs = va_arg(varg_list, int);
  267. STARPU_ASSERT(cl != NULL);
  268. int i;
  269. for(i=0 ; i<nb_descrs ; i++)
  270. {
  271. _starpu_task_insert_check_nb_buffers(cl, task, &allocated_buffers, current_buffer);
  272. STARPU_TASK_SET_HANDLE((*task), descrs[i].handle, current_buffer);
  273. if ((*task)->dyn_modes)
  274. {
  275. (*task)->dyn_modes[i] = descrs[i].mode;
  276. }
  277. else if (cl->nbuffers == STARPU_VARIABLE_NBUFFERS)
  278. STARPU_TASK_SET_MODE(*task, descrs[i].mode, current_buffer);
  279. else if (STARPU_CODELET_GET_MODE(cl, current_buffer))
  280. {
  281. STARPU_ASSERT_MSG(STARPU_CODELET_GET_MODE(cl, current_buffer) == descrs[i].mode,
  282. "The codelet <%s> defines the access mode %d for the buffer %d which is different from the mode %d given to starpu_task_insert\n",
  283. cl->name, STARPU_CODELET_GET_MODE(cl, current_buffer),
  284. current_buffer, descrs[i].mode);
  285. }
  286. else
  287. {
  288. STARPU_CODELET_SET_MODE(cl, descrs[i].mode, current_buffer);
  289. }
  290. current_buffer++;
  291. }
  292. }
  293. else if (arg_type==STARPU_VALUE)
  294. {
  295. void *ptr = va_arg(varg_list, void *);
  296. size_t ptr_size = va_arg(varg_list, size_t);
  297. nargs++;
  298. if (current_offset > arg_buffer_size_)
  299. {
  300. if (arg_buffer_size_ == 0) arg_buffer_size_ = 1024; else arg_buffer_size_ *= 2;
  301. arg_buffer_ = realloc(arg_buffer_, arg_buffer_size_);
  302. }
  303. memcpy(arg_buffer_+current_offset, (void *)&ptr_size, sizeof(ptr_size));
  304. current_offset += sizeof(ptr_size);
  305. memcpy(arg_buffer_+current_offset, ptr, ptr_size);
  306. current_offset += ptr_size;
  307. STARPU_ASSERT(current_offset <= arg_buffer_size_);
  308. }
  309. else if (arg_type==STARPU_CALLBACK)
  310. {
  311. void (*callback_func)(void *);
  312. callback_func = va_arg(varg_list, _starpu_callback_func_t);
  313. cl_arg_wrapper->callback_func = callback_func;
  314. }
  315. else if (arg_type==STARPU_CALLBACK_WITH_ARG)
  316. {
  317. void (*callback_func)(void *);
  318. void *callback_arg;
  319. callback_func = va_arg(varg_list, _starpu_callback_func_t);
  320. callback_arg = va_arg(varg_list, void *);
  321. cl_arg_wrapper->callback_func = callback_func;
  322. cl_arg_wrapper->callback_arg = callback_arg;
  323. }
  324. else if (arg_type==STARPU_CALLBACK_ARG)
  325. {
  326. void *callback_arg = va_arg(varg_list, void *);
  327. cl_arg_wrapper->callback_arg = callback_arg;
  328. }
  329. else if (arg_type==STARPU_PROLOGUE_CALLBACK)
  330. {
  331. void (*callback_func)(void *);
  332. callback_func = va_arg(varg_list, _starpu_callback_func_t);
  333. prologue_cl_arg_wrapper->callback_func = callback_func;
  334. }
  335. else if (arg_type==STARPU_PROLOGUE_CALLBACK_ARG)
  336. {
  337. void *callback_arg = va_arg(varg_list, void *);
  338. prologue_cl_arg_wrapper->callback_arg = callback_arg;
  339. }
  340. else if (arg_type==STARPU_PROLOGUE_CALLBACK_POP)
  341. {
  342. void (*callback_func)(void *);
  343. callback_func = va_arg(varg_list, _starpu_callback_func_t);
  344. prologue_pop_cl_arg_wrapper->callback_func = callback_func;
  345. }
  346. else if (arg_type==STARPU_PROLOGUE_CALLBACK_POP_ARG)
  347. {
  348. void *callback_arg = va_arg(varg_list, void *);
  349. prologue_pop_cl_arg_wrapper->callback_arg = callback_arg;
  350. }
  351. else if (arg_type==STARPU_PRIORITY)
  352. {
  353. /* Followed by a priority level */
  354. int prio = va_arg(varg_list, int);
  355. (*task)->priority = prio;
  356. }
  357. else if (arg_type==STARPU_EXECUTE_ON_NODE)
  358. {
  359. (void)va_arg(varg_list, int);
  360. }
  361. else if (arg_type==STARPU_EXECUTE_ON_DATA)
  362. {
  363. (void)va_arg(varg_list, starpu_data_handle_t);
  364. }
  365. else if (arg_type==STARPU_EXECUTE_ON_WORKER)
  366. {
  367. int worker = va_arg(varg_list, int);
  368. if (worker != -1)
  369. {
  370. (*task)->workerid = worker;
  371. (*task)->execute_on_a_specific_worker = 1;
  372. }
  373. }
  374. else if (arg_type==STARPU_WORKER_ORDER)
  375. {
  376. unsigned order = va_arg(varg_list, unsigned);
  377. if (order != 0)
  378. {
  379. STARPU_ASSERT_MSG((*task)->execute_on_a_specific_worker, "worker order only makes sense if a workerid is provided");
  380. (*task)->workerorder = order;
  381. }
  382. }
  383. else if (arg_type==STARPU_SCHED_CTX)
  384. {
  385. unsigned sched_ctx = va_arg(varg_list, unsigned);
  386. (*task)->sched_ctx = sched_ctx;
  387. }
  388. else if (arg_type==STARPU_HYPERVISOR_TAG)
  389. {
  390. int hypervisor_tag = va_arg(varg_list, int);
  391. (*task)->hypervisor_tag = hypervisor_tag;
  392. }
  393. else if (arg_type==STARPU_POSSIBLY_PARALLEL)
  394. {
  395. unsigned possibly_parallel = va_arg(varg_list, unsigned);
  396. (*task)->possibly_parallel = possibly_parallel;
  397. }
  398. else if (arg_type==STARPU_FLOPS)
  399. {
  400. double flops = va_arg(varg_list, double);
  401. (*task)->flops = flops;
  402. }
  403. else if (arg_type==STARPU_TAG)
  404. {
  405. starpu_tag_t tag = va_arg(varg_list, starpu_tag_t);
  406. (*task)->tag_id = tag;
  407. (*task)->use_tag = 1;
  408. }
  409. else if (arg_type==STARPU_TAG_ONLY)
  410. {
  411. starpu_tag_t tag = va_arg(varg_list, starpu_tag_t);
  412. (*task)->tag_id = tag;
  413. }
  414. else if (arg_type==STARPU_NODE_SELECTION_POLICY)
  415. {
  416. (void)va_arg(varg_list, char *);
  417. }
  418. else
  419. {
  420. STARPU_ABORT_MSG("Unrecognized argument %d\n", arg_type);
  421. }
  422. }
  423. if (cl)
  424. {
  425. if (cl->nbuffers == STARPU_VARIABLE_NBUFFERS)
  426. {
  427. (*task)->nbuffers = current_buffer;
  428. }
  429. else
  430. {
  431. STARPU_ASSERT_MSG(current_buffer == cl->nbuffers, "Incoherent number of buffers between cl (%d) and number of parameters (%d)", cl->nbuffers, current_buffer);
  432. }
  433. }
  434. if (nargs)
  435. {
  436. memcpy(arg_buffer_, (int *)&nargs, sizeof(nargs));
  437. (*task)->cl_arg = arg_buffer_;
  438. (*task)->cl_arg_size = arg_buffer_size_;
  439. }
  440. else
  441. {
  442. free(arg_buffer_);
  443. arg_buffer_ = NULL;
  444. }
  445. /* The callback will free the argument stack and execute the
  446. * application's callback, if any. */
  447. (*task)->callback_func = _starpu_task_insert_callback_wrapper;
  448. (*task)->callback_arg = cl_arg_wrapper;
  449. (*task)->callback_arg_free = 1;
  450. (*task)->prologue_callback_func = _starpu_task_insert_callback_wrapper;
  451. (*task)->prologue_callback_arg = prologue_cl_arg_wrapper;
  452. (*task)->prologue_callback_arg_free = 1;
  453. (*task)->prologue_callback_pop_func = _starpu_task_insert_callback_wrapper;
  454. (*task)->prologue_callback_pop_arg = prologue_pop_cl_arg_wrapper;
  455. (*task)->prologue_callback_pop_arg_free = 1;
  456. }