workers.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639
  1. /*
  2. * StarPU
  3. * Copyright (C) Université Bordeaux 1, CNRS 2008-2011 (see AUTHORS file)
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <stdlib.h>
  17. #include <stdio.h>
  18. #include <common/config.h>
  19. #include <common/utils.h>
  20. #include <core/workers.h>
  21. #include <core/debug.h>
  22. #include <core/task.h>
  23. #include <profiling/profiling.h>
  24. #include <starpu_task_list.h>
  25. #ifdef __MINGW32__
  26. #include <windows.h>
  27. #endif
  28. /* acquire/release semantic for concurrent initialization/de-initialization */
  29. static pthread_mutex_t init_mutex = PTHREAD_MUTEX_INITIALIZER;
  30. static pthread_cond_t init_cond = PTHREAD_COND_INITIALIZER;
  31. static int init_count;
  32. static enum { UNINITIALIZED, CHANGING, INITIALIZED } initialized = UNINITIALIZED;
  33. static pthread_key_t worker_key;
  34. static struct starpu_machine_config_s config;
  35. struct starpu_machine_config_s *_starpu_get_machine_config(void)
  36. {
  37. return &config;
  38. }
  39. /* in case a task is submitted, we may check whether there exists a worker
  40. that may execute the task or not */
  41. inline uint32_t _starpu_worker_exists(uint32_t task_mask)
  42. {
  43. return (task_mask & config.worker_mask);
  44. }
  45. inline uint32_t _starpu_may_submit_cuda_task(void)
  46. {
  47. return (STARPU_CUDA & config.worker_mask);
  48. }
  49. inline uint32_t _starpu_may_submit_cpu_task(void)
  50. {
  51. return (STARPU_CPU & config.worker_mask);
  52. }
  53. inline uint32_t _starpu_may_submit_opencl_task(void)
  54. {
  55. return (STARPU_OPENCL & config.worker_mask);
  56. }
  57. int starpu_worker_may_execute_task(unsigned workerid, struct starpu_task *task)
  58. {
  59. /* TODO: check that the task operand sizes will fit on that device */
  60. /* TODO: call application-provided function for various cases like
  61. * double support, shared memory size limit, etc. */
  62. return !!(task->cl->where & config.workers[workerid].worker_mask);
  63. }
  64. int starpu_combined_worker_may_execute_task(unsigned workerid, struct starpu_task *task)
  65. {
  66. /* TODO: check that the task operand sizes will fit on that device */
  67. /* TODO: call application-provided function for various cases like
  68. * double support, shared memory size limit, etc. */
  69. struct starpu_codelet_t *cl = task->cl;
  70. unsigned nworkers = config.topology.nworkers;
  71. /* Is this a parallel worker ? */
  72. if (workerid < nworkers)
  73. {
  74. return !!(task->cl->where & config.workers[workerid].worker_mask);
  75. }
  76. else {
  77. if ((cl->type == STARPU_SPMD) || (cl->type == STARPU_FORKJOIN))
  78. {
  79. /* TODO we should add other types of constraints */
  80. /* Is the worker larger than requested ? */
  81. int worker_size = (int)config.combined_workers[workerid - nworkers].worker_size;
  82. return !!(worker_size <= task->cl->max_parallelism);
  83. }
  84. else
  85. {
  86. /* We have a sequential task but a parallel worker */
  87. return 0;
  88. }
  89. }
  90. }
  91. /*
  92. * Runtime initialization methods
  93. */
  94. #ifdef STARPU_USE_GORDON
  95. static unsigned gordon_inited = 0;
  96. static struct starpu_worker_set_s gordon_worker_set;
  97. #endif
  98. static void _starpu_init_worker_queue(struct starpu_worker_s *workerarg)
  99. {
  100. pthread_cond_t *cond = workerarg->sched_cond;
  101. pthread_mutex_t *mutex = workerarg->sched_mutex;
  102. unsigned memory_node = workerarg->memory_node;
  103. _starpu_memory_node_register_condition(cond, mutex, memory_node);
  104. }
  105. static void _starpu_launch_drivers(struct starpu_machine_config_s *config)
  106. {
  107. config->running = 1;
  108. pthread_key_create(&worker_key, NULL);
  109. unsigned nworkers = config->topology.nworkers;
  110. /* Launch workers asynchronously (except for SPUs) */
  111. unsigned worker;
  112. for (worker = 0; worker < nworkers; worker++)
  113. {
  114. struct starpu_worker_s *workerarg = &config->workers[worker];
  115. workerarg->config = config;
  116. PTHREAD_MUTEX_INIT(&workerarg->mutex, NULL);
  117. PTHREAD_COND_INIT(&workerarg->ready_cond, NULL);
  118. workerarg->workerid = (int)worker;
  119. workerarg->worker_size = 1;
  120. workerarg->combined_workerid = workerarg->workerid;
  121. workerarg->current_rank = 0;
  122. /* if some codelet's termination cannot be handled directly :
  123. * for instance in the Gordon driver, Gordon tasks' callbacks
  124. * may be executed by another thread than that of the Gordon
  125. * driver so that we cannot call the push_codelet_output method
  126. * directly */
  127. workerarg->terminated_jobs = starpu_job_list_new();
  128. starpu_task_list_init(&workerarg->local_tasks);
  129. PTHREAD_MUTEX_INIT(&workerarg->local_tasks_mutex, NULL);
  130. workerarg->status = STATUS_INITIALIZING;
  131. _STARPU_DEBUG("initialising worker %d\n", worker);
  132. _starpu_init_worker_queue(workerarg);
  133. switch (workerarg->arch) {
  134. #ifdef STARPU_USE_CPU
  135. case STARPU_CPU_WORKER:
  136. workerarg->set = NULL;
  137. workerarg->worker_is_initialized = 0;
  138. pthread_create(&workerarg->worker_thread,
  139. NULL, _starpu_cpu_worker, workerarg);
  140. break;
  141. #endif
  142. #ifdef STARPU_USE_CUDA
  143. case STARPU_CUDA_WORKER:
  144. workerarg->set = NULL;
  145. workerarg->worker_is_initialized = 0;
  146. pthread_create(&workerarg->worker_thread,
  147. NULL, _starpu_cuda_worker, workerarg);
  148. break;
  149. #endif
  150. #ifdef STARPU_USE_OPENCL
  151. case STARPU_OPENCL_WORKER:
  152. workerarg->set = NULL;
  153. workerarg->worker_is_initialized = 0;
  154. pthread_create(&workerarg->worker_thread,
  155. NULL, _starpu_opencl_worker, workerarg);
  156. break;
  157. #endif
  158. #ifdef STARPU_USE_GORDON
  159. case STARPU_GORDON_WORKER:
  160. /* we will only launch gordon once, but it will handle
  161. * the different SPU workers */
  162. if (!gordon_inited)
  163. {
  164. gordon_worker_set.nworkers = config->ngordon_spus;
  165. gordon_worker_set.workers = &config->workers[worker];
  166. gordon_worker_set.set_is_initialized = 0;
  167. pthread_create(&gordon_worker_set.worker_thread, NULL,
  168. _starpu_gordon_worker, &gordon_worker_set);
  169. PTHREAD_MUTEX_LOCK(&gordon_worker_set.mutex);
  170. while (!gordon_worker_set.set_is_initialized)
  171. PTHREAD_COND_WAIT(&gordon_worker_set.ready_cond,
  172. &gordon_worker_set.mutex);
  173. PTHREAD_MUTEX_UNLOCK(&gordon_worker_set.mutex);
  174. gordon_inited = 1;
  175. }
  176. workerarg->set = &gordon_worker_set;
  177. gordon_worker_set.joined = 0;
  178. workerarg->worker_is_running = 1;
  179. break;
  180. #endif
  181. default:
  182. STARPU_ABORT();
  183. }
  184. }
  185. for (worker = 0; worker < nworkers; worker++)
  186. {
  187. struct starpu_worker_s *workerarg = &config->workers[worker];
  188. switch (workerarg->arch) {
  189. case STARPU_CPU_WORKER:
  190. case STARPU_CUDA_WORKER:
  191. case STARPU_OPENCL_WORKER:
  192. PTHREAD_MUTEX_LOCK(&workerarg->mutex);
  193. while (!workerarg->worker_is_initialized)
  194. PTHREAD_COND_WAIT(&workerarg->ready_cond, &workerarg->mutex);
  195. PTHREAD_MUTEX_UNLOCK(&workerarg->mutex);
  196. break;
  197. #ifdef STARPU_USE_GORDON
  198. case STARPU_GORDON_WORKER:
  199. /* the initialization of Gordon worker is
  200. * synchronous for now */
  201. break;
  202. #endif
  203. default:
  204. STARPU_ABORT();
  205. }
  206. }
  207. }
  208. void _starpu_set_local_worker_key(struct starpu_worker_s *worker)
  209. {
  210. pthread_setspecific(worker_key, worker);
  211. }
  212. struct starpu_worker_s *_starpu_get_local_worker_key(void)
  213. {
  214. return pthread_getspecific(worker_key);
  215. }
  216. /* Initialize the starpu_conf with default values */
  217. int starpu_conf_init(struct starpu_conf *conf)
  218. {
  219. if (!conf)
  220. return -EINVAL;
  221. conf->sched_policy_name = NULL;
  222. conf->sched_policy = NULL;
  223. conf->ncpus = -1;
  224. conf->ncuda = -1;
  225. conf->nopencl = -1;
  226. conf->nspus = -1;
  227. conf->use_explicit_workers_bindid = 0;
  228. conf->use_explicit_workers_cuda_gpuid = 0;
  229. conf->use_explicit_workers_opencl_gpuid = 0;
  230. conf->calibrate = -1;
  231. return 0;
  232. };
  233. int starpu_init(struct starpu_conf *user_conf)
  234. {
  235. int ret;
  236. PTHREAD_MUTEX_LOCK(&init_mutex);
  237. while (initialized == CHANGING)
  238. /* Wait for the other one changing it */
  239. PTHREAD_COND_WAIT(&init_cond, &init_mutex);
  240. init_count++;
  241. if (initialized == INITIALIZED) {
  242. /* He initialized it, don't do it again, and let the others get the mutex */
  243. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  244. return 0;
  245. }
  246. /* initialized == UNINITIALIZED */
  247. initialized = CHANGING;
  248. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  249. #ifdef __MINGW32__
  250. WSADATA wsadata;
  251. WSAStartup(MAKEWORD(1,0), &wsadata);
  252. #endif
  253. srand(2008);
  254. #ifdef STARPU_USE_FXT
  255. _starpu_start_fxt_profiling();
  256. #endif
  257. _starpu_open_debug_logfile();
  258. _starpu_timing_init();
  259. _starpu_profiling_init();
  260. _starpu_load_bus_performance_files();
  261. /* store the pointer to the user explicit configuration during the
  262. * initialization */
  263. config.user_conf = user_conf;
  264. ret = _starpu_build_topology(&config);
  265. if (ret) {
  266. PTHREAD_MUTEX_LOCK(&init_mutex);
  267. init_count--;
  268. initialized = UNINITIALIZED;
  269. /* Let somebody else try to do it */
  270. PTHREAD_COND_SIGNAL(&init_cond);
  271. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  272. return ret;
  273. }
  274. /* We need to store the current task handled by the different
  275. * threads */
  276. _starpu_initialize_current_task_key();
  277. /* initialize the scheduling policy */
  278. _starpu_init_sched_policy(&config);
  279. _starpu_initialize_registered_performance_models();
  280. /* Launch "basic" workers (ie. non-combined workers) */
  281. _starpu_launch_drivers(&config);
  282. PTHREAD_MUTEX_LOCK(&init_mutex);
  283. initialized = INITIALIZED;
  284. /* Tell everybody that we initialized */
  285. PTHREAD_COND_BROADCAST(&init_cond);
  286. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  287. return 0;
  288. }
  289. /*
  290. * Handle runtime termination
  291. */
  292. static void _starpu_terminate_workers(struct starpu_machine_config_s *config)
  293. {
  294. int status __attribute__((unused));
  295. unsigned workerid;
  296. for (workerid = 0; workerid < config->topology.nworkers; workerid++)
  297. {
  298. starpu_wake_all_blocked_workers();
  299. _STARPU_DEBUG("wait for worker %u\n", workerid);
  300. struct starpu_worker_set_s *set = config->workers[workerid].set;
  301. struct starpu_worker_s *worker = &config->workers[workerid];
  302. /* in case StarPU termination code is called from a callback,
  303. * we have to check if pthread_self() is the worker itself */
  304. if (set){
  305. if (!set->joined) {
  306. if (!pthread_equal(pthread_self(), set->worker_thread))
  307. {
  308. status = pthread_join(set->worker_thread, NULL);
  309. #ifdef STARPU_VERBOSE
  310. if (status) {
  311. _STARPU_DEBUG("pthread_join -> %d\n", status);
  312. }
  313. #endif
  314. }
  315. set->joined = 1;
  316. }
  317. }
  318. else {
  319. if (!pthread_equal(pthread_self(), worker->worker_thread))
  320. {
  321. status = pthread_join(worker->worker_thread, NULL);
  322. #ifdef STARPU_VERBOSE
  323. if (status) {
  324. _STARPU_DEBUG("pthread_join -> %d\n", status);
  325. }
  326. #endif
  327. }
  328. }
  329. STARPU_ASSERT(starpu_task_list_empty(&worker->local_tasks));
  330. starpu_job_list_delete(worker->terminated_jobs);
  331. }
  332. }
  333. unsigned _starpu_machine_is_running(void)
  334. {
  335. return config.running;
  336. }
  337. unsigned _starpu_worker_can_block(unsigned memnode __attribute__((unused)))
  338. {
  339. #ifdef STARPU_NON_BLOCKING_DRIVERS
  340. return 0;
  341. #else
  342. unsigned can_block = 1;
  343. if (!_starpu_check_that_no_data_request_exists(memnode))
  344. can_block = 0;
  345. if (!_starpu_machine_is_running())
  346. can_block = 0;
  347. if (!_starpu_execute_registered_progression_hooks())
  348. can_block = 0;
  349. return can_block;
  350. #endif
  351. }
  352. static void _starpu_kill_all_workers(struct starpu_machine_config_s *config)
  353. {
  354. /* set the flag which will tell workers to stop */
  355. config->running = 0;
  356. starpu_wake_all_blocked_workers();
  357. }
  358. void starpu_shutdown(void)
  359. {
  360. const char *stats;
  361. PTHREAD_MUTEX_LOCK(&init_mutex);
  362. init_count--;
  363. if (init_count)
  364. /* Still somebody needing StarPU, don't deinitialize */
  365. return;
  366. /* We're last */
  367. initialized = CHANGING;
  368. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  369. _starpu_display_msi_stats();
  370. _starpu_display_alloc_cache_stats();
  371. /* tell all workers to shutdown */
  372. _starpu_kill_all_workers(&config);
  373. #ifdef STARPU_DATA_STATS
  374. _starpu_display_comm_amounts();
  375. #endif
  376. if ((stats = getenv("STARPU_BUS_STATS")) && atoi(stats))
  377. starpu_bus_profiling_helper_display_summary();
  378. if ((stats = getenv("STARPU_WORKER_STATS")) && atoi(stats))
  379. starpu_worker_profiling_helper_display_summary();
  380. _starpu_deinitialize_registered_performance_models();
  381. /* wait for their termination */
  382. _starpu_terminate_workers(&config);
  383. _starpu_deinit_sched_policy(&config);
  384. _starpu_destroy_topology(&config);
  385. #ifdef STARPU_USE_FXT
  386. _starpu_stop_fxt_profiling();
  387. #endif
  388. _starpu_close_debug_logfile();
  389. PTHREAD_MUTEX_LOCK(&init_mutex);
  390. initialized = UNINITIALIZED;
  391. /* Let someone else that wants to initialize it again do it */
  392. PTHREAD_COND_SIGNAL(&init_cond);
  393. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  394. }
  395. unsigned starpu_worker_get_count(void)
  396. {
  397. return config.topology.nworkers;
  398. }
  399. unsigned starpu_combined_worker_get_count(void)
  400. {
  401. return config.topology.ncombinedworkers;
  402. }
  403. unsigned starpu_cpu_worker_get_count(void)
  404. {
  405. return config.topology.ncpus;
  406. }
  407. unsigned starpu_cuda_worker_get_count(void)
  408. {
  409. return config.topology.ncudagpus;
  410. }
  411. unsigned starpu_opencl_worker_get_count(void)
  412. {
  413. return config.topology.nopenclgpus;
  414. }
  415. unsigned starpu_spu_worker_get_count(void)
  416. {
  417. return config.topology.ngordon_spus;
  418. }
  419. /* When analyzing performance, it is useful to see what is the processing unit
  420. * that actually performed the task. This function returns the id of the
  421. * processing unit actually executing it, therefore it makes no sense to use it
  422. * within the callbacks of SPU functions for instance. If called by some thread
  423. * that is not controlled by StarPU, starpu_worker_get_id returns -1. */
  424. int starpu_worker_get_id(void)
  425. {
  426. struct starpu_worker_s * worker;
  427. worker = _starpu_get_local_worker_key();
  428. if (worker)
  429. {
  430. return worker->workerid;
  431. }
  432. else {
  433. /* there is no worker associated to that thread, perhaps it is
  434. * a thread from the application or this is some SPU worker */
  435. return -1;
  436. }
  437. }
  438. int starpu_combined_worker_get_id(void)
  439. {
  440. struct starpu_worker_s *worker;
  441. worker = _starpu_get_local_worker_key();
  442. if (worker)
  443. {
  444. return worker->combined_workerid;
  445. }
  446. else {
  447. /* there is no worker associated to that thread, perhaps it is
  448. * a thread from the application or this is some SPU worker */
  449. return -1;
  450. }
  451. }
  452. int starpu_combined_worker_get_size(void)
  453. {
  454. struct starpu_worker_s *worker;
  455. worker = _starpu_get_local_worker_key();
  456. if (worker)
  457. {
  458. return worker->worker_size;
  459. }
  460. else {
  461. /* there is no worker associated to that thread, perhaps it is
  462. * a thread from the application or this is some SPU worker */
  463. return -1;
  464. }
  465. }
  466. int starpu_combined_worker_get_rank(void)
  467. {
  468. struct starpu_worker_s *worker;
  469. worker = _starpu_get_local_worker_key();
  470. if (worker)
  471. {
  472. return worker->current_rank;
  473. }
  474. else {
  475. /* there is no worker associated to that thread, perhaps it is
  476. * a thread from the application or this is some SPU worker */
  477. return -1;
  478. }
  479. }
  480. int starpu_worker_get_devid(int id)
  481. {
  482. return config.workers[id].devid;
  483. }
  484. struct starpu_worker_s *_starpu_get_worker_struct(unsigned id)
  485. {
  486. return &config.workers[id];
  487. }
  488. struct starpu_combined_worker_s *_starpu_get_combined_worker_struct(unsigned id)
  489. {
  490. unsigned basic_worker_count = starpu_worker_get_count();
  491. STARPU_ASSERT(id >= basic_worker_count);
  492. return &config.combined_workers[id - basic_worker_count];
  493. }
  494. enum starpu_archtype starpu_worker_get_type(int id)
  495. {
  496. return config.workers[id].arch;
  497. }
  498. void starpu_worker_get_name(int id, char *dst, size_t maxlen)
  499. {
  500. char *name = config.workers[id].name;
  501. snprintf(dst, maxlen, "%s", name);
  502. }
  503. /* Retrieve the status which indicates what the worker is currently doing. */
  504. starpu_worker_status _starpu_worker_get_status(int workerid)
  505. {
  506. return config.workers[workerid].status;
  507. }
  508. /* Change the status of the worker which indicates what the worker is currently
  509. * doing (eg. executing a callback). */
  510. void _starpu_worker_set_status(int workerid, starpu_worker_status status)
  511. {
  512. config.workers[workerid].status = status;
  513. }
  514. void starpu_worker_set_sched_condition(int workerid, pthread_cond_t *sched_cond, pthread_mutex_t *sched_mutex)
  515. {
  516. config.workers[workerid].sched_cond = sched_cond;
  517. config.workers[workerid].sched_mutex = sched_mutex;
  518. }