workers.c 51 KB


  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2009-2014 Université de Bordeaux 1
  4. * Copyright (C) 2010, 2011, 2012, 2013, 2014 Centre National de la Recherche Scientifique
  5. * Copyright (C) 2010, 2011 Institut National de Recherche en Informatique et Automatique
  6. * Copyright (C) 2011 Télécom-SudParis
  7. * Copyright (C) 2011-2012 INRIA
  8. *
  9. * StarPU is free software; you can redistribute it and/or modify
  10. * it under the terms of the GNU Lesser General Public License as published by
  11. * the Free Software Foundation; either version 2.1 of the License, or (at
  12. * your option) any later version.
  13. *
  14. * StarPU is distributed in the hope that it will be useful, but
  15. * WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  17. *
  18. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  19. */
  20. #include <stdlib.h>
  21. #include <stdio.h>
  22. #include <common/config.h>
  23. #include <common/utils.h>
  24. #include <core/progress_hook.h>
  25. #include <core/workers.h>
  26. #include <core/debug.h>
  27. #include <core/disk.h>
  28. #include <core/task.h>
  29. #include <datawizard/malloc.h>
  30. #include <profiling/profiling.h>
  31. #include <starpu_task_list.h>
  32. #include <drivers/mp_common/sink_common.h>
  33. #include <drivers/scc/driver_scc_common.h>
  34. #include <drivers/cpu/driver_cpu.h>
  35. #include <drivers/cuda/driver_cuda.h>
  36. #include <drivers/opencl/driver_opencl.h>
  37. #ifdef STARPU_SIMGRID
  38. #include <msg/msg.h>
  39. #include <core/simgrid.h>
  40. #endif
  41. #ifdef __MINGW32__
  42. #include <windows.h>
  43. #endif
  44. /* acquire/release semantic for concurrent initialization/de-initialization */
  45. static starpu_pthread_mutex_t init_mutex = STARPU_PTHREAD_MUTEX_INITIALIZER;
  46. static starpu_pthread_cond_t init_cond = STARPU_PTHREAD_COND_INITIALIZER;
  47. static int init_count = 0;
  48. static enum { UNINITIALIZED, CHANGING, INITIALIZED } initialized = UNINITIALIZED;
  49. static starpu_pthread_key_t worker_key;
  50. static struct _starpu_machine_config config;
  51. /* Pointers to argc and argv
  52. */
  53. static int *my_argc = 0;
  54. static char ***my_argv = NULL;
  55. /* Initialize value of static argc and argv, called when the process begins
  56. */
  57. void _starpu_set_argc_argv(int *argc_param, char ***argv_param)
  58. {
  59. my_argc = argc_param;
  60. my_argv = argv_param;
  61. }
  62. int *_starpu_get_argc()
  63. {
  64. return my_argc;
  65. }
  66. char ***_starpu_get_argv()
  67. {
  68. return my_argv;
  69. }
  70. int _starpu_is_initialized(void)
  71. {
  72. return initialized == INITIALIZED;
  73. }
  74. struct _starpu_machine_config *_starpu_get_machine_config(void)
  75. {
  76. return &config;
  77. }
  78. /* Makes sure that at least one of the workers of type <arch> can execute
  79. * <task>, for at least one of its implementations. */
  80. static uint32_t _starpu_worker_exists_and_can_execute(struct starpu_task *task,
  81. enum starpu_worker_archtype arch)
  82. {
  83. int i;
  84. int nworkers = starpu_worker_get_count();
  85. _starpu_codelet_check_deprecated_fields(task->cl);
  86. for (i = 0; i < nworkers; i++)
  87. {
  88. if (starpu_worker_get_type(i) != arch)
  89. continue;
  90. unsigned impl;
  91. for (impl = 0; impl < STARPU_MAXIMPLEMENTATIONS; impl++)
  92. {
  93. /* We could call task->cl->can_execute(i, task, impl)
  94. here, it would definitely work. It is probably
  95. cheaper to check whether it is necessary in order to
  96. avoid a useless function call, though. */
  97. unsigned test_implementation = 0;
  98. switch (arch)
  99. {
  100. case STARPU_CPU_WORKER:
  101. if (task->cl->cpu_funcs[impl] != NULL)
  102. test_implementation = 1;
  103. break;
  104. case STARPU_CUDA_WORKER:
  105. if (task->cl->cuda_funcs[impl] != NULL)
  106. test_implementation = 1;
  107. break;
  108. case STARPU_OPENCL_WORKER:
  109. if (task->cl->opencl_funcs[impl] != NULL)
  110. test_implementation = 1;
  111. break;
  112. case STARPU_MIC_WORKER:
  113. if (task->cl->cpu_funcs_name[impl] != NULL || task->cl->mic_funcs[impl] != NULL)
  114. test_implementation = 1;
  115. break;
  116. case STARPU_SCC_WORKER:
  117. if (task->cl->cpu_funcs_name[impl] != NULL || task->cl->scc_funcs[impl] != NULL)
  118. test_implementation = 1;
  119. break;
  120. default:
  121. STARPU_ABORT();
  122. }
  123. if (!test_implementation)
  124. break;
  125. if (task->cl->can_execute(i, task, impl))
  126. return 1;
  127. }
  128. }
  129. return 0;
  130. }
  131. /* in case a task is submitted, we may check whether there exists a worker
  132. that may execute the task or not */
  133. uint32_t _starpu_worker_exists(struct starpu_task *task)
  134. {
  135. _starpu_codelet_check_deprecated_fields(task->cl);
  136. if (!(task->cl->where & config.worker_mask))
  137. return 0;
  138. if (!task->cl->can_execute)
  139. return 1;
  140. #if defined(STARPU_USE_CPU) || defined(STARPU_SIMGRID)
  141. if ((task->cl->where & STARPU_CPU) &&
  142. _starpu_worker_exists_and_can_execute(task, STARPU_CPU_WORKER))
  143. return 1;
  144. #endif
  145. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  146. if ((task->cl->where & STARPU_CUDA) &&
  147. _starpu_worker_exists_and_can_execute(task, STARPU_CUDA_WORKER))
  148. return 1;
  149. #endif
  150. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  151. if ((task->cl->where & STARPU_OPENCL) &&
  152. _starpu_worker_exists_and_can_execute(task, STARPU_OPENCL_WORKER))
  153. return 1;
  154. #endif
  155. #ifdef STARPU_USE_MIC
  156. if ((task->cl->where & STARPU_MIC) &&
  157. _starpu_worker_exists_and_can_execute(task, STARPU_MIC_WORKER))
  158. return 1;
  159. #endif
  160. #ifdef STARPU_USE_SCC
  161. if ((task->cl->where & STARPU_SCC) &&
  162. _starpu_worker_exists_and_can_execute(task, STARPU_SCC_WORKER))
  163. return 1;
  164. #endif
  165. return 0;
  166. }
  167. uint32_t _starpu_can_submit_cuda_task(void)
  168. {
  169. return (STARPU_CUDA & config.worker_mask);
  170. }
  171. uint32_t _starpu_can_submit_cpu_task(void)
  172. {
  173. return (STARPU_CPU & config.worker_mask);
  174. }
  175. uint32_t _starpu_can_submit_opencl_task(void)
  176. {
  177. return (STARPU_OPENCL & config.worker_mask);
  178. }
  179. uint32_t _starpu_can_submit_scc_task(void)
  180. {
  181. return (STARPU_SCC & config.worker_mask);
  182. }
  183. static int _starpu_can_use_nth_implementation(enum starpu_worker_archtype arch, struct starpu_codelet *cl, unsigned nimpl)
  184. {
  185. switch(arch)
  186. {
  187. case STARPU_ANY_WORKER:
  188. {
  189. int cpu_func_enabled=1, cuda_func_enabled=1, opencl_func_enabled=1;
  190. /* TODO: MIC/SCC */
  191. #if defined(STARPU_USE_CPU) || defined(STARPU_SIMGRID)
  192. starpu_cpu_func_t cpu_func = _starpu_task_get_cpu_nth_implementation(cl, nimpl);
  193. cpu_func_enabled = cpu_func != NULL && starpu_cpu_worker_get_count();
  194. #endif
  195. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  196. starpu_cuda_func_t cuda_func = _starpu_task_get_cuda_nth_implementation(cl, nimpl);
  197. cuda_func_enabled = cuda_func != NULL && starpu_cuda_worker_get_count();
  198. #endif
  199. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  200. starpu_opencl_func_t opencl_func = _starpu_task_get_opencl_nth_implementation(cl, nimpl);
  201. opencl_func_enabled = opencl_func != NULL && starpu_opencl_worker_get_count();
  202. #endif
  203. return (cpu_func_enabled && cuda_func_enabled && opencl_func_enabled);
  204. }
  205. case STARPU_CPU_WORKER:
  206. {
  207. starpu_cpu_func_t func = _starpu_task_get_cpu_nth_implementation(cl, nimpl);
  208. return func != NULL;
  209. }
  210. case STARPU_CUDA_WORKER:
  211. {
  212. starpu_cuda_func_t func = _starpu_task_get_cuda_nth_implementation(cl, nimpl);
  213. return func != NULL;
  214. }
  215. case STARPU_OPENCL_WORKER:
  216. {
  217. starpu_opencl_func_t func = _starpu_task_get_opencl_nth_implementation(cl, nimpl);
  218. return func != NULL;
  219. }
  220. case STARPU_MIC_WORKER:
  221. {
  222. starpu_mic_func_t func = _starpu_task_get_mic_nth_implementation(cl, nimpl);
  223. char *func_name = _starpu_task_get_cpu_name_nth_implementation(cl, nimpl);
  224. return func != NULL || func_name != NULL;
  225. }
  226. case STARPU_SCC_WORKER:
  227. {
  228. starpu_scc_func_t func = _starpu_task_get_scc_nth_implementation(cl, nimpl);
  229. char *func_name = _starpu_task_get_cpu_name_nth_implementation(cl, nimpl);
  230. return func != NULL || func_name != NULL;
  231. }
  232. default:
  233. STARPU_ASSERT_MSG(0, "Unknown arch type %d", arch);
  234. }
  235. return 0;
  236. }
  237. int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl)
  238. {
  239. struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
  240. if(sched_ctx->parallel_sect[workerid]) return 0;
  241. /* TODO: check that the task operand sizes will fit on that device */
  242. return (task->cl->where & config.workers[workerid].worker_mask) &&
  243. _starpu_can_use_nth_implementation(config.workers[workerid].arch, task->cl, nimpl) &&
  244. (!task->cl->can_execute || task->cl->can_execute(workerid, task, nimpl));
  245. }
  246. int starpu_combined_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl)
  247. {
  248. /* TODO: check that the task operand sizes will fit on that device */
  249. struct starpu_codelet *cl = task->cl;
  250. unsigned nworkers = config.topology.nworkers;
  251. /* Is this a parallel worker ? */
  252. if (workerid < nworkers)
  253. {
  254. return !!((task->cl->where & config.workers[workerid].worker_mask) &&
  255. _starpu_can_use_nth_implementation(config.workers[workerid].arch, task->cl, nimpl) &&
  256. (!task->cl->can_execute || task->cl->can_execute(workerid, task, nimpl)));
  257. }
  258. else
  259. {
  260. if ((cl->type == STARPU_SPMD)
  261. #ifdef STARPU_HAVE_HWLOC
  262. || (cl->type == STARPU_FORKJOIN)
  263. #else
  264. #ifdef __GLIBC__
  265. || (cl->type == STARPU_FORKJOIN)
  266. #endif
  267. #endif
  268. )
  269. {
  270. /* TODO we should add other types of constraints */
  271. /* Is the worker larger than requested ? */
  272. int worker_size = (int)config.combined_workers[workerid - nworkers].worker_size;
  273. int worker0 = config.combined_workers[workerid - nworkers].combined_workerid[0];
  274. return !!((worker_size <= task->cl->max_parallelism) &&
  275. _starpu_can_use_nth_implementation(config.workers[worker0].arch, task->cl, nimpl) &&
  276. (!task->cl->can_execute || task->cl->can_execute(workerid, task, nimpl)));
  277. }
  278. else
  279. {
  280. /* We have a sequential task but a parallel worker */
  281. return 0;
  282. }
  283. }
  284. }
  285. /*
  286. * Runtime initialization methods
  287. */
  288. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  289. static struct _starpu_worker_set cuda_worker_set[STARPU_MAXCUDADEVS];
  290. #endif
  291. #ifdef STARPU_USE_MIC
  292. static struct _starpu_worker_set mic_worker_set[STARPU_MAXMICDEVS];
  293. #endif
  294. static void _starpu_init_worker_queue(struct _starpu_worker *workerarg)
  295. {
  296. starpu_pthread_cond_t *cond = &workerarg->sched_cond;
  297. starpu_pthread_mutex_t *mutex = &workerarg->sched_mutex;
  298. unsigned memory_node = workerarg->memory_node;
  299. _starpu_memory_node_register_condition(cond, mutex, memory_node);
  300. }
  301. /*
  302. * Returns 0 if the given driver is one of the drivers that must be launched by
  303. * the application itself, and not by StarPU, 1 otherwise.
  304. */
  305. static unsigned _starpu_may_launch_driver(struct starpu_conf *conf,
  306. struct starpu_driver *d)
  307. {
  308. if (conf->n_not_launched_drivers == 0 ||
  309. conf->not_launched_drivers == NULL)
  310. return 1;
  311. /* Is <d> in conf->not_launched_drivers ? */
  312. unsigned i;
  313. for (i = 0; i < conf->n_not_launched_drivers; i++)
  314. {
  315. if (d->type != conf->not_launched_drivers[i].type)
  316. continue;
  317. switch (d->type)
  318. {
  319. case STARPU_CPU_WORKER:
  320. if (d->id.cpu_id == conf->not_launched_drivers[i].id.cpu_id)
  321. return 0;
  322. case STARPU_CUDA_WORKER:
  323. if (d->id.cuda_id == conf->not_launched_drivers[i].id.cuda_id)
  324. return 0;
  325. break;
  326. #ifdef STARPU_USE_OPENCL
  327. case STARPU_OPENCL_WORKER:
  328. if (d->id.opencl_id == conf->not_launched_drivers[i].id.opencl_id)
  329. return 0;
  330. break;
  331. #endif
  332. default:
  333. STARPU_ABORT();
  334. }
  335. }
  336. return 1;
  337. }
  338. #ifdef STARPU_PERF_DEBUG
  339. struct itimerval prof_itimer;
  340. #endif
  341. static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu_machine_config *pconfig)
  342. {
  343. workerarg->config = pconfig;
  344. STARPU_PTHREAD_MUTEX_INIT(&workerarg->mutex, NULL);
  345. /* arch initialized by topology.c */
  346. /* worker_mask initialized by topology.c */
  347. /* perf_arch initialized by topology.c */
  348. /* worker_thread initialized by _starpu_launch_drivers */
  349. /* devid initialized by topology.c */
  350. /* subworkerid initialized by topology.c */
  351. /* bindid initialized by topology.c */
  352. /* workerid initialized by topology.c */
  353. workerarg->combined_workerid = workerarg->workerid;
  354. workerarg->current_rank = 0;
  355. workerarg->worker_size = 1;
  356. STARPU_PTHREAD_COND_INIT(&workerarg->started_cond, NULL);
  357. STARPU_PTHREAD_COND_INIT(&workerarg->ready_cond, NULL);
  358. /* memory_node initialized by topology.c */
  359. STARPU_PTHREAD_COND_INIT(&workerarg->sched_cond, NULL);
  360. STARPU_PTHREAD_MUTEX_INIT(&workerarg->sched_mutex, NULL);
  361. starpu_task_list_init(&workerarg->local_tasks);
  362. workerarg->current_task = NULL;
  363. workerarg->set = NULL;
  364. /* if some codelet's termination cannot be handled directly :
  365. * for instance in the Gordon driver, Gordon tasks' callbacks
  366. * may be executed by another thread than that of the Gordon
  367. * driver so that we cannot call the push_codelet_output method
  368. * directly */
  369. workerarg->terminated_jobs = _starpu_job_list_new();
  370. workerarg->worker_is_running = 0;
  371. workerarg->worker_is_initialized = 0;
  372. workerarg->status = STATUS_INITIALIZING;
  373. /* name initialized by driver */
  374. /* short_name initialized by driver */
  375. workerarg->run_by_starpu = 1;
  376. workerarg->sched_ctx_list = NULL;
  377. workerarg->tmp_sched_ctx = -1;
  378. workerarg->nsched_ctxs = 0;
  379. _starpu_barrier_counter_init(&workerarg->tasks_barrier, 0);
  380. workerarg->has_prev_init = 0;
  381. int ctx;
  382. for(ctx = 0; ctx < STARPU_NMAX_SCHED_CTXS; ctx++)
  383. workerarg->removed_from_ctx[ctx] = 0;
  384. workerarg->spinning_backoff = 1;
  385. for(ctx = 0; ctx < STARPU_NMAX_SCHED_CTXS; ctx++)
  386. {
  387. workerarg->shares_tasks_lists[ctx] = 0;
  388. workerarg->poped_in_ctx[ctx] = 0;
  389. }
  390. workerarg->reverse_phase[0] = 0;
  391. workerarg->reverse_phase[1] = 0;
  392. workerarg->pop_ctx_priority = 1;
  393. workerarg->sched_mutex_locked = 0;
  394. /* cpu_set/hwloc_cpu_set initialized in topology.c */
  395. }
  396. void _starpu_worker_start(struct _starpu_worker *worker, unsigned fut_key)
  397. {
  398. (void) fut_key;
  399. int devid = worker->devid;
  400. (void) devid;
  401. #if defined(STARPU_PERF_DEBUG) && !defined(STARPU_SIMGRID)
  402. setitimer(ITIMER_PROF, &prof_itimer, NULL);
  403. #endif
  404. #ifdef STARPU_USE_FXT
  405. _starpu_fxt_register_thread(worker->bindid);
  406. unsigned memnode = worker->memory_node;
  407. _STARPU_TRACE_WORKER_INIT_START(fut_key, worker->workerid, devid, memnode);
  408. #endif
  409. _starpu_bind_thread_on_cpu(worker->config, worker->bindid);
  410. _STARPU_DEBUG("worker %p %d for dev %d is ready on logical cpu %d\n", worker, worker->workerid, devid, worker->bindid);
  411. #ifdef STARPU_HAVE_HWLOC
  412. _STARPU_DEBUG("worker %p %d cpuset start at %d\n", worker, worker->workerid, hwloc_bitmap_first(worker->hwloc_cpu_set));
  413. #endif
  414. _starpu_memory_node_set_local_key(&worker->memory_node);
  415. _starpu_set_local_worker_key(worker);
  416. STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
  417. worker->worker_is_running = 1;
  418. STARPU_PTHREAD_COND_SIGNAL(&worker->started_cond);
  419. STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
  420. }
  421. static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
  422. {
  423. pconfig->running = 1;
  424. pconfig->pause_depth = 0;
  425. pconfig->submitting = 1;
  426. STARPU_HG_DISABLE_CHECKING(pconfig->watchdog_ok);
  427. unsigned nworkers = pconfig->topology.nworkers;
  428. /* Launch workers asynchronously */
  429. unsigned cpu = 0;
  430. unsigned worker;
  431. #if defined(STARPU_PERF_DEBUG) && !defined(STARPU_SIMGRID)
  432. /* Get itimer of the main thread, to set it for the worker threads */
  433. getitimer(ITIMER_PROF, &prof_itimer);
  434. #endif
  435. #ifdef HAVE_AYUDAME_H
  436. if (AYU_event) AYU_event(AYU_INIT, 0, NULL);
  437. #endif
  438. for (worker = 0; worker < nworkers; worker++)
  439. {
  440. struct _starpu_worker *workerarg = &pconfig->workers[worker];
  441. #if defined(STARPU_USE_MIC) || defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  442. unsigned devid = workerarg->devid;
  443. #endif
  444. _STARPU_DEBUG("initialising worker %u/%u\n", worker, nworkers);
  445. _starpu_init_worker_queue(workerarg);
  446. struct starpu_driver driver;
  447. driver.type = workerarg->arch;
  448. switch (workerarg->arch)
  449. {
  450. #if defined(STARPU_USE_CPU) || defined(STARPU_SIMGRID)
  451. case STARPU_CPU_WORKER:
  452. driver.id.cpu_id = cpu;
  453. if (_starpu_may_launch_driver(pconfig->conf, &driver))
  454. {
  455. STARPU_PTHREAD_CREATE_ON(
  456. workerarg->name,
  457. &workerarg->worker_thread,
  458. NULL,
  459. _starpu_cpu_worker,
  460. workerarg,
  461. worker+1);
  462. #ifdef STARPU_USE_FXT
  463. /* In tracing mode, make sure the
  464. * thread is really started before
  465. * starting another one, to make sure
  466. * they appear in order in the trace.
  467. */
  468. STARPU_PTHREAD_MUTEX_LOCK(&workerarg->mutex);
  469. while (!workerarg->worker_is_running)
  470. STARPU_PTHREAD_COND_WAIT(&workerarg->started_cond, &workerarg->mutex);
  471. STARPU_PTHREAD_MUTEX_UNLOCK(&workerarg->mutex);
  472. #endif
  473. }
  474. else
  475. {
  476. workerarg->run_by_starpu = 0;
  477. }
  478. cpu++;
  479. break;
  480. #endif
  481. #if defined(STARPU_USE_CUDA) || defined(STARPU_SIMGRID)
  482. case STARPU_CUDA_WORKER:
  483. driver.id.cuda_id = workerarg->devid;
  484. if (_starpu_may_launch_driver(pconfig->conf, &driver))
  485. {
  486. /* We spawn only one thread per CUDA device,
  487. * which will control all CUDA workers of this
  488. * device. (by using a worker set). */
  489. if (cuda_worker_set[devid].started)
  490. goto worker_set_initialized;
  491. cuda_worker_set[devid].nworkers = starpu_get_env_number_default("STARPU_NWORKER_PER_CUDA", 1);
  492. cuda_worker_set[devid].workers = workerarg;
  493. cuda_worker_set[devid].set_is_initialized = 0;
  494. STARPU_PTHREAD_CREATE_ON(
  495. workerarg->name,
  496. &cuda_worker_set[devid].worker_thread,
  497. NULL,
  498. _starpu_cuda_worker,
  499. &cuda_worker_set[devid],
  500. worker+1);
  501. #ifdef STARPU_USE_FXT
  502. STARPU_PTHREAD_MUTEX_LOCK(&workerarg->mutex);
  503. while (!workerarg->worker_is_running)
  504. STARPU_PTHREAD_COND_WAIT(&workerarg->started_cond, &workerarg->mutex);
  505. STARPU_PTHREAD_MUTEX_UNLOCK(&workerarg->mutex);
  506. #endif
  507. STARPU_PTHREAD_MUTEX_LOCK(&cuda_worker_set[devid].mutex);
  508. while (!cuda_worker_set[devid].set_is_initialized)
  509. STARPU_PTHREAD_COND_WAIT(&cuda_worker_set[devid].ready_cond,
  510. &cuda_worker_set[devid].mutex);
  511. STARPU_PTHREAD_MUTEX_UNLOCK(&cuda_worker_set[devid].mutex);
  512. cuda_worker_set[devid].started = 1;
  513. worker_set_initialized:
  514. workerarg->set = &cuda_worker_set[devid];
  515. }
  516. else
  517. {
  518. workerarg->run_by_starpu = 0;
  519. }
  520. break;
  521. #endif
  522. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  523. case STARPU_OPENCL_WORKER:
  524. #ifndef STARPU_SIMGRID
  525. starpu_opencl_get_device(workerarg->devid, &driver.id.opencl_id);
  526. if (!_starpu_may_launch_driver(pconfig->conf, &driver))
  527. {
  528. workerarg->run_by_starpu = 0;
  529. break;
  530. }
  531. #endif
  532. STARPU_PTHREAD_CREATE_ON(
  533. workerarg->name,
  534. &workerarg->worker_thread,
  535. NULL,
  536. _starpu_opencl_worker,
  537. workerarg,
  538. worker+1);
  539. #ifdef STARPU_USE_FXT
  540. STARPU_PTHREAD_MUTEX_LOCK(&workerarg->mutex);
  541. while (!workerarg->worker_is_running)
  542. STARPU_PTHREAD_COND_WAIT(&workerarg->started_cond, &workerarg->mutex);
  543. STARPU_PTHREAD_MUTEX_UNLOCK(&workerarg->mutex);
  544. #endif
  545. break;
  546. #endif
  547. #ifdef STARPU_USE_MIC
  548. case STARPU_MIC_WORKER:
  549. /* We spawn only one thread
  550. * per MIC device, which will control all MIC
  551. * workers of this device. (by using a worker set). */
  552. if (mic_worker_set[devid].started)
  553. goto worker_set_initialized;
  554. mic_worker_set[devid].nworkers = pconfig->topology.nmiccores[devid];
  555. /* We assume all MIC workers of a given MIC
  556. * device are contiguous so that we can
  557. * address them with the first one only. */
  558. mic_worker_set[devid].workers = workerarg;
  559. mic_worker_set[devid].set_is_initialized = 0;
  560. STARPU_PTHREAD_CREATE_ON(
  561. workerarg->name,
  562. &mic_worker_set[devid].worker_thread,
  563. NULL,
  564. _starpu_mic_src_worker,
  565. &mic_worker_set[devid],
  566. worker+1);
  567. #ifdef STARPU_USE_FXT
  568. STARPU_PTHREAD_MUTEX_LOCK(&workerarg->mutex);
  569. while (!workerarg->worker_is_running)
  570. STARPU_PTHREAD_COND_WAIT(&workerarg->started_cond, &workerarg->mutex);
  571. STARPU_PTHREAD_MUTEX_UNLOCK(&workerarg->mutex);
  572. #endif
  573. STARPU_PTHREAD_MUTEX_LOCK(&mic_worker_set[devid].mutex);
  574. while (!mic_worker_set[devid].set_is_initialized)
  575. STARPU_PTHREAD_COND_WAIT(&mic_worker_set[devid].ready_cond,
  576. &mic_worker_set[devid].mutex);
  577. STARPU_PTHREAD_MUTEX_UNLOCK(&mic_worker_set[devid].mutex);
  578. mic_worker_set[devid].started = 1;
  579. worker_set_initialized:
  580. workerarg->set = &mic_worker_set[devid];
  581. break;
  582. #endif /* STARPU_USE_MIC */
  583. #ifdef STARPU_USE_SCC
  584. case STARPU_SCC_WORKER:
  585. workerarg->worker_is_initialized = 0;
  586. STARPU_PTHREAD_CREATE_ON(
  587. workerarg->name,
  588. &workerarg->worker_thread,
  589. NULL,
  590. _starpu_scc_src_worker,
  591. workerarg,
  592. worker+1);
  593. #ifdef STARPU_USE_FXT
  594. STARPU_PTHREAD_MUTEX_LOCK(&workerarg->mutex);
  595. while (!workerarg->worker_is_running)
  596. STARPU_PTHREAD_COND_WAIT(&workerarg->started_cond, &workerarg->mutex);
  597. STARPU_PTHREAD_MUTEX_UNLOCK(&workerarg->mutex);
  598. #endif
  599. break;
  600. #endif
  601. default:
  602. STARPU_ABORT();
  603. }
  604. }
  605. cpu = 0;
  606. for (worker = 0; worker < nworkers; worker++)
  607. {
  608. struct _starpu_worker *workerarg = &pconfig->workers[worker];
  609. struct starpu_driver driver;
  610. driver.type = workerarg->arch;
  611. switch (workerarg->arch)
  612. {
  613. case STARPU_CPU_WORKER:
  614. driver.id.cpu_id = cpu;
  615. if (!_starpu_may_launch_driver(pconfig->conf, &driver))
  616. {
  617. cpu++;
  618. break;
  619. }
  620. _STARPU_DEBUG("waiting for worker %u initialization\n", worker);
  621. STARPU_PTHREAD_MUTEX_LOCK(&workerarg->mutex);
  622. while (!workerarg->worker_is_initialized)
  623. STARPU_PTHREAD_COND_WAIT(&workerarg->ready_cond, &workerarg->mutex);
  624. STARPU_PTHREAD_MUTEX_UNLOCK(&workerarg->mutex);
  625. cpu++;
  626. break;
  627. case STARPU_CUDA_WORKER:
  628. /* Already waited above */
  629. break;
  630. #if defined(STARPU_USE_OPENCL) || defined(STARPU_SIMGRID)
  631. case STARPU_OPENCL_WORKER:
  632. #ifndef STARPU_SIMGRID
  633. starpu_opencl_get_device(workerarg->devid, &driver.id.opencl_id);
  634. if (!_starpu_may_launch_driver(pconfig->conf, &driver))
  635. break;
  636. #endif
  637. _STARPU_DEBUG("waiting for worker %u initialization\n", worker);
  638. STARPU_PTHREAD_MUTEX_LOCK(&workerarg->mutex);
  639. while (!workerarg->worker_is_initialized)
  640. STARPU_PTHREAD_COND_WAIT(&workerarg->ready_cond, &workerarg->mutex);
  641. STARPU_PTHREAD_MUTEX_UNLOCK(&workerarg->mutex);
  642. break;
  643. #endif
  644. case STARPU_MIC_WORKER:
  645. /* Already waited above */
  646. break;
  647. case STARPU_SCC_WORKER:
  648. /* TODO: implement may_launch? */
  649. _STARPU_DEBUG("waiting for worker %u initialization\n", worker);
  650. STARPU_PTHREAD_MUTEX_LOCK(&workerarg->mutex);
  651. while (!workerarg->worker_is_initialized)
  652. STARPU_PTHREAD_COND_WAIT(&workerarg->ready_cond, &workerarg->mutex);
  653. STARPU_PTHREAD_MUTEX_UNLOCK(&workerarg->mutex);
  654. break;
  655. default:
  656. STARPU_ABORT();
  657. }
  658. }
  659. _STARPU_DEBUG("finished launching drivers\n");
  660. }
  661. void _starpu_set_local_worker_key(struct _starpu_worker *worker)
  662. {
  663. STARPU_PTHREAD_SETSPECIFIC(worker_key, worker);
  664. }
  665. struct _starpu_worker *_starpu_get_local_worker_key(void)
  666. {
  667. return (struct _starpu_worker *) STARPU_PTHREAD_GETSPECIFIC(worker_key);
  668. }
  669. /* Initialize the starpu_conf with default values */
  670. int starpu_conf_init(struct starpu_conf *conf)
  671. {
  672. if (!conf)
  673. return -EINVAL;
  674. memset(conf, 0, sizeof(*conf));
  675. conf->magic = 42;
  676. conf->sched_policy_name = getenv("STARPU_SCHED");
  677. conf->sched_policy = NULL;
  678. /* Note that starpu_get_env_number returns -1 in case the variable is
  679. * not defined */
  680. /* Backward compatibility: check the value of STARPU_NCPUS if
  681. * STARPU_NCPU is not set. */
  682. conf->ncpus = starpu_get_env_number("STARPU_NCPU");
  683. if (conf->ncpus == -1)
  684. conf->ncpus = starpu_get_env_number("STARPU_NCPUS");
  685. conf->ncuda = starpu_get_env_number("STARPU_NCUDA");
  686. conf->nopencl = starpu_get_env_number("STARPU_NOPENCL");
  687. conf->nmic = starpu_get_env_number("STARPU_NMIC");
  688. conf->nscc = starpu_get_env_number("STARPU_NSCC");
  689. conf->calibrate = starpu_get_env_number("STARPU_CALIBRATE");
  690. conf->bus_calibrate = starpu_get_env_number("STARPU_BUS_CALIBRATE");
  691. conf->mic_sink_program_path = getenv("STARPU_MIC_PROGRAM_PATH");
  692. if (conf->calibrate == -1)
  693. conf->calibrate = 0;
  694. if (conf->bus_calibrate == -1)
  695. conf->bus_calibrate = 0;
  696. conf->use_explicit_workers_bindid = 0; /* TODO */
  697. conf->use_explicit_workers_cuda_gpuid = 0; /* TODO */
  698. conf->use_explicit_workers_opencl_gpuid = 0; /* TODO */
  699. conf->use_explicit_workers_mic_deviceid = 0; /* TODO */
  700. conf->use_explicit_workers_scc_deviceid = 0; /* TODO */
  701. conf->single_combined_worker = starpu_get_env_number("STARPU_SINGLE_COMBINED_WORKER");
  702. if (conf->single_combined_worker == -1)
  703. conf->single_combined_worker = 0;
  704. #if defined(STARPU_DISABLE_ASYNCHRONOUS_COPY)
  705. conf->disable_asynchronous_copy = 1;
  706. #else
  707. conf->disable_asynchronous_copy = starpu_get_env_number("STARPU_DISABLE_ASYNCHRONOUS_COPY");
  708. if (conf->disable_asynchronous_copy == -1)
  709. conf->disable_asynchronous_copy = 0;
  710. #endif
  711. #if defined(STARPU_DISABLE_ASYNCHRONOUS_CUDA_COPY)
  712. conf->disable_asynchronous_cuda_copy = 1;
  713. #else
  714. conf->disable_asynchronous_cuda_copy = starpu_get_env_number("STARPU_DISABLE_ASYNCHRONOUS_CUDA_COPY");
  715. if (conf->disable_asynchronous_cuda_copy == -1)
  716. conf->disable_asynchronous_cuda_copy = 0;
  717. #endif
  718. #if defined(STARPU_DISABLE_ASYNCHRONOUS_OPENCL_COPY)
  719. conf->disable_asynchronous_opencl_copy = 1;
  720. #else
  721. conf->disable_asynchronous_opencl_copy = starpu_get_env_number("STARPU_DISABLE_ASYNCHRONOUS_OPENCL_COPY");
  722. if (conf->disable_asynchronous_opencl_copy == -1)
  723. conf->disable_asynchronous_opencl_copy = 0;
  724. #endif
  725. #if defined(STARPU_DISABLE_ASYNCHRONOUS_MIC_COPY)
  726. conf->disable_asynchronous_mic_copy = 1;
  727. #else
  728. conf->disable_asynchronous_mic_copy = starpu_get_env_number("STARPU_DISABLE_ASYNCHRONOUS_MIC_COPY");
  729. if (conf->disable_asynchronous_mic_copy == -1)
  730. conf->disable_asynchronous_mic_copy = 0;
  731. #endif
  732. /* 64MiB by default */
  733. conf->trace_buffer_size = 64<<20;
  734. return 0;
  735. }
  736. static void _starpu_conf_set_value_against_environment(char *name, int *value)
  737. {
  738. int number;
  739. number = starpu_get_env_number(name);
  740. if (number != -1)
  741. {
  742. *value = number;
  743. }
  744. }
  745. void _starpu_conf_check_environment(struct starpu_conf *conf)
  746. {
  747. char *sched = getenv("STARPU_SCHED");
  748. if (sched)
  749. {
  750. conf->sched_policy_name = sched;
  751. }
  752. _starpu_conf_set_value_against_environment("STARPU_NCPUS", &conf->ncpus);
  753. _starpu_conf_set_value_against_environment("STARPU_NCPU", &conf->ncpus);
  754. _starpu_conf_set_value_against_environment("STARPU_NCUDA", &conf->ncuda);
  755. _starpu_conf_set_value_against_environment("STARPU_NOPENCL", &conf->nopencl);
  756. _starpu_conf_set_value_against_environment("STARPU_CALIBRATE", &conf->calibrate);
  757. _starpu_conf_set_value_against_environment("STARPU_BUS_CALIBRATE", &conf->bus_calibrate);
  758. _starpu_conf_set_value_against_environment("STARPU_SINGLE_COMBINED_WORKER", &conf->single_combined_worker);
  759. _starpu_conf_set_value_against_environment("STARPU_DISABLE_ASYNCHRONOUS_COPY", &conf->disable_asynchronous_copy);
  760. _starpu_conf_set_value_against_environment("STARPU_DISABLE_ASYNCHRONOUS_CUDA_COPY", &conf->disable_asynchronous_cuda_copy);
  761. _starpu_conf_set_value_against_environment("STARPU_DISABLE_ASYNCHRONOUS_OPENCL_COPY", &conf->disable_asynchronous_opencl_copy);
  762. _starpu_conf_set_value_against_environment("STARPU_DISABLE_ASYNCHRONOUS_MIC_COPY", &conf->disable_asynchronous_mic_copy);
  763. }
  764. struct starpu_tree* starpu_workers_get_tree(void)
  765. {
  766. return config.topology.tree;
  767. }
  768. #ifdef STARPU_HAVE_HWLOC
  769. static void _fill_tree(struct starpu_tree *tree, hwloc_obj_t curr_obj, unsigned depth, hwloc_topology_t topology)
  770. {
  771. unsigned i;
  772. for(i = 0; i < curr_obj->arity; i++)
  773. {
  774. starpu_tree_insert(tree->nodes[i], curr_obj->children[i]->logical_index, depth, curr_obj->children[i]->type == HWLOC_OBJ_PU, curr_obj->children[i]->arity, tree);
  775. /* char string[128]; */
  776. /* hwloc_obj_snprintf(string, sizeof(string), topology, curr_obj->children[i], "#", 0); */
  777. /* printf("%*s%s %d is_pu %d \n", 0, "", string, curr_obj->children[i]->logical_index, curr_obj->children[i]->type == HWLOC_OBJ_PU); */
  778. _fill_tree(tree->nodes[i], curr_obj->children[i], depth+1, topology);
  779. }
  780. }
  781. #endif
  782. static void _starpu_build_tree(void)
  783. {
  784. #ifdef STARPU_HAVE_HWLOC
  785. struct starpu_tree* tree = (struct starpu_tree*)malloc(sizeof(struct starpu_tree));
  786. config.topology.tree = tree;
  787. hwloc_obj_t root = hwloc_get_root_obj(config.topology.hwtopology);
  788. /* char string[128]; */
  789. /* hwloc_obj_snprintf(string, sizeof(string), topology, root, "#", 0); */
  790. /* printf("%*s%s %d is_pu = %d \n", 0, "", string, root->logical_index, root->type == HWLOC_OBJ_PU); */
  791. /* level, is_pu, is in the tree (it will be true only after add*/
  792. starpu_tree_insert(tree, root->logical_index, 0,root->type == HWLOC_OBJ_PU, root->arity, NULL);
  793. _fill_tree(tree, root, 1, config.topology.hwtopology);
  794. #endif
  795. }
  796. int starpu_init(struct starpu_conf *user_conf)
  797. {
  798. return starpu_initialize(user_conf, NULL, NULL);
  799. }
  800. int starpu_initialize(struct starpu_conf *user_conf, int *argc, char ***argv)
  801. {
  802. int is_a_sink = 0; /* Always defined. If the MP infrastructure is not
  803. * used, we cannot be a sink. */
  804. unsigned worker;
  805. #ifdef STARPU_USE_MP
  806. _starpu_set_argc_argv(argc, argv);
  807. # ifdef STARPU_USE_SCC
  808. /* In SCC case we look at the rank to know if we are a sink */
  809. if (_starpu_scc_common_mp_init() && !_starpu_scc_common_is_src_node())
  810. setenv("STARPU_SINK", "STARPU_SCC", 1);
  811. # endif
  812. /* If StarPU was configured to use MP sinks, we have to control the
  813. * kind on node we are running on : host or sink ? */
  814. if (getenv("STARPU_SINK"))
  815. is_a_sink = 1;
  816. #else
  817. (void)argc;
  818. (void)argv;
  819. #endif /* STARPU_USE_MP */
  820. int ret;
  821. #ifdef STARPU_SIMGRID
  822. _starpu_simgrid_init();
  823. #else
  824. #ifdef __GNUC__
  825. #ifndef __OPTIMIZE__
  826. _STARPU_DISP("Warning: StarPU was configured with --enable-debug (-O0), and is thus not optimized\n");
  827. #endif
  828. #endif
  829. #ifdef STARPU_SPINLOCK_CHECK
  830. _STARPU_DISP("Warning: StarPU was configured with --enable-spinlock-check, which slows down a bit\n");
  831. #endif
  832. #if 0
  833. #ifndef STARPU_NO_ASSERT
  834. _STARPU_DISP("Warning: StarPU was configured without --enable-fast\n");
  835. #endif
  836. #endif
  837. #ifdef STARPU_MEMORY_STATS
  838. _STARPU_DISP("Warning: StarPU was configured with --enable-memory-stats, which slows down a bit\n");
  839. #endif
  840. #ifdef STARPU_VERBOSE
  841. _STARPU_DISP("Warning: StarPU was configured with --enable-verbose, which slows down a bit\n");
  842. #endif
  843. #ifdef STARPU_USE_FXT
  844. _STARPU_DISP("Warning: StarPU was configured with --with-fxt, which slows down a bit\n");
  845. #endif
  846. #ifdef STARPU_PERF_DEBUG
  847. _STARPU_DISP("Warning: StarPU was configured with --enable-perf-debug, which slows down a bit\n");
  848. #endif
  849. #ifdef STARPU_MODEL_DEBUG
  850. _STARPU_DISP("Warning: StarPU was configured with --enable-model-debug, which slows down a bit\n");
  851. #endif
  852. #ifdef STARPU_ENABLE_STATS
  853. _STARPU_DISP("Warning: StarPU was configured with --enable-stats, which slows down a bit\n");
  854. #endif
  855. #endif
  856. STARPU_PTHREAD_MUTEX_LOCK(&init_mutex);
  857. while (initialized == CHANGING)
  858. /* Wait for the other one changing it */
  859. STARPU_PTHREAD_COND_WAIT(&init_cond, &init_mutex);
  860. init_count++;
  861. if (initialized == INITIALIZED)
  862. {
  863. /* He initialized it, don't do it again, and let the others get the mutex */
  864. STARPU_PTHREAD_MUTEX_UNLOCK(&init_mutex);
  865. return 0;
  866. }
  867. /* initialized == UNINITIALIZED */
  868. initialized = CHANGING;
  869. STARPU_PTHREAD_MUTEX_UNLOCK(&init_mutex);
  870. #ifdef __MINGW32__
  871. WSADATA wsadata;
  872. WSAStartup(MAKEWORD(1,0), &wsadata);
  873. #endif
  874. srand(2008);
  875. #ifdef HAVE_AYUDAME_H
  876. #ifndef AYU_RT_STARPU
  877. /* Dumb value for now */
  878. #define AYU_RT_STARPU 32
  879. #endif
  880. if (AYU_event)
  881. {
  882. enum ayu_runtime_t ayu_rt = AYU_RT_STARPU;
  883. AYU_event(AYU_PREINIT, 0, (void*) &ayu_rt);
  884. }
  885. #endif
  886. /* store the pointer to the user explicit configuration during the
  887. * initialization */
  888. if (user_conf == NULL)
  889. {
  890. struct starpu_conf *conf = malloc(sizeof(struct starpu_conf));
  891. starpu_conf_init(conf);
  892. config.conf = conf;
  893. config.default_conf = 1;
  894. }
  895. else
  896. {
  897. if (user_conf->magic != 42)
  898. {
  899. _STARPU_DISP("starpu_conf structure needs to be initialized with starpu_conf_init\n");
  900. return -EINVAL;
  901. }
  902. config.conf = user_conf;
  903. config.default_conf = 0;
  904. }
  905. _starpu_conf_check_environment(config.conf);
  906. _starpu_init_all_sched_ctxs(&config);
  907. _starpu_init_progression_hooks();
  908. _starpu_init_tags();
  909. #ifdef STARPU_USE_FXT
  910. _starpu_init_fxt_profiling(config.conf->trace_buffer_size);
  911. #endif
  912. _starpu_open_debug_logfile();
  913. _starpu_data_interface_init();
  914. _starpu_timing_init();
  915. _starpu_profiling_init();
  916. _starpu_load_bus_performance_files();
  917. /* Depending on whether we are a MP sink or not, we must build the
  918. * topology with MP nodes or not. */
  919. ret = _starpu_build_topology(&config, is_a_sink);
  920. if (ret)
  921. {
  922. STARPU_PTHREAD_MUTEX_LOCK(&init_mutex);
  923. init_count--;
  924. #ifdef STARPU_USE_SCC
  925. if (_starpu_scc_common_is_mp_initialized())
  926. _starpu_scc_src_mp_deinit();
  927. #endif
  928. initialized = UNINITIALIZED;
  929. /* Let somebody else try to do it */
  930. STARPU_PTHREAD_COND_SIGNAL(&init_cond);
  931. STARPU_PTHREAD_MUTEX_UNLOCK(&init_mutex);
  932. return ret;
  933. }
  934. /* We need to store the current task handled by the different
  935. * threads */
  936. _starpu_initialize_current_task_key();
  937. for (worker = 0; worker < config.topology.nworkers; worker++)
  938. _starpu_worker_init(&config.workers[worker], &config);
  939. STARPU_PTHREAD_KEY_CREATE(&worker_key, NULL);
  940. _starpu_build_tree();
  941. if (!is_a_sink)
  942. {
  943. struct starpu_sched_policy *selected_policy = _starpu_select_sched_policy(&config, config.conf->sched_policy_name);
  944. _starpu_create_sched_ctx(selected_policy, NULL, -1, 1, "init", 0, 0, 0, 0);
  945. }
  946. _starpu_initialize_registered_performance_models();
  947. /* Launch "basic" workers (ie. non-combined workers) */
  948. if (!is_a_sink)
  949. _starpu_launch_drivers(&config);
  950. _starpu_watchdog_init();
  951. STARPU_PTHREAD_MUTEX_LOCK(&init_mutex);
  952. initialized = INITIALIZED;
  953. /* Tell everybody that we initialized */
  954. STARPU_PTHREAD_COND_BROADCAST(&init_cond);
  955. STARPU_PTHREAD_MUTEX_UNLOCK(&init_mutex);
  956. _STARPU_DEBUG("Initialisation finished\n");
  957. #ifdef STARPU_USE_MP
  958. /* Finally, if we are a MP sink, we never leave this function. Else,
  959. * we enter an infinite event loop which listen for MP commands from
  960. * the source. */
  961. if (is_a_sink) {
  962. _starpu_sink_common_worker();
  963. /* We should normally never leave the loop as we don't want to
  964. * really initialize STARPU */
  965. STARPU_ASSERT(0);
  966. }
  967. #endif
  968. return 0;
  969. }
  970. /*
  971. * Handle runtime termination
  972. */
  973. static void _starpu_terminate_workers(struct _starpu_machine_config *pconfig)
  974. {
  975. int status = 0;
  976. unsigned workerid;
  977. for (workerid = 0; workerid < pconfig->topology.nworkers; workerid++)
  978. {
  979. starpu_wake_all_blocked_workers();
  980. _STARPU_DEBUG("wait for worker %u\n", workerid);
  981. struct _starpu_worker_set *set = pconfig->workers[workerid].set;
  982. struct _starpu_worker *worker = &pconfig->workers[workerid];
  983. /* in case StarPU termination code is called from a callback,
  984. * we have to check if pthread_self() is the worker itself */
  985. if (set)
  986. {
  987. if (set->started)
  988. {
  989. #ifdef STARPU_SIMGRID
  990. status = starpu_pthread_join(set->worker_thread, NULL);
  991. #else
  992. if (!pthread_equal(pthread_self(), set->worker_thread))
  993. status = starpu_pthread_join(set->worker_thread, NULL);
  994. #endif
  995. if (status)
  996. {
  997. #ifdef STARPU_VERBOSE
  998. _STARPU_DEBUG("starpu_pthread_join -> %d\n", status);
  999. #endif
  1000. }
  1001. set->started = 0;
  1002. }
  1003. }
  1004. else
  1005. {
  1006. if (!worker->run_by_starpu)
  1007. goto out;
  1008. #ifdef STARPU_SIMGRID
  1009. status = starpu_pthread_join(worker->worker_thread, NULL);
  1010. #else
  1011. if (!pthread_equal(pthread_self(), worker->worker_thread))
  1012. status = starpu_pthread_join(worker->worker_thread, NULL);
  1013. #endif
  1014. if (status)
  1015. {
  1016. #ifdef STARPU_VERBOSE
  1017. _STARPU_DEBUG("starpu_pthread_join -> %d\n", status);
  1018. #endif
  1019. }
  1020. }
  1021. out:
  1022. STARPU_ASSERT(starpu_task_list_empty(&worker->local_tasks));
  1023. _starpu_sched_ctx_list_delete(&worker->sched_ctx_list);
  1024. _starpu_job_list_delete(worker->terminated_jobs);
  1025. }
  1026. }
  1027. /* Condition variable and mutex used to pause/resume. */
  1028. static starpu_pthread_cond_t pause_cond = STARPU_PTHREAD_COND_INITIALIZER;
  1029. static starpu_pthread_mutex_t pause_mutex = STARPU_PTHREAD_MUTEX_INITIALIZER;
  1030. unsigned _starpu_machine_is_running(void)
  1031. {
  1032. unsigned ret;
  1033. /* running and pause_depth are just protected by a memory barrier */
  1034. STARPU_RMB();
  1035. if (STARPU_UNLIKELY(config.pause_depth > 0)) {
  1036. STARPU_PTHREAD_MUTEX_LOCK(&pause_mutex);
  1037. if (config.pause_depth > 0) {
  1038. STARPU_PTHREAD_COND_WAIT(&pause_cond, &pause_mutex);
  1039. }
  1040. STARPU_PTHREAD_MUTEX_UNLOCK(&pause_mutex);
  1041. }
  1042. ANNOTATE_HAPPENS_AFTER(&config.running);
  1043. ret = config.running;
  1044. ANNOTATE_HAPPENS_BEFORE(&config.running);
  1045. return ret;
  1046. }
  1047. void starpu_pause()
  1048. {
  1049. STARPU_HG_DISABLE_CHECKING(config.pause_depth);
  1050. config.pause_depth += 1;
  1051. }
  1052. void starpu_resume()
  1053. {
  1054. STARPU_PTHREAD_MUTEX_LOCK(&pause_mutex);
  1055. config.pause_depth -= 1;
  1056. if (!config.pause_depth) {
  1057. STARPU_PTHREAD_COND_BROADCAST(&pause_cond);
  1058. }
  1059. STARPU_PTHREAD_MUTEX_UNLOCK(&pause_mutex);
  1060. }
  1061. unsigned _starpu_worker_can_block(unsigned memnode STARPU_ATTRIBUTE_UNUSED)
  1062. {
  1063. #ifdef STARPU_NON_BLOCKING_DRIVERS
  1064. return 0;
  1065. #else
  1066. unsigned can_block = 1;
  1067. #ifndef STARPU_SIMGRID
  1068. if (!_starpu_check_that_no_data_request_exists(memnode))
  1069. can_block = 0;
  1070. #endif
  1071. if (!_starpu_machine_is_running())
  1072. can_block = 0;
  1073. if (!_starpu_execute_registered_progression_hooks())
  1074. can_block = 0;
  1075. return can_block;
  1076. #endif
  1077. }
  1078. static void _starpu_kill_all_workers(struct _starpu_machine_config *pconfig)
  1079. {
  1080. /* set the flag which will tell workers to stop */
  1081. ANNOTATE_HAPPENS_AFTER(&config.running);
  1082. pconfig->running = 0;
  1083. /* running is just protected by a memory barrier */
  1084. ANNOTATE_HAPPENS_BEFORE(&config.running);
  1085. STARPU_WMB();
  1086. starpu_wake_all_blocked_workers();
  1087. }
  1088. void starpu_display_stats()
  1089. {
  1090. starpu_profiling_bus_helper_display_summary();
  1091. starpu_profiling_worker_helper_display_summary();
  1092. }
  1093. void starpu_shutdown(void)
  1094. {
  1095. STARPU_PTHREAD_MUTEX_LOCK(&init_mutex);
  1096. init_count--;
  1097. if (init_count)
  1098. {
  1099. _STARPU_DEBUG("Still somebody needing StarPU, don't deinitialize\n");
  1100. STARPU_PTHREAD_MUTEX_UNLOCK(&init_mutex);
  1101. return;
  1102. }
  1103. /* We're last */
  1104. initialized = CHANGING;
  1105. STARPU_PTHREAD_MUTEX_UNLOCK(&init_mutex);
  1106. /* If the workers are frozen, no progress can be made. */
  1107. STARPU_ASSERT(config.pause_depth <= 0);
  1108. starpu_task_wait_for_no_ready();
  1109. /* tell all workers to shutdown */
  1110. _starpu_kill_all_workers(&config);
  1111. {
  1112. int stats = starpu_get_env_number("STARPU_STATS");
  1113. if (stats != 0)
  1114. {
  1115. _starpu_display_msi_stats();
  1116. _starpu_display_alloc_cache_stats();
  1117. _starpu_display_comm_amounts();
  1118. }
  1119. }
  1120. starpu_profiling_bus_helper_display_summary();
  1121. starpu_profiling_worker_helper_display_summary();
  1122. _starpu_deinitialize_registered_performance_models();
  1123. _starpu_watchdog_shutdown();
  1124. /* wait for their termination */
  1125. _starpu_terminate_workers(&config);
  1126. {
  1127. int stats = starpu_get_env_number("STARPU_MEMORY_STATS");
  1128. if (stats != 0)
  1129. {
  1130. // Display statistics on data which have not been unregistered
  1131. starpu_data_display_memory_stats();
  1132. }
  1133. }
  1134. _starpu_delete_all_sched_ctxs();
  1135. _starpu_disk_unregister();
  1136. #ifdef STARPU_HAVE_HWLOC
  1137. starpu_tree_free(config.topology.tree);
  1138. #endif
  1139. _starpu_destroy_topology(&config);
  1140. #ifdef STARPU_USE_FXT
  1141. _starpu_stop_fxt_profiling();
  1142. #endif
  1143. _starpu_data_interface_shutdown();
  1144. /* Drop all remaining tags */
  1145. _starpu_tag_clear();
  1146. _starpu_close_debug_logfile();
  1147. STARPU_PTHREAD_MUTEX_LOCK(&init_mutex);
  1148. initialized = UNINITIALIZED;
  1149. /* Let someone else that wants to initialize it again do it */
  1150. STARPU_PTHREAD_COND_SIGNAL(&init_cond);
  1151. STARPU_PTHREAD_MUTEX_UNLOCK(&init_mutex);
  1152. /* Clear memory if it was allocated by StarPU */
  1153. if (config.default_conf)
  1154. free(config.conf);
  1155. #ifdef HAVE_AYUDAME_H
  1156. if (AYU_event) AYU_event(AYU_FINISH, 0, NULL);
  1157. #endif
  1158. #ifdef STARPU_USE_SCC
  1159. if (_starpu_scc_common_is_mp_initialized())
  1160. _starpu_scc_src_mp_deinit();
  1161. #endif
  1162. _starpu_print_idle_time();
  1163. _STARPU_DEBUG("Shutdown finished\n");
  1164. }
  1165. unsigned starpu_worker_get_count(void)
  1166. {
  1167. return config.topology.nworkers;
  1168. }
  1169. int starpu_worker_get_count_by_type(enum starpu_worker_archtype type)
  1170. {
  1171. switch (type)
  1172. {
  1173. case STARPU_CPU_WORKER:
  1174. return config.topology.ncpus;
  1175. case STARPU_CUDA_WORKER:
  1176. return config.topology.ncudagpus;
  1177. case STARPU_OPENCL_WORKER:
  1178. return config.topology.nopenclgpus;
  1179. case STARPU_MIC_WORKER:
  1180. return config.topology.nmicdevices;
  1181. case STARPU_SCC_WORKER:
  1182. return config.topology.nsccdevices;
  1183. default:
  1184. return -EINVAL;
  1185. }
  1186. }
  1187. unsigned starpu_combined_worker_get_count(void)
  1188. {
  1189. return config.topology.ncombinedworkers;
  1190. }
  1191. unsigned starpu_cpu_worker_get_count(void)
  1192. {
  1193. return config.topology.ncpus;
  1194. }
  1195. unsigned starpu_cuda_worker_get_count(void)
  1196. {
  1197. return config.topology.ncudagpus;
  1198. }
  1199. unsigned starpu_opencl_worker_get_count(void)
  1200. {
  1201. return config.topology.nopenclgpus;
  1202. }
  1203. int starpu_asynchronous_copy_disabled(void)
  1204. {
  1205. return config.conf->disable_asynchronous_copy;
  1206. }
  1207. int starpu_asynchronous_cuda_copy_disabled(void)
  1208. {
  1209. return config.conf->disable_asynchronous_cuda_copy;
  1210. }
  1211. int starpu_asynchronous_opencl_copy_disabled(void)
  1212. {
  1213. return config.conf->disable_asynchronous_opencl_copy;
  1214. }
  1215. int starpu_asynchronous_mic_copy_disabled(void)
  1216. {
  1217. return config.conf->disable_asynchronous_mic_copy;
  1218. }
  1219. unsigned starpu_mic_worker_get_count(void)
  1220. {
  1221. int i = 0, count = 0;
  1222. for (i = 0; i < STARPU_MAXMICDEVS; i++)
  1223. count += config.topology.nmiccores[i];
  1224. return count;
  1225. }
  1226. unsigned starpu_scc_worker_get_count(void)
  1227. {
  1228. return config.topology.nsccdevices;
  1229. }
  1230. /* When analyzing performance, it is useful to see what is the processing unit
  1231. * that actually performed the task. This function returns the id of the
  1232. * processing unit actually executing it, therefore it makes no sense to use it
  1233. * within the callbacks of SPU functions for instance. If called by some thread
  1234. * that is not controlled by StarPU, starpu_worker_get_id returns -1. */
  1235. int starpu_worker_get_id(void)
  1236. {
  1237. struct _starpu_worker * worker;
  1238. worker = _starpu_get_local_worker_key();
  1239. if (worker)
  1240. {
  1241. return worker->workerid;
  1242. }
  1243. else
  1244. {
  1245. /* there is no worker associated to that thread, perhaps it is
  1246. * a thread from the application or this is some SPU worker */
  1247. return -1;
  1248. }
  1249. }
  1250. int starpu_combined_worker_get_id(void)
  1251. {
  1252. struct _starpu_worker *worker;
  1253. worker = _starpu_get_local_worker_key();
  1254. if (worker)
  1255. {
  1256. return worker->combined_workerid;
  1257. }
  1258. else
  1259. {
  1260. /* there is no worker associated to that thread, perhaps it is
  1261. * a thread from the application or this is some SPU worker */
  1262. return -1;
  1263. }
  1264. }
  1265. int starpu_combined_worker_get_size(void)
  1266. {
  1267. struct _starpu_worker *worker;
  1268. worker = _starpu_get_local_worker_key();
  1269. if (worker)
  1270. {
  1271. return worker->worker_size;
  1272. }
  1273. else
  1274. {
  1275. /* there is no worker associated to that thread, perhaps it is
  1276. * a thread from the application or this is some SPU worker */
  1277. return -1;
  1278. }
  1279. }
  1280. int starpu_combined_worker_get_rank(void)
  1281. {
  1282. struct _starpu_worker *worker;
  1283. worker = _starpu_get_local_worker_key();
  1284. if (worker)
  1285. {
  1286. return worker->current_rank;
  1287. }
  1288. else
  1289. {
  1290. /* there is no worker associated to that thread, perhaps it is
  1291. * a thread from the application or this is some SPU worker */
  1292. return -1;
  1293. }
  1294. }
  1295. int starpu_worker_get_subworkerid(int id)
  1296. {
  1297. return config.workers[id].subworkerid;
  1298. }
  1299. int starpu_worker_get_devid(int id)
  1300. {
  1301. return config.workers[id].devid;
  1302. }
  1303. struct _starpu_worker *_starpu_get_worker_struct(unsigned id)
  1304. {
  1305. return &config.workers[id];
  1306. }
  1307. unsigned starpu_worker_is_combined_worker(int id)
  1308. {
  1309. return id >= (int)config.topology.nworkers;
  1310. }
  1311. struct _starpu_sched_ctx *_starpu_get_sched_ctx_struct(unsigned id)
  1312. {
  1313. if(id == STARPU_NMAX_SCHED_CTXS) return NULL;
  1314. return &config.sched_ctxs[id];
  1315. }
  1316. struct _starpu_combined_worker *_starpu_get_combined_worker_struct(unsigned id)
  1317. {
  1318. unsigned basic_worker_count = starpu_worker_get_count();
  1319. //_STARPU_DEBUG("basic_worker_count:%d\n",basic_worker_count);
  1320. STARPU_ASSERT(id >= basic_worker_count);
  1321. return &config.combined_workers[id - basic_worker_count];
  1322. }
  1323. enum starpu_worker_archtype starpu_worker_get_type(int id)
  1324. {
  1325. return config.workers[id].arch;
  1326. }
  1327. int starpu_worker_get_ids_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize)
  1328. {
  1329. unsigned nworkers = starpu_worker_get_count();
  1330. int cnt = 0;
  1331. unsigned id;
  1332. for (id = 0; id < nworkers; id++)
  1333. {
  1334. if (starpu_worker_get_type(id) == type)
  1335. {
  1336. /* Perhaps the array is too small ? */
  1337. if (cnt >= maxsize)
  1338. return -ERANGE;
  1339. workerids[cnt++] = id;
  1340. }
  1341. }
  1342. return cnt;
  1343. }
  1344. int starpu_worker_get_by_type(enum starpu_worker_archtype type, int num)
  1345. {
  1346. unsigned nworkers = starpu_worker_get_count();
  1347. int cnt = 0;
  1348. unsigned id;
  1349. for (id = 0; id < nworkers; id++)
  1350. {
  1351. if (starpu_worker_get_type(id) == type)
  1352. {
  1353. if (num == cnt)
  1354. return id;
  1355. cnt++;
  1356. }
  1357. }
  1358. /* Not found */
  1359. return -1;
  1360. }
  1361. int starpu_worker_get_by_devid(enum starpu_worker_archtype type, int devid)
  1362. {
  1363. unsigned nworkers = starpu_worker_get_count();
  1364. unsigned id;
  1365. for (id = 0; id < nworkers; id++)
  1366. if (starpu_worker_get_type(id) == type && starpu_worker_get_devid(id) == devid)
  1367. return id;
  1368. /* Not found */
  1369. return -1;
  1370. }
  1371. void starpu_worker_get_name(int id, char *dst, size_t maxlen)
  1372. {
  1373. char *name = config.workers[id].name;
  1374. snprintf(dst, maxlen, "%s", name);
  1375. }
  1376. int starpu_worker_get_bindid(int workerid)
  1377. {
  1378. return config.workers[workerid].bindid;
  1379. }
  1380. int _starpu_worker_get_workerids(int bindid, int *workerids)
  1381. {
  1382. unsigned nworkers = starpu_worker_get_count();
  1383. int nw = 0;
  1384. unsigned id;
  1385. for (id = 0; id < nworkers; id++)
  1386. if (config.workers[id].bindid == bindid)
  1387. workerids[nw++] = id;
  1388. return nw;
  1389. }
  1390. /* Retrieve the status which indicates what the worker is currently doing. */
  1391. enum _starpu_worker_status _starpu_worker_get_status(int workerid)
  1392. {
  1393. return config.workers[workerid].status;
  1394. }
  1395. /* Change the status of the worker which indicates what the worker is currently
  1396. * doing (eg. executing a callback). */
  1397. void _starpu_worker_set_status(int workerid, enum _starpu_worker_status status)
  1398. {
  1399. config.workers[workerid].status = status;
  1400. }
  1401. void starpu_worker_get_sched_condition(int workerid, starpu_pthread_mutex_t **sched_mutex, starpu_pthread_cond_t **sched_cond)
  1402. {
  1403. *sched_cond = &config.workers[workerid].sched_cond;
  1404. *sched_mutex = &config.workers[workerid].sched_mutex;
  1405. }
  1406. int starpu_wakeup_worker(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex)
  1407. {
  1408. int success = 0;
  1409. STARPU_PTHREAD_MUTEX_LOCK(mutex);
  1410. if (config.workers[workerid].status == STATUS_SLEEPING)
  1411. {
  1412. config.workers[workerid].status = STATUS_WAKING_UP;
  1413. STARPU_PTHREAD_COND_SIGNAL(cond);
  1414. success = 1;
  1415. }
  1416. STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
  1417. return success;
  1418. }
  1419. int starpu_worker_get_nids_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize)
  1420. {
  1421. unsigned nworkers = starpu_worker_get_count();
  1422. int cnt = 0;
  1423. unsigned id;
  1424. for (id = 0; id < nworkers; id++)
  1425. {
  1426. if (starpu_worker_get_type(id) == type)
  1427. {
  1428. /* Perhaps the array is too small ? */
  1429. if (cnt >= maxsize)
  1430. return cnt;
  1431. workerids[cnt++] = id;
  1432. }
  1433. }
  1434. return cnt;
  1435. }
  1436. int starpu_worker_get_nids_ctx_free_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize)
  1437. {
  1438. unsigned nworkers = starpu_worker_get_count();
  1439. int cnt = 0;
  1440. unsigned id, worker;
  1441. unsigned found = 0;
  1442. for (id = 0; id < nworkers; id++)
  1443. {
  1444. found = 0;
  1445. if (starpu_worker_get_type(id) == type)
  1446. {
  1447. /* Perhaps the array is too small ? */
  1448. if (cnt >= maxsize)
  1449. return cnt;
  1450. int s;
  1451. for(s = 1; s < STARPU_NMAX_SCHED_CTXS; s++)
  1452. {
  1453. if(config.sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
  1454. {
  1455. struct starpu_worker_collection *workers = config.sched_ctxs[s].workers;
  1456. struct starpu_sched_ctx_iterator it;
  1457. if(workers->init_iterator)
  1458. workers->init_iterator(workers, &it);
  1459. while(workers->has_next(workers, &it))
  1460. {
  1461. worker = workers->get_next(workers, &it);
  1462. if(worker == id)
  1463. {
  1464. found = 1;
  1465. break;
  1466. }
  1467. }
  1468. if(found) break;
  1469. }
  1470. }
  1471. if(!found)
  1472. workerids[cnt++] = id;
  1473. }
  1474. }
  1475. return cnt;
  1476. }
  1477. struct _starpu_sched_ctx* _starpu_get_initial_sched_ctx(void)
  1478. {
  1479. return &config.sched_ctxs[STARPU_GLOBAL_SCHED_CTX];
  1480. }
  1481. int _starpu_worker_get_nsched_ctxs(int workerid)
  1482. {
  1483. return config.workers[workerid].nsched_ctxs;
  1484. }
  1485. static void *
  1486. _starpu_get_worker_from_driver(struct starpu_driver *d)
  1487. {
  1488. unsigned nworkers = starpu_worker_get_count();
  1489. unsigned workerid;
  1490. #ifdef STARPU_USE_CUDA
  1491. if (d->type == STARPU_CUDA_WORKER)
  1492. return &cuda_worker_set[d->id.cuda_id];
  1493. #endif
  1494. for (workerid = 0; workerid < nworkers; workerid++)
  1495. {
  1496. if (starpu_worker_get_type(workerid) == d->type)
  1497. {
  1498. struct _starpu_worker *worker;
  1499. worker = _starpu_get_worker_struct(workerid);
  1500. switch (d->type)
  1501. {
  1502. #ifdef STARPU_USE_CPU
  1503. case STARPU_CPU_WORKER:
  1504. if (worker->devid == d->id.cpu_id)
  1505. return worker;
  1506. break;
  1507. #endif
  1508. #ifdef STARPU_USE_OPENCL
  1509. case STARPU_OPENCL_WORKER:
  1510. {
  1511. cl_device_id device;
  1512. starpu_opencl_get_device(worker->devid, &device);
  1513. if (device == d->id.opencl_id)
  1514. return worker;
  1515. break;
  1516. }
  1517. #endif
  1518. default:
  1519. _STARPU_DEBUG("Invalid device type\n");
  1520. return NULL;
  1521. }
  1522. }
  1523. }
  1524. return NULL;
  1525. }
  1526. int
  1527. starpu_driver_run(struct starpu_driver *d)
  1528. {
  1529. if (!d)
  1530. {
  1531. _STARPU_DEBUG("Invalid argument\n");
  1532. return -EINVAL;
  1533. }
  1534. void *worker = _starpu_get_worker_from_driver(d);
  1535. switch (d->type)
  1536. {
  1537. #ifdef STARPU_USE_CPU
  1538. case STARPU_CPU_WORKER:
  1539. return _starpu_run_cpu(worker);
  1540. #endif
  1541. #ifdef STARPU_USE_CUDA
  1542. case STARPU_CUDA_WORKER:
  1543. return _starpu_run_cuda(worker);
  1544. #endif
  1545. #ifdef STARPU_USE_OPENCL
  1546. case STARPU_OPENCL_WORKER:
  1547. return _starpu_run_opencl(worker);
  1548. #endif
  1549. default:
  1550. _STARPU_DEBUG("Invalid device type\n");
  1551. return -EINVAL;
  1552. }
  1553. }
  1554. int
  1555. starpu_driver_init(struct starpu_driver *d)
  1556. {
  1557. STARPU_ASSERT(d);
  1558. void *worker = _starpu_get_worker_from_driver(d);
  1559. switch (d->type)
  1560. {
  1561. #ifdef STARPU_USE_CPU
  1562. case STARPU_CPU_WORKER:
  1563. return _starpu_cpu_driver_init(worker);
  1564. #endif
  1565. #ifdef STARPU_USE_CUDA
  1566. case STARPU_CUDA_WORKER:
  1567. return _starpu_cuda_driver_init(worker);
  1568. #endif
  1569. #ifdef STARPU_USE_OPENCL
  1570. case STARPU_OPENCL_WORKER:
  1571. return _starpu_opencl_driver_init(worker);
  1572. #endif
  1573. default:
  1574. return -EINVAL;
  1575. }
  1576. }
  1577. int
  1578. starpu_driver_run_once(struct starpu_driver *d)
  1579. {
  1580. STARPU_ASSERT(d);
  1581. void *worker = _starpu_get_worker_from_driver(d);
  1582. switch (d->type)
  1583. {
  1584. #ifdef STARPU_USE_CPU
  1585. case STARPU_CPU_WORKER:
  1586. return _starpu_cpu_driver_run_once(worker);
  1587. #endif
  1588. #ifdef STARPU_USE_CUDA
  1589. case STARPU_CUDA_WORKER:
  1590. return _starpu_cuda_driver_run_once(worker);
  1591. #endif
  1592. #ifdef STARPU_USE_OPENCL
  1593. case STARPU_OPENCL_WORKER:
  1594. return _starpu_opencl_driver_run_once(worker);
  1595. #endif
  1596. default:
  1597. return -EINVAL;
  1598. }
  1599. }
  1600. int
  1601. starpu_driver_deinit(struct starpu_driver *d)
  1602. {
  1603. STARPU_ASSERT(d);
  1604. void *worker = _starpu_get_worker_from_driver(d);
  1605. switch (d->type)
  1606. {
  1607. #ifdef STARPU_USE_CPU
  1608. case STARPU_CPU_WORKER:
  1609. return _starpu_cpu_driver_deinit(worker);
  1610. #endif
  1611. #ifdef STARPU_USE_CUDA
  1612. case STARPU_CUDA_WORKER:
  1613. return _starpu_cuda_driver_deinit(worker);
  1614. #endif
  1615. #ifdef STARPU_USE_OPENCL
  1616. case STARPU_OPENCL_WORKER:
  1617. return _starpu_opencl_driver_deinit(worker);
  1618. #endif
  1619. default:
  1620. return -EINVAL;
  1621. }
  1622. }
  1623. void starpu_get_version(int *major, int *minor, int *release)
  1624. {
  1625. *major = STARPU_MAJOR_VERSION;
  1626. *minor = STARPU_MINOR_VERSION;
  1627. *release = STARPU_RELEASE_VERSION;
  1628. }
  1629. void _starpu_unlock_mutex_if_prev_locked()
  1630. {
  1631. int workerid = starpu_worker_get_id();
  1632. if(workerid != -1)
  1633. {
  1634. struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
  1635. if(w->sched_mutex_locked)
  1636. {
  1637. STARPU_PTHREAD_MUTEX_UNLOCK(&w->sched_mutex);
  1638. _starpu_worker_set_flag_sched_mutex_locked(workerid, 1);
  1639. }
  1640. }
  1641. return;
  1642. }
  1643. void _starpu_relock_mutex_if_prev_locked()
  1644. {
  1645. int workerid = starpu_worker_get_id();
  1646. if(workerid != -1)
  1647. {
  1648. struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
  1649. if(w->sched_mutex_locked)
  1650. STARPU_PTHREAD_MUTEX_LOCK(&w->sched_mutex);
  1651. }
  1652. return;
  1653. }
  1654. void _starpu_worker_set_flag_sched_mutex_locked(int workerid, unsigned flag)
  1655. {
  1656. struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
  1657. w->sched_mutex_locked = flag;
  1658. }
  1659. unsigned _starpu_worker_mutex_is_sched_mutex(int workerid, starpu_pthread_mutex_t *mutex)
  1660. {
  1661. struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
  1662. return &w->sched_mutex == mutex;
  1663. }
  1664. unsigned starpu_worker_get_sched_ctx_list(int workerid, unsigned **sched_ctxs)
  1665. {
  1666. unsigned s = 0;
  1667. unsigned nsched_ctxs = _starpu_worker_get_nsched_ctxs(workerid);
  1668. *sched_ctxs = (unsigned*)malloc(nsched_ctxs*sizeof(unsigned));
  1669. struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
  1670. struct _starpu_sched_ctx_list *l = NULL;
  1671. for (l = worker->sched_ctx_list; l; l = l->next)
  1672. {
  1673. (*sched_ctxs)[s++] = l->sched_ctx;
  1674. }
  1675. return nsched_ctxs;
  1676. }