driver_gordon.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. /*
  2. * StarPU
  3. * Copyright (C) Université Bordeaux 1, CNRS 2008-2011 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #ifndef _GNU_SOURCE
  17. #define _GNU_SOURCE
  18. #endif
  19. #include <sched.h>
  20. #include <pthread.h>
  21. #include <semaphore.h>
  22. #include <common/utils.h>
  23. #include "driver_gordon.h"
  24. #include "gordon_interface.h"
  25. #include <core/sched_policy.h>
  26. static unsigned progress_thread_is_inited = 0;
  27. pthread_t progress_thread;
  28. pthread_cond_t progress_cond;
  29. pthread_mutex_t progress_mutex;
  30. struct gordon_task_wrapper_s {
  31. /* who has executed that ? */
  32. struct starpu_worker_s *worker;
  33. struct starpu_job_list_s *list; /* StarPU */
  34. struct gordon_ppu_job_s *gordon_job; /* gordon*/
  35. struct starpu_job_s *j; /* if there is a single task */
  36. /* debug */
  37. unsigned terminated;
  38. };
  39. void *gordon_worker_progress(void *arg)
  40. {
  41. _STARPU_DEBUG("gordon_worker_progress\n");
  42. /* fix the thread on the correct cpu */
  43. struct starpu_worker_set_s *gordon_set_arg = arg;
  44. unsigned prog_thread_bind_id =
  45. (gordon_set_arg->workers[0].bindid + 1)%(gordon_set_arg->config->nhwcores);
  46. _starpu_bind_thread_on_cpu(gordon_set_arg->config, prog_thread_bind_id);
  47. PTHREAD_MUTEX_LOCK(&progress_mutex);
  48. progress_thread_is_inited = 1;
  49. PTHREAD_COND_SIGNAL(&progress_cond);
  50. PTHREAD_MUTEX_UNLOCK(&progress_mutex);
  51. while (1) {
  52. /* the Gordon runtime needs to make sure that we poll it
  53. * so that we handle jobs that are done */
  54. /* wait for one task termination */
  55. int ret = gordon_wait(0);
  56. if (ret)
  57. {
  58. /* possibly wake the thread that injects work */
  59. starpu_wake_all_blocked_workers();
  60. }
  61. }
  62. return NULL;
  63. }
  64. static void starpu_to_gordon_buffers(starpu_job_t j, struct gordon_ppu_job_s *gordon_job, uint32_t memory_node)
  65. {
  66. unsigned buffer;
  67. unsigned nin = 0, ninout = 0, nout = 0;
  68. unsigned in = 0, inout = 0, out = 0;
  69. struct starpu_task *task = j->task;
  70. struct starpu_codelet_t *cl = task->cl;
  71. /* if it is non null, the argument buffer is considered
  72. * as the first read-only buffer */
  73. if (task->cl_arg) {
  74. gordon_job->buffers[in] = (uint64_t)task->cl_arg;
  75. gordon_job->ss[in].size = (uint32_t)task->cl_arg_size;
  76. nin++; in++;
  77. }
  78. /* count the number of in/inout/out buffers */
  79. unsigned nbuffers = cl->nbuffers;
  80. for (buffer = 0; buffer < nbuffers; buffer++)
  81. {
  82. struct starpu_buffer_descr_t *descr;
  83. descr = &task->buffers[buffer];
  84. switch (descr->mode) {
  85. case STARPU_R:
  86. nin++;
  87. break;
  88. case STARPU_W:
  89. nout++;
  90. break;
  91. case STARPU_RW:
  92. default:
  93. ninout++;
  94. break;
  95. }
  96. }
  97. for (buffer = 0; buffer < nbuffers; buffer++)
  98. {
  99. unsigned gordon_buffer;
  100. struct starpu_buffer_descr_t *descr;
  101. descr = &task->buffers[buffer];
  102. switch (descr->mode) {
  103. case STARPU_R:
  104. gordon_buffer = in++;
  105. break;
  106. case STARPU_W:
  107. gordon_buffer = nin + ninout + out++;
  108. break;
  109. case STARPU_RW:
  110. default:
  111. gordon_buffer = nin + inout++;
  112. break;
  113. }
  114. starpu_data_handle handle = task->buffers[buffer].handle;
  115. gordon_job->nalloc = 0;
  116. gordon_job->nin = nin;
  117. gordon_job->ninout = ninout;
  118. gordon_job->nout = nout;
  119. STARPU_ASSERT(handle->ops->convert_to_gordon);
  120. handle->ops->convert_to_gordon(&handle->per_node[memory_node].interface,
  121. &gordon_job->buffers[gordon_buffer],
  122. &gordon_job->ss[gordon_buffer]);
  123. }
  124. }
  125. /* we assume the data are already available so that the data interface fields are
  126. * already filled */
  127. static struct gordon_task_wrapper_s *starpu_to_gordon_job(starpu_job_t j)
  128. {
  129. struct gordon_ppu_job_s *gordon_job = gordon_alloc_jobs(1, 0);
  130. struct gordon_task_wrapper_s *task_wrapper =
  131. malloc(sizeof(struct gordon_task_wrapper_s));
  132. task_wrapper->gordon_job = gordon_job;
  133. task_wrapper->j = j;
  134. task_wrapper->terminated = 0;
  135. gordon_job->index = j->task->cl->gordon_func;
  136. /* we should not hardcore the memory node ... XXX */
  137. unsigned memory_node = 0;
  138. starpu_to_gordon_buffers(j, gordon_job, memory_node);
  139. return task_wrapper;
  140. }
  141. static void handle_terminated_job(starpu_job_t j)
  142. {
  143. _starpu_push_task_output(j->task, 0);
  144. _starpu_handle_job_termination(j, 0);
  145. starpu_wake_all_blocked_workers();
  146. }
  147. static void gordon_callback_list_func(void *arg)
  148. {
  149. struct gordon_task_wrapper_s *task_wrapper = arg;
  150. struct starpu_job_list_s *wrapper_list;
  151. /* we don't know who will execute that codelet : so we actually defer the
  152. * execution of the StarPU codelet and the job termination later */
  153. struct starpu_worker_s *worker = task_wrapper->worker;
  154. STARPU_ASSERT(worker);
  155. wrapper_list = task_wrapper->list;
  156. task_wrapper->terminated = 1;
  157. // _STARPU_DEBUG("gordon callback : push job j %p\n", task_wrapper->j);
  158. unsigned task_cnt = 0;
  159. /* XXX 0 was hardcoded */
  160. while (!starpu_job_list_empty(wrapper_list))
  161. {
  162. starpu_job_t j = starpu_job_list_pop_back(wrapper_list);
  163. struct gordon_ppu_job_s * gordon_task = &task_wrapper->gordon_job[task_cnt];
  164. struct starpu_perfmodel_t *model = j->task->cl->model;
  165. if (model && model->benchmarking)
  166. {
  167. double measured = (double)gordon_task->measured;
  168. unsigned cpuid = 0; /* XXX */
  169. _starpu_update_perfmodel_history(j, j->task->cl->model, STARPU_GORDON_DEFAULT, cpuid, measured);
  170. }
  171. _starpu_push_task_output(j->task, 0);
  172. _starpu_handle_job_termination(j, 0);
  173. //starpu_wake_all_blocked_workers();
  174. task_cnt++;
  175. }
  176. /* the job list was allocated by the gordon driver itself */
  177. starpu_job_list_delete(wrapper_list);
  178. starpu_wake_all_blocked_workers();
  179. free(task_wrapper->gordon_job);
  180. free(task_wrapper);
  181. }
  182. static void gordon_callback_func(void *arg)
  183. {
  184. struct gordon_task_wrapper_s *task_wrapper = arg;
  185. /* we don't know who will execute that codelet : so we actually defer the
  186. * execution of the StarPU codelet and the job termination later */
  187. struct starpu_worker_s *worker = task_wrapper->worker;
  188. STARPU_ASSERT(worker);
  189. task_wrapper->terminated = 1;
  190. task_wrapper->j->task->cl->per_worker_stats[worker->workerid]++;
  191. handle_terminated_job(task_wrapper->j);
  192. starpu_wake_all_blocked_workers();
  193. free(task_wrapper);
  194. }
  195. int inject_task(starpu_job_t j, struct starpu_worker_s *worker)
  196. {
  197. struct starpu_task *task = j->task;
  198. int ret = _starpu_fetch_task_input(task, 0);
  199. if (ret != 0) {
  200. /* there was not enough memory so the codelet cannot be executed right now ... */
  201. /* push the codelet back and try another one ... */
  202. return STARPU_TRYAGAIN;
  203. }
  204. struct gordon_task_wrapper_s *task_wrapper = starpu_to_gordon_job(j);
  205. task_wrapper->worker = worker;
  206. gordon_pushjob(task_wrapper->gordon_job, gordon_callback_func, task_wrapper);
  207. return 0;
  208. }
  209. int inject_task_list(struct starpu_job_list_s *list, struct starpu_worker_s *worker)
  210. {
  211. /* first put back all tasks that can not be performed by Gordon */
  212. unsigned nvalids = 0;
  213. unsigned ninvalids = 0;
  214. starpu_job_t j;
  215. // TODO !
  216. //
  217. // for (j = starpu_job_list_begin(list); j != starpu_job_list_end(list); j = starpu_job_list_next(j) )
  218. // {
  219. // if (!STARPU_GORDON_MAY_PERFORM(j)) {
  220. // // XXX TODO
  221. // ninvalids++;
  222. // assert(0);
  223. // }
  224. // else {
  225. // nvalids++;
  226. // }
  227. // }
  228. nvalids = job_list_size(list);
  229. // _STARPU_DEBUG("nvalids %d \n", nvalids);
  230. struct gordon_task_wrapper_s *task_wrapper = malloc(sizeof(struct gordon_task_wrapper_s));
  231. gordon_job_t *gordon_jobs = gordon_alloc_jobs(nvalids, 0);
  232. task_wrapper->gordon_job = gordon_jobs;
  233. task_wrapper->list = list;
  234. task_wrapper->j = NULL;
  235. task_wrapper->terminated = 0;
  236. task_wrapper->worker = worker;
  237. unsigned index;
  238. for (j = starpu_job_list_begin(list), index = 0; j != starpu_job_list_end(list); j = starpu_job_list_next(j), index++)
  239. {
  240. int ret;
  241. struct starpu_task *task = j->task;
  242. ret = _starpu_fetch_task_input(task, 0);
  243. STARPU_ASSERT(!ret);
  244. gordon_jobs[index].index = task->cl->gordon_func;
  245. struct starpu_perfmodel_t *model = j->task->cl->model;
  246. if (model && model->benchmarking)
  247. gordon_jobs[index].flags.sampling = 1;
  248. /* we should not hardcore the memory node ... XXX */
  249. unsigned memory_node = 0;
  250. starpu_to_gordon_buffers(j, &gordon_jobs[index], memory_node);
  251. }
  252. gordon_pushjob(task_wrapper->gordon_job, gordon_callback_list_func, task_wrapper);
  253. return 0;
  254. }
  255. void *gordon_worker_inject(struct starpu_worker_set_s *arg)
  256. {
  257. while(_starpu_machine_is_running()) {
  258. if (gordon_busy_enough()) {
  259. /* gordon already has enough work, wait a little TODO */
  260. _starpu_wait_on_sched_event();
  261. }
  262. else {
  263. #ifndef NOCHAIN
  264. int ret = 0;
  265. #warning we should look into the local job list here !
  266. struct starpu_job_list_s *list = _starpu_pop_every_task();
  267. /* XXX 0 is hardcoded */
  268. if (list)
  269. {
  270. /* partition lists */
  271. unsigned size = job_list_size(list);
  272. unsigned nchunks = (size<2*arg->nworkers)?size:(2*arg->nworkers);
  273. //unsigned nchunks = (size<arg->nworkers)?size:(arg->nworkers);
  274. /* last element may be a little smaller (by 1) */
  275. unsigned chunksize = size/nchunks;
  276. unsigned chunk;
  277. for (chunk = 0; chunk < nchunks; chunk++)
  278. {
  279. struct starpu_job_list_s *chunk_list;
  280. if (chunk != (nchunks -1))
  281. {
  282. /* split the list in 2 parts : list = chunk_list | tail */
  283. chunk_list = starpu_job_list_new();
  284. /* find the end */
  285. chunk_list->_head = list->_head;
  286. starpu_job_itor_t it_j = starpu_job_list_begin(list);
  287. unsigned ind;
  288. for (ind = 0; ind < chunksize; ind++)
  289. {
  290. it_j = starpu_job_list_next(it_j);
  291. }
  292. /* it_j should be the first element of the new list (tail) */
  293. chunk_list->_tail = it_j->_prev;
  294. chunk_list->_tail->_next = NULL;
  295. list->_head = it_j;
  296. it_j->_prev = NULL;
  297. }
  298. else {
  299. /* this is the last chunk */
  300. chunk_list = list;
  301. }
  302. ret = inject_task_list(chunk_list, &arg->workers[0]);
  303. }
  304. }
  305. else {
  306. _starpu_wait_on_sched_event();
  307. }
  308. #else
  309. /* gordon should accept a little more work */
  310. starpu_job_t j;
  311. j = _starpu_pop_task();
  312. // _STARPU_DEBUG("pop task %p\n", j);
  313. if (j) {
  314. if (STARPU_GORDON_MAY_PERFORM(j)) {
  315. /* inject that task */
  316. /* XXX we hardcore &arg->workers[0] for now */
  317. inject_task(j, &arg->workers[0]);
  318. }
  319. else {
  320. _starpu_push_task(j, 0);
  321. }
  322. }
  323. #endif
  324. }
  325. }
  326. return NULL;
  327. }
  328. void *_starpu_gordon_worker(void *arg)
  329. {
  330. struct starpu_worker_set_s *gordon_set_arg = arg;
  331. _starpu_bind_thread_on_cpu(gordon_set_arg->config, gordon_set_arg->workers[0].bindid);
  332. /* TODO set_local_memory_node per SPU */
  333. gordon_init(gordon_set_arg->nworkers);
  334. /* NB: On SPUs, the worker_key is set to NULL since there is no point
  335. * in associating the PPU thread with a specific SPU (worker) while
  336. * it's handling multiple processing units. */
  337. _starpu_set_local_worker_key(NULL);
  338. /* TODO set workers' name field */
  339. unsigned spu;
  340. for (spu = 0; spu < gordon_set_arg->nworkers; spu++)
  341. {
  342. struct starpu_worker_s *worker = &gordon_set_arg->workers[spu];
  343. snprintf(worker->name, 32, "SPU %d", worker->id);
  344. }
  345. /*
  346. * To take advantage of PPE being hyperthreaded, we should have 2 threads
  347. * for the gordon driver : one injects works, the other makes sure that
  348. * gordon is progressing (and performs the callbacks).
  349. */
  350. /* launch the progression thread */
  351. PTHREAD_MUTEX_INIT(&progress_mutex, NULL);
  352. PTHREAD_COND_INIT(&progress_cond, NULL);
  353. pthread_create(&progress_thread, NULL, gordon_worker_progress, gordon_set_arg);
  354. /* wait for the progression thread to be ready */
  355. PTHREAD_MUTEX_LOCK(&progress_mutex);
  356. while (!progress_thread_is_inited)
  357. PTHREAD_COND_WAIT(&progress_cond, &progress_mutex);
  358. PTHREAD_MUTEX_UNLOCK(&progress_mutex);
  359. _STARPU_DEBUG("progress thread is running ... \n");
  360. /* tell the core that gordon is ready */
  361. PTHREAD_MUTEX_LOCK(&gordon_set_arg->mutex);
  362. gordon_set_arg->set_is_initialized = 1;
  363. PTHREAD_COND_SIGNAL(&gordon_set_arg->ready_cond);
  364. PTHREAD_MUTEX_UNLOCK(&gordon_set_arg->mutex);
  365. gordon_worker_inject(gordon_set_arg);
  366. _STARPU_DEBUG("gordon deinit...\n");
  367. gordon_deinit();
  368. _STARPU_DEBUG("gordon was deinited\n");
  369. pthread_exit((void *)0x42);
  370. }