starpu_replay_sched.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2018 Inria
  4. * Copyright (C) 2017 CNRS
  5. * Copyright (C) 2016,2017 Université de Bordeaux
  6. * Copyright (C) 2017 Erwan Leria
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. /*
  20. * This reads a sched.rec file and mangles submitted tasks according to the hint
  21. * from that file.
  22. */
  23. #include <starpu.h>
  24. #include <unistd.h>
  25. #include <stdio.h>
  26. #include <math.h>
  27. #include <common/uthash.h>
  28. #include <common/list.h>
  29. #include <common/utils.h>
  30. #include <limits.h>
  31. /*
  32. sched.rec files look like this:
  33. SubmitOrder: 1234
  34. Priority: 12
  35. SpecificWorker: 1
  36. Workers: 0 1 2
  37. DependsOn: 1235
  38. Prefetch: 1234
  39. DependsOn: 1233
  40. MemoryNode: 1
  41. Parameters: 1
  42. */
  43. #define CPY(src, dst, n) memcpy(dst, src, n * sizeof(*dst))
  44. #if 0
  45. #define debug(fmt, ...) fprintf(stderr, fmt, ##__VA_ARGS__)
  46. #else
  47. #define debug(fmt, ...) (void)0
  48. #endif
  49. static unsigned long submitorder; /* Also use as prefetchtag */
  50. static int priority;
  51. static int eosw;
  52. static unsigned workerorder;
  53. static int memnode;
  54. /* FIXME: MAXs */
  55. static uint32_t workers[STARPU_NMAXWORKERS/32];
  56. static unsigned nworkers;
  57. static unsigned dependson[STARPU_NMAXBUFS];
  58. static unsigned ndependson;
  59. static unsigned params[STARPU_NMAXBUFS];
  60. static unsigned nparams;
  61. static enum sched_type
  62. {
  63. NormalTask,
  64. PrefetchTask,
  65. } sched_type;
  66. static struct starpu_codelet cl_prefetch =
  67. {
  68. .where = STARPU_NOWHERE,
  69. .nbuffers = 1,
  70. .modes = { STARPU_R },
  71. };
  72. static struct task
  73. {
  74. UT_hash_handle hh;
  75. unsigned long submitorder;
  76. int priority;
  77. int memnode;
  78. unsigned dependson[STARPU_NMAXBUFS];
  79. unsigned ndependson;
  80. struct starpu_task *depends_tasks[STARPU_NMAXBUFS];
  81. /* For real tasks */
  82. int eosw;
  83. unsigned workerorder;
  84. uint32_t workers[STARPU_NMAXWORKERS/32];
  85. unsigned nworkers;
  86. /* For prefetch tasks */
  87. unsigned params[STARPU_NMAXBUFS];
  88. unsigned nparams;
  89. struct starpu_task *pref_task; /* Actual prefetch task */
  90. } *mangled_tasks, *prefetch_tasks;
  91. LIST_TYPE(dep,
  92. struct task *task;
  93. unsigned i;
  94. );
  95. struct deps
  96. {
  97. UT_hash_handle hh;
  98. unsigned long submitorder;
  99. struct dep_list list;
  100. } *dependencies = NULL;
  101. static void reset(void)
  102. {
  103. submitorder = 0;
  104. priority = INT_MIN;
  105. eosw = -1;
  106. memset(&workers, 0, sizeof(workers));
  107. nworkers = 0;
  108. ndependson = 0;
  109. sched_type = NormalTask;
  110. nparams = 0;
  111. memnode = -1;
  112. workerorder = 0;
  113. }
  114. /* TODO : respecter l'ordre de soumission des tâches SubmitOrder */
  115. static void checkField(char * s)
  116. {
  117. /* Record various information */
  118. #define TEST(field) (!strncmp(s, field": ", strlen(field) + 2))
  119. if (TEST("SubmitOrder"))
  120. {
  121. s = s + strlen("SubmitOrder: ");
  122. submitorder = strtol(s, NULL, 10);
  123. }
  124. else if (TEST("Priority"))
  125. {
  126. s = s + strlen("Priority: ");
  127. priority = strtol(s, NULL, 10);
  128. }
  129. else if (TEST("SpecificWorker"))
  130. {
  131. s = s + strlen("SpecificWorker: ");
  132. eosw = strtol(s, NULL, 10);
  133. }
  134. else if (TEST("Workers"))
  135. {
  136. s = s + strlen("Workers: ");
  137. char * delim = " ";
  138. char * token = strtok(s, delim);
  139. int i = 0;
  140. while (token != NULL)
  141. {
  142. int k = strtol(token, NULL, 10);
  143. STARPU_ASSERT_MSG(k < STARPU_NMAXWORKERS, "%d is bigger than maximum %d\n", k, STARPU_NMAXWORKERS);
  144. workers[k/(sizeof(*workers)*8)] |= (1 << (k%(sizeof(*workers)*8)));
  145. i++;
  146. token = strtok(NULL, delim);
  147. }
  148. nworkers = i;
  149. }
  150. else if (TEST("DependsOn"))
  151. {
  152. /* NOTE : dependsons (in the sched.rec) should be the submit orders of the dependencies,
  153. otherwise it can occur an undefined behaviour
  154. (contrary to the tasks.rec where dependencies are jobids */
  155. unsigned i = 0;
  156. char * delim = " ";
  157. char * token = strtok(s+strlen("DependsOn: "), delim);
  158. while (token != NULL)
  159. {
  160. dependson[i] = strtol(token, NULL, 10);
  161. i++;
  162. token = strtok(NULL, delim);
  163. }
  164. ndependson = i;
  165. }
  166. else if (TEST("Prefetch"))
  167. {
  168. s = s + strlen("Prefetch: ");
  169. submitorder = strtol(s, NULL, 10);
  170. sched_type = PrefetchTask;
  171. }
  172. else if (TEST("Parameters"))
  173. {
  174. s = s + strlen("Parameters: ");
  175. char * delim = " ";
  176. char * token = strtok(s, delim);
  177. int i = 0;
  178. while (token != NULL)
  179. {
  180. params[i] = strtol(token, NULL, 10);
  181. i++;
  182. token = strtok(NULL, delim);
  183. }
  184. nparams = i;
  185. }
  186. else if (TEST("MemoryNode"))
  187. {
  188. s = s + strlen("MemoryNode: ");
  189. memnode = strtol(s, NULL, 10);
  190. }
  191. else if (TEST("Workerorder"))
  192. {
  193. s = s + strlen("Workerorder: ");
  194. workerorder = strtol(s, NULL, 10);
  195. }
  196. }
  197. void schedRecInit(const char * filename)
  198. {
  199. FILE * f = fopen(filename, "r");
  200. if(f == NULL)
  201. {
  202. fprintf(stderr,"unable to open file %s: %s\n", filename, strerror(errno));
  203. return;
  204. }
  205. size_t lnsize = 128;
  206. char *s;
  207. _STARPU_MALLOC(s, sizeof(*s) * lnsize);
  208. int eof = 0;
  209. reset();
  210. while(!eof && !feof(f))
  211. {
  212. char *ln;
  213. /* Get the line */
  214. if (!fgets(s, lnsize, f))
  215. {
  216. eof = 1;
  217. }
  218. while (!(ln = strchr(s, '\n')))
  219. {
  220. _STARPU_REALLOC(s, lnsize * 2);
  221. if (!fgets(s + lnsize-1, lnsize+1, f))
  222. {
  223. eof = 1;
  224. break;
  225. }
  226. lnsize *= 2;
  227. }
  228. if ((ln == s || eof) && submitorder)
  229. {
  230. /* Empty line, doit */
  231. struct task * task;
  232. unsigned i;
  233. _STARPU_MALLOC(task, sizeof(*task));
  234. task->submitorder = submitorder;
  235. task->priority = priority;
  236. task->memnode = memnode;
  237. CPY(dependson, task->dependson, ndependson);
  238. task->ndependson = ndependson;
  239. /* Also record submitorder of tasks that this one will need to depend on */
  240. for (i = 0; i < ndependson; i++)
  241. {
  242. struct dep *dep;
  243. struct starpu_task *starpu_task;
  244. _STARPU_MALLOC(dep, sizeof(*dep));
  245. dep->task = task;
  246. dep->i = i;
  247. struct deps *deps;
  248. HASH_FIND(hh, dependencies, &task->dependson[i], sizeof(submitorder), deps);
  249. if (!deps)
  250. {
  251. /* No task depends on this one yet, add a cell for it */
  252. _STARPU_MALLOC(deps, sizeof(*deps));
  253. dep_list_init(&deps->list);
  254. deps->submitorder = task->dependson[i];
  255. HASH_ADD(hh, dependencies, submitorder, sizeof(submitorder), deps);
  256. }
  257. dep_list_push_back(&deps->list, dep);
  258. /* Create the intermediate task */
  259. starpu_task = dep->task->depends_tasks[i] = starpu_task_create();
  260. starpu_task->cl = NULL;
  261. starpu_task->destroy = 0;
  262. starpu_task->no_submitorder = 1;
  263. }
  264. switch (sched_type)
  265. {
  266. case NormalTask:
  267. /* A new task to mangle, record what needs to be done */
  268. task->eosw = eosw;
  269. task->workerorder = workerorder;
  270. CPY(workers, task->workers, STARPU_NMAXWORKERS/32);
  271. task->nworkers = nworkers;
  272. STARPU_ASSERT(nparams == 0);
  273. debug("adding mangled task %lu\n", submitorder);
  274. HASH_ADD(hh, mangled_tasks, submitorder, sizeof(submitorder), task);
  275. break;
  276. case PrefetchTask:
  277. STARPU_ASSERT(memnode >= 0);
  278. STARPU_ASSERT(eosw == -1);
  279. STARPU_ASSERT(workerorder == 0);
  280. STARPU_ASSERT(nworkers == 0);
  281. CPY(params, task->params, nparams);
  282. task->nparams = nparams;
  283. /* TODO: more params */
  284. STARPU_ASSERT_MSG(nparams == 1, "only supports one parameter at a time");
  285. debug("adding prefetch task for %lu\n", submitorder);
  286. HASH_ADD(hh, prefetch_tasks, submitorder, sizeof(submitorder), task);
  287. break;
  288. default:
  289. STARPU_ASSERT(0);
  290. break;
  291. }
  292. reset();
  293. }
  294. else checkField(s);
  295. }
  296. fclose(f);
  297. }
  298. static void do_prefetch(void *arg)
  299. {
  300. unsigned node = (uintptr_t) arg;
  301. starpu_data_idle_prefetch_on_node(starpu_task_get_current()->handles[0], node, 1);
  302. }
  303. void applySchedRec(struct starpu_task *starpu_task, unsigned long submit_order)
  304. {
  305. struct task *task;
  306. struct deps *deps;
  307. int ret;
  308. HASH_FIND(hh, dependencies, &submit_order, sizeof(submit_order), deps);
  309. if (deps)
  310. {
  311. struct dep *dep;
  312. for (dep = dep_list_begin(&deps->list);
  313. dep != dep_list_end(&deps->list);
  314. dep = dep_list_next(dep))
  315. {
  316. debug("task %lu is %d-th dep for %lu\n", submit_order, dep->i, dep->task->submitorder);
  317. /* Some task will depend on this one, make the dependency */
  318. starpu_task_declare_deps_array(dep->task->depends_tasks[dep->i], 1, &starpu_task);
  319. ret = starpu_task_submit(dep->task->depends_tasks[dep->i]);
  320. STARPU_ASSERT(ret == 0);
  321. }
  322. }
  323. HASH_FIND(hh, prefetch_tasks, &submit_order, sizeof(submit_order), task);
  324. if (task)
  325. {
  326. /* We want to submit a prefetch for this task */
  327. debug("task %lu has a prefetch for parameter %d to node %d\n", submit_order, task->params[0], task->memnode);
  328. struct starpu_task *pref_task;
  329. pref_task = task->pref_task = starpu_task_create();
  330. pref_task->cl = &cl_prefetch;
  331. pref_task->destroy = 1;
  332. pref_task->no_submitorder = 1;
  333. pref_task->callback_arg = (void*)(uintptr_t) task->memnode;
  334. pref_task->callback_func = do_prefetch;
  335. /* TODO: more params */
  336. pref_task->handles[0] = starpu_task->handles[task->params[0]];
  337. /* Make it depend on intermediate tasks */
  338. if (task->ndependson)
  339. {
  340. debug("%u dependencies\n", task->ndependson);
  341. starpu_task_declare_deps_array(pref_task, task->ndependson, task->depends_tasks);
  342. }
  343. ret = starpu_task_submit(pref_task);
  344. STARPU_ASSERT(ret == 0);
  345. }
  346. HASH_FIND(hh, mangled_tasks, &submit_order, sizeof(submit_order), task);
  347. if (task == NULL)
  348. /* Nothing to do for this */
  349. return;
  350. debug("mangling task %lu\n", submit_order);
  351. if (task->eosw >= 0)
  352. {
  353. debug("execute on a specific worker %d\n", task->eosw);
  354. starpu_task->workerid = task->eosw;
  355. starpu_task->execute_on_a_specific_worker = 1;
  356. }
  357. if (task->workerorder > 0)
  358. {
  359. debug("workerorder %d\n", task->workerorder);
  360. starpu_task->workerorder = task->workerorder;
  361. }
  362. if (task->priority != INT_MIN)
  363. {
  364. debug("priority %d\n", task->priority);
  365. starpu_task->priority = task->priority;
  366. }
  367. if (task->nworkers)
  368. {
  369. debug("%u workers %x\n", task->nworkers, task->workers[0]);
  370. starpu_task->workerids_len = sizeof(task->workers) / sizeof(task->workers[0]);
  371. _STARPU_MALLOC(starpu_task->workerids, task->nworkers * sizeof(*starpu_task->workerids));
  372. CPY(task->workers, starpu_task->workerids, STARPU_NMAXWORKERS/32);
  373. }
  374. if (task->ndependson)
  375. {
  376. debug("%u dependencies\n", task->ndependson);
  377. starpu_task_declare_deps_array(starpu_task, task->ndependson, task->depends_tasks);
  378. }
  379. /* And now, let it go! */
  380. }