workers.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009, 2010, 2011 Université de Bordeaux 1
  4. * Copyright (C) 2010 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2010 Institut National de Recherche en Informatique et Automatique
  6. *
  7. * StarPU is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU Lesser General Public License as published by
  9. * the Free Software Foundation; either version 2.1 of the License, or (at
  10. * your option) any later version.
  11. *
  12. * StarPU is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  15. *
  16. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  17. */
  18. #include <stdlib.h>
  19. #include <stdio.h>
  20. #include <common/config.h>
  21. #include <common/utils.h>
  22. #include <core/workers.h>
  23. #include <core/debug.h>
  24. #include <core/task.h>
  25. #include <profiling/profiling.h>
  26. #include <starpu_task_list.h>
  27. #ifdef __MINGW32__
  28. #include <windows.h>
  29. #endif
  30. /* acquire/release semantic for concurrent initialization/de-initialization */
  31. static pthread_mutex_t init_mutex = PTHREAD_MUTEX_INITIALIZER;
  32. static pthread_cond_t init_cond = PTHREAD_COND_INITIALIZER;
  33. static int init_count;
  34. static enum { UNINITIALIZED, CHANGING, INITIALIZED } initialized = UNINITIALIZED;
  35. static pthread_key_t worker_key;
  36. static struct starpu_machine_config_s config;
  37. struct starpu_machine_config_s *_starpu_get_machine_config(void)
  38. {
  39. return &config;
  40. }
  41. /* in case a task is submitted, we may check whether there exists a worker
  42. that may execute the task or not */
  43. inline uint32_t _starpu_worker_exists(uint32_t task_mask)
  44. {
  45. return (task_mask & config.worker_mask);
  46. }
  47. inline uint32_t _starpu_may_submit_cuda_task(void)
  48. {
  49. return (STARPU_CUDA & config.worker_mask);
  50. }
  51. inline uint32_t _starpu_may_submit_cpu_task(void)
  52. {
  53. return (STARPU_CPU & config.worker_mask);
  54. }
  55. inline uint32_t _starpu_may_submit_opencl_task(void)
  56. {
  57. return (STARPU_OPENCL & config.worker_mask);
  58. }
  59. int starpu_worker_may_execute_task(unsigned workerid, struct starpu_task *task)
  60. {
  61. /* TODO: check that the task operand sizes will fit on that device */
  62. /* TODO: call application-provided function for various cases like
  63. * double support, shared memory size limit, etc. */
  64. return !!(task->cl->where & config.workers[workerid].worker_mask);
  65. }
  66. int starpu_combined_worker_may_execute_task(unsigned workerid, struct starpu_task *task)
  67. {
  68. /* TODO: check that the task operand sizes will fit on that device */
  69. /* TODO: call application-provided function for various cases like
  70. * double support, shared memory size limit, etc. */
  71. struct starpu_codelet_t *cl = task->cl;
  72. unsigned nworkers = config.topology.nworkers;
  73. /* Is this a parallel worker ? */
  74. if (workerid < nworkers)
  75. {
  76. return !!(task->cl->where & config.workers[workerid].worker_mask);
  77. }
  78. else {
  79. if ((cl->type == STARPU_SPMD) || (cl->type == STARPU_FORKJOIN))
  80. {
  81. /* TODO we should add other types of constraints */
  82. /* Is the worker larger than requested ? */
  83. int worker_size = (int)config.combined_workers[workerid - nworkers].worker_size;
  84. return !!(worker_size <= task->cl->max_parallelism);
  85. }
  86. else
  87. {
  88. /* We have a sequential task but a parallel worker */
  89. return 0;
  90. }
  91. }
  92. }
  93. /*
  94. * Runtime initialization methods
  95. */
  96. #ifdef STARPU_USE_GORDON
  97. static unsigned gordon_inited = 0;
  98. static struct starpu_worker_set_s gordon_worker_set;
  99. #endif
  100. static void _starpu_init_worker_queue(struct starpu_worker_s *workerarg)
  101. {
  102. pthread_cond_t *cond = workerarg->sched_cond;
  103. pthread_mutex_t *mutex = workerarg->sched_mutex;
  104. unsigned memory_node = workerarg->memory_node;
  105. _starpu_memory_node_register_condition(cond, mutex, memory_node);
  106. }
  107. static void _starpu_launch_drivers(struct starpu_machine_config_s *config)
  108. {
  109. config->running = 1;
  110. pthread_key_create(&worker_key, NULL);
  111. unsigned nworkers = config->topology.nworkers;
  112. /* Launch workers asynchronously (except for SPUs) */
  113. unsigned worker;
  114. for (worker = 0; worker < nworkers; worker++)
  115. {
  116. struct starpu_worker_s *workerarg = &config->workers[worker];
  117. workerarg->config = config;
  118. PTHREAD_MUTEX_INIT(&workerarg->mutex, NULL);
  119. PTHREAD_COND_INIT(&workerarg->ready_cond, NULL);
  120. workerarg->worker_size = 1;
  121. workerarg->combined_workerid = workerarg->workerid;
  122. workerarg->current_rank = 0;
  123. /* if some codelet's termination cannot be handled directly :
  124. * for instance in the Gordon driver, Gordon tasks' callbacks
  125. * may be executed by another thread than that of the Gordon
  126. * driver so that we cannot call the push_codelet_output method
  127. * directly */
  128. workerarg->terminated_jobs = starpu_job_list_new();
  129. starpu_task_list_init(&workerarg->local_tasks);
  130. PTHREAD_MUTEX_INIT(&workerarg->local_tasks_mutex, NULL);
  131. workerarg->status = STATUS_INITIALIZING;
  132. _STARPU_DEBUG("initialising worker %d\n", worker);
  133. _starpu_init_worker_queue(workerarg);
  134. switch (workerarg->arch) {
  135. #ifdef STARPU_USE_CPU
  136. case STARPU_CPU_WORKER:
  137. workerarg->set = NULL;
  138. workerarg->worker_is_initialized = 0;
  139. pthread_create(&workerarg->worker_thread,
  140. NULL, _starpu_cpu_worker, workerarg);
  141. break;
  142. #endif
  143. #ifdef STARPU_USE_CUDA
  144. case STARPU_CUDA_WORKER:
  145. workerarg->set = NULL;
  146. workerarg->worker_is_initialized = 0;
  147. pthread_create(&workerarg->worker_thread,
  148. NULL, _starpu_cuda_worker, workerarg);
  149. break;
  150. #endif
  151. #ifdef STARPU_USE_OPENCL
  152. case STARPU_OPENCL_WORKER:
  153. workerarg->set = NULL;
  154. workerarg->worker_is_initialized = 0;
  155. pthread_create(&workerarg->worker_thread,
  156. NULL, _starpu_opencl_worker, workerarg);
  157. break;
  158. #endif
  159. #ifdef STARPU_USE_GORDON
  160. case STARPU_GORDON_WORKER:
  161. /* we will only launch gordon once, but it will handle
  162. * the different SPU workers */
  163. if (!gordon_inited)
  164. {
  165. gordon_worker_set.nworkers = config->ngordon_spus;
  166. gordon_worker_set.workers = &config->workers[worker];
  167. gordon_worker_set.set_is_initialized = 0;
  168. pthread_create(&gordon_worker_set.worker_thread, NULL,
  169. _starpu_gordon_worker, &gordon_worker_set);
  170. PTHREAD_MUTEX_LOCK(&gordon_worker_set.mutex);
  171. while (!gordon_worker_set.set_is_initialized)
  172. PTHREAD_COND_WAIT(&gordon_worker_set.ready_cond,
  173. &gordon_worker_set.mutex);
  174. PTHREAD_MUTEX_UNLOCK(&gordon_worker_set.mutex);
  175. gordon_inited = 1;
  176. }
  177. workerarg->set = &gordon_worker_set;
  178. gordon_worker_set.joined = 0;
  179. workerarg->worker_is_running = 1;
  180. break;
  181. #endif
  182. default:
  183. STARPU_ABORT();
  184. }
  185. }
  186. for (worker = 0; worker < nworkers; worker++)
  187. {
  188. struct starpu_worker_s *workerarg = &config->workers[worker];
  189. switch (workerarg->arch) {
  190. case STARPU_CPU_WORKER:
  191. case STARPU_CUDA_WORKER:
  192. case STARPU_OPENCL_WORKER:
  193. PTHREAD_MUTEX_LOCK(&workerarg->mutex);
  194. while (!workerarg->worker_is_initialized)
  195. PTHREAD_COND_WAIT(&workerarg->ready_cond, &workerarg->mutex);
  196. PTHREAD_MUTEX_UNLOCK(&workerarg->mutex);
  197. break;
  198. #ifdef STARPU_USE_GORDON
  199. case STARPU_GORDON_WORKER:
  200. /* the initialization of Gordon worker is
  201. * synchronous for now */
  202. break;
  203. #endif
  204. default:
  205. STARPU_ABORT();
  206. }
  207. }
  208. }
  209. void _starpu_set_local_worker_key(struct starpu_worker_s *worker)
  210. {
  211. pthread_setspecific(worker_key, worker);
  212. }
  213. struct starpu_worker_s *_starpu_get_local_worker_key(void)
  214. {
  215. return pthread_getspecific(worker_key);
  216. }
  217. /* Initialize the starpu_conf with default values */
  218. int starpu_conf_init(struct starpu_conf *conf)
  219. {
  220. if (!conf)
  221. return -EINVAL;
  222. conf->sched_policy_name = getenv("STARPU_SCHED");
  223. conf->sched_policy = NULL;
  224. /* Note that starpu_get_env_number returns -1 in case the variable is
  225. * not defined */
  226. conf->ncpus = starpu_get_env_number("STARPU_NCPUS");
  227. conf->ncuda = starpu_get_env_number("STARPU_NCUDA");
  228. conf->nopencl = starpu_get_env_number("STARPU_NOPENCL");
  229. conf->nspus = starpu_get_env_number("STARPU_NGORDON");
  230. conf->calibrate = starpu_get_env_number("STARPU_CALIBRATE");
  231. conf->use_explicit_workers_bindid = 0; /* TODO */
  232. conf->use_explicit_workers_cuda_gpuid = 0; /* TODO */
  233. conf->use_explicit_workers_opencl_gpuid = 0; /* TODO */
  234. return 0;
  235. };
  236. int starpu_init(struct starpu_conf *user_conf)
  237. {
  238. int ret;
  239. PTHREAD_MUTEX_LOCK(&init_mutex);
  240. while (initialized == CHANGING)
  241. /* Wait for the other one changing it */
  242. PTHREAD_COND_WAIT(&init_cond, &init_mutex);
  243. init_count++;
  244. if (initialized == INITIALIZED) {
  245. /* He initialized it, don't do it again, and let the others get the mutex */
  246. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  247. return 0;
  248. }
  249. /* initialized == UNINITIALIZED */
  250. initialized = CHANGING;
  251. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  252. #ifdef __MINGW32__
  253. WSADATA wsadata;
  254. WSAStartup(MAKEWORD(1,0), &wsadata);
  255. #endif
  256. srand(2008);
  257. #ifdef STARPU_USE_FXT
  258. _starpu_start_fxt_profiling();
  259. #endif
  260. _starpu_open_debug_logfile();
  261. _starpu_timing_init();
  262. _starpu_profiling_init();
  263. _starpu_load_bus_performance_files();
  264. /* store the pointer to the user explicit configuration during the
  265. * initialization */
  266. config.user_conf = user_conf;
  267. ret = _starpu_build_topology(&config);
  268. if (ret) {
  269. PTHREAD_MUTEX_LOCK(&init_mutex);
  270. init_count--;
  271. initialized = UNINITIALIZED;
  272. /* Let somebody else try to do it */
  273. PTHREAD_COND_SIGNAL(&init_cond);
  274. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  275. return ret;
  276. }
  277. /* We need to store the current task handled by the different
  278. * threads */
  279. _starpu_initialize_current_task_key();
  280. /* initialize the scheduling policy */
  281. _starpu_init_sched_policy(&config);
  282. _starpu_initialize_registered_performance_models();
  283. /* Launch "basic" workers (ie. non-combined workers) */
  284. _starpu_launch_drivers(&config);
  285. PTHREAD_MUTEX_LOCK(&init_mutex);
  286. initialized = INITIALIZED;
  287. /* Tell everybody that we initialized */
  288. PTHREAD_COND_BROADCAST(&init_cond);
  289. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  290. return 0;
  291. }
  292. /*
  293. * Handle runtime termination
  294. */
  295. static void _starpu_terminate_workers(struct starpu_machine_config_s *config)
  296. {
  297. int status __attribute__((unused));
  298. unsigned workerid;
  299. for (workerid = 0; workerid < config->topology.nworkers; workerid++)
  300. {
  301. starpu_wake_all_blocked_workers();
  302. _STARPU_DEBUG("wait for worker %u\n", workerid);
  303. struct starpu_worker_set_s *set = config->workers[workerid].set;
  304. struct starpu_worker_s *worker = &config->workers[workerid];
  305. /* in case StarPU termination code is called from a callback,
  306. * we have to check if pthread_self() is the worker itself */
  307. if (set){
  308. if (!set->joined) {
  309. if (!pthread_equal(pthread_self(), set->worker_thread))
  310. {
  311. status = pthread_join(set->worker_thread, NULL);
  312. #ifdef STARPU_VERBOSE
  313. if (status) {
  314. _STARPU_DEBUG("pthread_join -> %d\n", status);
  315. }
  316. #endif
  317. }
  318. set->joined = 1;
  319. }
  320. }
  321. else {
  322. if (!pthread_equal(pthread_self(), worker->worker_thread))
  323. {
  324. status = pthread_join(worker->worker_thread, NULL);
  325. #ifdef STARPU_VERBOSE
  326. if (status) {
  327. _STARPU_DEBUG("pthread_join -> %d\n", status);
  328. }
  329. #endif
  330. }
  331. }
  332. STARPU_ASSERT(starpu_task_list_empty(&worker->local_tasks));
  333. starpu_job_list_delete(worker->terminated_jobs);
  334. }
  335. }
  336. unsigned _starpu_machine_is_running(void)
  337. {
  338. return config.running;
  339. }
  340. unsigned _starpu_worker_can_block(unsigned memnode __attribute__((unused)))
  341. {
  342. #ifdef STARPU_NON_BLOCKING_DRIVERS
  343. return 0;
  344. #else
  345. unsigned can_block = 1;
  346. if (!_starpu_check_that_no_data_request_exists(memnode))
  347. can_block = 0;
  348. if (!_starpu_machine_is_running())
  349. can_block = 0;
  350. if (!_starpu_execute_registered_progression_hooks())
  351. can_block = 0;
  352. return can_block;
  353. #endif
  354. }
  355. static void _starpu_kill_all_workers(struct starpu_machine_config_s *config)
  356. {
  357. /* set the flag which will tell workers to stop */
  358. config->running = 0;
  359. starpu_wake_all_blocked_workers();
  360. }
  361. void starpu_shutdown(void)
  362. {
  363. const char *stats;
  364. PTHREAD_MUTEX_LOCK(&init_mutex);
  365. init_count--;
  366. if (init_count)
  367. /* Still somebody needing StarPU, don't deinitialize */
  368. return;
  369. /* We're last */
  370. initialized = CHANGING;
  371. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  372. _starpu_display_msi_stats();
  373. _starpu_display_alloc_cache_stats();
  374. /* tell all workers to shutdown */
  375. _starpu_kill_all_workers(&config);
  376. #ifdef STARPU_DATA_STATS
  377. _starpu_display_comm_amounts();
  378. #endif
  379. if ((stats = getenv("STARPU_BUS_STATS")) && atoi(stats))
  380. starpu_bus_profiling_helper_display_summary();
  381. if ((stats = getenv("STARPU_WORKER_STATS")) && atoi(stats))
  382. starpu_worker_profiling_helper_display_summary();
  383. _starpu_deinitialize_registered_performance_models();
  384. /* wait for their termination */
  385. _starpu_terminate_workers(&config);
  386. _starpu_deinit_sched_policy(&config);
  387. _starpu_destroy_topology(&config);
  388. #ifdef STARPU_USE_FXT
  389. _starpu_stop_fxt_profiling();
  390. #endif
  391. _starpu_close_debug_logfile();
  392. PTHREAD_MUTEX_LOCK(&init_mutex);
  393. initialized = UNINITIALIZED;
  394. /* Let someone else that wants to initialize it again do it */
  395. PTHREAD_COND_SIGNAL(&init_cond);
  396. PTHREAD_MUTEX_UNLOCK(&init_mutex);
  397. }
  398. unsigned starpu_worker_get_count(void)
  399. {
  400. return config.topology.nworkers;
  401. }
  402. unsigned starpu_combined_worker_get_count(void)
  403. {
  404. return config.topology.ncombinedworkers;
  405. }
  406. unsigned starpu_cpu_worker_get_count(void)
  407. {
  408. return config.topology.ncpus;
  409. }
  410. unsigned starpu_cuda_worker_get_count(void)
  411. {
  412. return config.topology.ncudagpus;
  413. }
  414. unsigned starpu_opencl_worker_get_count(void)
  415. {
  416. return config.topology.nopenclgpus;
  417. }
  418. unsigned starpu_spu_worker_get_count(void)
  419. {
  420. return config.topology.ngordon_spus;
  421. }
  422. /* When analyzing performance, it is useful to see what is the processing unit
  423. * that actually performed the task. This function returns the id of the
  424. * processing unit actually executing it, therefore it makes no sense to use it
  425. * within the callbacks of SPU functions for instance. If called by some thread
  426. * that is not controlled by StarPU, starpu_worker_get_id returns -1. */
  427. int starpu_worker_get_id(void)
  428. {
  429. struct starpu_worker_s * worker;
  430. worker = _starpu_get_local_worker_key();
  431. if (worker)
  432. {
  433. return worker->workerid;
  434. }
  435. else {
  436. /* there is no worker associated to that thread, perhaps it is
  437. * a thread from the application or this is some SPU worker */
  438. return -1;
  439. }
  440. }
  441. int starpu_combined_worker_get_id(void)
  442. {
  443. struct starpu_worker_s *worker;
  444. worker = _starpu_get_local_worker_key();
  445. if (worker)
  446. {
  447. return worker->combined_workerid;
  448. }
  449. else {
  450. /* there is no worker associated to that thread, perhaps it is
  451. * a thread from the application or this is some SPU worker */
  452. return -1;
  453. }
  454. }
  455. int starpu_combined_worker_get_size(void)
  456. {
  457. struct starpu_worker_s *worker;
  458. worker = _starpu_get_local_worker_key();
  459. if (worker)
  460. {
  461. return worker->worker_size;
  462. }
  463. else {
  464. /* there is no worker associated to that thread, perhaps it is
  465. * a thread from the application or this is some SPU worker */
  466. return -1;
  467. }
  468. }
  469. int starpu_combined_worker_get_rank(void)
  470. {
  471. struct starpu_worker_s *worker;
  472. worker = _starpu_get_local_worker_key();
  473. if (worker)
  474. {
  475. return worker->current_rank;
  476. }
  477. else {
  478. /* there is no worker associated to that thread, perhaps it is
  479. * a thread from the application or this is some SPU worker */
  480. return -1;
  481. }
  482. }
  483. int starpu_worker_get_devid(int id)
  484. {
  485. return config.workers[id].devid;
  486. }
  487. struct starpu_worker_s *_starpu_get_worker_struct(unsigned id)
  488. {
  489. return &config.workers[id];
  490. }
  491. struct starpu_combined_worker_s *_starpu_get_combined_worker_struct(unsigned id)
  492. {
  493. unsigned basic_worker_count = starpu_worker_get_count();
  494. STARPU_ASSERT(id >= basic_worker_count);
  495. return &config.combined_workers[id - basic_worker_count];
  496. }
  497. enum starpu_archtype starpu_worker_get_type(int id)
  498. {
  499. return config.workers[id].arch;
  500. }
  501. void starpu_worker_get_name(int id, char *dst, size_t maxlen)
  502. {
  503. char *name = config.workers[id].name;
  504. snprintf(dst, maxlen, "%s", name);
  505. }
  506. /* Retrieve the status which indicates what the worker is currently doing. */
  507. starpu_worker_status _starpu_worker_get_status(int workerid)
  508. {
  509. return config.workers[workerid].status;
  510. }
  511. /* Change the status of the worker which indicates what the worker is currently
  512. * doing (eg. executing a callback). */
  513. void _starpu_worker_set_status(int workerid, starpu_worker_status status)
  514. {
  515. config.workers[workerid].status = status;
  516. }
  517. void starpu_worker_set_sched_condition(int workerid, pthread_cond_t *sched_cond, pthread_mutex_t *sched_mutex)
  518. {
  519. config.workers[workerid].sched_cond = sched_cond;
  520. config.workers[workerid].sched_mutex = sched_mutex;
  521. }