driver_cpu.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2014 Université de Bordeaux 1
  4. * Copyright (C) 2010 Mehdi Juhoor <mjuhoor@gmail.com>
  5. * Copyright (C) 2010-2013 Centre National de la Recherche Scientifique
  6. * Copyright (C) 2011 Télécom-SudParis
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. #include <common/config.h>
  20. #include <math.h>
  21. #include <starpu.h>
  22. #include <starpu_profiling.h>
  23. #include <drivers/driver_common/driver_common.h>
  24. #include <common/utils.h>
  25. #include <core/debug.h>
  26. #include <core/workers.h>
  27. #include "driver_cpu.h"
  28. #include <core/sched_policy.h>
  29. #include <datawizard/memory_manager.h>
  30. #include <datawizard/malloc.h>
  31. #ifdef STARPU_HAVE_HWLOC
  32. #include <hwloc.h>
  33. #ifndef HWLOC_API_VERSION
  34. #define HWLOC_OBJ_PU HWLOC_OBJ_PROC
  35. #endif
  36. #endif
  37. #ifdef STARPU_HAVE_WINDOWS
  38. #include <windows.h>
  39. #endif
  40. #ifdef STARPU_SIMGRID
  41. #include <core/simgrid.h>
  42. #endif
  43. #ifdef STARPU_SIMGRID
  44. void
  45. _starpu_cpu_discover_devices(struct _starpu_machine_config *config)
  46. {
  47. config->topology.nhwcpus = _starpu_simgrid_get_nbhosts("CPU");
  48. }
  49. #elif defined(STARPU_HAVE_HWLOC)
  50. void
  51. _starpu_cpu_discover_devices(struct _starpu_machine_config *config)
  52. {
  53. /* Discover the CPUs relying on the hwloc interface and fills CONFIG
  54. * accordingly. */
  55. struct _starpu_machine_topology *topology = &config->topology;
  56. config->cpu_depth = hwloc_get_type_depth (topology->hwtopology,
  57. HWLOC_OBJ_CORE);
  58. /* Would be very odd */
  59. STARPU_ASSERT(config->cpu_depth != HWLOC_TYPE_DEPTH_MULTIPLE);
  60. if (config->cpu_depth == HWLOC_TYPE_DEPTH_UNKNOWN) {
  61. /* unknown, using logical procesors as fallback */
  62. _STARPU_DISP("Warning: The OS did not report CPU cores. Assuming there is only one hardware thread per core.\n");
  63. config->cpu_depth = hwloc_get_type_depth(topology->hwtopology,
  64. HWLOC_OBJ_PU);
  65. }
  66. topology->nhwcpus = hwloc_get_nbobjs_by_depth (topology->hwtopology,
  67. config->cpu_depth);
  68. }
  69. #elif defined(HAVE_SYSCONF)
  70. void
  71. _starpu_cpu_discover_devices(struct _starpu_machine_config *config)
  72. {
  73. /* Discover the CPUs relying on the sysconf(3) function and fills
  74. * CONFIG accordingly. */
  75. config->topology.nhwcpus = sysconf(_SC_NPROCESSORS_ONLN);
  76. }
  77. #elif defined(__MINGW32__) || defined(__CYGWIN__)
  78. void
  79. _starpu_cpu_discover_devices(struct _starpu_machine_config *config)
  80. {
  81. /* Discover the CPUs on Cygwin and MinGW systems. */
  82. SYSTEM_INFO sysinfo;
  83. GetSystemInfo(&sysinfo);
  84. config->topology.nhwcpus = sysinfo.dwNumberOfProcessors;
  85. }
  86. #else
  87. #warning no way to know number of cores, assuming 1
  88. void
  89. _starpu_cpu_discover_devices(struct _starpu_machine_config *config)
  90. {
  91. config->topology.nhwcpus = 1;
  92. }
  93. #endif
  94. /* Actually launch the job on a cpu worker.
  95. * Handle binding CPUs on cores.
  96. * In the case of a combined worker WORKER_TASK != J->TASK */
  97. static int execute_job_on_cpu(struct _starpu_job *j, struct starpu_task *worker_task, struct _starpu_worker *cpu_args, int rank, struct starpu_perfmodel_arch* perf_arch)
  98. {
  99. int ret;
  100. int is_parallel_task = (j->task_size > 1);
  101. int profiling = starpu_profiling_status_get();
  102. struct timespec codelet_start, codelet_end;
  103. struct starpu_task *task = j->task;
  104. struct starpu_codelet *cl = task->cl;
  105. STARPU_ASSERT(cl);
  106. if (rank == 0)
  107. {
  108. ret = _starpu_fetch_task_input(j, 0);
  109. if (ret != 0)
  110. {
  111. /* there was not enough memory so the codelet cannot be executed right now ... */
  112. /* push the codelet back and try another one ... */
  113. return -EAGAIN;
  114. }
  115. }
  116. if (is_parallel_task)
  117. {
  118. STARPU_PTHREAD_BARRIER_WAIT(&j->before_work_barrier);
  119. /* In the case of a combined worker, the scheduler needs to know
  120. * when each actual worker begins the execution */
  121. _starpu_sched_pre_exec_hook(worker_task);
  122. }
  123. /* Give profiling variable */
  124. _starpu_driver_start_job(cpu_args, j, &codelet_start, rank, profiling);
  125. /* In case this is a Fork-join parallel task, the worker does not
  126. * execute the kernel at all. */
  127. if ((rank == 0) || (cl->type != STARPU_FORKJOIN))
  128. {
  129. _starpu_cl_func_t func = _starpu_task_get_cpu_nth_implementation(cl, j->nimpl);
  130. if (is_parallel_task && cl->type == STARPU_FORKJOIN)
  131. /* bind to parallel worker */
  132. _starpu_bind_thread_on_cpus(cpu_args->config, _starpu_get_combined_worker_struct(j->combined_workerid));
  133. STARPU_ASSERT_MSG(func, "when STARPU_CPU is defined in 'where', cpu_func or cpu_funcs has to be defined");
  134. #ifdef STARPU_SIMGRID
  135. _starpu_simgrid_execute_job(j, perf_arch, NAN);
  136. #else
  137. func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
  138. #endif
  139. if (is_parallel_task && cl->type == STARPU_FORKJOIN)
  140. /* rebind to single CPU */
  141. _starpu_bind_thread_on_cpu(cpu_args->config, cpu_args->bindid);
  142. }
  143. _starpu_driver_end_job(cpu_args, j, perf_arch, &codelet_end, rank, profiling);
  144. if (is_parallel_task)
  145. STARPU_PTHREAD_BARRIER_WAIT(&j->after_work_barrier);
  146. if (rank == 0)
  147. {
  148. _starpu_driver_update_job_feedback(j, cpu_args,
  149. perf_arch, &codelet_start, &codelet_end, profiling);
  150. _starpu_push_task_output(j, 0);
  151. }
  152. return 0;
  153. }
  154. static struct _starpu_worker*
  155. _starpu_get_worker_from_driver(struct starpu_driver *d)
  156. {
  157. int n = starpu_worker_get_by_devid(STARPU_CPU_WORKER, d->id.cpu_id);
  158. if (n == -1)
  159. return NULL;
  160. return _starpu_get_worker_struct(n);
  161. }
  162. static size_t _starpu_cpu_get_global_mem_size(int nodeid STARPU_ATTRIBUTE_UNUSED, struct _starpu_machine_config *config)
  163. {
  164. size_t global_mem;
  165. starpu_ssize_t limit;
  166. limit = starpu_get_env_number("STARPU_LIMIT_CPU_MEM");
  167. #ifdef STARPU_DEVEL
  168. # warning TODO: take into account NUMA node and check STARPU_LIMIT_CPU_numanode_MEM
  169. #endif
  170. #if defined(STARPU_HAVE_HWLOC)
  171. struct _starpu_machine_topology *topology = &config->topology;
  172. #if 0
  173. /* Do not limit ourself to a single NUMA node yet, as we don't have real NUMA support for now */
  174. int depth_node = hwloc_get_type_depth(topology->hwtopology, HWLOC_OBJ_NODE);
  175. if (depth_node == HWLOC_TYPE_DEPTH_UNKNOWN)
  176. global_mem = hwloc_get_root_obj(topology->hwtopology)->memory.total_memory;
  177. else
  178. global_mem = hwloc_get_obj_by_depth(topology->hwtopology, depth_node, nodeid)->memory.local_memory;
  179. #else
  180. global_mem = hwloc_get_root_obj(topology->hwtopology)->memory.total_memory;
  181. #endif
  182. #else /* STARPU_HAVE_HWLOC */
  183. #ifdef STARPU_DEVEL
  184. # warning use sysinfo when available to get global size
  185. #endif
  186. global_mem = 0;
  187. #endif
  188. if (limit == -1)
  189. // No limit is defined, we return the global memory size
  190. return global_mem;
  191. else if (limit*1024*1024 > global_mem)
  192. // The requested limit is higher than what is available, we return the global memory size
  193. return global_mem;
  194. else
  195. // We limit the memory
  196. return limit*1024*1024;
  197. }
  198. int _starpu_cpu_driver_init(struct starpu_driver *d)
  199. {
  200. struct _starpu_worker *cpu_worker;
  201. cpu_worker = _starpu_get_worker_from_driver(d);
  202. STARPU_ASSERT(cpu_worker);
  203. int devid = cpu_worker->devid;
  204. _starpu_worker_start(cpu_worker, _STARPU_FUT_CPU_KEY);
  205. /* FIXME: when we have NUMA support, properly turn node number into NUMA node number */
  206. _starpu_memory_manager_set_global_memory_size(cpu_worker->memory_node, _starpu_cpu_get_global_mem_size(cpu_worker->memory_node, cpu_worker->config));
  207. snprintf(cpu_worker->name, sizeof(cpu_worker->name), "CPU %d", devid);
  208. snprintf(cpu_worker->short_name, sizeof(cpu_worker->short_name), "CPU %d", devid);
  209. cpu_worker->status = STATUS_UNKNOWN;
  210. _STARPU_TRACE_WORKER_INIT_END;
  211. /* tell the main thread that we are ready */
  212. STARPU_PTHREAD_MUTEX_LOCK(&cpu_worker->mutex);
  213. cpu_worker->worker_is_initialized = 1;
  214. STARPU_PTHREAD_COND_SIGNAL(&cpu_worker->ready_cond);
  215. STARPU_PTHREAD_MUTEX_UNLOCK(&cpu_worker->mutex);
  216. return 0;
  217. }
  218. int _starpu_cpu_driver_run_once(struct starpu_driver *d STARPU_ATTRIBUTE_UNUSED)
  219. {
  220. struct _starpu_worker *cpu_worker;
  221. cpu_worker = _starpu_get_local_worker_key();
  222. STARPU_ASSERT(cpu_worker);
  223. unsigned memnode = cpu_worker->memory_node;
  224. int workerid = cpu_worker->workerid;
  225. _STARPU_TRACE_START_PROGRESS(memnode);
  226. _starpu_datawizard_progress(memnode, 1);
  227. _STARPU_TRACE_END_PROGRESS(memnode);
  228. struct _starpu_job *j;
  229. struct starpu_task *task;
  230. int res;
  231. task = _starpu_get_worker_task(cpu_worker, workerid, memnode);
  232. if (!task)
  233. return 0;
  234. j = _starpu_get_job_associated_to_task(task);
  235. /* can a cpu perform that task ? */
  236. if (!_STARPU_CPU_MAY_PERFORM(j))
  237. {
  238. /* put it and the end of the queue ... XXX */
  239. _starpu_push_task_to_workers(task);
  240. return 0;
  241. }
  242. int rank = 0;
  243. int is_parallel_task = (j->task_size > 1);
  244. struct starpu_perfmodel_arch* perf_arch;
  245. /* Get the rank in case it is a parallel task */
  246. if (is_parallel_task)
  247. {
  248. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  249. rank = j->active_task_alias_count++;
  250. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  251. struct _starpu_combined_worker *combined_worker;
  252. combined_worker = _starpu_get_combined_worker_struct(j->combined_workerid);
  253. cpu_worker->combined_workerid = j->combined_workerid;
  254. cpu_worker->worker_size = combined_worker->worker_size;
  255. cpu_worker->current_rank = rank;
  256. perf_arch = &combined_worker->perf_arch;
  257. }
  258. else
  259. {
  260. cpu_worker->combined_workerid = cpu_worker->workerid;
  261. cpu_worker->worker_size = 1;
  262. cpu_worker->current_rank = 0;
  263. perf_arch = &cpu_worker->perf_arch;
  264. }
  265. _starpu_set_current_task(j->task);
  266. cpu_worker->current_task = j->task;
  267. res = execute_job_on_cpu(j, task, cpu_worker, rank, perf_arch);
  268. _starpu_set_current_task(NULL);
  269. cpu_worker->current_task = NULL;
  270. if (res)
  271. {
  272. switch (res)
  273. {
  274. case -EAGAIN:
  275. _starpu_push_task_to_workers(task);
  276. return 0;
  277. default:
  278. STARPU_ABORT();
  279. }
  280. }
  281. /* In the case of combined workers, we need to inform the
  282. * scheduler each worker's execution is over.
  283. * Then we free the workers' task alias */
  284. if (is_parallel_task)
  285. {
  286. _starpu_sched_post_exec_hook(task);
  287. free(task);
  288. }
  289. if (rank == 0)
  290. _starpu_handle_job_termination(j);
  291. return 0;
  292. }
  293. int _starpu_cpu_driver_deinit(struct starpu_driver *d STARPU_ATTRIBUTE_UNUSED)
  294. {
  295. _STARPU_TRACE_WORKER_DEINIT_START;
  296. struct _starpu_worker *cpu_worker;
  297. cpu_worker = _starpu_get_local_worker_key();
  298. STARPU_ASSERT(cpu_worker);
  299. unsigned memnode = cpu_worker->memory_node;
  300. _starpu_handle_all_pending_node_data_requests(memnode);
  301. /* In case there remains some memory that was automatically
  302. * allocated by StarPU, we release it now. Note that data
  303. * coherency is not maintained anymore at that point ! */
  304. _starpu_free_all_automatically_allocated_buffers(memnode);
  305. _STARPU_TRACE_WORKER_DEINIT_END(_STARPU_FUT_CPU_KEY);
  306. return 0;
  307. }
  308. void *
  309. _starpu_cpu_worker(void *arg)
  310. {
  311. struct _starpu_worker *args = arg;
  312. struct starpu_driver d =
  313. {
  314. .type = STARPU_CPU_WORKER,
  315. .id.cpu_id = args->devid
  316. };
  317. _starpu_cpu_driver_init(&d);
  318. while (_starpu_machine_is_running())
  319. _starpu_cpu_driver_run_once(&d);
  320. _starpu_cpu_driver_deinit(&d);
  321. return NULL;
  322. }
  323. int _starpu_run_cpu(struct starpu_driver *d)
  324. {
  325. STARPU_ASSERT(d && d->type == STARPU_CPU_WORKER);
  326. struct _starpu_worker *worker = _starpu_get_worker_from_driver(d);
  327. STARPU_ASSERT(worker);
  328. worker->set = NULL;
  329. worker->worker_is_initialized = 0;
  330. _starpu_cpu_worker(worker);
  331. return 0;
  332. }