workers.c 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. /*
  2. * StarPU
  3. * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <core/workers.h>
  17. /* XXX quick and dirty implementation for now ... */
  18. pthread_key_t local_workers_key;
  19. static struct machine_config_s config;
  20. /* in case a task is submitted, we may check whether there exists a worker
  21. that may execute the task or not */
  22. static uint32_t worker_mask = 0;
  23. inline uint32_t worker_exists(uint32_t task_mask)
  24. {
  25. return (task_mask & worker_mask);
  26. }
  27. inline uint32_t may_submit_cuda_task(void)
  28. {
  29. return ((CUDA|CUBLAS) & worker_mask);
  30. }
  31. inline uint32_t may_submit_core_task(void)
  32. {
  33. return (CORE & worker_mask);
  34. }
  35. #ifdef USE_CPUS
  36. static unsigned ncores;
  37. #endif
  38. #ifdef USE_CUDA
  39. static unsigned ncudagpus;
  40. #endif
  41. #ifdef USE_GORDON
  42. static unsigned ngordon_spus;
  43. #endif
  44. /*
  45. * Runtime initialization methods
  46. */
  47. #ifdef USE_CUDA
  48. extern unsigned get_cuda_device_count(void);
  49. #endif
  50. static void init_machine_config(struct machine_config_s *config)
  51. {
  52. int envval __attribute__((unused));
  53. unsigned use_accelerator = 0;
  54. config->nworkers = 0;
  55. #ifdef USE_CUDA
  56. /* we need to initialize CUDA early to count the number of devices */
  57. init_cuda();
  58. envval = starpu_get_env_number("NCUDA");
  59. if (envval < 0) {
  60. ncudagpus = STARPU_MIN(get_cuda_device_count(), MAXCUDADEVS);
  61. } else {
  62. /* use the specified value */
  63. ncudagpus = (unsigned)envval;
  64. STARPU_ASSERT(ncudagpus <= MAXCUDADEVS);
  65. }
  66. STARPU_ASSERT(ncudagpus + config->nworkers <= NMAXWORKERS);
  67. if (ncudagpus > 0)
  68. use_accelerator = 1;
  69. unsigned cudagpu;
  70. for (cudagpu = 0; cudagpu < ncudagpus; cudagpu++)
  71. {
  72. config->workers[config->nworkers + cudagpu].arch = CUDA_WORKER;
  73. config->workers[config->nworkers + cudagpu].perf_arch = STARPU_CUDA_DEFAULT;
  74. config->workers[config->nworkers + cudagpu].id = cudagpu;
  75. worker_mask |= (CUDA|CUBLAS);
  76. }
  77. config->nworkers += ncudagpus;
  78. #endif
  79. #ifdef USE_GORDON
  80. envval = starpu_get_env_number("NGORDON");
  81. if (envval < 0) {
  82. ngordon_spus = spe_cpu_info_get(SPE_COUNT_USABLE_SPES, -1);
  83. } else {
  84. /* use the specified value */
  85. ngordon_spus = (unsigned)envval;
  86. STARPU_ASSERT(ngordon_spus <= NMAXGORDONSPUS);
  87. }
  88. STARPU_ASSERT(ngordon_spus + config->nworkers <= NMAXWORKERS);
  89. if (ngordon_spus > 0)
  90. use_accelerator = 1;
  91. unsigned spu;
  92. for (spu = 0; spu < ngordon_spus; spu++)
  93. {
  94. config->workers[config->nworkers + spu].arch = GORDON_WORKER;
  95. config->workers[config->nworkers + spu].perf_arch = STARPU_GORDON_DEFAULT;
  96. config->workers[config->nworkers + spu].id = spu;
  97. config->workers[config->nworkers + spu].worker_is_running = 0;
  98. worker_mask |= GORDON;
  99. }
  100. config->nworkers += ngordon_spus;
  101. #endif
  102. /* we put the CPU section after the accelerator : in case there was an
  103. * accelerator found, we devote one core */
  104. #ifdef USE_CPUS
  105. envval = starpu_get_env_number("NCPUS");
  106. if (envval < 0) {
  107. long avail_cores = sysconf(_SC_NPROCESSORS_ONLN)
  108. - (use_accelerator?1:0);
  109. ncores = STARPU_MIN(avail_cores, NMAXCORES);
  110. } else {
  111. /* use the specified value */
  112. ncores = (unsigned)envval;
  113. STARPU_ASSERT(ncores <= NMAXCORES);
  114. }
  115. STARPU_ASSERT(ncores + config->nworkers <= NMAXWORKERS);
  116. unsigned core;
  117. for (core = 0; core < ncores; core++)
  118. {
  119. config->workers[config->nworkers + core].arch = CORE_WORKER;
  120. config->workers[config->nworkers + core].perf_arch = STARPU_CORE_DEFAULT;
  121. config->workers[config->nworkers + core].id = core;
  122. worker_mask |= CORE;
  123. }
  124. config->nworkers += ncores;
  125. #endif
  126. if (config->nworkers == 0)
  127. {
  128. fprintf(stderr, "No worker found, aborting ...\n");
  129. exit(-1);
  130. }
  131. }
  132. static void init_workers_binding(struct machine_config_s *config)
  133. {
  134. /* launch one thread per CPU */
  135. unsigned ram_memory_node;
  136. int current_bindid = 0;
  137. /* a single core is dedicated for the accelerators */
  138. int accelerator_bindid = -1;
  139. /* note that even if the CPU core are not used, we always have a RAM node */
  140. /* TODO : support NUMA ;) */
  141. ram_memory_node = register_memory_node(RAM);
  142. unsigned worker;
  143. for (worker = 0; worker < config->nworkers; worker++)
  144. {
  145. unsigned memory_node = -1;
  146. unsigned is_an_accelerator = 0;
  147. struct worker_s *workerarg = &config->workers[worker];
  148. /* select the memory node that contains worker's memory */
  149. switch (workerarg->arch) {
  150. case CORE_WORKER:
  151. /* "dedicate" a cpu core to that worker */
  152. is_an_accelerator = 0;
  153. memory_node = ram_memory_node;
  154. break;
  155. #ifdef USE_GORDON
  156. case GORDON_WORKER:
  157. is_an_accelerator = 1;
  158. memory_node = ram_memory_node;
  159. break;
  160. #endif
  161. #ifdef USE_CUDA
  162. case CUDA_WORKER:
  163. is_an_accelerator = 1;
  164. memory_node = register_memory_node(CUDA_RAM);
  165. break;
  166. #endif
  167. default:
  168. STARPU_ASSERT(0);
  169. }
  170. if (is_an_accelerator) {
  171. if (accelerator_bindid == -1)
  172. accelerator_bindid = (current_bindid++) % (sysconf(_SC_NPROCESSORS_ONLN));
  173. workerarg->bindid = accelerator_bindid;
  174. }
  175. else {
  176. workerarg->bindid = (current_bindid++) % (sysconf(_SC_NPROCESSORS_ONLN));
  177. }
  178. workerarg->memory_node = memory_node;
  179. }
  180. }
  181. #ifdef USE_GORDON
  182. unsigned gordon_inited = 0;
  183. struct worker_set_s gordon_worker_set;
  184. #endif
  185. static void init_workers(struct machine_config_s *config)
  186. {
  187. config->running = 1;
  188. pthread_key_create(&local_workers_key, NULL);
  189. unsigned worker;
  190. for (worker = 0; worker < config->nworkers; worker++)
  191. {
  192. struct worker_s *workerarg = &config->workers[worker];
  193. pthread_mutex_init(&workerarg->mutex, NULL);
  194. pthread_cond_init(&workerarg->ready_cond, NULL);
  195. /* if some codelet's termination cannot be handled directly :
  196. * for instance in the Gordon driver, Gordon tasks' callbacks
  197. * may be executed by another thread than that of the Gordon
  198. * driver so that we cannot call the push_codelet_output method
  199. * directly */
  200. workerarg->terminated_jobs = job_list_new();
  201. switch (workerarg->arch) {
  202. #ifdef USE_CPUS
  203. case CORE_WORKER:
  204. workerarg->set = NULL;
  205. pthread_create(&workerarg->worker_thread,
  206. NULL, core_worker, workerarg);
  207. pthread_mutex_lock(&workerarg->mutex);
  208. pthread_cond_wait(&workerarg->ready_cond, &workerarg->mutex);
  209. pthread_mutex_unlock(&workerarg->mutex);
  210. break;
  211. #endif
  212. #ifdef USE_CUDA
  213. case CUDA_WORKER:
  214. workerarg->set = NULL;
  215. pthread_create(&workerarg->worker_thread,
  216. NULL, cuda_worker, workerarg);
  217. pthread_mutex_lock(&workerarg->mutex);
  218. pthread_cond_wait(&workerarg->ready_cond, &workerarg->mutex);
  219. pthread_mutex_unlock(&workerarg->mutex);
  220. break;
  221. #endif
  222. #ifdef USE_GORDON
  223. case GORDON_WORKER:
  224. /* we will only launch gordon once, but it will handle
  225. * the different SPU workers */
  226. if (!gordon_inited)
  227. {
  228. gordon_worker_set.nworkers = ngordon_spus;
  229. gordon_worker_set.workers = &config->workers[worker];
  230. pthread_create(&gordon_worker_set.worker_thread, NULL,
  231. gordon_worker, &gordon_worker_set);
  232. pthread_mutex_lock(&gordon_worker_set.mutex);
  233. pthread_cond_wait(&gordon_worker_set.ready_cond, &gordon_worker_set.mutex);
  234. pthread_mutex_unlock(&gordon_worker_set.mutex);
  235. gordon_inited = 1;
  236. }
  237. workerarg->set = &gordon_worker_set;
  238. gordon_worker_set.joined = 0;
  239. workerarg->worker_is_running = 1;
  240. break;
  241. #endif
  242. default:
  243. STARPU_ASSERT(0);
  244. }
  245. }
  246. }
  247. void starpu_init(void)
  248. {
  249. srand(2008);
  250. #ifdef USE_FXT
  251. start_fxt_profiling();
  252. #endif
  253. timing_init();
  254. init_machine_config(&config);
  255. /* for the data wizard */
  256. init_memory_nodes();
  257. init_workers_binding(&config);
  258. /* initialize the scheduler */
  259. /* initialize the queue containing the jobs */
  260. init_sched_policy(&config);
  261. init_workers(&config);
  262. }
  263. /*
  264. * Handle runtime termination
  265. */
  266. void terminate_workers(struct machine_config_s *config)
  267. {
  268. int status;
  269. unsigned workerid;
  270. for (workerid = 0; workerid < config->nworkers; workerid++)
  271. {
  272. wake_all_blocked_workers();
  273. #ifdef VERBOSE
  274. fprintf(stderr, "wait for worker %d\n", workerid);
  275. #endif
  276. struct worker_set_s *set = config->workers[workerid].set;
  277. /* in case StarPU termination code is called from a callback,
  278. * we have to check if pthread_self() is the worker itself */
  279. if (set){
  280. if (!set->joined) {
  281. if (pthread_self() != set->worker_thread)
  282. {
  283. status = pthread_join(set->worker_thread, NULL);
  284. #ifdef VERBOSE
  285. if (status)
  286. fprintf(stderr, "pthread_join -> %d\n", status);
  287. #endif
  288. }
  289. set->joined = 1;
  290. }
  291. }
  292. else {
  293. struct worker_s *worker = &config->workers[workerid];
  294. if (pthread_self() != worker->worker_thread)
  295. {
  296. status = pthread_join(worker->worker_thread, NULL);
  297. #ifdef VERBOSE
  298. if (status)
  299. fprintf(stderr, "pthread_join -> %d\n", status);
  300. #endif
  301. }
  302. }
  303. }
  304. }
  305. unsigned machine_is_running(void)
  306. {
  307. return config.running;
  308. }
  309. void kill_all_workers(struct machine_config_s *config)
  310. {
  311. /* set the flag which will tell workers to stop */
  312. config->running = 0;
  313. /* in case some workers are waiting on some event
  314. wake them up ... */
  315. wake_all_blocked_workers();
  316. }
  317. void starpu_shutdown(void)
  318. {
  319. display_msi_stats();
  320. display_alloc_cache_stats();
  321. /* tell all workers to shutdown */
  322. kill_all_workers(&config);
  323. #ifdef DATA_STATS
  324. display_comm_ammounts();
  325. #endif
  326. if (starpu_get_env_number("CALIBRATE") != -1)
  327. dump_registered_models();
  328. /* wait for their termination */
  329. terminate_workers(&config);
  330. }