driver_gordon.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009-2014 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011, 2013 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2011 Télécom-SudParis
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #ifndef _GNU_SOURCE
  19. #define _GNU_SOURCE
  20. #endif
  21. #include <sched.h>
  22. #include <semaphore.h>
  23. #include <common/utils.h>
  24. #include "driver_gordon.h"
  25. #include "gordon_interface.h"
  26. #include <core/sched_policy.h>
  27. static unsigned progress_thread_is_inited = 0;
  28. starpu_pthread_t progress_thread;
  29. starpu_pthread_cond_t progress_cond;
  30. starpu_pthread_mutex_t progress_mutex;
  31. struct gordon_task_wrapper_s
  32. {
  33. /* who has executed that ? */
  34. struct _starpu_worker *worker;
  35. struct _starpu_job_list *list; /* StarPU */
  36. struct gordon_ppu_job_s *gordon_job; /* gordon*/
  37. struct _starpu_job *j; /* if there is a single task */
  38. /* debug */
  39. unsigned terminated;
  40. };
  41. void *gordon_worker_progress(void *arg)
  42. {
  43. _STARPU_DEBUG("gordon_worker_progress\n");
  44. /* fix the thread on the correct cpu */
  45. struct _starpu_worker_set *gordon_set_arg = arg;
  46. unsigned prog_thread_bind_id =
  47. (gordon_set_arg->workers[0].bindid + 1)%(gordon_set_arg->config->nhwcores);
  48. _starpu_bind_thread_on_cpu(gordon_set_arg->config, prog_thread_bind_id);
  49. STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
  50. progress_thread_is_inited = 1;
  51. STARPU_PTHREAD_COND_SIGNAL(&progress_cond);
  52. STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
  53. while (1)
  54. {
  55. /* the Gordon runtime needs to make sure that we poll it
  56. * so that we handle jobs that are done */
  57. /* wait for one task termination */
  58. int ret = gordon_wait(0);
  59. if (ret)
  60. {
  61. /* possibly wake the thread that injects work */
  62. starpu_wake_all_blocked_workers();
  63. }
  64. }
  65. return NULL;
  66. }
  67. static void starpu_to_gordon_buffers(struct _starpu_job *j, struct gordon_ppu_job_s *gordon_job, unsigned memory_node)
  68. {
  69. unsigned buffer;
  70. unsigned nin = 0, ninout = 0, nout = 0;
  71. unsigned in = 0, inout = 0, out = 0;
  72. struct starpu_task *task = j->task;
  73. struct starpu_codelet *cl = task->cl;
  74. /* if it is non null, the argument buffer is considered
  75. * as the first read-only buffer */
  76. if (task->cl_arg)
  77. {
  78. gordon_job->buffers[in] = (uint64_t)task->cl_arg;
  79. gordon_job->ss[in].size = (uint32_t)task->cl_arg_size;
  80. nin++; in++;
  81. }
  82. /* count the number of in/inout/out buffers */
  83. unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(task);
  84. for (buffer = 0; buffer < nbuffers; buffer++)
  85. {
  86. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, buffer);
  87. switch (mode)
  88. {
  89. case STARPU_R:
  90. nin++;
  91. break;
  92. case STARPU_W:
  93. nout++;
  94. break;
  95. case STARPU_RW:
  96. default:
  97. ninout++;
  98. break;
  99. }
  100. }
  101. for (buffer = 0; buffer < nbuffers; buffer++)
  102. {
  103. unsigned gordon_buffer;
  104. enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(task, buffer);
  105. switch (mode)
  106. {
  107. case STARPU_R:
  108. gordon_buffer = in++;
  109. break;
  110. case STARPU_W:
  111. gordon_buffer = nin + ninout + out++;
  112. break;
  113. case STARPU_RW:
  114. default:
  115. gordon_buffer = nin + inout++;
  116. break;
  117. }
  118. starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(task, buffer);
  119. gordon_job->nalloc = 0;
  120. gordon_job->nin = nin;
  121. gordon_job->ninout = ninout;
  122. gordon_job->nout = nout;
  123. STARPU_ASSERT(handle->ops->convert_to_gordon);
  124. handle->ops->convert_to_gordon(&handle->per_node[memory_node].interface,
  125. &gordon_job->buffers[gordon_buffer],
  126. &gordon_job->ss[gordon_buffer]);
  127. }
  128. }
  129. /* we assume the data are already available so that the data interface fields are
  130. * already filled */
  131. static struct gordon_task_wrapper_s *starpu_to_gordon_job(struct _starpu_job *j)
  132. {
  133. struct gordon_ppu_job_s *gordon_job = gordon_alloc_jobs(1, 0);
  134. struct gordon_task_wrapper_s *task_wrapper =
  135. malloc(sizeof(struct gordon_task_wrapper_s));
  136. task_wrapper->gordon_job = gordon_job;
  137. task_wrapper->j = j;
  138. task_wrapper->terminated = 0;
  139. gordon_job->index = _starpu_task_get_gordon_nth_implementation(j->task->cl, j->nimpl);
  140. /* we should not hardcore the memory node ... XXX */
  141. unsigned memory_node = 0;
  142. starpu_to_gordon_buffers(j, gordon_job, memory_node);
  143. return task_wrapper;
  144. }
  145. static void handle_terminated_job(struct _starpu_job *j)
  146. {
  147. _starpu_push_task_output(j);
  148. _starpu_handle_job_termination(j);
  149. starpu_wake_all_blocked_workers();
  150. }
  151. static void gordon_callback_list_func(void *arg)
  152. {
  153. struct gordon_task_wrapper_s *task_wrapper = arg;
  154. struct _starpu_job_list *wrapper_list;
  155. /* we don't know who will execute that codelet : so we actually defer the
  156. * execution of the StarPU codelet and the job termination later */
  157. struct _starpu_worker *worker = task_wrapper->worker;
  158. STARPU_ASSERT(worker);
  159. wrapper_list = task_wrapper->list;
  160. task_wrapper->terminated = 1;
  161. // _STARPU_DEBUG("gordon callback : push job j %p\n", task_wrapper->j);
  162. unsigned task_cnt = 0;
  163. /* XXX 0 was hardcoded */
  164. while (!_starpu_job_list_empty(wrapper_list))
  165. {
  166. struct _starpu_job *j = _starpu_job_list_pop_back(wrapper_list);
  167. #ifndef STARPU_SIMGRID
  168. struct gordon_ppu_job_s * gordon_task = &task_wrapper->gordon_job[task_cnt];
  169. struct starpu_perfmodel *model = j->task->cl->model;
  170. if (model && model->benchmarking)
  171. {
  172. double measured = (double)gordon_task->measured;
  173. unsigned cpuid = 0; /* XXX */
  174. _starpu_update_perfmodel_history(j, j->task->cl->model, STARPU_GORDON_DEFAULT, cpuid, measured);
  175. }
  176. #endif
  177. _starpu_push_task_output(j);
  178. _starpu_handle_job_termination(j);
  179. //starpu_wake_all_blocked_workers();
  180. task_cnt++;
  181. }
  182. /* the job list was allocated by the gordon driver itself */
  183. _starpu_job_list_delete(wrapper_list);
  184. starpu_wake_all_blocked_workers();
  185. free(task_wrapper->gordon_job);
  186. free(task_wrapper);
  187. }
  188. static void gordon_callback_func(void *arg)
  189. {
  190. struct gordon_task_wrapper_s *task_wrapper = arg;
  191. /* we don't know who will execute that codelet : so we actually defer the
  192. * execution of the StarPU codelet and the job termination later */
  193. struct _starpu_worker *worker = task_wrapper->worker;
  194. STARPU_ASSERT(worker);
  195. task_wrapper->terminated = 1;
  196. task_wrapper->j->task->cl->per_worker_stats[worker->workerid]++;
  197. handle_terminated_job(task_wrapper->j);
  198. starpu_wake_all_blocked_workers();
  199. free(task_wrapper);
  200. }
  201. int inject_task(struct _starpu_job *j, struct _starpu_worker *worker)
  202. {
  203. struct starpu_task *task = j->task;
  204. int ret = _starpu_fetch_task_input(j);
  205. if (ret != 0)
  206. {
  207. /* there was not enough memory so the codelet cannot be executed right now ... */
  208. /* push the codelet back and try another one ... */
  209. return STARPU_TRYAGAIN;
  210. }
  211. _starpu_sched_pre_exec_hook(task);
  212. struct gordon_task_wrapper_s *task_wrapper = starpu_to_gordon_job(j);
  213. task_wrapper->worker = worker;
  214. gordon_pushjob(task_wrapper->gordon_job, gordon_callback_func, task_wrapper);
  215. return 0;
  216. }
  217. int inject_task_list(struct _starpu_job_list *list, struct _starpu_worker *worker)
  218. {
  219. /* first put back all tasks that can not be performed by Gordon */
  220. unsigned nvalids = 0;
  221. unsigned ninvalids = 0;
  222. struct _starpu_job *j;
  223. // TODO !
  224. //
  225. // for (j = _starpu_job_list_begin(list); j != _starpu_job_list_end(list); j = _starpu_job_list_next(j) )
  226. // {
  227. // if (!_STARPU_GORDON_MAY_PERFORM(j))
  228. // {
  229. // // XXX TODO
  230. // ninvalids++;
  231. // assert(0);
  232. // }
  233. // else
  234. // {
  235. // nvalids++;
  236. // }
  237. // }
  238. nvalids = _job_list_size(list);
  239. // _STARPU_DEBUG("nvalids %d \n", nvalids);
  240. struct gordon_task_wrapper_s *task_wrapper = malloc(sizeof(struct gordon_task_wrapper_s));
  241. gordon_job_t *gordon_jobs = gordon_alloc_jobs(nvalids, 0);
  242. task_wrapper->gordon_job = gordon_jobs;
  243. task_wrapper->list = list;
  244. task_wrapper->j = NULL;
  245. task_wrapper->terminated = 0;
  246. task_wrapper->worker = worker;
  247. unsigned index;
  248. for (j = _starpu_job_list_begin(list), index = 0; j != _starpu_job_list_end(list); j = _starpu_job_list_next(j), index++)
  249. {
  250. int ret;
  251. struct starpu_task *task = j->task;
  252. ret = _starpu_fetch_task_input(j);
  253. STARPU_ASSERT(!ret);
  254. _starpu_sched_pre_exec_hook(task);
  255. gordon_jobs[index].index = _starpu_task_get_gordon_nth_implementation(task->cl, j->nimpl);
  256. struct starpu_perfmodel *model = j->task->cl->model;
  257. if (model && model->benchmarking)
  258. gordon_jobs[index].flags.sampling = 1;
  259. /* we should not hardcore the memory node ... XXX */
  260. unsigned memory_node = 0;
  261. starpu_to_gordon_buffers(j, &gordon_jobs[index], memory_node);
  262. }
  263. gordon_pushjob(task_wrapper->gordon_job, gordon_callback_list_func, task_wrapper);
  264. return 0;
  265. }
  266. void *gordon_worker_inject(struct _starpu_worker_set *arg)
  267. {
  268. while(_starpu_machine_is_running())
  269. {
  270. if (gordon_busy_enough())
  271. {
  272. /* gordon already has enough work, wait a little TODO */
  273. _starpu_wait_on_sched_event();
  274. }
  275. else
  276. {
  277. #ifndef NOCHAIN
  278. int ret = 0;
  279. #ifdef STARPU_DEVEL
  280. #warning we should look into the local job list here !
  281. #endif
  282. struct _starpu_job_list *list = _starpu_pop_every_task();
  283. /* XXX 0 is hardcoded */
  284. if (list)
  285. {
  286. /* partition lists */
  287. unsigned size = _starpu_job_list_size(list);
  288. unsigned nchunks = (size<2*arg->nworkers)?size:(2*arg->nworkers);
  289. //unsigned nchunks = (size<arg->nworkers)?size:(arg->nworkers);
  290. /* last element may be a little smaller (by 1) */
  291. unsigned chunksize = size/nchunks;
  292. unsigned chunk;
  293. for (chunk = 0; chunk < nchunks; chunk++)
  294. {
  295. struct _starpu_job_list *chunk_list;
  296. if (chunk != (nchunks -1))
  297. {
  298. /* split the list in 2 parts :
  299. * list = chunk_list | tail */
  300. chunk_list = _starpu_job_list_new();
  301. /* find the end */
  302. chunk_list->_head = list->_head;
  303. struct _starpu_job *it_j = _starpu_job_list_begin(list);
  304. unsigned ind;
  305. for (ind = 0; ind < chunksize; ind++)
  306. {
  307. it_j = _starpu_job_list_next(it_j);
  308. }
  309. /* it_j should be the first element of the new list (tail) */
  310. chunk_list->_tail = it_j->_prev;
  311. chunk_list->_tail->_next = NULL;
  312. list->_head = it_j;
  313. it_j->_prev = NULL;
  314. }
  315. else
  316. {
  317. /* this is the last chunk */
  318. chunk_list = list;
  319. }
  320. ret = inject_task_list(chunk_list, &arg->workers[0]);
  321. }
  322. }
  323. else
  324. {
  325. _starpu_wait_on_sched_event();
  326. }
  327. #else
  328. /* gordon should accept a little more work */
  329. struct _starpu_job *j;
  330. j = _starpu_pop_task();
  331. // _STARPU_DEBUG("pop task %p\n", j);
  332. if (j)
  333. {
  334. if (_STARPU_GORDON_MAY_PERFORM(j))
  335. {
  336. /* inject that task */
  337. /* XXX we hardcore &arg->workers[0] for now */
  338. inject_task(j, &arg->workers[0]);
  339. }
  340. else
  341. {
  342. _starpu_push_task_to_workers(task);
  343. }
  344. }
  345. #endif
  346. }
  347. }
  348. return NULL;
  349. }
  350. void *_starpu_gordon_worker(void *arg)
  351. {
  352. struct _starpu_worker_set *gordon_set_arg = arg;
  353. _starpu_bind_thread_on_cpu(gordon_set_arg->config, gordon_set_arg->workers[0].bindid);
  354. /* TODO set_local_memory_node per SPU */
  355. gordon_init(gordon_set_arg->nworkers);
  356. /* NB: On SPUs, the worker_key is set to NULL since there is no point
  357. * in associating the PPU thread with a specific SPU (worker) while
  358. * it's handling multiple processing units. */
  359. _starpu_set_local_worker_key(NULL);
  360. /* TODO set workers' name field */
  361. unsigned spu;
  362. for (spu = 0; spu < gordon_set_arg->nworkers; spu++)
  363. {
  364. struct _starpu_worker *worker = &gordon_set_arg->workers[spu];
  365. snprintf(worker->name, sizeof(worker->name), "SPU %d", worker->id);
  366. snprintf(worker->short_name, sizeof(worker->short_name), "SPU %d", worker->id);
  367. }
  368. /*
  369. * To take advantage of PPE being hyperthreaded, we should have 2 threads
  370. * for the gordon driver : one injects works, the other makes sure that
  371. * gordon is progressing (and performs the callbacks).
  372. */
  373. /* launch the progression thread */
  374. STARPU_PTHREAD_MUTEX_INIT(&progress_mutex, NULL);
  375. STARPU_PTHREAD_COND_INIT(&progress_cond, NULL);
  376. STARPU_PTHREAD_CREATE(&progress_thread, NULL, gordon_worker_progress, gordon_set_arg);
  377. /* wait for the progression thread to be ready */
  378. STARPU_PTHREAD_MUTEX_LOCK(&progress_mutex);
  379. while (!progress_thread_is_inited)
  380. STARPU_PTHREAD_COND_WAIT(&progress_cond, &progress_mutex);
  381. STARPU_PTHREAD_MUTEX_UNLOCK(&progress_mutex);
  382. _STARPU_DEBUG("progress thread is running ... \n");
  383. /* tell the core that gordon is ready */
  384. STARPU_PTHREAD_MUTEX_LOCK(&gordon_set_arg->mutex);
  385. gordon_set_arg->set_is_initialized = 1;
  386. STARPU_PTHREAD_COND_SIGNAL(&gordon_set_arg->ready_cond);
  387. STARPU_PTHREAD_MUTEX_UNLOCK(&gordon_set_arg->mutex);
  388. gordon_worker_inject(gordon_set_arg);
  389. _starpu_handle_all_pending_node_data_requests(memnode);
  390. _STARPU_DEBUG("gordon deinit...\n");
  391. gordon_deinit();
  392. _STARPU_DEBUG("gordon was deinited\n");
  393. return NULL;
  394. }