detect_combined_workers.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2016 Université de Bordeaux
  4. * Copyright (C) 2011, 2012, 2013, 2016 CNRS
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <starpu.h>
  18. #include <common/utils.h>
  19. #include <core/workers.h>
  20. #include <math.h>
  21. #include <core/detect_combined_workers.h>
  22. #ifdef STARPU_HAVE_HWLOC
  23. #include <hwloc.h>
  24. static void find_workers(hwloc_obj_t obj, int cpu_workers[STARPU_NMAXWORKERS], unsigned *n)
  25. {
  26. struct _starpu_hwloc_userdata *data = obj->userdata;
  27. if (!data->worker_list)
  28. /* Not something we run something on, don't care */
  29. return;
  30. if (data->worker_list == (void*) -1)
  31. {
  32. /* Intra node, recurse */
  33. unsigned i;
  34. for (i = 0; i < obj->arity; i++)
  35. find_workers(obj->children[i], cpu_workers, n);
  36. return;
  37. }
  38. /* Got to a PU leaf */
  39. struct _starpu_worker_list *workers = data->worker_list;
  40. struct _starpu_worker *worker;
  41. for(worker = _starpu_worker_list_begin(workers); worker != _starpu_worker_list_end(workers); worker = _starpu_worker_list_next(worker))
  42. {
  43. /* is it a CPU worker? */
  44. if (worker->perf_arch.devices[0].type == STARPU_CPU_WORKER && worker->perf_arch.devices[0].ncores == 1)
  45. {
  46. _STARPU_DEBUG("worker %d is part of it\n", worker->workerid);
  47. /* Add it to the combined worker */
  48. cpu_workers[(*n)++] = worker->workerid;
  49. }
  50. }
  51. }
  52. static void synthesize_intermediate_workers(hwloc_obj_t *children, unsigned min, unsigned max, unsigned arity, unsigned n, unsigned synthesize_arity)
  53. {
  54. unsigned nworkers, i, j;
  55. unsigned chunk_size = (n + synthesize_arity-1) / synthesize_arity;
  56. unsigned chunk_start;
  57. int cpu_workers[STARPU_NMAXWORKERS];
  58. int ret;
  59. if (n <= synthesize_arity)
  60. /* Not too many children, do not synthesize */
  61. return;
  62. _STARPU_DEBUG("%u children > %u, synthesizing intermediate combined workers of size %u\n", n, synthesize_arity, chunk_size);
  63. n = 0;
  64. j = 0;
  65. nworkers = 0;
  66. chunk_start = 0;
  67. for (i = 0 ; i < arity; i++)
  68. {
  69. if (((struct _starpu_hwloc_userdata*)children[i]->userdata)->worker_list)
  70. {
  71. n++;
  72. _STARPU_DEBUG("child %u\n", i);
  73. find_workers(children[i], cpu_workers, &nworkers);
  74. j++;
  75. }
  76. /* Completed a chunk, or last bit (but not if it's just 1 subobject) */
  77. if (j == chunk_size || (i == arity-1 && j > 1))
  78. {
  79. if (nworkers >= min && nworkers <= max)
  80. {
  81. unsigned sched_ctx_id = starpu_sched_ctx_get_context();
  82. if(sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  83. sched_ctx_id = 0;
  84. struct starpu_worker_collection* workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  85. _STARPU_DEBUG("Adding it\n");
  86. ret = starpu_combined_worker_assign_workerid(nworkers, cpu_workers);
  87. STARPU_ASSERT(ret >= 0);
  88. workers->add(workers,ret);
  89. }
  90. /* Recurse there */
  91. synthesize_intermediate_workers(children+chunk_start, min, max, i - chunk_start, n, synthesize_arity);
  92. /* And restart another one */
  93. n = 0;
  94. j = 0;
  95. nworkers = 0;
  96. chunk_start = i+1;
  97. }
  98. }
  99. }
  100. static void find_and_assign_combinations(hwloc_obj_t obj, unsigned min, unsigned max, unsigned synthesize_arity)
  101. {
  102. char name[64];
  103. unsigned i, n, nworkers;
  104. int cpu_workers[STARPU_NMAXWORKERS];
  105. #if HWLOC_API_VERSION >= 0x10000
  106. hwloc_obj_attr_snprintf(name, sizeof(name), obj, "#", 0);
  107. #else
  108. hwloc_obj_snprintf(name, sizeof(name), _starpu_get_machine_config()->topology.hwtopology, obj, "#", 0);
  109. #endif
  110. _STARPU_DEBUG("Looking at %s\n", name);
  111. for (n = 0, i = 0; i < obj->arity; i++)
  112. if (((struct _starpu_hwloc_userdata *)obj->children[i]->userdata)->worker_list)
  113. /* it has a CPU worker */
  114. n++;
  115. if (n == 1)
  116. {
  117. /* If there is only one child, we go to the next level right away */
  118. find_and_assign_combinations(obj->children[0], min, max, synthesize_arity);
  119. return;
  120. }
  121. /* Add this object */
  122. nworkers = 0;
  123. find_workers(obj, cpu_workers, &nworkers);
  124. if (nworkers >= min && nworkers <= max)
  125. {
  126. _STARPU_DEBUG("Adding it\n");
  127. unsigned sched_ctx_id = starpu_sched_ctx_get_context();
  128. if(sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  129. sched_ctx_id = 0;
  130. struct starpu_worker_collection* workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  131. int newworkerid = starpu_combined_worker_assign_workerid(nworkers, cpu_workers);
  132. STARPU_ASSERT(newworkerid >= 0);
  133. workers->add(workers,newworkerid);
  134. }
  135. /* Add artificial intermediate objects recursively */
  136. synthesize_intermediate_workers(obj->children, min, max, obj->arity, n, synthesize_arity);
  137. /* And recurse */
  138. for (i = 0; i < obj->arity; i++)
  139. if (((struct _starpu_hwloc_userdata*) obj->children[i]->userdata)->worker_list == (void*) -1)
  140. find_and_assign_combinations(obj->children[i], min, max, synthesize_arity);
  141. }
  142. static void find_and_assign_combinations_with_hwloc(int *workerids, int nworkers)
  143. {
  144. struct _starpu_machine_config *config = _starpu_get_machine_config();
  145. struct _starpu_machine_topology *topology = &config->topology;
  146. int synthesize_arity = starpu_get_env_number("STARPU_SYNTHESIZE_ARITY_COMBINED_WORKER");
  147. int min = starpu_get_env_number("STARPU_MIN_WORKERSIZE");
  148. if (min < 2)
  149. min = 2;
  150. int max = starpu_get_env_number("STARPU_MAX_WORKERSIZE");
  151. if (max == -1)
  152. max = INT_MAX;
  153. if (synthesize_arity == -1)
  154. synthesize_arity = 2;
  155. STARPU_ASSERT_MSG(synthesize_arity > 0, "STARPU_SYNTHESIZE_ARITY_COMBINED_WORKER must be greater than 0");
  156. /* First, mark nodes which contain CPU workers, simply by setting their userdata field */
  157. int i;
  158. for (i = 0; i < nworkers; i++)
  159. {
  160. struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
  161. if (worker->perf_arch.devices[0].type == STARPU_CPU_WORKER && worker->perf_arch.devices[0].ncores == 1)
  162. {
  163. hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->pu_depth, worker->bindid);
  164. obj = obj->parent;
  165. while (obj)
  166. {
  167. ((struct _starpu_hwloc_userdata*) obj->userdata)->worker_list = (void*) -1;
  168. obj = obj->parent;
  169. }
  170. }
  171. }
  172. find_and_assign_combinations(hwloc_get_root_obj(topology->hwtopology), min, max, synthesize_arity);
  173. }
  174. #else /* STARPU_HAVE_HWLOC */
  175. static void assign_combinations_without_hwloc(struct starpu_worker_collection* worker_collection, int* workers, unsigned n, int min, int max)
  176. {
  177. int size,i,count =0;
  178. //if the maximun number of worker is already reached
  179. if(worker_collection->nworkers >= STARPU_NMAXWORKERS - 1)
  180. return;
  181. for (size = min; size <= max; size *= 2)
  182. {
  183. unsigned first;
  184. for (first = 0; first < n; first += size)
  185. {
  186. if (first + size <= n)
  187. {
  188. int found_workerids[size];
  189. for (i = 0; i < size; i++)
  190. found_workerids[i] = workers[first + i];
  191. /* We register this combination */
  192. int newworkerid;
  193. newworkerid = starpu_combined_worker_assign_workerid(size, found_workerids);
  194. STARPU_ASSERT(newworkerid >= 0);
  195. count++;
  196. worker_collection->add(worker_collection, newworkerid);
  197. //if the maximun number of worker is reached, then return
  198. if(worker_collection->nworkers >= STARPU_NMAXWORKERS - 1)
  199. return;
  200. }
  201. }
  202. }
  203. }
  204. static void find_and_assign_combinations_without_hwloc(int *workerids, int nworkers)
  205. {
  206. int i;
  207. unsigned sched_ctx_id = starpu_sched_ctx_get_context();
  208. if(sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  209. sched_ctx_id = 0;
  210. int min, max;
  211. #ifdef STARPU_USE_MIC
  212. unsigned j;
  213. int mic_min, mic_max;
  214. #endif
  215. struct starpu_worker_collection* workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  216. /* We put the id of all CPU workers in this array */
  217. int cpu_workers[STARPU_NMAXWORKERS];
  218. unsigned ncpus = 0;
  219. #ifdef STARPU_USE_MIC
  220. unsigned nb_mics = _starpu_get_machine_config()->topology.nmicdevices;
  221. unsigned * nmics_table;
  222. int * mic_id;
  223. int ** mic_workers;
  224. _STARPU_MALLOC(mic_id, sizeof(int)*nb_mics);
  225. _STARPU_MALLOC(nmics_table, sizeof(unsigned)*nb_mics);
  226. _STARPU_MALLOC(mic_workers, sizeof(int*)*nb_mics);
  227. for(j=0; j<nb_mics; j++)
  228. {
  229. mic_id[j] = -1;
  230. nmics_table[j] = 0;
  231. _STARPU_MALLOC(mic_workers[j], sizeof(int)*STARPU_NMAXWORKERS);
  232. }
  233. #endif /* STARPU_USE_MIC */
  234. for (i = 0; i < nworkers; i++)
  235. {
  236. struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
  237. if (worker->arch == STARPU_CPU_WORKER)
  238. cpu_workers[ncpus++] = i;
  239. #ifdef STARPU_USE_MIC
  240. else if(worker->arch == STARPU_MIC_WORKER)
  241. {
  242. for(j=0; j<nb_mics && mic_id[j] != worker->devid && mic_id[j] != -1; j++);
  243. if(j<nb_mics)
  244. {
  245. if(mic_id[j] == -1)
  246. {
  247. mic_id[j] = worker->devid;
  248. }
  249. mic_workers[j][nmics_table[j]++] = i;
  250. }
  251. }
  252. #endif /* STARPU_USE_MIC */
  253. }
  254. min = starpu_get_env_number("STARPU_MIN_WORKERSIZE");
  255. if (min < 2)
  256. min = 2;
  257. max = starpu_get_env_number("STARPU_MAX_WORKERSIZE");
  258. if (max == -1 || max > (int) ncpus)
  259. max = ncpus;
  260. assign_combinations_without_hwloc(workers,cpu_workers,ncpus,min,max);
  261. #ifdef STARPU_USE_MIC
  262. mic_min = starpu_get_env_number("STARPU_MIN_WORKERSIZE");
  263. mic_max = starpu_get_env_number("STARPU_MAX_WORKERSIZE");
  264. if (mic_min < 2)
  265. mic_min = 2;
  266. for(j=0; j<nb_mics; j++)
  267. {
  268. int _mic_max = mic_max;
  269. if (_mic_max == -1 || _mic_max > (int) nmics_table[j])
  270. _mic_max = nmics_table[j];
  271. assign_combinations_without_hwloc(workers,mic_workers[j],nmics_table[j],mic_min,_mic_max);
  272. free(mic_workers[j]);
  273. }
  274. free(mic_id);
  275. free(nmics_table);
  276. free(mic_workers);
  277. #endif /* STARPU_USE_MIC */
  278. }
  279. #endif /* STARPU_HAVE_HWLOC */
  280. static void combine_all_cpu_workers(int *workerids, int nworkers)
  281. {
  282. unsigned sched_ctx_id = starpu_sched_ctx_get_context();
  283. if(sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  284. sched_ctx_id = 0;
  285. struct starpu_worker_collection* workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  286. int cpu_workers[STARPU_NMAXWORKERS];
  287. int ncpus = 0;
  288. int i;
  289. int min;
  290. int max;
  291. for (i = 0; i < nworkers; i++)
  292. {
  293. struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
  294. if (worker->arch == STARPU_CPU_WORKER)
  295. cpu_workers[ncpus++] = workerids[i];
  296. }
  297. min = starpu_get_env_number("STARPU_MIN_WORKERSIZE");
  298. if (min < 1)
  299. min = 1;
  300. max = starpu_get_env_number("STARPU_MAX_WORKERSIZE");
  301. if (max == -1 || max > ncpus)
  302. max = ncpus;
  303. for (i = min; i <= max; i++)
  304. {
  305. int newworkerid = starpu_combined_worker_assign_workerid(i, cpu_workers);
  306. STARPU_ASSERT(newworkerid >= 0);
  307. workers->add(workers, newworkerid);
  308. }
  309. }
  310. void _starpu_sched_find_worker_combinations(int *workerids, int nworkers)
  311. {
  312. struct _starpu_machine_config *config = _starpu_get_machine_config();
  313. if (config->conf.single_combined_worker > 0)
  314. combine_all_cpu_workers(workerids, nworkers);
  315. else
  316. {
  317. #ifdef STARPU_HAVE_HWLOC
  318. find_and_assign_combinations_with_hwloc(workerids, nworkers);
  319. #else
  320. find_and_assign_combinations_without_hwloc(workerids, nworkers);
  321. #endif
  322. }
  323. }