driver_gordon.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009-2013 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011, 2013 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 Télécom-SudParis
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #ifndef _GNU_SOURCE
  19. #define _GNU_SOURCE
  20. #endif
  21. #include <sched.h>
  22. #include <semaphore.h>
  23. #include <common/utils.h>
  24. #include "driver_gordon.h"
  25. #include "gordon_interface.h"
  26. #include <core/sched_policy.h>
  27. static unsigned progress_thread_is_inited = 0;
  28. starpu_pthread_t progress_thread;
  29. starpu_pthread_cond_t progress_cond;
  30. starpu_pthread_mutex_t progress_mutex;
  31. struct gordon_task_wrapper_s
  32. {
  33. /* who has executed that ? */
  34. struct _starpu_worker *worker;
  35. struct _starpu_job_list *list; /* StarPU */
  36. struct gordon_ppu_job_s *gordon_job; /* gordon*/
  37. struct _starpu_job *j; /* if there is a single task */
  38. /* debug */
  39. unsigned terminated;
  40. };
  41. void *gordon_worker_progress(void *arg)
  42. {
  43. _STARPU_DEBUG("gordon_worker_progress\n");
  44. /* fix the thread on the correct cpu */
  45. struct _starpu_worker_set *gordon_set_arg = arg;
  46. unsigned prog_thread_bind_id =
  47. (gordon_set_arg->workers[0].bindid + 1)%(gordon_set_arg->config->nhwcores);
  48. _starpu_bind_thread_on_cpu(gordon_set_arg->config, prog_thread_bind_id);
  49. STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
  50. progress_thread_is_inited = 1;
  51. STARPU_PTHREAD_COND_SIGNAL(&progress_cond);
  52. STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
  53. while (1)
  54. {
  55. /* the Gordon runtime needs to make sure that we poll it
  56. * so that we handle jobs that are done */
  57. /* wait for one task termination */
  58. int ret = gordon_wait(0);
  59. if (ret)
  60. {
  61. /* possibly wake the thread that injects work */
  62. starpu_wake_all_blocked_workers();
  63. }
  64. }
  65. return NULL;
  66. }
  67. static void starpu_to_gordon_buffers(struct _starpu_job *j, struct gordon_ppu_job_s *gordon_job, unsigned memory_node)
  68. {
  69. unsigned buffer;
  70. unsigned nin = 0, ninout = 0, nout = 0;
  71. unsigned in = 0, inout = 0, out = 0;
  72. struct starpu_task *task = j->task;
  73. struct starpu_codelet *cl = task->cl;
  74. /* if it is non null, the argument buffer is considered
  75. * as the first read-only buffer */
  76. if (task->cl_arg)
  77. {
  78. gordon_job->buffers[in] = (uint64_t)task->cl_arg;
  79. gordon_job->ss[in].size = (uint32_t)task->cl_arg_size;
  80. nin++; in++;
  81. }
  82. /* count the number of in/inout/out buffers */
  83. unsigned nbuffers = cl->nbuffers;
  84. for (buffer = 0; buffer < nbuffers; buffer++)
  85. {
  86. enum starpu_access_mode mode = STARPU_CODELET_GET_MODE(cl, buffer);
  87. switch (mode)
  88. {
  89. case STARPU_R:
  90. nin++;
  91. break;
  92. case STARPU_W:
  93. nout++;
  94. break;
  95. case STARPU_RW:
  96. default:
  97. ninout++;
  98. break;
  99. }
  100. }
  101. for (buffer = 0; buffer < nbuffers; buffer++)
  102. {
  103. unsigned gordon_buffer;
  104. enum starpu_access_mode mode = STARPU_CODELET_GET_MODE(cl, buffer);
  105. switch (mode)
  106. {
  107. case STARPU_R:
  108. gordon_buffer = in++;
  109. break;
  110. case STARPU_W:
  111. gordon_buffer = nin + ninout + out++;
  112. break;
  113. case STARPU_RW:
  114. default:
  115. gordon_buffer = nin + inout++;
  116. break;
  117. }
  118. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, buffer);
  119. gordon_job->nalloc = 0;
  120. gordon_job->nin = nin;
  121. gordon_job->ninout = ninout;
  122. gordon_job->nout = nout;
  123. STARPU_ASSERT(handle->ops->convert_to_gordon);
  124. handle->ops->convert_to_gordon(&handle->per_node[memory_node].interface,
  125. &gordon_job->buffers[gordon_buffer],
  126. &gordon_job->ss[gordon_buffer]);
  127. }
  128. }
  129. /* we assume the data are already available so that the data interface fields are
  130. * already filled */
  131. static struct gordon_task_wrapper_s *starpu_to_gordon_job(struct _starpu_job *j)
  132. {
  133. struct gordon_ppu_job_s *gordon_job = gordon_alloc_jobs(1, 0);
  134. struct gordon_task_wrapper_s *task_wrapper =
  135. malloc(sizeof(struct gordon_task_wrapper_s));
  136. task_wrapper->gordon_job = gordon_job;
  137. task_wrapper->j = j;
  138. task_wrapper->terminated = 0;
  139. gordon_job->index = _starpu_task_get_gordon_nth_implementation(j->task->cl, j->nimpl);
  140. /* we should not hardcore the memory node ... XXX */
  141. unsigned memory_node = 0;
  142. starpu_to_gordon_buffers(j, gordon_job, memory_node);
  143. return task_wrapper;
  144. }
  145. static void handle_terminated_job(struct _starpu_job *j)
  146. {
  147. _starpu_push_task_output(j, 0);
  148. _starpu_handle_job_termination(j);
  149. starpu_wake_all_blocked_workers();
  150. }
  151. static void gordon_callback_list_func(void *arg)
  152. {
  153. struct gordon_task_wrapper_s *task_wrapper = arg;
  154. struct _starpu_job_list *wrapper_list;
  155. /* we don't know who will execute that codelet : so we actually defer the
  156. * execution of the StarPU codelet and the job termination later */
  157. struct _starpu_worker *worker = task_wrapper->worker;
  158. STARPU_ASSERT(worker);
  159. wrapper_list = task_wrapper->list;
  160. task_wrapper->terminated = 1;
  161. // _STARPU_DEBUG("gordon callback : push job j %p\n", task_wrapper->j);
  162. unsigned task_cnt = 0;
  163. /* XXX 0 was hardcoded */
  164. while (!_starpu_job_list_empty(wrapper_list))
  165. {
  166. struct _starpu_job *j = _starpu_job_list_pop_back(wrapper_list);
  167. #ifndef STARPU_SIMGRID
  168. struct gordon_ppu_job_s * gordon_task = &task_wrapper->gordon_job[task_cnt];
  169. struct starpu_perfmodel *model = j->task->cl->model;
  170. if (model && model->benchmarking)
  171. {
  172. double measured = (double)gordon_task->measured;
  173. unsigned cpuid = 0; /* XXX */
  174. _starpu_update_perfmodel_history(j, j->task->cl->model, STARPU_GORDON_DEFAULT, cpuid, measured);
  175. }
  176. #endif
  177. _starpu_push_task_output(j, 0);
  178. _starpu_handle_job_termination(j);
  179. //starpu_wake_all_blocked_workers();
  180. task_cnt++;
  181. }
  182. /* the job list was allocated by the gordon driver itself */
  183. _starpu_job_list_delete(wrapper_list);
  184. starpu_wake_all_blocked_workers();
  185. free(task_wrapper->gordon_job);
  186. free(task_wrapper);
  187. }
  188. static void gordon_callback_func(void *arg)
  189. {
  190. struct gordon_task_wrapper_s *task_wrapper = arg;
  191. /* we don't know who will execute that codelet : so we actually defer the
  192. * execution of the StarPU codelet and the job termination later */
  193. struct _starpu_worker *worker = task_wrapper->worker;
  194. STARPU_ASSERT(worker);
  195. task_wrapper->terminated = 1;
  196. task_wrapper->j->task->cl->per_worker_stats[worker->workerid]++;
  197. handle_terminated_job(task_wrapper->j);
  198. starpu_wake_all_blocked_workers();
  199. free(task_wrapper);
  200. }
  201. int inject_task(struct _starpu_job *j, struct _starpu_worker *worker)
  202. {
  203. struct starpu_task *task = j->task;
  204. int ret = _starpu_fetch_task_input(j, 0);
  205. if (ret != 0)
  206. {
  207. /* there was not enough memory so the codelet cannot be executed right now ... */
  208. /* push the codelet back and try another one ... */
  209. return STARPU_TRYAGAIN;
  210. }
  211. _starpu_sched_pre_exec_hook(task);
  212. struct gordon_task_wrapper_s *task_wrapper = starpu_to_gordon_job(j);
  213. task_wrapper->worker = worker;
  214. gordon_pushjob(task_wrapper->gordon_job, gordon_callback_func, task_wrapper);
  215. return 0;
  216. }
  217. int inject_task_list(struct _starpu_job_list *list, struct _starpu_worker *worker)
  218. {
  219. /* first put back all tasks that can not be performed by Gordon */
  220. unsigned nvalids = 0;
  221. unsigned ninvalids = 0;
  222. struct _starpu_job *j;
  223. // TODO !
  224. //
  225. // for (j = _starpu_job_list_begin(list); j != _starpu_job_list_end(list); j = _starpu_job_list_next(j) )
  226. // {
  227. // if (!_STARPU_GORDON_MAY_PERFORM(j))
  228. // {
  229. // // XXX TODO
  230. // ninvalids++;
  231. // assert(0);
  232. // }
  233. // else
  234. // {
  235. // nvalids++;
  236. // }
  237. // }
  238. nvalids = _job_list_size(list);
  239. // _STARPU_DEBUG("nvalids %d \n", nvalids);
  240. struct gordon_task_wrapper_s *task_wrapper = malloc(sizeof(struct gordon_task_wrapper_s));
  241. gordon_job_t *gordon_jobs = gordon_alloc_jobs(nvalids, 0);
  242. task_wrapper->gordon_job = gordon_jobs;
  243. task_wrapper->list = list;
  244. task_wrapper->j = NULL;
  245. task_wrapper->terminated = 0;
  246. task_wrapper->worker = worker;
  247. unsigned index;
  248. for (j = _starpu_job_list_begin(list), index = 0; j != _starpu_job_list_end(list); j = _starpu_job_list_next(j), index++)
  249. {
  250. int ret;
  251. struct starpu_task *task = j->task;
  252. ret = _starpu_fetch_task_input(j, 0);
  253. STARPU_ASSERT(!ret);
  254. _starpu_sched_pre_exec_hook(task);
  255. gordon_jobs[index].index = _starpu_task_get_gordon_nth_implementation(task->cl, j->nimpl);
  256. struct starpu_perfmodel *model = j->task->cl->model;
  257. if (model && model->benchmarking)
  258. gordon_jobs[index].flags.sampling = 1;
  259. /* we should not hardcore the memory node ... XXX */
  260. unsigned memory_node = 0;
  261. starpu_to_gordon_buffers(j, &gordon_jobs[index], memory_node);
  262. }
  263. gordon_pushjob(task_wrapper->gordon_job, gordon_callback_list_func, task_wrapper);
  264. return 0;
  265. }
  266. void *gordon_worker_inject(struct _starpu_worker_set *arg)
  267. {
  268. while(_starpu_machine_is_running())
  269. {
  270. if (gordon_busy_enough())
  271. {
  272. /* gordon already has enough work, wait a little TODO */
  273. _starpu_wait_on_sched_event();
  274. }
  275. else
  276. {
  277. #ifndef NOCHAIN
  278. int ret = 0;
  279. #ifdef STARPU_DEVEL
  280. #warning we should look into the local job list here !
  281. #endif
  282. struct _starpu_job_list *list = _starpu_pop_every_task();
  283. /* XXX 0 is hardcoded */
  284. if (list)
  285. {
  286. /* partition lists */
  287. unsigned size = _starpu_job_list_size(list);
  288. unsigned nchunks = (size<2*arg->nworkers)?size:(2*arg->nworkers);
  289. //unsigned nchunks = (size<arg->nworkers)?size:(arg->nworkers);
  290. /* last element may be a little smaller (by 1) */
  291. unsigned chunksize = size/nchunks;
  292. unsigned chunk;
  293. for (chunk = 0; chunk < nchunks; chunk++)
  294. {
  295. struct _starpu_job_list *chunk_list;
  296. if (chunk != (nchunks -1))
  297. {
  298. /* split the list in 2 parts : list = chunk_list | tail */
  299. chunk_list = _starpu_job_list_new();
  300. /* find the end */
  301. chunk_list->_head = list->_head;
  302. struct _starpu_job *it_j = _starpu_job_list_begin(list);
  303. unsigned ind;
  304. for (ind = 0; ind < chunksize; ind++)
  305. {
  306. it_j = _starpu_job_list_next(it_j);
  307. }
  308. /* it_j should be the first element of the new list (tail) */
  309. chunk_list->_tail = it_j->_prev;
  310. chunk_list->_tail->_next = NULL;
  311. list->_head = it_j;
  312. it_j->_prev = NULL;
  313. }
  314. else
  315. {
  316. /* this is the last chunk */
  317. chunk_list = list;
  318. }
  319. ret = inject_task_list(chunk_list, &arg->workers[0]);
  320. }
  321. }
  322. else
  323. {
  324. _starpu_wait_on_sched_event();
  325. }
  326. #else
  327. /* gordon should accept a little more work */
  328. struct _starpu_job *j;
  329. j = _starpu_pop_task();
  330. // _STARPU_DEBUG("pop task %p\n", j);
  331. if (j)
  332. {
  333. if (_STARPU_GORDON_MAY_PERFORM(j))
  334. {
  335. /* inject that task */
  336. /* XXX we hardcore &arg->workers[0] for now */
  337. inject_task(j, &arg->workers[0]);
  338. }
  339. else
  340. {
  341. _starpu_push_task_to_workers(task);
  342. }
  343. }
  344. #endif
  345. }
  346. }
  347. return NULL;
  348. }
  349. void *_starpu_gordon_worker(void *arg)
  350. {
  351. struct _starpu_worker_set *gordon_set_arg = arg;
  352. _starpu_bind_thread_on_cpu(gordon_set_arg->config, gordon_set_arg->workers[0].bindid);
  353. /* TODO set_local_memory_node per SPU */
  354. gordon_init(gordon_set_arg->nworkers);
  355. /* NB: On SPUs, the worker_key is set to NULL since there is no point
  356. * in associating the PPU thread with a specific SPU (worker) while
  357. * it's handling multiple processing units. */
  358. _starpu_set_local_worker_key(NULL);
  359. /* TODO set workers' name field */
  360. unsigned spu;
  361. for (spu = 0; spu < gordon_set_arg->nworkers; spu++)
  362. {
  363. struct _starpu_worker *worker = &gordon_set_arg->workers[spu];
  364. snprintf(worker->name, sizeof(worker->name), "SPU %d", worker->id);
  365. snprintf(worker->short_name, sizeof(worker->short_name), "SPU %d", worker->id);
  366. }
  367. /*
  368. * To take advantage of PPE being hyperthreaded, we should have 2 threads
  369. * for the gordon driver : one injects works, the other makes sure that
  370. * gordon is progressing (and performs the callbacks).
  371. */
  372. /* launch the progression thread */
  373. STARPU_PTHREAD_MUTEX_INIT(&progress_mutex, NULL);
  374. STARPU_PTHREAD_COND_INIT(&progress_cond, NULL);
  375. STARPU_PTHREAD_CREATE(&progress_thread, NULL, gordon_worker_progress, gordon_set_arg);
  376. /* wait for the progression thread to be ready */
  377. STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
  378. while (!progress_thread_is_inited)
  379. STARPU_PTHREAD_COND_WAIT(&progress_cond, &progress_mutex);
  380. STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
  381. _STARPU_DEBUG("progress thread is running ... \n");
  382. /* tell the core that gordon is ready */
  383. STARPU_PTHREAD_MUTEX_LOCK(&gordon_set_arg->mutex);
  384. gordon_set_arg->set_is_initialized = 1;
  385. STARPU_PTHREAD_COND_SIGNAL(&gordon_set_arg->ready_cond);
  386. STARPU_PTHREAD_MUTEX_UNLOCK(&gordon_set_arg->mutex);
  387. gordon_worker_inject(gordon_set_arg);
  388. _starpu_handle_all_pending_node_data_requests(memnode);
  389. _STARPU_DEBUG("gordon deinit...\n");
  390. gordon_deinit();
  391. _STARPU_DEBUG("gordon was deinited\n");
  392. return NULL;
  393. }