workers.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. /*
  2. * StarPU
  3. * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <stdlib.h>
  17. #include <stdio.h>
  18. #include <common/config.h>
  19. #include <core/workers.h>
  20. #include <core/debug.h>
  21. static pthread_key_t worker_key;
  22. static struct machine_config_s config;
  23. struct machine_config_s *get_machine_config(void)
  24. {
  25. return &config;
  26. }
  27. /* in case a task is submitted, we may check whether there exists a worker
  28. that may execute the task or not */
  29. inline uint32_t worker_exists(uint32_t task_mask)
  30. {
  31. return (task_mask & config.worker_mask);
  32. }
  33. inline uint32_t may_submit_cuda_task(void)
  34. {
  35. return (CUDA & config.worker_mask);
  36. }
  37. inline uint32_t may_submit_core_task(void)
  38. {
  39. return (CORE & config.worker_mask);
  40. }
  41. inline uint32_t worker_may_execute_task(unsigned workerid, uint32_t where)
  42. {
  43. return (where & config.workers[workerid].worker_mask);
  44. }
  45. /*
  46. * Runtime initialization methods
  47. */
  48. #ifdef USE_GORDON
  49. static unsigned gordon_inited = 0;
  50. static struct worker_set_s gordon_worker_set;
  51. #endif
  52. static void init_workers(struct machine_config_s *config)
  53. {
  54. config->running = 1;
  55. pthread_key_create(&worker_key, NULL);
  56. /* Launch workers asynchronously (except for SPUs) */
  57. unsigned worker;
  58. for (worker = 0; worker < config->nworkers; worker++)
  59. {
  60. struct worker_s *workerarg = &config->workers[worker];
  61. workerarg->config = config;
  62. pthread_mutex_init(&workerarg->mutex, NULL);
  63. pthread_cond_init(&workerarg->ready_cond, NULL);
  64. workerarg->workerid = (int)worker;
  65. /* if some codelet's termination cannot be handled directly :
  66. * for instance in the Gordon driver, Gordon tasks' callbacks
  67. * may be executed by another thread than that of the Gordon
  68. * driver so that we cannot call the push_codelet_output method
  69. * directly */
  70. workerarg->terminated_jobs = job_list_new();
  71. workerarg->local_jobs = job_list_new();
  72. pthread_mutex_init(&workerarg->local_jobs_mutex, NULL);
  73. workerarg->status = STATUS_INITIALIZING;
  74. switch (workerarg->arch) {
  75. #ifdef USE_CPUS
  76. case STARPU_CORE_WORKER:
  77. workerarg->set = NULL;
  78. workerarg->worker_is_initialized = 0;
  79. pthread_create(&workerarg->worker_thread,
  80. NULL, core_worker, workerarg);
  81. break;
  82. #endif
  83. #ifdef USE_CUDA
  84. case STARPU_CUDA_WORKER:
  85. workerarg->set = NULL;
  86. workerarg->worker_is_initialized = 0;
  87. pthread_create(&workerarg->worker_thread,
  88. NULL, cuda_worker, workerarg);
  89. break;
  90. #endif
  91. #ifdef USE_GORDON
  92. case STARPU_GORDON_WORKER:
  93. /* we will only launch gordon once, but it will handle
  94. * the different SPU workers */
  95. if (!gordon_inited)
  96. {
  97. gordon_worker_set.nworkers = config->ngordon_spus;
  98. gordon_worker_set.workers = &config->workers[worker];
  99. gordon_worker_set.set_is_initialized = 0;
  100. pthread_create(&gordon_worker_set.worker_thread, NULL,
  101. gordon_worker, &gordon_worker_set);
  102. pthread_mutex_lock(&gordon_worker_set.mutex);
  103. if (!gordon_worker_set.set_is_initialized)
  104. pthread_cond_wait(&gordon_worker_set.ready_cond,
  105. &gordon_worker_set.mutex);
  106. pthread_mutex_unlock(&gordon_worker_set.mutex);
  107. gordon_inited = 1;
  108. }
  109. workerarg->set = &gordon_worker_set;
  110. gordon_worker_set.joined = 0;
  111. workerarg->worker_is_running = 1;
  112. break;
  113. #endif
  114. default:
  115. STARPU_ABORT();
  116. }
  117. }
  118. for (worker = 0; worker < config->nworkers; worker++)
  119. {
  120. struct worker_s *workerarg = &config->workers[worker];
  121. switch (workerarg->arch) {
  122. case STARPU_CORE_WORKER:
  123. case STARPU_CUDA_WORKER:
  124. pthread_mutex_lock(&workerarg->mutex);
  125. if (!workerarg->worker_is_initialized)
  126. pthread_cond_wait(&workerarg->ready_cond, &workerarg->mutex);
  127. pthread_mutex_unlock(&workerarg->mutex);
  128. break;
  129. #ifdef USE_GORDON
  130. case STARPU_GORDON_WORKER:
  131. /* the initialization of Gordon worker is
  132. * synchronous for now */
  133. break;
  134. #endif
  135. default:
  136. STARPU_ABORT();
  137. }
  138. }
  139. }
  140. void set_local_worker_key(struct worker_s *worker)
  141. {
  142. pthread_setspecific(worker_key, worker);
  143. }
  144. struct worker_s *get_local_worker_key(void)
  145. {
  146. return pthread_getspecific(worker_key);
  147. }
  148. int starpu_init(struct starpu_conf *user_conf)
  149. {
  150. int ret;
  151. srand(2008);
  152. #ifdef USE_FXT
  153. start_fxt_profiling();
  154. #endif
  155. open_debug_logfile();
  156. timing_init();
  157. load_bus_performance_files();
  158. /* store the pointer to the user explicit configuration during the
  159. * initialization */
  160. config.user_conf = user_conf;
  161. ret = starpu_build_topology(&config);
  162. if (ret)
  163. return ret;
  164. /* initialize the scheduler */
  165. /* initialize the queue containing the jobs */
  166. init_sched_policy(&config);
  167. init_workers(&config);
  168. return 0;
  169. }
  170. /*
  171. * Handle runtime termination
  172. */
  173. static void terminate_workers(struct machine_config_s *config)
  174. {
  175. int status;
  176. unsigned workerid;
  177. for (workerid = 0; workerid < config->nworkers; workerid++)
  178. {
  179. wake_all_blocked_workers();
  180. #ifdef VERBOSE
  181. fprintf(stderr, "wait for worker %d\n", workerid);
  182. #endif
  183. struct worker_set_s *set = config->workers[workerid].set;
  184. struct worker_s *worker = &config->workers[workerid];
  185. /* in case StarPU termination code is called from a callback,
  186. * we have to check if pthread_self() is the worker itself */
  187. if (set){
  188. if (!set->joined) {
  189. if (pthread_self() != set->worker_thread)
  190. {
  191. status = pthread_join(set->worker_thread, NULL);
  192. #ifdef VERBOSE
  193. if (status)
  194. fprintf(stderr, "pthread_join -> %d\n", status);
  195. #endif
  196. }
  197. set->joined = 1;
  198. }
  199. }
  200. else {
  201. if (pthread_self() != worker->worker_thread)
  202. {
  203. status = pthread_join(worker->worker_thread, NULL);
  204. #ifdef VERBOSE
  205. if (status)
  206. fprintf(stderr, "pthread_join -> %d\n", status);
  207. #endif
  208. }
  209. }
  210. job_list_delete(worker->local_jobs);
  211. job_list_delete(worker->terminated_jobs);
  212. }
  213. }
  214. unsigned machine_is_running(void)
  215. {
  216. return config.running;
  217. }
  218. typedef enum {
  219. BROADCAST,
  220. LOCK,
  221. UNLOCK
  222. } queue_op;
  223. static void operate_on_all_queues_attached_to_node(unsigned nodeid, queue_op op)
  224. {
  225. unsigned q_id;
  226. struct jobq_s *q;
  227. mem_node_descr * const descr = get_memory_node_description();
  228. pthread_rwlock_rdlock(&descr->attached_queues_rwlock);
  229. unsigned nqueues = descr->queues_count[nodeid];
  230. for (q_id = 0; q_id < nqueues; q_id++)
  231. {
  232. q = descr->attached_queues_per_node[nodeid][q_id];
  233. switch (op) {
  234. case BROADCAST:
  235. pthread_cond_broadcast(&q->activity_cond);
  236. break;
  237. case LOCK:
  238. pthread_mutex_lock(&q->activity_mutex);
  239. break;
  240. case UNLOCK:
  241. pthread_mutex_unlock(&q->activity_mutex);
  242. break;
  243. }
  244. }
  245. pthread_rwlock_unlock(&descr->attached_queues_rwlock);
  246. }
  247. inline void lock_all_queues_attached_to_node(unsigned node)
  248. {
  249. operate_on_all_queues_attached_to_node(node, LOCK);
  250. }
  251. inline void unlock_all_queues_attached_to_node(unsigned node)
  252. {
  253. operate_on_all_queues_attached_to_node(node, UNLOCK);
  254. }
  255. inline void broadcast_all_queues_attached_to_node(unsigned node)
  256. {
  257. operate_on_all_queues_attached_to_node(node, BROADCAST);
  258. }
  259. static void operate_on_all_queues(queue_op op)
  260. {
  261. unsigned q_id;
  262. struct jobq_s *q;
  263. mem_node_descr * const descr = get_memory_node_description();
  264. pthread_rwlock_rdlock(&descr->attached_queues_rwlock);
  265. unsigned nqueues = descr->total_queues_count;
  266. for (q_id = 0; q_id < nqueues; q_id++)
  267. {
  268. q = descr->attached_queues_all[q_id];
  269. switch (op) {
  270. case BROADCAST:
  271. pthread_cond_broadcast(&q->activity_cond);
  272. break;
  273. case LOCK:
  274. pthread_mutex_lock(&q->activity_mutex);
  275. break;
  276. case UNLOCK:
  277. pthread_mutex_unlock(&q->activity_mutex);
  278. break;
  279. }
  280. }
  281. pthread_rwlock_unlock(&descr->attached_queues_rwlock);
  282. }
  283. static void kill_all_workers(struct machine_config_s *config)
  284. {
  285. /* lock all workers and the scheduler (in the proper order) to make
  286. sure everyone will notice the termination */
  287. /* WARNING: here we make the asumption that a queue is not attached to
  288. * different memory nodes ! */
  289. struct sched_policy_s *sched = get_sched_policy();
  290. operate_on_all_queues(LOCK);
  291. pthread_mutex_lock(&sched->sched_activity_mutex);
  292. /* set the flag which will tell workers to stop */
  293. config->running = 0;
  294. operate_on_all_queues(BROADCAST);
  295. pthread_cond_broadcast(&sched->sched_activity_cond);
  296. pthread_mutex_unlock(&sched->sched_activity_mutex);
  297. operate_on_all_queues(UNLOCK);
  298. }
  299. void starpu_shutdown(void)
  300. {
  301. display_msi_stats();
  302. display_alloc_cache_stats();
  303. /* tell all workers to shutdown */
  304. kill_all_workers(&config);
  305. #ifdef DATA_STATS
  306. display_comm_ammounts();
  307. #endif
  308. if (starpu_get_env_number("CALIBRATE") != -1)
  309. dump_registered_models();
  310. /* wait for their termination */
  311. terminate_workers(&config);
  312. deinit_sched_policy(&config);
  313. starpu_destroy_topology(&config);
  314. close_debug_logfile();
  315. }
  316. unsigned starpu_get_worker_count(void)
  317. {
  318. return config.nworkers;
  319. }
  320. unsigned starpu_get_core_worker_count(void)
  321. {
  322. return config.ncores;
  323. }
  324. unsigned starpu_get_cuda_worker_count(void)
  325. {
  326. return config.ncudagpus;
  327. }
  328. unsigned starpu_get_spu_worker_count(void)
  329. {
  330. return config.ngordon_spus;
  331. }
  332. /* When analyzing performance, it is useful to see what is the processing unit
  333. * that actually performed the task. This function returns the id of the
  334. * processing unit actually executing it, therefore it makes no sense to use it
  335. * within the callbacks of SPU functions for instance. If called by some thread
  336. * that is not controlled by StarPU, starpu_get_worker_id returns -1. */
  337. int starpu_get_worker_id(void)
  338. {
  339. struct worker_s * worker;
  340. worker = get_local_worker_key();
  341. if (worker)
  342. {
  343. return worker->workerid;
  344. }
  345. else {
  346. /* there is no worker associated to that thread, perhaps it is
  347. * a thread from the application or this is some SPU worker */
  348. return -1;
  349. }
  350. }
  351. struct worker_s *get_worker_struct(unsigned id)
  352. {
  353. return &config.workers[id];
  354. }
  355. enum starpu_archtype starpu_get_worker_type(int id)
  356. {
  357. return config.workers[id].arch;
  358. }
  359. void starpu_get_worker_name(int id, char *dst, size_t maxlen)
  360. {
  361. char *name = config.workers[id].name;
  362. snprintf(dst, maxlen, "%s", name);
  363. }