detect_combined_workers.c 12 KB


  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011-2014 Inria
  4. * Copyright (C) 2011-2017, 2019 CNRS
  5. * Copyright (C) 2010-2016,2019 Université de Bordeaux
  6. * Copyright (C) 2013 Thibaut Lambert
  7. *
  8. * StarPU is free software; you can redistribute it and/or modify
  9. * it under the terms of the GNU Lesser General Public License as published by
  10. * the Free Software Foundation; either version 2.1 of the License, or (at
  11. * your option) any later version.
  12. *
  13. * StarPU is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  16. *
  17. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  18. */
  19. #include <starpu.h>
  20. #include <common/utils.h>
  21. #include <core/workers.h>
  22. #include <math.h>
  23. #include <core/detect_combined_workers.h>
  24. int _starpu_initialized_combined_workers;
  25. #ifdef STARPU_HAVE_HWLOC
  26. #include <hwloc.h>
  27. static void find_workers(hwloc_obj_t obj, int cpu_workers[STARPU_NMAXWORKERS], unsigned *n)
  28. {
  29. struct _starpu_hwloc_userdata *data = obj->userdata;
  30. if (!data->worker_list)
  31. /* Not something we run something on, don't care */
  32. return;
  33. if (data->worker_list == (void*) -1)
  34. {
  35. /* Intra node, recurse */
  36. unsigned i;
  37. for (i = 0; i < obj->arity; i++)
  38. find_workers(obj->children[i], cpu_workers, n);
  39. return;
  40. }
  41. /* Got to a PU leaf */
  42. struct _starpu_worker_list *workers = data->worker_list;
  43. struct _starpu_worker *worker;
  44. for(worker = _starpu_worker_list_begin(workers); worker != _starpu_worker_list_end(workers); worker = _starpu_worker_list_next(worker))
  45. {
  46. /* is it a CPU worker? */
  47. if (worker->perf_arch.devices[0].type == STARPU_CPU_WORKER && worker->perf_arch.devices[0].ncores == 1)
  48. {
  49. _STARPU_DEBUG("worker %d is part of it\n", worker->workerid);
  50. /* Add it to the combined worker */
  51. cpu_workers[(*n)++] = worker->workerid;
  52. }
  53. }
  54. }
  55. static void synthesize_intermediate_workers(hwloc_obj_t *children, unsigned min, unsigned max, unsigned arity, unsigned n, unsigned synthesize_arity)
  56. {
  57. unsigned nworkers, i, j;
  58. unsigned chunk_size = (n + synthesize_arity-1) / synthesize_arity;
  59. unsigned chunk_start;
  60. int cpu_workers[STARPU_NMAXWORKERS];
  61. int ret;
  62. if (n <= synthesize_arity)
  63. /* Not too many children, do not synthesize */
  64. return;
  65. _STARPU_DEBUG("%u children > %u, synthesizing intermediate combined workers of size %u\n", n, synthesize_arity, chunk_size);
  66. n = 0;
  67. j = 0;
  68. nworkers = 0;
  69. chunk_start = 0;
  70. for (i = 0 ; i < arity; i++)
  71. {
  72. if (((struct _starpu_hwloc_userdata*)children[i]->userdata)->worker_list)
  73. {
  74. n++;
  75. _STARPU_DEBUG("child %u\n", i);
  76. find_workers(children[i], cpu_workers, &nworkers);
  77. j++;
  78. }
  79. /* Completed a chunk, or last bit (but not if it's just 1 subobject) */
  80. if (j == chunk_size || (i == arity-1 && j > 1))
  81. {
  82. if (nworkers >= min && nworkers <= max)
  83. {
  84. unsigned sched_ctx_id = starpu_sched_ctx_get_context();
  85. if(sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  86. sched_ctx_id = 0;
  87. struct starpu_worker_collection* workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  88. _STARPU_DEBUG("Adding it\n");
  89. ret = starpu_combined_worker_assign_workerid(nworkers, cpu_workers);
  90. STARPU_ASSERT(ret >= 0);
  91. workers->add(workers,ret);
  92. }
  93. /* Recurse there */
  94. synthesize_intermediate_workers(children+chunk_start, min, max, i - chunk_start, n, synthesize_arity);
  95. /* And restart another one */
  96. n = 0;
  97. j = 0;
  98. nworkers = 0;
  99. chunk_start = i+1;
  100. }
  101. }
  102. }
  103. static void find_and_assign_combinations(hwloc_obj_t obj, unsigned min, unsigned max, unsigned synthesize_arity)
  104. {
  105. char name[64];
  106. unsigned i, n, nworkers;
  107. int cpu_workers[STARPU_NMAXWORKERS];
  108. #if HWLOC_API_VERSION >= 0x10000
  109. hwloc_obj_attr_snprintf(name, sizeof(name), obj, "#", 0);
  110. #else
  111. hwloc_obj_snprintf(name, sizeof(name), _starpu_get_machine_config()->topology.hwtopology, obj, "#", 0);
  112. #endif
  113. _STARPU_DEBUG("Looking at %s\n", name);
  114. for (n = 0, i = 0; i < obj->arity; i++)
  115. if (((struct _starpu_hwloc_userdata *)obj->children[i]->userdata)->worker_list)
  116. /* it has a CPU worker */
  117. n++;
  118. if (n == 1)
  119. {
  120. /* If there is only one child, we go to the next level right away */
  121. find_and_assign_combinations(obj->children[0], min, max, synthesize_arity);
  122. return;
  123. }
  124. /* Add this object */
  125. nworkers = 0;
  126. find_workers(obj, cpu_workers, &nworkers);
  127. if (nworkers >= min && nworkers <= max)
  128. {
  129. _STARPU_DEBUG("Adding it\n");
  130. unsigned sched_ctx_id = starpu_sched_ctx_get_context();
  131. if(sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  132. sched_ctx_id = 0;
  133. struct starpu_worker_collection* workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  134. int newworkerid = starpu_combined_worker_assign_workerid(nworkers, cpu_workers);
  135. STARPU_ASSERT(newworkerid >= 0);
  136. workers->add(workers,newworkerid);
  137. }
  138. /* Add artificial intermediate objects recursively */
  139. synthesize_intermediate_workers(obj->children, min, max, obj->arity, n, synthesize_arity);
  140. /* And recurse */
  141. for (i = 0; i < obj->arity; i++)
  142. if (((struct _starpu_hwloc_userdata*) obj->children[i]->userdata)->worker_list == (void*) -1)
  143. find_and_assign_combinations(obj->children[i], min, max, synthesize_arity);
  144. }
  145. static void find_and_assign_combinations_with_hwloc(int *workerids, int nworkers)
  146. {
  147. struct _starpu_machine_config *config = _starpu_get_machine_config();
  148. struct _starpu_machine_topology *topology = &config->topology;
  149. int synthesize_arity = starpu_get_env_number("STARPU_SYNTHESIZE_ARITY_COMBINED_WORKER");
  150. int min = starpu_get_env_number("STARPU_MIN_WORKERSIZE");
  151. if (min < 2)
  152. min = 2;
  153. int max = starpu_get_env_number("STARPU_MAX_WORKERSIZE");
  154. if (max == -1)
  155. max = INT_MAX;
  156. if (synthesize_arity == -1)
  157. synthesize_arity = 2;
  158. STARPU_ASSERT_MSG(synthesize_arity > 0, "STARPU_SYNTHESIZE_ARITY_COMBINED_WORKER must be greater than 0");
  159. /* First, mark nodes which contain CPU workers, simply by setting their userdata field */
  160. int i;
  161. for (i = 0; i < nworkers; i++)
  162. {
  163. struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
  164. if (worker->perf_arch.devices[0].type == STARPU_CPU_WORKER && worker->perf_arch.devices[0].ncores == 1)
  165. {
  166. hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->pu_depth, worker->bindid);
  167. obj = obj->parent;
  168. while (obj)
  169. {
  170. ((struct _starpu_hwloc_userdata*) obj->userdata)->worker_list = (void*) -1;
  171. obj = obj->parent;
  172. }
  173. }
  174. }
  175. find_and_assign_combinations(hwloc_get_root_obj(topology->hwtopology), min, max, synthesize_arity);
  176. }
  177. #else /* STARPU_HAVE_HWLOC */
  178. static void assign_combinations_without_hwloc(struct starpu_worker_collection* worker_collection, int* workers, unsigned n, int min, int max)
  179. {
  180. int size,i;
  181. //if the maximun number of worker is already reached
  182. if(worker_collection->nworkers >= STARPU_NMAXWORKERS - 1)
  183. return;
  184. for (size = min; size <= max; size *= 2)
  185. {
  186. unsigned first;
  187. for (first = 0; first < n; first += size)
  188. {
  189. if (first + size <= n)
  190. {
  191. int found_workerids[size];
  192. for (i = 0; i < size; i++)
  193. found_workerids[i] = workers[first + i];
  194. /* We register this combination */
  195. int newworkerid;
  196. newworkerid = starpu_combined_worker_assign_workerid(size, found_workerids);
  197. STARPU_ASSERT(newworkerid >= 0);
  198. worker_collection->add(worker_collection, newworkerid);
  199. //if the maximun number of worker is reached, then return
  200. if(worker_collection->nworkers >= STARPU_NMAXWORKERS - 1)
  201. return;
  202. }
  203. }
  204. }
  205. }
  206. static void find_and_assign_combinations_without_hwloc(int *workerids, int nworkers)
  207. {
  208. int i;
  209. unsigned sched_ctx_id = starpu_sched_ctx_get_context();
  210. if(sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  211. sched_ctx_id = 0;
  212. int min, max;
  213. #ifdef STARPU_USE_MIC
  214. unsigned j;
  215. int mic_min, mic_max;
  216. #endif
  217. struct starpu_worker_collection* workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  218. /* We put the id of all CPU workers in this array */
  219. int cpu_workers[STARPU_NMAXWORKERS];
  220. unsigned ncpus = 0;
  221. #ifdef STARPU_USE_MIC
  222. unsigned nb_mics = _starpu_get_machine_config()->topology.nmicdevices;
  223. unsigned * nmics_table;
  224. int * mic_id;
  225. int ** mic_workers;
  226. _STARPU_MALLOC(mic_id, sizeof(int)*nb_mics);
  227. _STARPU_MALLOC(nmics_table, sizeof(unsigned)*nb_mics);
  228. _STARPU_MALLOC(mic_workers, sizeof(int*)*nb_mics);
  229. for(j=0; j<nb_mics; j++)
  230. {
  231. mic_id[j] = -1;
  232. nmics_table[j] = 0;
  233. _STARPU_MALLOC(mic_workers[j], sizeof(int)*STARPU_NMAXWORKERS);
  234. }
  235. #endif /* STARPU_USE_MIC */
  236. for (i = 0; i < nworkers; i++)
  237. {
  238. struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
  239. if (worker->arch == STARPU_CPU_WORKER)
  240. cpu_workers[ncpus++] = i;
  241. #ifdef STARPU_USE_MIC
  242. else if(worker->arch == STARPU_MIC_WORKER)
  243. {
  244. for(j=0; j<nb_mics && mic_id[j] != worker->devid && mic_id[j] != -1; j++);
  245. if(j<nb_mics)
  246. {
  247. if(mic_id[j] == -1)
  248. {
  249. mic_id[j] = worker->devid;
  250. }
  251. mic_workers[j][nmics_table[j]++] = i;
  252. }
  253. }
  254. #endif /* STARPU_USE_MIC */
  255. }
  256. min = starpu_get_env_number("STARPU_MIN_WORKERSIZE");
  257. if (min < 2)
  258. min = 2;
  259. max = starpu_get_env_number("STARPU_MAX_WORKERSIZE");
  260. if (max == -1 || max > (int) ncpus)
  261. max = ncpus;
  262. assign_combinations_without_hwloc(workers,cpu_workers,ncpus,min,max);
  263. #ifdef STARPU_USE_MIC
  264. mic_min = starpu_get_env_number("STARPU_MIN_WORKERSIZE");
  265. mic_max = starpu_get_env_number("STARPU_MAX_WORKERSIZE");
  266. if (mic_min < 2)
  267. mic_min = 2;
  268. for(j=0; j<nb_mics; j++)
  269. {
  270. int _mic_max = mic_max;
  271. if (_mic_max == -1 || _mic_max > (int) nmics_table[j])
  272. _mic_max = nmics_table[j];
  273. assign_combinations_without_hwloc(workers,mic_workers[j],nmics_table[j],mic_min,_mic_max);
  274. free(mic_workers[j]);
  275. }
  276. free(mic_id);
  277. free(nmics_table);
  278. free(mic_workers);
  279. #endif /* STARPU_USE_MIC */
  280. }
  281. #endif /* STARPU_HAVE_HWLOC */
  282. static void combine_all_cpu_workers(int *workerids, int nworkers)
  283. {
  284. unsigned sched_ctx_id = starpu_sched_ctx_get_context();
  285. if(sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  286. sched_ctx_id = 0;
  287. struct starpu_worker_collection* workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  288. int cpu_workers[STARPU_NMAXWORKERS];
  289. int ncpus = 0;
  290. int i;
  291. int min;
  292. int max;
  293. for (i = 0; i < nworkers; i++)
  294. {
  295. struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
  296. if (worker->arch == STARPU_CPU_WORKER)
  297. cpu_workers[ncpus++] = workerids[i];
  298. }
  299. min = starpu_get_env_number("STARPU_MIN_WORKERSIZE");
  300. if (min < 1)
  301. min = 1;
  302. max = starpu_get_env_number("STARPU_MAX_WORKERSIZE");
  303. if (max == -1 || max > ncpus)
  304. max = ncpus;
  305. for (i = min; i <= max; i++)
  306. {
  307. int newworkerid = starpu_combined_worker_assign_workerid(i, cpu_workers);
  308. STARPU_ASSERT(newworkerid >= 0);
  309. workers->add(workers, newworkerid);
  310. }
  311. }
  312. void _starpu_sched_find_worker_combinations(int *workerids, int nworkers)
  313. {
  314. /* FIXME: this seems to be lacking shutdown support? */
  315. if (_starpu_initialized_combined_workers)
  316. return;
  317. _starpu_initialized_combined_workers = 1;
  318. struct _starpu_machine_config *config = _starpu_get_machine_config();
  319. if (config->conf.single_combined_worker > 0)
  320. combine_all_cpu_workers(workerids, nworkers);
  321. else
  322. {
  323. #ifdef STARPU_HAVE_HWLOC
  324. find_and_assign_combinations_with_hwloc(workerids, nworkers);
  325. #else
  326. find_and_assign_combinations_without_hwloc(workerids, nworkers);
  327. #endif
  328. }
  329. }
  330. void starpu_sched_find_all_worker_combinations(void)
  331. {
  332. const unsigned nbasic_workers = starpu_worker_get_count();
  333. int basic_workerids[nbasic_workers];
  334. unsigned i;
  335. for(i = 0; i < nbasic_workers; i++)
  336. {
  337. basic_workerids[i] = i;
  338. }
  339. _starpu_sched_find_worker_combinations(basic_workerids, nbasic_workers);
  340. }