workers.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503
  1. /*
  2. * StarPU
  3. * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <stdlib.h>
  17. #include <stdio.h>
  18. #include <common/config.h>
  19. #include <common/utils.h>
  20. #include <core/workers.h>
  21. #include <core/debug.h>
  22. #include <core/task.h>
  23. #include <profiling/profiling.h>
  24. #ifdef __MINGW32__
  25. #include <windows.h>
  26. #endif
  27. /* acquire/release semantic for concurrent initialization/de-initialization */
  28. static pthread_mutex_t init_mutex = PTHREAD_MUTEX_INITIALIZER;
  29. static pthread_cond_t init_cond = PTHREAD_COND_INITIALIZER;
  30. static int init_count;
  31. static enum { UNINITIALIZED, CHANGING, INITIALIZED } initialized = UNINITIALIZED;
  32. static pthread_key_t worker_key;
  33. static struct starpu_machine_config_s config;
  34. struct starpu_machine_config_s *_starpu_get_machine_config(void)
  35. {
  36. return &config;
  37. }
  38. /* in case a task is submitted, we may check whether there exists a worker
  39. that may execute the task or not */
  40. inline uint32_t _starpu_worker_exists(uint32_t task_mask)
  41. {
  42. return (task_mask & config.worker_mask);
  43. }
  44. inline uint32_t _starpu_may_submit_cuda_task(void)
  45. {
  46. return (STARPU_CUDA & config.worker_mask);
  47. }
  48. inline uint32_t _starpu_may_submit_cpu_task(void)
  49. {
  50. return (STARPU_CPU & config.worker_mask);
  51. }
  52. inline uint32_t _starpu_may_submit_opencl_task(void)
  53. {
  54. return (STARPU_OPENCL & config.worker_mask);
  55. }
  56. inline uint32_t _starpu_worker_may_execute_task(unsigned workerid, uint32_t where)
  57. {
  58. return (where & config.workers[workerid].worker_mask);
  59. }
  60. /*
  61. * Runtime initialization methods
  62. */
  63. #ifdef STARPU_USE_GORDON
  64. static unsigned gordon_inited = 0;
  65. static struct starpu_worker_set_s gordon_worker_set;
  66. #endif
  67. static void _starpu_init_worker_queue(struct starpu_worker_s *workerarg)
  68. {
  69. pthread_cond_t *cond = workerarg->sched_cond;
  70. pthread_mutex_t *mutex = workerarg->sched_mutex;
  71. unsigned memory_node = workerarg->memory_node;
  72. _starpu_memory_node_register_condition(cond, mutex, memory_node);
  73. }
  74. static void _starpu_init_workers(struct starpu_machine_config_s *config)
  75. {
  76. config->running = 1;
  77. pthread_key_create(&worker_key, NULL);
  78. unsigned nworkers = config->topology.nworkers;
  79. /* Launch workers asynchronously (except for SPUs) */
  80. unsigned worker;
  81. for (worker = 0; worker < nworkers; worker++)
  82. {
  83. struct starpu_worker_s *workerarg = &config->workers[worker];
  84. workerarg->config = config;
  85. PTHREAD_MUTEX_INIT(&workerarg->mutex, NULL);
  86. PTHREAD_COND_INIT(&workerarg->ready_cond, NULL);
  87. workerarg->workerid = (int)worker;
  88. /* if some codelet's termination cannot be handled directly :
  89. * for instance in the Gordon driver, Gordon tasks' callbacks
  90. * may be executed by another thread than that of the Gordon
  91. * driver so that we cannot call the push_codelet_output method
  92. * directly */
  93. workerarg->terminated_jobs = starpu_job_list_new();
  94. workerarg->local_jobs = starpu_job_list_new();
  95. PTHREAD_MUTEX_INIT(&workerarg->local_jobs_mutex, NULL);
  96. workerarg->status = STATUS_INITIALIZING;
  97. _starpu_init_worker_queue(workerarg);
  98. switch (workerarg->arch) {
  99. #ifdef STARPU_USE_CPU
  100. case STARPU_CPU_WORKER:
  101. workerarg->set = NULL;
  102. workerarg->worker_is_initialized = 0;
  103. pthread_create(&workerarg->worker_thread,
  104. NULL, _starpu_cpu_worker, workerarg);
  105. break;
  106. #endif
  107. #ifdef STARPU_USE_CUDA
  108. case STARPU_CUDA_WORKER:
  109. workerarg->set = NULL;
  110. workerarg->worker_is_initialized = 0;
  111. pthread_create(&workerarg->worker_thread,
  112. NULL, _starpu_cuda_worker, workerarg);
  113. break;
  114. #endif
  115. #ifdef STARPU_USE_OPENCL
  116. case STARPU_OPENCL_WORKER:
  117. workerarg->set = NULL;
  118. workerarg->worker_is_initialized = 0;
  119. pthread_create(&workerarg->worker_thread,
  120. NULL, _starpu_opencl_worker, workerarg);
  121. break;
  122. #endif
  123. #ifdef STARPU_USE_GORDON
  124. case STARPU_GORDON_WORKER:
  125. /* we will only launch gordon once, but it will handle
  126. * the different SPU workers */
  127. if (!gordon_inited)
  128. {
  129. gordon_worker_set.nworkers = config->ngordon_spus;
  130. gordon_worker_set.workers = &config->workers[worker];
  131. gordon_worker_set.set_is_initialized = 0;
  132. pthread_create(&gordon_worker_set.worker_thread, NULL,
  133. _starpu_gordon_worker, &gordon_worker_set);
  134. PTHREAD_MUTEX_LOCK(&gordon_worker_set.mutex);
  135. while (!gordon_worker_set.set_is_initialized)
  136. PTHREAD_COND_WAIT(&gordon_worker_set.ready_cond,
  137. &gordon_worker_set.mutex);
  138. PTHREAD_MUTEX_UNLOCK(&gordon_worker_set.mutex);
  139. gordon_inited = 1;
  140. }
  141. workerarg->set = &gordon_worker_set;
  142. gordon_worker_set.joined = 0;
  143. workerarg->worker_is_running = 1;
  144. break;
  145. #endif
  146. default:
  147. STARPU_ABORT();
  148. }
  149. }
  150. for (worker = 0; worker < nworkers; worker++)
  151. {
  152. struct starpu_worker_s *workerarg = &config->workers[worker];
  153. switch (workerarg->arch) {
  154. case STARPU_CPU_WORKER:
  155. case STARPU_CUDA_WORKER:
  156. case STARPU_OPENCL_WORKER:
  157. PTHREAD_MUTEX_LOCK(&workerarg->mutex);
  158. while (!workerarg->worker_is_initialized)
  159. PTHREAD_COND_WAIT(&workerarg->ready_cond, &workerarg->mutex);
  160. PTHREAD_MUTEX_UNLOCK(&workerarg->mutex);
  161. break;
  162. #ifdef STARPU_USE_GORDON
  163. case STARPU_GORDON_WORKER:
  164. /* the initialization of Gordon worker is
  165. * synchronous for now */
  166. break;
  167. #endif
  168. default:
  169. STARPU_ABORT();
  170. }
  171. }
  172. }
  173. void _starpu_set_local_worker_key(struct starpu_worker_s *worker)
  174. {
  175. pthread_setspecific(worker_key, worker);
  176. }
  177. struct starpu_worker_s *_starpu_get_local_worker_key(void)
  178. {
  179. return pthread_getspecific(worker_key);
  180. }
  181. int starpu_init(struct starpu_conf *user_conf)
  182. {
  183. int ret;
  184. PTHREAD_MUTEX_LOCK(&init_mutex);
  185. while (initialized == CHANGING)
  186. /* Wait for the other one changing it */
  187. PTHREAD_COND_WAIT(&init_cond, &init_mutex);
  188. init_count++;
  189. if (initialized == INITIALIZED)
  190. /* He initialized it, don't do it again */
  191. return 0;
  192. /* initialized == UNINITIALIZED */
  193. initialized = CHANGING;
  194. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  195. #ifdef __MINGW32__
  196. WSADATA wsadata;
  197. WSAStartup(MAKEWORD(1,0), &wsadata);
  198. #endif
  199. srand(2008);
  200. #ifdef STARPU_USE_FXT
  201. _starpu_start_fxt_profiling();
  202. #endif
  203. _starpu_open_debug_logfile();
  204. _starpu_timing_init();
  205. _starpu_load_bus_performance_files();
  206. /* store the pointer to the user explicit configuration during the
  207. * initialization */
  208. config.user_conf = user_conf;
  209. ret = _starpu_build_topology(&config);
  210. if (ret) {
  211. PTHREAD_MUTEX_LOCK(&init_mutex);
  212. init_count--;
  213. initialized = UNINITIALIZED;
  214. /* Let somebody else try to do it */
  215. PTHREAD_COND_SIGNAL(&init_cond);
  216. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  217. return ret;
  218. }
  219. /* We need to store the current task handled by the different
  220. * threads */
  221. _starpu_initialize_current_task_key();
  222. /* initialize the scheduler */
  223. /* initialize the queue containing the jobs */
  224. _starpu_init_sched_policy(&config);
  225. _starpu_initialize_registered_performance_models();
  226. _starpu_init_workers(&config);
  227. PTHREAD_MUTEX_LOCK(&init_mutex);
  228. initialized = INITIALIZED;
  229. /* Tell everybody that we initialized */
  230. PTHREAD_COND_BROADCAST(&init_cond);
  231. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  232. return 0;
  233. }
  234. /*
  235. * Handle runtime termination
  236. */
  237. static void _starpu_terminate_workers(struct starpu_machine_config_s *config)
  238. {
  239. int status __attribute__((unused));
  240. unsigned workerid;
  241. for (workerid = 0; workerid < config->topology.nworkers; workerid++)
  242. {
  243. starpu_wake_all_blocked_workers();
  244. #ifdef STARPU_VERBOSE
  245. fprintf(stderr, "wait for worker %d\n", workerid);
  246. #endif
  247. struct starpu_worker_set_s *set = config->workers[workerid].set;
  248. struct starpu_worker_s *worker = &config->workers[workerid];
  249. /* in case StarPU termination code is called from a callback,
  250. * we have to check if pthread_self() is the worker itself */
  251. if (set){
  252. if (!set->joined) {
  253. if (!pthread_equal(pthread_self(), set->worker_thread))
  254. {
  255. status = pthread_join(set->worker_thread, NULL);
  256. #ifdef STARPU_VERBOSE
  257. if (status)
  258. fprintf(stderr, "pthread_join -> %d\n", status);
  259. #endif
  260. }
  261. set->joined = 1;
  262. }
  263. }
  264. else {
  265. if (!pthread_equal(pthread_self(), worker->worker_thread))
  266. {
  267. status = pthread_join(worker->worker_thread, NULL);
  268. #ifdef STARPU_VERBOSE
  269. if (status)
  270. fprintf(stderr, "pthread_join -> %d\n", status);
  271. #endif
  272. }
  273. }
  274. starpu_job_list_delete(worker->local_jobs);
  275. starpu_job_list_delete(worker->terminated_jobs);
  276. }
  277. }
  278. unsigned _starpu_machine_is_running(void)
  279. {
  280. return config.running;
  281. }
  282. unsigned _starpu_worker_can_block(unsigned memnode)
  283. {
  284. unsigned can_block = 1;
  285. if (!_starpu_check_that_no_data_request_exists(memnode))
  286. can_block = 0;
  287. if (!_starpu_machine_is_running())
  288. can_block = 0;
  289. if (!_starpu_execute_registered_progression_hooks())
  290. can_block = 0;
  291. return can_block;
  292. }
  293. static void _starpu_kill_all_workers(struct starpu_machine_config_s *config)
  294. {
  295. /* set the flag which will tell workers to stop */
  296. config->running = 0;
  297. starpu_wake_all_blocked_workers();
  298. }
  299. void starpu_shutdown(void)
  300. {
  301. PTHREAD_MUTEX_LOCK(&init_mutex);
  302. init_count--;
  303. if (init_count)
  304. /* Still somebody needing StarPU, don't deinitialize */
  305. return;
  306. /* We're last */
  307. initialized = CHANGING;
  308. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  309. _starpu_display_msi_stats();
  310. _starpu_display_alloc_cache_stats();
  311. /* tell all workers to shutdown */
  312. _starpu_kill_all_workers(&config);
  313. #ifdef STARPU_DATA_STATS
  314. _starpu_display_comm_amounts();
  315. #endif
  316. _starpu_deinitialize_registered_performance_models();
  317. /* wait for their termination */
  318. _starpu_terminate_workers(&config);
  319. _starpu_deinit_sched_policy(&config);
  320. _starpu_destroy_topology(&config);
  321. #ifdef STARPU_USE_FXT
  322. _starpu_stop_fxt_profiling();
  323. #endif
  324. _starpu_close_debug_logfile();
  325. PTHREAD_MUTEX_LOCK(&init_mutex);
  326. initialized = UNINITIALIZED;
  327. /* Let someone else that wants to initialize it again do it */
  328. pthread_cond_signal(&init_cond);
  329. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  330. }
  331. unsigned starpu_worker_get_count(void)
  332. {
  333. return config.topology.nworkers;
  334. }
  335. unsigned starpu_cpu_worker_get_count(void)
  336. {
  337. return config.topology.ncpus;
  338. }
  339. unsigned starpu_cuda_worker_get_count(void)
  340. {
  341. return config.topology.ncudagpus;
  342. }
  343. unsigned starpu_opencl_worker_get_count(void)
  344. {
  345. return config.topology.nopenclgpus;
  346. }
  347. unsigned starpu_spu_worker_get_count(void)
  348. {
  349. return config.topology.ngordon_spus;
  350. }
  351. /* When analyzing performance, it is useful to see what is the processing unit
  352. * that actually performed the task. This function returns the id of the
  353. * processing unit actually executing it, therefore it makes no sense to use it
  354. * within the callbacks of SPU functions for instance. If called by some thread
  355. * that is not controlled by StarPU, starpu_worker_get_id returns -1. */
  356. int starpu_worker_get_id(void)
  357. {
  358. struct starpu_worker_s * worker;
  359. worker = _starpu_get_local_worker_key();
  360. if (worker)
  361. {
  362. return worker->workerid;
  363. }
  364. else {
  365. /* there is no worker associated to that thread, perhaps it is
  366. * a thread from the application or this is some SPU worker */
  367. return -1;
  368. }
  369. }
  370. int starpu_worker_get_devid(int id)
  371. {
  372. return config.workers[id].devid;
  373. }
  374. struct starpu_worker_s *_starpu_get_worker_struct(unsigned id)
  375. {
  376. return &config.workers[id];
  377. }
  378. enum starpu_archtype starpu_worker_get_type(int id)
  379. {
  380. return config.workers[id].arch;
  381. }
  382. void starpu_worker_get_name(int id, char *dst, size_t maxlen)
  383. {
  384. char *name = config.workers[id].name;
  385. snprintf(dst, maxlen, "%s", name);
  386. }
  387. /* Retrieve the status which indicates what the worker is currently doing. */
  388. starpu_worker_status _starpu_worker_get_status(int workerid)
  389. {
  390. return config.workers[workerid].status;
  391. }
  392. /* Change the status of the worker which indicates what the worker is currently
  393. * doing (eg. executing a callback). */
  394. void _starpu_worker_set_status(int workerid, starpu_worker_status status)
  395. {
  396. config.workers[workerid].status = status;
  397. }
  398. void starpu_worker_set_sched_condition(int workerid, pthread_cond_t *sched_cond, pthread_mutex_t *sched_mutex)
  399. {
  400. config.workers[workerid].sched_cond = sched_cond;
  401. config.workers[workerid].sched_mutex = sched_mutex;
  402. }