detect_combined_workers.c 10 KB


  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. * Copyright (C) 2013 Thibaut Lambert
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <starpu.h>
  18. #include <common/utils.h>
  19. #include <core/workers.h>
  20. #include <math.h>
  21. #include <core/detect_combined_workers.h>
  22. int _starpu_initialized_combined_workers;
  23. #ifdef STARPU_HAVE_HWLOC
  24. #include <hwloc.h>
  25. static void find_workers(hwloc_obj_t obj, int cpu_workers[STARPU_NMAXWORKERS], unsigned *n)
  26. {
  27. struct _starpu_hwloc_userdata *data = obj->userdata;
  28. if (!data->worker_list)
  29. /* Not something we run something on, don't care */
  30. return;
  31. if (data->worker_list == (void*) -1)
  32. {
  33. /* Intra node, recurse */
  34. unsigned i;
  35. for (i = 0; i < obj->arity; i++)
  36. find_workers(obj->children[i], cpu_workers, n);
  37. return;
  38. }
  39. /* Got to a PU leaf */
  40. struct _starpu_worker_list *workers = data->worker_list;
  41. struct _starpu_worker *worker;
  42. for(worker = _starpu_worker_list_begin(workers); worker != _starpu_worker_list_end(workers); worker = _starpu_worker_list_next(worker))
  43. {
  44. /* is it a CPU worker? */
  45. if (worker->perf_arch.devices[0].type == STARPU_CPU_WORKER && worker->perf_arch.devices[0].ncores == 1)
  46. {
  47. _STARPU_DEBUG("worker %d is part of it\n", worker->workerid);
  48. /* Add it to the combined worker */
  49. cpu_workers[(*n)++] = worker->workerid;
  50. }
  51. }
  52. }
  53. static void synthesize_intermediate_workers(hwloc_obj_t *children, unsigned min, unsigned max, unsigned arity, unsigned n, unsigned synthesize_arity)
  54. {
  55. unsigned nworkers, i, j;
  56. unsigned chunk_size = (n + synthesize_arity-1) / synthesize_arity;
  57. unsigned chunk_start;
  58. int cpu_workers[STARPU_NMAXWORKERS];
  59. int ret;
  60. if (n <= synthesize_arity)
  61. /* Not too many children, do not synthesize */
  62. return;
  63. _STARPU_DEBUG("%u children > %u, synthesizing intermediate combined workers of size %u\n", n, synthesize_arity, chunk_size);
  64. n = 0;
  65. j = 0;
  66. nworkers = 0;
  67. chunk_start = 0;
  68. for (i = 0 ; i < arity; i++)
  69. {
  70. if (((struct _starpu_hwloc_userdata*)children[i]->userdata)->worker_list)
  71. {
  72. n++;
  73. _STARPU_DEBUG("child %u\n", i);
  74. find_workers(children[i], cpu_workers, &nworkers);
  75. j++;
  76. }
  77. /* Completed a chunk, or last bit (but not if it's just 1 subobject) */
  78. if (j == chunk_size || (i == arity-1 && j > 1))
  79. {
  80. if (nworkers >= min && nworkers <= max)
  81. {
  82. unsigned sched_ctx_id = starpu_sched_ctx_get_context();
  83. if(sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  84. sched_ctx_id = 0;
  85. struct starpu_worker_collection* workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  86. _STARPU_DEBUG("Adding it\n");
  87. ret = starpu_combined_worker_assign_workerid(nworkers, cpu_workers);
  88. STARPU_ASSERT(ret >= 0);
  89. workers->add(workers,ret);
  90. }
  91. /* Recurse there */
  92. synthesize_intermediate_workers(children+chunk_start, min, max, i - chunk_start, n, synthesize_arity);
  93. /* And restart another one */
  94. n = 0;
  95. j = 0;
  96. nworkers = 0;
  97. chunk_start = i+1;
  98. }
  99. }
  100. }
  101. static void find_and_assign_combinations(hwloc_obj_t obj, unsigned min, unsigned max, unsigned synthesize_arity)
  102. {
  103. char name[64];
  104. unsigned i, n, nworkers;
  105. int cpu_workers[STARPU_NMAXWORKERS];
  106. #if HWLOC_API_VERSION >= 0x10000
  107. hwloc_obj_attr_snprintf(name, sizeof(name), obj, "#", 0);
  108. #else
  109. hwloc_obj_snprintf(name, sizeof(name), _starpu_get_machine_config()->topology.hwtopology, obj, "#", 0);
  110. #endif
  111. _STARPU_DEBUG("Looking at %s\n", name);
  112. for (n = 0, i = 0; i < obj->arity; i++)
  113. if (((struct _starpu_hwloc_userdata *)obj->children[i]->userdata)->worker_list)
  114. /* it has a CPU worker */
  115. n++;
  116. if (n == 1)
  117. {
  118. /* If there is only one child, we go to the next level right away */
  119. find_and_assign_combinations(obj->children[0], min, max, synthesize_arity);
  120. return;
  121. }
  122. /* Add this object */
  123. nworkers = 0;
  124. find_workers(obj, cpu_workers, &nworkers);
  125. if (nworkers >= min && nworkers <= max)
  126. {
  127. _STARPU_DEBUG("Adding it\n");
  128. unsigned sched_ctx_id = starpu_sched_ctx_get_context();
  129. if(sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  130. sched_ctx_id = 0;
  131. struct starpu_worker_collection* workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  132. int newworkerid = starpu_combined_worker_assign_workerid(nworkers, cpu_workers);
  133. STARPU_ASSERT(newworkerid >= 0);
  134. workers->add(workers,newworkerid);
  135. }
  136. /* Add artificial intermediate objects recursively */
  137. synthesize_intermediate_workers(obj->children, min, max, obj->arity, n, synthesize_arity);
  138. /* And recurse */
  139. for (i = 0; i < obj->arity; i++)
  140. if (((struct _starpu_hwloc_userdata*) obj->children[i]->userdata)->worker_list == (void*) -1)
  141. find_and_assign_combinations(obj->children[i], min, max, synthesize_arity);
  142. }
  143. static void find_and_assign_combinations_with_hwloc(int *workerids, int nworkers)
  144. {
  145. struct _starpu_machine_config *config = _starpu_get_machine_config();
  146. struct _starpu_machine_topology *topology = &config->topology;
  147. int synthesize_arity = starpu_get_env_number("STARPU_SYNTHESIZE_ARITY_COMBINED_WORKER");
  148. int min = starpu_get_env_number("STARPU_MIN_WORKERSIZE");
  149. if (min < 2)
  150. min = 2;
  151. int max = starpu_get_env_number("STARPU_MAX_WORKERSIZE");
  152. if (max == -1)
  153. max = INT_MAX;
  154. if (synthesize_arity == -1)
  155. synthesize_arity = 2;
  156. STARPU_ASSERT_MSG(synthesize_arity > 0, "STARPU_SYNTHESIZE_ARITY_COMBINED_WORKER must be greater than 0");
  157. /* First, mark nodes which contain CPU workers, simply by setting their userdata field */
  158. int i;
  159. for (i = 0; i < nworkers; i++)
  160. {
  161. struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
  162. if (worker->perf_arch.devices[0].type == STARPU_CPU_WORKER && worker->perf_arch.devices[0].ncores == 1)
  163. {
  164. hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->pu_depth, worker->bindid);
  165. obj = obj->parent;
  166. while (obj)
  167. {
  168. ((struct _starpu_hwloc_userdata*) obj->userdata)->worker_list = (void*) -1;
  169. obj = obj->parent;
  170. }
  171. }
  172. }
  173. find_and_assign_combinations(hwloc_get_root_obj(topology->hwtopology), min, max, synthesize_arity);
  174. }
  175. #else /* STARPU_HAVE_HWLOC */
  176. static void assign_combinations_without_hwloc(struct starpu_worker_collection* worker_collection, int* workers, unsigned n, int min, int max)
  177. {
  178. int size,i;
  179. //if the maximun number of worker is already reached
  180. if(worker_collection->nworkers >= STARPU_NMAXWORKERS - 1)
  181. return;
  182. for (size = min; size <= max; size *= 2)
  183. {
  184. unsigned first;
  185. for (first = 0; first < n; first += size)
  186. {
  187. if (first + size <= n)
  188. {
  189. int found_workerids[size];
  190. for (i = 0; i < size; i++)
  191. found_workerids[i] = workers[first + i];
  192. /* We register this combination */
  193. int newworkerid;
  194. newworkerid = starpu_combined_worker_assign_workerid(size, found_workerids);
  195. STARPU_ASSERT(newworkerid >= 0);
  196. worker_collection->add(worker_collection, newworkerid);
  197. //if the maximun number of worker is reached, then return
  198. if(worker_collection->nworkers >= STARPU_NMAXWORKERS - 1)
  199. return;
  200. }
  201. }
  202. }
  203. }
  204. static void find_and_assign_combinations_without_hwloc(int *workerids, int nworkers)
  205. {
  206. int i;
  207. unsigned sched_ctx_id = starpu_sched_ctx_get_context();
  208. if(sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  209. sched_ctx_id = 0;
  210. int min, max;
  211. struct starpu_worker_collection* workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  212. /* We put the id of all CPU workers in this array */
  213. int cpu_workers[STARPU_NMAXWORKERS];
  214. unsigned ncpus = 0;
  215. for (i = 0; i < nworkers; i++)
  216. {
  217. struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
  218. if (worker->arch == STARPU_CPU_WORKER)
  219. cpu_workers[ncpus++] = i;
  220. }
  221. min = starpu_get_env_number("STARPU_MIN_WORKERSIZE");
  222. if (min < 2)
  223. min = 2;
  224. max = starpu_get_env_number("STARPU_MAX_WORKERSIZE");
  225. if (max == -1 || max > (int) ncpus)
  226. max = ncpus;
  227. assign_combinations_without_hwloc(workers,cpu_workers,ncpus,min,max);
  228. }
  229. #endif /* STARPU_HAVE_HWLOC */
  230. static void combine_all_cpu_workers(int *workerids, int nworkers)
  231. {
  232. unsigned sched_ctx_id = starpu_sched_ctx_get_context();
  233. if(sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  234. sched_ctx_id = 0;
  235. struct starpu_worker_collection* workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  236. int cpu_workers[STARPU_NMAXWORKERS];
  237. int ncpus = 0;
  238. int i;
  239. int min;
  240. int max;
  241. for (i = 0; i < nworkers; i++)
  242. {
  243. struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
  244. if (worker->arch == STARPU_CPU_WORKER)
  245. cpu_workers[ncpus++] = workerids[i];
  246. }
  247. min = starpu_get_env_number("STARPU_MIN_WORKERSIZE");
  248. if (min < 1)
  249. min = 1;
  250. max = starpu_get_env_number("STARPU_MAX_WORKERSIZE");
  251. if (max == -1 || max > ncpus)
  252. max = ncpus;
  253. for (i = min; i <= max; i++)
  254. {
  255. int newworkerid = starpu_combined_worker_assign_workerid(i, cpu_workers);
  256. STARPU_ASSERT(newworkerid >= 0);
  257. workers->add(workers, newworkerid);
  258. }
  259. }
  260. void _starpu_sched_find_worker_combinations(int *workerids, int nworkers)
  261. {
  262. /* FIXME: this seems to be lacking shutdown support? */
  263. if (_starpu_initialized_combined_workers)
  264. return;
  265. _starpu_initialized_combined_workers = 1;
  266. struct _starpu_machine_config *config = _starpu_get_machine_config();
  267. if (config->conf.single_combined_worker > 0)
  268. combine_all_cpu_workers(workerids, nworkers);
  269. else
  270. {
  271. #ifdef STARPU_HAVE_HWLOC
  272. find_and_assign_combinations_with_hwloc(workerids, nworkers);
  273. #else
  274. find_and_assign_combinations_without_hwloc(workerids, nworkers);
  275. #endif
  276. }
  277. }
  278. void starpu_sched_find_all_worker_combinations(void)
  279. {
  280. const unsigned nbasic_workers = starpu_worker_get_count();
  281. int basic_workerids[nbasic_workers];
  282. unsigned i;
  283. for(i = 0; i < nbasic_workers; i++)
  284. {
  285. basic_workerids[i] = i;
  286. }
  287. _starpu_sched_find_worker_combinations(basic_workerids, nbasic_workers);
  288. }