driver_cpu.c 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2014 Université de Bordeaux 1
  4. * Copyright (C) 2010 Mehdi Juhoor <mjuhoor@gmail.com>
  5. * Copyright (C) 2010-2014 Centre National de la Recherche Scientifique
  6. * Copyright (C) 2011 Télécom-SudParis
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. #include <common/config.h>
  20. #include <math.h>
  21. #include <starpu.h>
  22. #include <starpu_profiling.h>
  23. #include <drivers/driver_common/driver_common.h>
  24. #include <common/utils.h>
  25. #include <core/debug.h>
  26. #include <core/workers.h>
  27. #include "driver_cpu.h"
  28. #include <core/sched_policy.h>
  29. #include <datawizard/memory_manager.h>
  30. #include <datawizard/malloc.h>
  31. #include <core/simgrid.h>
  32. #ifdef STARPU_HAVE_HWLOC
  33. #include <hwloc.h>
  34. #ifndef HWLOC_API_VERSION
  35. #define HWLOC_OBJ_PU HWLOC_OBJ_PROC
  36. #endif
  37. #endif
  38. #ifdef STARPU_HAVE_WINDOWS
  39. #include <windows.h>
  40. #endif
  41. /* Actually launch the job on a cpu worker.
  42. * Handle binding CPUs on cores.
  43. * In the case of a combined worker WORKER_TASK != J->TASK */
  44. static int execute_job_on_cpu(struct _starpu_job *j, struct starpu_task *worker_task, struct _starpu_worker *cpu_args, int rank, struct starpu_perfmodel_arch* perf_arch)
  45. {
  46. int ret;
  47. int is_parallel_task = (j->task_size > 1);
  48. int profiling = starpu_profiling_status_get();
  49. struct timespec codelet_start, codelet_end;
  50. struct starpu_task *task = j->task;
  51. struct starpu_codelet *cl = task->cl;
  52. STARPU_ASSERT(cl);
  53. if (rank == 0)
  54. {
  55. ret = _starpu_fetch_task_input(j);
  56. if (ret != 0)
  57. {
  58. /* there was not enough memory so the codelet cannot be executed right now ... */
  59. /* push the codelet back and try another one ... */
  60. return -EAGAIN;
  61. }
  62. }
  63. if (is_parallel_task)
  64. {
  65. STARPU_PTHREAD_BARRIER_WAIT(&j->before_work_barrier);
  66. /* In the case of a combined worker, the scheduler needs to know
  67. * when each actual worker begins the execution */
  68. _starpu_sched_pre_exec_hook(worker_task);
  69. }
  70. /* Give profiling variable */
  71. _starpu_driver_start_job(cpu_args, j, perf_arch, &codelet_start, rank, profiling);
  72. /* In case this is a Fork-join parallel task, the worker does not
  73. * execute the kernel at all. */
  74. if ((rank == 0) || (cl->type != STARPU_FORKJOIN))
  75. {
  76. _starpu_cl_func_t func = _starpu_task_get_cpu_nth_implementation(cl, j->nimpl);
  77. if (is_parallel_task && cl->type == STARPU_FORKJOIN)
  78. /* bind to parallel worker */
  79. _starpu_bind_thread_on_cpus(cpu_args->config, _starpu_get_combined_worker_struct(j->combined_workerid));
  80. STARPU_ASSERT_MSG(func, "when STARPU_CPU is defined in 'where', cpu_func or cpu_funcs has to be defined");
  81. if (starpu_get_env_number("STARPU_DISABLE_KERNELS") <= 0)
  82. {
  83. _STARPU_TRACE_START_EXECUTING();
  84. #ifdef STARPU_SIMGRID
  85. _starpu_simgrid_execute_job(j, perf_arch, NAN);
  86. #else
  87. func(_STARPU_TASK_GET_INTERFACES(task), task->cl_arg);
  88. #endif
  89. _STARPU_TRACE_END_EXECUTING();
  90. }
  91. if (is_parallel_task && cl->type == STARPU_FORKJOIN)
  92. /* rebind to single CPU */
  93. _starpu_bind_thread_on_cpu(cpu_args->config, cpu_args->bindid);
  94. }
  95. _starpu_driver_end_job(cpu_args, j, perf_arch, &codelet_end, rank, profiling);
  96. if (is_parallel_task)
  97. STARPU_PTHREAD_BARRIER_WAIT(&j->after_work_barrier);
  98. if (rank == 0)
  99. {
  100. _starpu_driver_update_job_feedback(j, cpu_args,
  101. perf_arch, &codelet_start, &codelet_end, profiling);
  102. _starpu_push_task_output(j);
  103. }
  104. return 0;
  105. }
  106. static size_t _starpu_cpu_get_global_mem_size(int nodeid STARPU_ATTRIBUTE_UNUSED, struct _starpu_machine_config *config)
  107. {
  108. size_t global_mem;
  109. starpu_ssize_t limit;
  110. limit = starpu_get_env_number("STARPU_LIMIT_CPU_MEM");
  111. #ifdef STARPU_DEVEL
  112. # warning TODO: take into account NUMA node and check STARPU_LIMIT_CPU_numanode_MEM
  113. #endif
  114. #if defined(STARPU_HAVE_HWLOC)
  115. struct _starpu_machine_topology *topology = &config->topology;
  116. #if 0
  117. /* Do not limit ourself to a single NUMA node yet, as we don't have real NUMA support for now */
  118. int depth_node = hwloc_get_type_depth(topology->hwtopology, HWLOC_OBJ_NODE);
  119. if (depth_node == HWLOC_TYPE_DEPTH_UNKNOWN)
  120. global_mem = hwloc_get_root_obj(topology->hwtopology)->memory.total_memory;
  121. else
  122. global_mem = hwloc_get_obj_by_depth(topology->hwtopology, depth_node, nodeid)->memory.local_memory;
  123. #else
  124. global_mem = hwloc_get_root_obj(topology->hwtopology)->memory.total_memory;
  125. #endif
  126. #else /* STARPU_HAVE_HWLOC */
  127. #ifdef STARPU_DEVEL
  128. # warning use sysinfo when available to get global size
  129. #endif
  130. global_mem = 0;
  131. #endif
  132. if (limit < 0)
  133. // No limit is defined, we return the global memory size
  134. return global_mem;
  135. else if ((size_t)limit * 1024*1024 > global_mem)
  136. // The requested limit is higher than what is available, we return the global memory size
  137. return global_mem;
  138. else
  139. // We limit the memory
  140. return limit*1024*1024;
  141. }
  142. int _starpu_cpu_driver_init(struct _starpu_worker *cpu_worker)
  143. {
  144. int devid = cpu_worker->devid;
  145. _starpu_worker_start(cpu_worker, _STARPU_FUT_CPU_KEY);
  146. /* FIXME: when we have NUMA support, properly turn node number into NUMA node number */
  147. _starpu_memory_manager_set_global_memory_size(cpu_worker->memory_node, _starpu_cpu_get_global_mem_size(cpu_worker->memory_node, cpu_worker->config));
  148. snprintf(cpu_worker->name, sizeof(cpu_worker->name), "CPU %d", devid);
  149. snprintf(cpu_worker->short_name, sizeof(cpu_worker->short_name), "CPU %d", devid);
  150. cpu_worker->status = STATUS_UNKNOWN;
  151. _STARPU_TRACE_WORKER_INIT_END(cpu_worker->workerid);
  152. /* tell the main thread that we are ready */
  153. STARPU_PTHREAD_MUTEX_LOCK(&cpu_worker->mutex);
  154. cpu_worker->worker_is_initialized = 1;
  155. STARPU_PTHREAD_COND_SIGNAL(&cpu_worker->ready_cond);
  156. STARPU_PTHREAD_MUTEX_UNLOCK(&cpu_worker->mutex);
  157. return 0;
  158. }
  159. int _starpu_cpu_driver_run_once(struct _starpu_worker *cpu_worker)
  160. {
  161. unsigned memnode = cpu_worker->memory_node;
  162. int workerid = cpu_worker->workerid;
  163. _STARPU_TRACE_START_PROGRESS(memnode);
  164. _starpu_datawizard_progress(memnode, 1);
  165. _STARPU_TRACE_END_PROGRESS(memnode);
  166. struct _starpu_job *j;
  167. struct starpu_task *task;
  168. int res;
  169. task = _starpu_get_worker_task(cpu_worker, workerid, memnode);
  170. if (!task)
  171. return 0;
  172. j = _starpu_get_job_associated_to_task(task);
  173. /* can a cpu perform that task ? */
  174. if (!_STARPU_CPU_MAY_PERFORM(j))
  175. {
  176. /* put it and the end of the queue ... XXX */
  177. _starpu_push_task_to_workers(task);
  178. return 0;
  179. }
  180. int rank = 0;
  181. int is_parallel_task = (j->task_size > 1);
  182. struct starpu_perfmodel_arch* perf_arch;
  183. /* Get the rank in case it is a parallel task */
  184. if (is_parallel_task)
  185. {
  186. STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
  187. rank = j->active_task_alias_count++;
  188. STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
  189. struct _starpu_combined_worker *combined_worker;
  190. combined_worker = _starpu_get_combined_worker_struct(j->combined_workerid);
  191. cpu_worker->combined_workerid = j->combined_workerid;
  192. cpu_worker->worker_size = combined_worker->worker_size;
  193. cpu_worker->current_rank = rank;
  194. perf_arch = &combined_worker->perf_arch;
  195. }
  196. else
  197. {
  198. cpu_worker->combined_workerid = cpu_worker->workerid;
  199. cpu_worker->worker_size = 1;
  200. cpu_worker->current_rank = 0;
  201. perf_arch = &cpu_worker->perf_arch;
  202. }
  203. _starpu_set_current_task(j->task);
  204. cpu_worker->current_task = j->task;
  205. res = execute_job_on_cpu(j, task, cpu_worker, rank, perf_arch);
  206. _starpu_set_current_task(NULL);
  207. cpu_worker->current_task = NULL;
  208. if (res)
  209. {
  210. switch (res)
  211. {
  212. case -EAGAIN:
  213. _starpu_push_task_to_workers(task);
  214. return 0;
  215. default:
  216. STARPU_ABORT();
  217. }
  218. }
  219. /* In the case of combined workers, we need to inform the
  220. * scheduler each worker's execution is over.
  221. * Then we free the workers' task alias */
  222. if (is_parallel_task)
  223. {
  224. _starpu_sched_post_exec_hook(task);
  225. free(task);
  226. }
  227. if (rank == 0)
  228. _starpu_handle_job_termination(j);
  229. return 0;
  230. }
  231. int _starpu_cpu_driver_deinit(struct _starpu_worker *cpu_worker)
  232. {
  233. _STARPU_TRACE_WORKER_DEINIT_START;
  234. unsigned memnode = cpu_worker->memory_node;
  235. _starpu_handle_all_pending_node_data_requests(memnode);
  236. /* In case there remains some memory that was automatically
  237. * allocated by StarPU, we release it now. Note that data
  238. * coherency is not maintained anymore at that point ! */
  239. _starpu_free_all_automatically_allocated_buffers(memnode);
  240. _STARPU_TRACE_WORKER_DEINIT_END(_STARPU_FUT_CPU_KEY);
  241. return 0;
  242. }
  243. void *
  244. _starpu_cpu_worker(void *arg)
  245. {
  246. struct _starpu_worker *args = arg;
  247. _starpu_cpu_driver_init(args);
  248. while (_starpu_machine_is_running())
  249. _starpu_cpu_driver_run_once(args);
  250. _starpu_cpu_driver_deinit(args);
  251. return NULL;
  252. }
  253. int _starpu_run_cpu(struct _starpu_worker *worker)
  254. {
  255. worker->set = NULL;
  256. worker->worker_is_initialized = 0;
  257. _starpu_cpu_worker(worker);
  258. return 0;
  259. }