driver_cpu.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2016 Université de Bordeaux
  4. * Copyright (C) 2010 Mehdi Juhoor <mjuhoor@gmail.com>
  5. * Copyright (C) 2010-2016 CNRS
  6. * Copyright (C) 2011 Télécom-SudParis
  7. * Copyright (C) 2014 INRIA
  8. *
  9. * StarPU is free software; you can redistribute it and/or modify
  10. * it under the terms of the GNU Lesser General Public License as published by
  11. * the Free Software Foundation; either version 2.1 of the License, or (at
  12. * your option) any later version.
  13. *
  14. * StarPU is distributed in the hope that it will be useful, but
  15. * WITHOUT ANY WARRANTY; without even the implied warranty of
  16. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  17. *
  18. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  19. */
  20. #include <common/config.h>
  21. #include <math.h>
  22. #include <starpu.h>
  23. #include <starpu_profiling.h>
  24. #include <drivers/driver_common/driver_common.h>
  25. #include <common/utils.h>
  26. #include <core/debug.h>
  27. #include <core/workers.h>
  28. #include "driver_cpu.h"
  29. #include <core/sched_policy.h>
  30. #include <datawizard/memory_manager.h>
  31. #include <datawizard/malloc.h>
  32. #include <core/simgrid.h>
  33. #ifdef STARPU_HAVE_HWLOC
  34. #include <hwloc.h>
  35. #ifndef HWLOC_API_VERSION
  36. #define HWLOC_OBJ_PU HWLOC_OBJ_PROC
  37. #endif
  38. #endif
  39. #ifdef STARPU_HAVE_WINDOWS
  40. #include <windows.h>
  41. #endif
  42. /* Actually launch the job on a cpu worker.
  43. * Handle binding CPUs on cores.
  44. * In the case of a combined worker WORKER_TASK != J->TASK */
  45. static int execute_job_on_cpu(struct _starpu_job *j, struct starpu_task *worker_task, struct _starpu_worker *cpu_args, int rank, struct starpu_perfmodel_arch* perf_arch)
  46. {
  47. int is_parallel_task = (j->task_size > 1);
  48. int profiling = starpu_profiling_status_get();
  49. struct timespec codelet_start, codelet_end;
  50. struct starpu_task *task = j->task;
  51. struct starpu_codelet *cl = task->cl;
  52. #ifdef STARPU_OPENMP
  53. /* At this point, j->continuation as been cleared as the task is being
  54. * woken up, thus we use j->discontinuous instead for the check */
  55. const unsigned continuation_wake_up = j->discontinuous;
  56. #else
  57. const unsigned continuation_wake_up = 0;
  58. #endif
  59. STARPU_ASSERT(cl);
  60. if (rank == 0 && !continuation_wake_up)
  61. {
  62. int ret = _starpu_fetch_task_input(task, j, 0);
  63. if (ret != 0)
  64. {
  65. /* there was not enough memory so the codelet cannot be executed right now ... */
  66. /* push the codelet back and try another one ... */
  67. return -EAGAIN;
  68. }
  69. }
  70. if (is_parallel_task)
  71. {
  72. STARPU_PTHREAD_BARRIER_WAIT(&j->before_work_barrier);
  73. /* In the case of a combined worker, the scheduler needs to know
  74. * when each actual worker begins the execution */
  75. _starpu_sched_pre_exec_hook(worker_task);
  76. }
  77. /* Give profiling variable */
  78. _starpu_driver_start_job(cpu_args, j, perf_arch, &codelet_start, rank, profiling);
  79. /* In case this is a Fork-join parallel task, the worker does not
  80. * execute the kernel at all. */
  81. if ((rank == 0) || (cl->type != STARPU_FORKJOIN))
  82. {
  83. _starpu_cl_func_t func = _starpu_task_get_cpu_nth_implementation(cl, j->nimpl);
  84. if (is_parallel_task && cl->type == STARPU_FORKJOIN)
  85. /* bind to parallel worker */
  86. _starpu_bind_thread_on_cpus(cpu_args->config, _starpu_get_combined_worker_struct(j->combined_workerid));
  87. STARPU_ASSERT_MSG(func, "when STARPU_CPU is defined in 'where', cpu_func or cpu_funcs has to be defined");
  88. if (_starpu_get_disable_kernels() <= 0)
  89. {
  90. _STARPU_TRACE_START_EXECUTING();
  91. #ifdef STARPU_SIMGRID
  92. if (cl->flags & STARPU_CODELET_SIMGRID_EXECUTE)
  93. func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
  94. else
  95. _starpu_simgrid_submit_job(cpu_args->workerid, j, perf_arch, NAN, NULL, NULL, NULL);
  96. #else
  97. func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
  98. #endif
  99. _STARPU_TRACE_END_EXECUTING();
  100. }
  101. if (is_parallel_task && cl->type == STARPU_FORKJOIN)
  102. /* rebind to single CPU */
  103. _starpu_bind_thread_on_cpu(cpu_args->config, cpu_args->bindid, cpu_args->workerid);
  104. }
  105. _starpu_driver_end_job(cpu_args, j, perf_arch, &codelet_end, rank, profiling);
  106. if (is_parallel_task)
  107. {
  108. STARPU_PTHREAD_BARRIER_WAIT(&j->after_work_barrier);
  109. #ifdef STARPU_SIMGRID
  110. if (rank == 0)
  111. {
  112. /* Wait for other threads to exit barrier_wait so we
  113. * can safely drop the job structure */
  114. MSG_process_sleep(0.0000001);
  115. j->after_work_busy_barrier = 0;
  116. }
  117. #else
  118. ANNOTATE_HAPPENS_BEFORE(&j->after_work_busy_barrier);
  119. (void) STARPU_ATOMIC_ADD(&j->after_work_busy_barrier, -1);
  120. if (rank == 0)
  121. {
  122. /* Wait with a busy barrier for other workers to have
  123. * finished with the blocking barrier before we can
  124. * safely drop the job structure */
  125. while (j->after_work_busy_barrier > 0)
  126. {
  127. STARPU_UYIELD();
  128. STARPU_SYNCHRONIZE();
  129. }
  130. ANNOTATE_HAPPENS_AFTER(&j->after_work_busy_barrier);
  131. }
  132. #endif
  133. }
  134. if (rank == 0)
  135. {
  136. _starpu_driver_update_job_feedback(j, cpu_args,
  137. perf_arch, &codelet_start, &codelet_end, profiling);
  138. #ifdef STARPU_OPENMP
  139. if (!j->continuation)
  140. #endif
  141. {
  142. _starpu_push_task_output(j);
  143. }
  144. }
  145. return 0;
  146. }
  147. static size_t _starpu_cpu_get_global_mem_size(int nodeid STARPU_ATTRIBUTE_UNUSED, struct _starpu_machine_config *config STARPU_ATTRIBUTE_UNUSED)
  148. {
  149. size_t global_mem;
  150. starpu_ssize_t limit;
  151. limit = starpu_get_env_number("STARPU_LIMIT_CPU_MEM");
  152. #ifdef STARPU_DEVEL
  153. # warning TODO: take into account NUMA node and check STARPU_LIMIT_CPU_numanode_MEM
  154. #endif
  155. #if defined(STARPU_HAVE_HWLOC)
  156. struct _starpu_machine_topology *topology = &config->topology;
  157. #if 0
  158. /* Do not limit ourself to a single NUMA node yet, as we don't have real NUMA support for now */
  159. int depth_node = hwloc_get_type_depth(topology->hwtopology, HWLOC_OBJ_NODE);
  160. if (depth_node == HWLOC_TYPE_DEPTH_UNKNOWN)
  161. global_mem = hwloc_get_root_obj(topology->hwtopology)->memory.total_memory;
  162. else
  163. global_mem = hwloc_get_obj_by_depth(topology->hwtopology, depth_node, nodeid)->memory.local_memory;
  164. #else
  165. global_mem = hwloc_get_root_obj(topology->hwtopology)->memory.total_memory;
  166. #endif
  167. #else /* STARPU_HAVE_HWLOC */
  168. #ifdef STARPU_DEVEL
  169. # warning use sysinfo when available to get global size
  170. #endif
  171. global_mem = 0;
  172. #endif
  173. if (limit < 0)
  174. // No limit is defined, we return the global memory size
  175. return global_mem;
  176. else if (global_mem && (size_t)limit * 1024*1024 > global_mem)
  177. // The requested limit is higher than what is available, we return the global memory size
  178. return global_mem;
  179. else
  180. // We limit the memory
  181. return limit*1024*1024;
  182. }
  183. int _starpu_cpu_driver_init(struct _starpu_worker *cpu_worker)
  184. {
  185. int devid = cpu_worker->devid;
  186. _starpu_driver_start(cpu_worker, _STARPU_FUT_CPU_KEY, 1);
  187. /* FIXME: when we have NUMA support, properly turn node number into NUMA node number */
  188. _starpu_memory_manager_set_global_memory_size(cpu_worker->memory_node, _starpu_cpu_get_global_mem_size(cpu_worker->memory_node, cpu_worker->config));
  189. snprintf(cpu_worker->name, sizeof(cpu_worker->name), "CPU %d", devid);
  190. snprintf(cpu_worker->short_name, sizeof(cpu_worker->short_name), "CPU %d", devid);
  191. starpu_pthread_setname(cpu_worker->short_name);
  192. _STARPU_TRACE_WORKER_INIT_END(cpu_worker->workerid);
  193. /* tell the main thread that we are ready */
  194. STARPU_PTHREAD_MUTEX_LOCK(&cpu_worker->mutex);
  195. cpu_worker->status = STATUS_UNKNOWN;
  196. cpu_worker->worker_is_initialized = 1;
  197. STARPU_PTHREAD_COND_SIGNAL(&cpu_worker->ready_cond);
  198. STARPU_PTHREAD_MUTEX_UNLOCK(&cpu_worker->mutex);
  199. return 0;
  200. }
  201. static int _starpu_cpu_driver_execute_task(struct _starpu_worker *cpu_worker, struct starpu_task *task, struct _starpu_job *j)
  202. {
  203. int res;
  204. int rank;
  205. int is_parallel_task = (j->task_size > 1);
  206. struct starpu_perfmodel_arch* perf_arch;
  207. rank = cpu_worker->current_rank;
  208. /* Get the rank in case it is a parallel task */
  209. if (is_parallel_task)
  210. {
  211. if(j->combined_workerid != -1)
  212. {
  213. struct _starpu_combined_worker *combined_worker;
  214. combined_worker = _starpu_get_combined_worker_struct(j->combined_workerid);
  215. cpu_worker->combined_workerid = j->combined_workerid;
  216. cpu_worker->worker_size = combined_worker->worker_size;
  217. perf_arch = &combined_worker->perf_arch;
  218. }
  219. else
  220. {
  221. struct _starpu_sched_ctx *sched_ctx = _starpu_sched_ctx_get_sched_ctx_for_worker_and_job(cpu_worker, j);
  222. STARPU_ASSERT_MSG(sched_ctx != NULL, "there should be a worker %d in the ctx of this job \n", cpu_worker->workerid);
  223. perf_arch = &sched_ctx->perf_arch;
  224. }
  225. }
  226. else
  227. {
  228. cpu_worker->combined_workerid = cpu_worker->workerid;
  229. cpu_worker->worker_size = 1;
  230. struct _starpu_sched_ctx *sched_ctx = _starpu_sched_ctx_get_sched_ctx_for_worker_and_job(cpu_worker, j);
  231. if (sched_ctx && !sched_ctx->sched_policy && !sched_ctx->awake_workers && sched_ctx->main_master == cpu_worker->workerid)
  232. perf_arch = &sched_ctx->perf_arch;
  233. else
  234. perf_arch = &cpu_worker->perf_arch;
  235. }
  236. _starpu_set_current_task(j->task);
  237. cpu_worker->current_task = j->task;
  238. res = execute_job_on_cpu(j, task, cpu_worker, rank, perf_arch);
  239. _starpu_set_current_task(NULL);
  240. cpu_worker->current_task = NULL;
  241. if (res)
  242. {
  243. switch (res)
  244. {
  245. case -EAGAIN:
  246. _starpu_push_task_to_workers(task);
  247. return 0;
  248. default:
  249. STARPU_ABORT();
  250. }
  251. }
  252. /* In the case of combined workers, we need to inform the
  253. * scheduler each worker's execution is over.
  254. * Then we free the workers' task alias */
  255. if (is_parallel_task)
  256. {
  257. _starpu_sched_post_exec_hook(task);
  258. free(task);
  259. }
  260. if (rank == 0)
  261. _starpu_handle_job_termination(j);
  262. return 0;
  263. }
  264. int _starpu_cpu_driver_run_once(struct _starpu_worker *cpu_worker)
  265. {
  266. unsigned memnode = cpu_worker->memory_node;
  267. int workerid = cpu_worker->workerid;
  268. int res;
  269. struct _starpu_job *j;
  270. struct starpu_task *task = NULL, *pending_task;
  271. int rank = 0;
  272. #ifdef STARPU_SIMGRID
  273. starpu_pthread_wait_reset(&cpu_worker->wait);
  274. #endif
  275. /* Test if async transfers are completed */
  276. pending_task = cpu_worker->task_transferring;
  277. if (pending_task != NULL && cpu_worker->nb_buffers_transferred == cpu_worker->nb_buffers_totransfer)
  278. {
  279. struct _starpu_job *j = _starpu_get_job_associated_to_task(pending_task);
  280. _starpu_release_fetch_task_input_async(j, workerid, cpu_worker->nb_buffers_totransfer);
  281. /* Reset it */
  282. cpu_worker->task_transferring = NULL;
  283. return _starpu_cpu_driver_execute_task(cpu_worker, pending_task, j);
  284. }
  285. _STARPU_TRACE_START_PROGRESS(memnode);
  286. res = __starpu_datawizard_progress(1, 1);
  287. _STARPU_TRACE_END_PROGRESS(memnode);
  288. if (!pending_task)
  289. task = _starpu_get_worker_task(cpu_worker, workerid, memnode);
  290. #ifdef STARPU_SIMGRID
  291. if (!res && !task)
  292. /* No progress, wait */
  293. starpu_pthread_wait_wait(&cpu_worker->wait);
  294. #endif
  295. if (!task)
  296. /* No task or task still pending transfers */
  297. return 0;
  298. j = _starpu_get_job_associated_to_task(task);
  299. /* NOTE: j->task is != task for parallel tasks, which share the same
  300. * job. */
  301. /* can a cpu perform that task ? */
  302. if (!_STARPU_CPU_MAY_PERFORM(j))
  303. {
  304. /* put it and the end of the queue ... XXX */
  305. _starpu_push_task_to_workers(task);
  306. return 0;
  307. }
  308. /* Get the rank in case it is a parallel task */
  309. if (j->task_size > 1)
  310. {
  311. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  312. rank = j->active_task_alias_count++;
  313. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  314. }
  315. else
  316. {
  317. rank = 0;
  318. }
  319. cpu_worker->current_rank = rank;
  320. if (rank == 0)
  321. _starpu_fetch_task_input(task, j, 1);
  322. else
  323. return _starpu_cpu_driver_execute_task(cpu_worker, task, j);
  324. return 0;
  325. }
  326. int _starpu_cpu_driver_deinit(struct _starpu_worker *cpu_worker)
  327. {
  328. _STARPU_TRACE_WORKER_DEINIT_START;
  329. unsigned memnode = cpu_worker->memory_node;
  330. _starpu_handle_all_pending_node_data_requests(memnode);
  331. /* In case there remains some memory that was automatically
  332. * allocated by StarPU, we release it now. Note that data
  333. * coherency is not maintained anymore at that point ! */
  334. _starpu_free_all_automatically_allocated_buffers(memnode);
  335. cpu_worker->worker_is_initialized = 0;
  336. _STARPU_TRACE_WORKER_DEINIT_END(_STARPU_FUT_CPU_KEY);
  337. return 0;
  338. }
  339. void *
  340. _starpu_cpu_worker(void *arg)
  341. {
  342. struct _starpu_worker *args = arg;
  343. _starpu_cpu_driver_init(args);
  344. while (_starpu_machine_is_running())
  345. {
  346. _starpu_may_pause();
  347. _starpu_cpu_driver_run_once(args);
  348. }
  349. _starpu_cpu_driver_deinit(args);
  350. return NULL;
  351. }
  352. int _starpu_run_cpu(struct _starpu_worker *worker)
  353. {
  354. worker->set = NULL;
  355. worker->worker_is_initialized = 0;
  356. _starpu_cpu_worker(worker);
  357. return 0;
  358. }