workers.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510
  1. /*
  2. * StarPU
  3. * Copyright (C) Université Bordeaux 1, CNRS 2008-2010 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <stdlib.h>
  17. #include <stdio.h>
  18. #include <common/config.h>
  19. #include <common/utils.h>
  20. #include <core/workers.h>
  21. #include <core/debug.h>
  22. #include <core/task.h>
  23. #include <profiling/profiling.h>
  24. #ifdef __MINGW32__
  25. #include <windows.h>
  26. #endif
  27. /* acquire/release semantic for concurrent initialization/de-initialization */
  28. static pthread_mutex_t init_mutex = PTHREAD_MUTEX_INITIALIZER;
  29. static pthread_cond_t init_cond = PTHREAD_COND_INITIALIZER;
  30. static int init_count;
  31. static enum { UNINITIALIZED, CHANGING, INITIALIZED } initialized = UNINITIALIZED;
  32. static pthread_key_t worker_key;
  33. static struct starpu_machine_config_s config;
  34. struct starpu_machine_config_s *_starpu_get_machine_config(void)
  35. {
  36. return &config;
  37. }
  38. /* in case a task is submitted, we may check whether there exists a worker
  39. that may execute the task or not */
  40. inline uint32_t _starpu_worker_exists(uint32_t task_mask)
  41. {
  42. return (task_mask & config.worker_mask);
  43. }
  44. inline uint32_t _starpu_may_submit_cuda_task(void)
  45. {
  46. return (STARPU_CUDA & config.worker_mask);
  47. }
  48. inline uint32_t _starpu_may_submit_cpu_task(void)
  49. {
  50. return (STARPU_CPU & config.worker_mask);
  51. }
  52. inline uint32_t _starpu_may_submit_opencl_task(void)
  53. {
  54. return (STARPU_OPENCL & config.worker_mask);
  55. }
  56. int _starpu_worker_may_execute_task(unsigned workerid, struct starpu_task *task)
  57. {
  58. /* TODO: check that the task operand sizes will fit on that device */
  59. /* TODO: call application-provided function for various cases like
  60. * double support, shared memory size limit, etc. */
  61. return !!(task->cl->where & config.workers[workerid].worker_mask);
  62. }
  63. /*
  64. * Runtime initialization methods
  65. */
  66. #ifdef STARPU_USE_GORDON
  67. static unsigned gordon_inited = 0;
  68. static struct starpu_worker_set_s gordon_worker_set;
  69. #endif
  70. static void _starpu_init_worker_queue(struct starpu_worker_s *workerarg)
  71. {
  72. pthread_cond_t *cond = workerarg->sched_cond;
  73. pthread_mutex_t *mutex = workerarg->sched_mutex;
  74. unsigned memory_node = workerarg->memory_node;
  75. _starpu_memory_node_register_condition(cond, mutex, memory_node);
  76. }
  77. static void _starpu_init_workers(struct starpu_machine_config_s *config)
  78. {
  79. config->running = 1;
  80. pthread_key_create(&worker_key, NULL);
  81. unsigned nworkers = config->topology.nworkers;
  82. /* Launch workers asynchronously (except for SPUs) */
  83. unsigned worker;
  84. for (worker = 0; worker < nworkers; worker++)
  85. {
  86. struct starpu_worker_s *workerarg = &config->workers[worker];
  87. workerarg->config = config;
  88. PTHREAD_MUTEX_INIT(&workerarg->mutex, NULL);
  89. PTHREAD_COND_INIT(&workerarg->ready_cond, NULL);
  90. workerarg->workerid = (int)worker;
  91. /* if some codelet's termination cannot be handled directly :
  92. * for instance in the Gordon driver, Gordon tasks' callbacks
  93. * may be executed by another thread than that of the Gordon
  94. * driver so that we cannot call the push_codelet_output method
  95. * directly */
  96. workerarg->terminated_jobs = starpu_job_list_new();
  97. workerarg->local_jobs = starpu_job_list_new();
  98. PTHREAD_MUTEX_INIT(&workerarg->local_jobs_mutex, NULL);
  99. workerarg->status = STATUS_INITIALIZING;
  100. _starpu_init_worker_queue(workerarg);
  101. switch (workerarg->arch) {
  102. #ifdef STARPU_USE_CPU
  103. case STARPU_CPU_WORKER:
  104. workerarg->set = NULL;
  105. workerarg->worker_is_initialized = 0;
  106. pthread_create(&workerarg->worker_thread,
  107. NULL, _starpu_cpu_worker, workerarg);
  108. break;
  109. #endif
  110. #ifdef STARPU_USE_CUDA
  111. case STARPU_CUDA_WORKER:
  112. workerarg->set = NULL;
  113. workerarg->worker_is_initialized = 0;
  114. pthread_create(&workerarg->worker_thread,
  115. NULL, _starpu_cuda_worker, workerarg);
  116. break;
  117. #endif
  118. #ifdef STARPU_USE_OPENCL
  119. case STARPU_OPENCL_WORKER:
  120. workerarg->set = NULL;
  121. workerarg->worker_is_initialized = 0;
  122. pthread_create(&workerarg->worker_thread,
  123. NULL, _starpu_opencl_worker, workerarg);
  124. break;
  125. #endif
  126. #ifdef STARPU_USE_GORDON
  127. case STARPU_GORDON_WORKER:
  128. /* we will only launch gordon once, but it will handle
  129. * the different SPU workers */
  130. if (!gordon_inited)
  131. {
  132. gordon_worker_set.nworkers = config->ngordon_spus;
  133. gordon_worker_set.workers = &config->workers[worker];
  134. gordon_worker_set.set_is_initialized = 0;
  135. pthread_create(&gordon_worker_set.worker_thread, NULL,
  136. _starpu_gordon_worker, &gordon_worker_set);
  137. PTHREAD_MUTEX_LOCK(&gordon_worker_set.mutex);
  138. while (!gordon_worker_set.set_is_initialized)
  139. PTHREAD_COND_WAIT(&gordon_worker_set.ready_cond,
  140. &gordon_worker_set.mutex);
  141. PTHREAD_MUTEX_UNLOCK(&gordon_worker_set.mutex);
  142. gordon_inited = 1;
  143. }
  144. workerarg->set = &gordon_worker_set;
  145. gordon_worker_set.joined = 0;
  146. workerarg->worker_is_running = 1;
  147. break;
  148. #endif
  149. default:
  150. STARPU_ABORT();
  151. }
  152. }
  153. for (worker = 0; worker < nworkers; worker++)
  154. {
  155. struct starpu_worker_s *workerarg = &config->workers[worker];
  156. switch (workerarg->arch) {
  157. case STARPU_CPU_WORKER:
  158. case STARPU_CUDA_WORKER:
  159. case STARPU_OPENCL_WORKER:
  160. PTHREAD_MUTEX_LOCK(&workerarg->mutex);
  161. while (!workerarg->worker_is_initialized)
  162. PTHREAD_COND_WAIT(&workerarg->ready_cond, &workerarg->mutex);
  163. PTHREAD_MUTEX_UNLOCK(&workerarg->mutex);
  164. break;
  165. #ifdef STARPU_USE_GORDON
  166. case STARPU_GORDON_WORKER:
  167. /* the initialization of Gordon worker is
  168. * synchronous for now */
  169. break;
  170. #endif
  171. default:
  172. STARPU_ABORT();
  173. }
  174. }
  175. }
  176. void _starpu_set_local_worker_key(struct starpu_worker_s *worker)
  177. {
  178. pthread_setspecific(worker_key, worker);
  179. }
  180. struct starpu_worker_s *_starpu_get_local_worker_key(void)
  181. {
  182. return pthread_getspecific(worker_key);
  183. }
  184. int starpu_init(struct starpu_conf *user_conf)
  185. {
  186. int ret;
  187. PTHREAD_MUTEX_LOCK(&init_mutex);
  188. while (initialized == CHANGING)
  189. /* Wait for the other one changing it */
  190. PTHREAD_COND_WAIT(&init_cond, &init_mutex);
  191. init_count++;
  192. if (initialized == INITIALIZED)
  193. /* He initialized it, don't do it again */
  194. return 0;
  195. /* initialized == UNINITIALIZED */
  196. initialized = CHANGING;
  197. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  198. #ifdef __MINGW32__
  199. WSADATA wsadata;
  200. WSAStartup(MAKEWORD(1,0), &wsadata);
  201. #endif
  202. srand(2008);
  203. #ifdef STARPU_USE_FXT
  204. _starpu_start_fxt_profiling();
  205. #endif
  206. _starpu_open_debug_logfile();
  207. _starpu_timing_init();
  208. _starpu_load_bus_performance_files();
  209. /* store the pointer to the user explicit configuration during the
  210. * initialization */
  211. config.user_conf = user_conf;
  212. ret = _starpu_build_topology(&config);
  213. if (ret) {
  214. PTHREAD_MUTEX_LOCK(&init_mutex);
  215. init_count--;
  216. initialized = UNINITIALIZED;
  217. /* Let somebody else try to do it */
  218. PTHREAD_COND_SIGNAL(&init_cond);
  219. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  220. return ret;
  221. }
  222. /* We need to store the current task handled by the different
  223. * threads */
  224. _starpu_initialize_current_task_key();
  225. /* initialize the scheduler */
  226. /* initialize the queue containing the jobs */
  227. _starpu_init_sched_policy(&config);
  228. _starpu_initialize_registered_performance_models();
  229. _starpu_init_workers(&config);
  230. PTHREAD_MUTEX_LOCK(&init_mutex);
  231. initialized = INITIALIZED;
  232. /* Tell everybody that we initialized */
  233. PTHREAD_COND_BROADCAST(&init_cond);
  234. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  235. return 0;
  236. }
  237. /*
  238. * Handle runtime termination
  239. */
  240. static void _starpu_terminate_workers(struct starpu_machine_config_s *config)
  241. {
  242. int status __attribute__((unused));
  243. unsigned workerid;
  244. for (workerid = 0; workerid < config->topology.nworkers; workerid++)
  245. {
  246. starpu_wake_all_blocked_workers();
  247. _STARPU_DEBUG("wait for worker %d\n", workerid);
  248. struct starpu_worker_set_s *set = config->workers[workerid].set;
  249. struct starpu_worker_s *worker = &config->workers[workerid];
  250. /* in case StarPU termination code is called from a callback,
  251. * we have to check if pthread_self() is the worker itself */
  252. if (set){
  253. if (!set->joined) {
  254. if (!pthread_equal(pthread_self(), set->worker_thread))
  255. {
  256. status = pthread_join(set->worker_thread, NULL);
  257. #ifdef STARPU_VERBOSE
  258. if (status) {
  259. _STARPU_DEBUG("pthread_join -> %d\n", status);
  260. }
  261. #endif
  262. }
  263. set->joined = 1;
  264. }
  265. }
  266. else {
  267. if (!pthread_equal(pthread_self(), worker->worker_thread))
  268. {
  269. status = pthread_join(worker->worker_thread, NULL);
  270. #ifdef STARPU_VERBOSE
  271. if (status) {
  272. _STARPU_DEBUG("pthread_join -> %d\n", status);
  273. }
  274. #endif
  275. }
  276. }
  277. starpu_job_list_delete(worker->local_jobs);
  278. starpu_job_list_delete(worker->terminated_jobs);
  279. }
  280. }
  281. unsigned _starpu_machine_is_running(void)
  282. {
  283. return config.running;
  284. }
  285. unsigned _starpu_worker_can_block(unsigned memnode)
  286. {
  287. #ifdef STARPU_NON_BLOCKING_DRIVERS
  288. return 0;
  289. #else
  290. unsigned can_block = 1;
  291. if (!_starpu_check_that_no_data_request_exists(memnode))
  292. can_block = 0;
  293. if (!_starpu_machine_is_running())
  294. can_block = 0;
  295. if (!_starpu_execute_registered_progression_hooks())
  296. can_block = 0;
  297. return can_block;
  298. #endif
  299. }
  300. static void _starpu_kill_all_workers(struct starpu_machine_config_s *config)
  301. {
  302. /* set the flag which will tell workers to stop */
  303. config->running = 0;
  304. starpu_wake_all_blocked_workers();
  305. }
  306. void starpu_shutdown(void)
  307. {
  308. PTHREAD_MUTEX_LOCK(&init_mutex);
  309. init_count--;
  310. if (init_count)
  311. /* Still somebody needing StarPU, don't deinitialize */
  312. return;
  313. /* We're last */
  314. initialized = CHANGING;
  315. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  316. _starpu_display_msi_stats();
  317. _starpu_display_alloc_cache_stats();
  318. /* tell all workers to shutdown */
  319. _starpu_kill_all_workers(&config);
  320. #ifdef STARPU_DATA_STATS
  321. _starpu_display_comm_amounts();
  322. #endif
  323. _starpu_deinitialize_registered_performance_models();
  324. /* wait for their termination */
  325. _starpu_terminate_workers(&config);
  326. _starpu_deinit_sched_policy(&config);
  327. _starpu_destroy_topology(&config);
  328. #ifdef STARPU_USE_FXT
  329. _starpu_stop_fxt_profiling();
  330. #endif
  331. _starpu_close_debug_logfile();
  332. PTHREAD_MUTEX_LOCK(&init_mutex);
  333. initialized = UNINITIALIZED;
  334. /* Let someone else that wants to initialize it again do it */
  335. pthread_cond_signal(&init_cond);
  336. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  337. }
  338. unsigned starpu_worker_get_count(void)
  339. {
  340. return config.topology.nworkers;
  341. }
  342. unsigned starpu_cpu_worker_get_count(void)
  343. {
  344. return config.topology.ncpus;
  345. }
  346. unsigned starpu_cuda_worker_get_count(void)
  347. {
  348. return config.topology.ncudagpus;
  349. }
  350. unsigned starpu_opencl_worker_get_count(void)
  351. {
  352. return config.topology.nopenclgpus;
  353. }
  354. unsigned starpu_spu_worker_get_count(void)
  355. {
  356. return config.topology.ngordon_spus;
  357. }
  358. /* When analyzing performance, it is useful to see what is the processing unit
  359. * that actually performed the task. This function returns the id of the
  360. * processing unit actually executing it, therefore it makes no sense to use it
  361. * within the callbacks of SPU functions for instance. If called by some thread
  362. * that is not controlled by StarPU, starpu_worker_get_id returns -1. */
  363. int starpu_worker_get_id(void)
  364. {
  365. struct starpu_worker_s * worker;
  366. worker = _starpu_get_local_worker_key();
  367. if (worker)
  368. {
  369. return worker->workerid;
  370. }
  371. else {
  372. /* there is no worker associated to that thread, perhaps it is
  373. * a thread from the application or this is some SPU worker */
  374. return -1;
  375. }
  376. }
  377. int starpu_worker_get_devid(int id)
  378. {
  379. return config.workers[id].devid;
  380. }
  381. struct starpu_worker_s *_starpu_get_worker_struct(unsigned id)
  382. {
  383. return &config.workers[id];
  384. }
  385. enum starpu_archtype starpu_worker_get_type(int id)
  386. {
  387. return config.workers[id].arch;
  388. }
  389. void starpu_worker_get_name(int id, char *dst, size_t maxlen)
  390. {
  391. char *name = config.workers[id].name;
  392. snprintf(dst, maxlen, "%s", name);
  393. }
  394. /* Retrieve the status which indicates what the worker is currently doing. */
  395. starpu_worker_status _starpu_worker_get_status(int workerid)
  396. {
  397. return config.workers[workerid].status;
  398. }
  399. /* Change the status of the worker which indicates what the worker is currently
  400. * doing (eg. executing a callback). */
  401. void _starpu_worker_set_status(int workerid, starpu_worker_status status)
  402. {
  403. config.workers[workerid].status = status;
  404. }
  405. void starpu_worker_set_sched_condition(int workerid, pthread_cond_t *sched_cond, pthread_mutex_t *sched_mutex)
  406. {
  407. config.workers[workerid].sched_cond = sched_cond;
  408. config.workers[workerid].sched_mutex = sched_mutex;
  409. }