driver_gordon.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009, 2010, 2011-2012 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 Télécom-SudParis
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #ifndef _GNU_SOURCE
  19. #define _GNU_SOURCE
  20. #endif
  21. #include <sched.h>
  22. #include <pthread.h>
  23. #include <semaphore.h>
  24. #include <common/utils.h>
  25. #include "driver_gordon.h"
  26. #include "gordon_interface.h"
  27. #include <core/sched_policy.h>
  28. static unsigned progress_thread_is_inited = 0;
  29. pthread_t progress_thread;
  30. pthread_cond_t progress_cond;
  31. pthread_mutex_t progress_mutex;
  32. struct gordon_task_wrapper_s
  33. {
  34. /* who has executed that ? */
  35. struct _starpu_worker *worker;
  36. struct _starpu_job_list *list; /* StarPU */
  37. struct gordon_ppu_job_s *gordon_job; /* gordon*/
  38. struct _starpu_job *j; /* if there is a single task */
  39. /* debug */
  40. unsigned terminated;
  41. };
  42. void *gordon_worker_progress(void *arg)
  43. {
  44. _STARPU_DEBUG("gordon_worker_progress\n");
  45. /* fix the thread on the correct cpu */
  46. struct _starpu_worker_set *gordon_set_arg = arg;
  47. unsigned prog_thread_bind_id =
  48. (gordon_set_arg->workers[0].bindid + 1)%(gordon_set_arg->config->nhwcores);
  49. _starpu_bind_thread_on_cpu(gordon_set_arg->config, prog_thread_bind_id);
  50. _STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
  51. progress_thread_is_inited = 1;
  52. _STARPU_PTHREAD_COND_SIGNAL(&progress_cond);
  53. _STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
  54. while (1)
  55. {
  56. /* the Gordon runtime needs to make sure that we poll it
  57. * so that we handle jobs that are done */
  58. /* wait for one task termination */
  59. int ret = gordon_wait(0);
  60. if (ret)
  61. {
  62. /* possibly wake the thread that injects work */
  63. starpu_wake_all_blocked_workers();
  64. }
  65. }
  66. return NULL;
  67. }
  68. static void starpu_to_gordon_buffers(struct _starpu_job *j, struct gordon_ppu_job_s *gordon_job, uint32_t memory_node)
  69. {
  70. unsigned buffer;
  71. unsigned nin = 0, ninout = 0, nout = 0;
  72. unsigned in = 0, inout = 0, out = 0;
  73. struct starpu_task *task = j->task;
  74. struct starpu_codelet *cl = task->cl;
  75. /* if it is non null, the argument buffer is considered
  76. * as the first read-only buffer */
  77. if (task->cl_arg)
  78. {
  79. gordon_job->buffers[in] = (uint64_t)task->cl_arg;
  80. gordon_job->ss[in].size = (uint32_t)task->cl_arg_size;
  81. nin++; in++;
  82. }
  83. /* count the number of in/inout/out buffers */
  84. unsigned nbuffers = cl->nbuffers;
  85. for (buffer = 0; buffer < nbuffers; buffer++)
  86. {
  87. enum starpu_access_mode mode = cl->modes[buffer];
  88. switch (mode)
  89. {
  90. case STARPU_R:
  91. nin++;
  92. break;
  93. case STARPU_W:
  94. nout++;
  95. break;
  96. case STARPU_RW:
  97. default:
  98. ninout++;
  99. break;
  100. }
  101. }
  102. for (buffer = 0; buffer < nbuffers; buffer++)
  103. {
  104. unsigned gordon_buffer;
  105. enum starpu_access_mode mode = cl->modes[buffer];
  106. switch (mode)
  107. {
  108. case STARPU_R:
  109. gordon_buffer = in++;
  110. break;
  111. case STARPU_W:
  112. gordon_buffer = nin + ninout + out++;
  113. break;
  114. case STARPU_RW:
  115. default:
  116. gordon_buffer = nin + inout++;
  117. break;
  118. }
  119. starpu_data_handle_t handle = task->handles[buffer];
  120. gordon_job->nalloc = 0;
  121. gordon_job->nin = nin;
  122. gordon_job->ninout = ninout;
  123. gordon_job->nout = nout;
  124. STARPU_ASSERT(handle->ops->convert_to_gordon);
  125. handle->ops->convert_to_gordon(&handle->per_node[memory_node].interface,
  126. &gordon_job->buffers[gordon_buffer],
  127. &gordon_job->ss[gordon_buffer]);
  128. }
  129. }
  130. /* we assume the data are already available so that the data interface fields are
  131. * already filled */
  132. static struct gordon_task_wrapper_s *starpu_to_gordon_job(struct _starpu_job *j)
  133. {
  134. struct gordon_ppu_job_s *gordon_job = gordon_alloc_jobs(1, 0);
  135. struct gordon_task_wrapper_s *task_wrapper =
  136. malloc(sizeof(struct gordon_task_wrapper_s));
  137. task_wrapper->gordon_job = gordon_job;
  138. task_wrapper->j = j;
  139. task_wrapper->terminated = 0;
  140. gordon_job->index = _starpu_task_get_gordon_nth_implementation(j->task->cl, j->nimpl);
  141. /* we should not hardcore the memory node ... XXX */
  142. unsigned memory_node = 0;
  143. starpu_to_gordon_buffers(j, gordon_job, memory_node);
  144. return task_wrapper;
  145. }
  146. static void handle_terminated_job(struct _starpu_job *j)
  147. {
  148. _starpu_push_task_output(j, 0);
  149. _starpu_handle_job_termination(j);
  150. starpu_wake_all_blocked_workers();
  151. }
  152. static void gordon_callback_list_func(void *arg)
  153. {
  154. struct gordon_task_wrapper_s *task_wrapper = arg;
  155. struct _starpu_job_list *wrapper_list;
  156. /* we don't know who will execute that codelet : so we actually defer the
  157. * execution of the StarPU codelet and the job termination later */
  158. struct _starpu_worker *worker = task_wrapper->worker;
  159. STARPU_ASSERT(worker);
  160. wrapper_list = task_wrapper->list;
  161. task_wrapper->terminated = 1;
  162. // _STARPU_DEBUG("gordon callback : push job j %p\n", task_wrapper->j);
  163. unsigned task_cnt = 0;
  164. /* XXX 0 was hardcoded */
  165. while (!_starpu_job_list_empty(wrapper_list))
  166. {
  167. struct _starpu_job *j = _starpu_job_list_pop_back(wrapper_list);
  168. struct gordon_ppu_job_s * gordon_task = &task_wrapper->gordon_job[task_cnt];
  169. struct starpu_perfmodel *model = j->task->cl->model;
  170. if (model && model->benchmarking)
  171. {
  172. double measured = (double)gordon_task->measured;
  173. unsigned cpuid = 0; /* XXX */
  174. _starpu_update_perfmodel_history(j, j->task->cl->model, STARPU_GORDON_DEFAULT, cpuid, measured);
  175. }
  176. _starpu_push_task_output(j, 0);
  177. _starpu_handle_job_termination(j);
  178. //starpu_wake_all_blocked_workers();
  179. task_cnt++;
  180. }
  181. /* the job list was allocated by the gordon driver itself */
  182. _starpu_job_list_delete(wrapper_list);
  183. starpu_wake_all_blocked_workers();
  184. free(task_wrapper->gordon_job);
  185. free(task_wrapper);
  186. }
  187. static void gordon_callback_func(void *arg)
  188. {
  189. struct gordon_task_wrapper_s *task_wrapper = arg;
  190. /* we don't know who will execute that codelet : so we actually defer the
  191. * execution of the StarPU codelet and the job termination later */
  192. struct _starpu_worker *worker = task_wrapper->worker;
  193. STARPU_ASSERT(worker);
  194. task_wrapper->terminated = 1;
  195. task_wrapper->j->task->cl->per_worker_stats[worker->workerid]++;
  196. handle_terminated_job(task_wrapper->j);
  197. starpu_wake_all_blocked_workers();
  198. free(task_wrapper);
  199. }
  200. int inject_task(struct _starpu_job *j, struct _starpu_worker *worker)
  201. {
  202. struct starpu_task *task = j->task;
  203. int ret = _starpu_fetch_task_input(j, 0);
  204. if (ret != 0)
  205. {
  206. /* there was not enough memory so the codelet cannot be executed right now ... */
  207. /* push the codelet back and try another one ... */
  208. return STARPU_TRYAGAIN;
  209. }
  210. _starpu_sched_pre_exec_hook(task);
  211. struct gordon_task_wrapper_s *task_wrapper = starpu_to_gordon_job(j);
  212. task_wrapper->worker = worker;
  213. gordon_pushjob(task_wrapper->gordon_job, gordon_callback_func, task_wrapper);
  214. return 0;
  215. }
  216. int inject_task_list(struct _starpu_job_list *list, struct _starpu_worker *worker)
  217. {
  218. /* first put back all tasks that can not be performed by Gordon */
  219. unsigned nvalids = 0;
  220. unsigned ninvalids = 0;
  221. struct _starpu_job *j;
  222. // TODO !
  223. //
  224. // for (j = _starpu_job_list_begin(list); j != _starpu_job_list_end(list); j = _starpu_job_list_next(j) )
  225. // {
  226. // if (!_STARPU_GORDON_MAY_PERFORM(j))
  227. // {
  228. // // XXX TODO
  229. // ninvalids++;
  230. // assert(0);
  231. // }
  232. // else
  233. // {
  234. // nvalids++;
  235. // }
  236. // }
  237. nvalids = _job_list_size(list);
  238. // _STARPU_DEBUG("nvalids %d \n", nvalids);
  239. struct gordon_task_wrapper_s *task_wrapper = malloc(sizeof(struct gordon_task_wrapper_s));
  240. gordon_job_t *gordon_jobs = gordon_alloc_jobs(nvalids, 0);
  241. task_wrapper->gordon_job = gordon_jobs;
  242. task_wrapper->list = list;
  243. task_wrapper->j = NULL;
  244. task_wrapper->terminated = 0;
  245. task_wrapper->worker = worker;
  246. unsigned index;
  247. for (j = _starpu_job_list_begin(list), index = 0; j != _starpu_job_list_end(list); j = _starpu_job_list_next(j), index++)
  248. {
  249. int ret;
  250. struct starpu_task *task = j->task;
  251. ret = _starpu_fetch_task_input(j, 0);
  252. STARPU_ASSERT(!ret);
  253. _starpu_sched_pre_exec_hook(task);
  254. gordon_jobs[index].index = _starpu_task_get_gordon_nth_implementation(task->cl, j->nimpl);
  255. struct starpu_perfmodel *model = j->task->cl->model;
  256. if (model && model->benchmarking)
  257. gordon_jobs[index].flags.sampling = 1;
  258. /* we should not hardcore the memory node ... XXX */
  259. unsigned memory_node = 0;
  260. starpu_to_gordon_buffers(j, &gordon_jobs[index], memory_node);
  261. }
  262. gordon_pushjob(task_wrapper->gordon_job, gordon_callback_list_func, task_wrapper);
  263. return 0;
  264. }
  265. void *gordon_worker_inject(struct _starpu_worker_set *arg)
  266. {
  267. while(_starpu_machine_is_running())
  268. {
  269. if (gordon_busy_enough())
  270. {
  271. /* gordon already has enough work, wait a little TODO */
  272. _starpu_wait_on_sched_event();
  273. }
  274. else
  275. {
  276. #ifndef NOCHAIN
  277. int ret = 0;
  278. #ifdef STARPU_DEVEL
  279. #warning we should look into the local job list here !
  280. #endif
  281. struct _starpu_job_list *list = _starpu_pop_every_task();
  282. /* XXX 0 is hardcoded */
  283. if (list)
  284. {
  285. /* partition lists */
  286. unsigned size = _starpu_job_list_size(list);
  287. unsigned nchunks = (size<2*arg->nworkers)?size:(2*arg->nworkers);
  288. //unsigned nchunks = (size<arg->nworkers)?size:(arg->nworkers);
  289. /* last element may be a little smaller (by 1) */
  290. unsigned chunksize = size/nchunks;
  291. unsigned chunk;
  292. for (chunk = 0; chunk < nchunks; chunk++)
  293. {
  294. struct _starpu_job_list *chunk_list;
  295. if (chunk != (nchunks -1))
  296. {
  297. /* split the list in 2 parts : list = chunk_list | tail */
  298. chunk_list = _starpu_job_list_new();
  299. /* find the end */
  300. chunk_list->_head = list->_head;
  301. struct _starpu_job *it_j = _starpu_job_list_begin(list);
  302. unsigned ind;
  303. for (ind = 0; ind < chunksize; ind++)
  304. {
  305. it_j = _starpu_job_list_next(it_j);
  306. }
  307. /* it_j should be the first element of the new list (tail) */
  308. chunk_list->_tail = it_j->_prev;
  309. chunk_list->_tail->_next = NULL;
  310. list->_head = it_j;
  311. it_j->_prev = NULL;
  312. }
  313. else
  314. {
  315. /* this is the last chunk */
  316. chunk_list = list;
  317. }
  318. ret = inject_task_list(chunk_list, &arg->workers[0]);
  319. }
  320. }
  321. else
  322. {
  323. _starpu_wait_on_sched_event();
  324. }
  325. #else
  326. /* gordon should accept a little more work */
  327. struct _starpu_job *j;
  328. j = _starpu_pop_task();
  329. // _STARPU_DEBUG("pop task %p\n", j);
  330. if (j)
  331. {
  332. if (_STARPU_GORDON_MAY_PERFORM(j))
  333. {
  334. /* inject that task */
  335. /* XXX we hardcore &arg->workers[0] for now */
  336. inject_task(j, &arg->workers[0]);
  337. }
  338. else
  339. {
  340. _starpu_push_task(j);
  341. }
  342. }
  343. #endif
  344. }
  345. }
  346. return NULL;
  347. }
  348. void *_starpu_gordon_worker(void *arg)
  349. {
  350. struct _starpu_worker_set *gordon_set_arg = arg;
  351. _starpu_bind_thread_on_cpu(gordon_set_arg->config, gordon_set_arg->workers[0].bindid);
  352. /* TODO set_local_memory_node per SPU */
  353. gordon_init(gordon_set_arg->nworkers);
  354. /* NB: On SPUs, the worker_key is set to NULL since there is no point
  355. * in associating the PPU thread with a specific SPU (worker) while
  356. * it's handling multiple processing units. */
  357. _starpu_set_local_worker_key(NULL);
  358. /* TODO set workers' name field */
  359. unsigned spu;
  360. for (spu = 0; spu < gordon_set_arg->nworkers; spu++)
  361. {
  362. struct _starpu_worker *worker = &gordon_set_arg->workers[spu];
  363. snprintf(worker->name, sizeof(worker->name), "SPU %d", worker->id);
  364. snprintf(worker->short_name, sizeof(worker->short_name), "SPU %d", worker->id);
  365. }
  366. /*
  367. * To take advantage of PPE being hyperthreaded, we should have 2 threads
  368. * for the gordon driver : one injects works, the other makes sure that
  369. * gordon is progressing (and performs the callbacks).
  370. */
  371. /* launch the progression thread */
  372. _STARPU_PTHREAD_MUTEX_INIT(&progress_mutex, NULL);
  373. _STARPU_PTHREAD_COND_INIT(&progress_cond, NULL);
  374. pthread_create(&progress_thread, NULL, gordon_worker_progress, gordon_set_arg);
  375. /* wait for the progression thread to be ready */
  376. _STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
  377. while (!progress_thread_is_inited)
  378. _STARPU_PTHREAD_COND_WAIT(&progress_cond, &progress_mutex);
  379. _STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
  380. _STARPU_DEBUG("progress thread is running ... \n");
  381. /* tell the core that gordon is ready */
  382. _STARPU_PTHREAD_MUTEX_LOCK(&gordon_set_arg->mutex);
  383. gordon_set_arg->set_is_initialized = 1;
  384. _STARPU_PTHREAD_COND_SIGNAL(&gordon_set_arg->ready_cond);
  385. _STARPU_PTHREAD_MUTEX_UNLOCK(&gordon_set_arg->mutex);
  386. gordon_worker_inject(gordon_set_arg);
  387. _starpu_handle_all_pending_node_data_requests(memnode);
  388. _STARPU_DEBUG("gordon deinit...\n");
  389. gordon_deinit();
  390. _STARPU_DEBUG("gordon was deinited\n");
  391. pthread_exit((void *)0x42);
  392. }