detect_combined_workers.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2010-2013 Université de Bordeaux 1
  4. * Copyright (C) 2011, 2012, 2013 Centre National de la Recherche Scientifique
  5. *
  6. * StarPU is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU Lesser General Public License as published by
  8. * the Free Software Foundation; either version 2.1 of the License, or (at
  9. * your option) any later version.
  10. *
  11. * StarPU is distributed in the hope that it will be useful, but
  12. * WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  14. *
  15. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  16. */
  17. #include <starpu.h>
  18. #include <common/utils.h>
  19. #include <core/workers.h>
  20. #include <math.h>
  21. #include <core/detect_combined_workers.h>
  22. #ifdef STARPU_HAVE_HWLOC
  23. #include <hwloc.h>
  24. static void find_workers(hwloc_obj_t obj, int cpu_workers[STARPU_NMAXWORKERS], unsigned *n)
  25. {
  26. if (!obj->userdata)
  27. /* Not something we run something on, don't care */
  28. return;
  29. if (obj->userdata == (void*) -1)
  30. {
  31. /* Intra node, recurse */
  32. unsigned i;
  33. for (i = 0; i < obj->arity; i++)
  34. find_workers(obj->children[i], cpu_workers, n);
  35. return;
  36. }
  37. /* Got to a PU leaf */
  38. struct _starpu_worker *worker = obj->userdata;
  39. /* is it a CPU worker? */
  40. if (worker->perf_arch.type == STARPU_CPU_WORKER && worker->perf_arch.ncore == 0)
  41. {
  42. _STARPU_DEBUG("worker %d is part of it\n", worker->workerid);
  43. /* Add it to the combined worker */
  44. cpu_workers[(*n)++] = worker->workerid;
  45. }
  46. }
  47. static void synthesize_intermediate_workers(hwloc_obj_t *children, unsigned min, unsigned max, unsigned arity, unsigned n, unsigned synthesize_arity)
  48. {
  49. unsigned nworkers, i, j;
  50. unsigned chunk_size = (n + synthesize_arity-1) / synthesize_arity;
  51. unsigned chunk_start;
  52. int cpu_workers[STARPU_NMAXWORKERS];
  53. int ret;
  54. if (n <= synthesize_arity)
  55. /* Not too many children, do not synthesize */
  56. return;
  57. _STARPU_DEBUG("%u children > %u, synthesizing intermediate combined workers of size %u\n", n, synthesize_arity, chunk_size);
  58. n = 0;
  59. j = 0;
  60. nworkers = 0;
  61. chunk_start = 0;
  62. for (i = 0 ; i < arity; i++)
  63. {
  64. if (children[i]->userdata)
  65. {
  66. n++;
  67. _STARPU_DEBUG("child %u\n", i);
  68. find_workers(children[i], cpu_workers, &nworkers);
  69. j++;
  70. }
  71. /* Completed a chunk, or last bit (but not if it's just 1 subobject) */
  72. if (j == chunk_size || (i == arity-1 && j > 1))
  73. {
  74. if (nworkers >= min && nworkers <= max)
  75. {
  76. unsigned sched_ctx_id = starpu_sched_ctx_get_context();
  77. if(sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  78. sched_ctx_id = 0;
  79. struct starpu_worker_collection* workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  80. _STARPU_DEBUG("Adding it\n");
  81. ret = starpu_combined_worker_assign_workerid(nworkers, cpu_workers);
  82. STARPU_ASSERT(ret >= 0);
  83. workers->add(workers,ret);
  84. }
  85. /* Recurse there */
  86. synthesize_intermediate_workers(children+chunk_start, min, max, i - chunk_start, n, synthesize_arity);
  87. /* And restart another one */
  88. n = 0;
  89. j = 0;
  90. nworkers = 0;
  91. chunk_start = i+1;
  92. }
  93. }
  94. }
  95. static void find_and_assign_combinations(hwloc_obj_t obj, unsigned min, unsigned max, unsigned synthesize_arity)
  96. {
  97. char name[64];
  98. unsigned i, n, nworkers;
  99. int cpu_workers[STARPU_NMAXWORKERS];
  100. struct _starpu_machine_config *config = _starpu_get_machine_config();
  101. struct _starpu_machine_topology *topology = &config->topology;
  102. hwloc_obj_snprintf(name, sizeof(name), topology->hwtopology, obj, "#", 0);
  103. _STARPU_DEBUG("Looking at %s\n", name);
  104. for (n = 0, i = 0; i < obj->arity; i++)
  105. if (obj->children[i]->userdata)
  106. /* it has a CPU worker */
  107. n++;
  108. if (n == 1)
  109. {
  110. /* If there is only one child, we go to the next level right away */
  111. find_and_assign_combinations(obj->children[0], min, max, synthesize_arity);
  112. return;
  113. }
  114. /* Add this object */
  115. nworkers = 0;
  116. find_workers(obj, cpu_workers, &nworkers);
  117. if (nworkers >= min && nworkers <= max)
  118. {
  119. _STARPU_DEBUG("Adding it\n");
  120. unsigned sched_ctx_id = starpu_sched_ctx_get_context();
  121. if(sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  122. sched_ctx_id = 0;
  123. struct starpu_worker_collection* workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  124. int newworkerid = starpu_combined_worker_assign_workerid(nworkers, cpu_workers);
  125. STARPU_ASSERT(newworkerid >= 0);
  126. workers->add(workers,newworkerid);
  127. }
  128. /* Add artificial intermediate objects recursively */
  129. synthesize_intermediate_workers(obj->children, min, max, obj->arity, n, synthesize_arity);
  130. /* And recurse */
  131. for (i = 0; i < obj->arity; i++)
  132. if (obj->children[i]->userdata == (void*) -1)
  133. find_and_assign_combinations(obj->children[i], min, max, synthesize_arity);
  134. }
  135. static void find_and_assign_combinations_with_hwloc(int *workerids, int nworkers)
  136. {
  137. struct _starpu_machine_config *config = _starpu_get_machine_config();
  138. struct _starpu_machine_topology *topology = &config->topology;
  139. int synthesize_arity = starpu_get_env_number("STARPU_SYNTHESIZE_ARITY_COMBINED_WORKER");
  140. int min = starpu_get_env_number("STARPU_MIN_WORKERSIZE");
  141. if (min < 2)
  142. min = 2;
  143. int max = starpu_get_env_number("STARPU_MAX_WORKERSIZE");
  144. if (max == -1)
  145. max = INT_MAX;
  146. if (synthesize_arity == -1)
  147. synthesize_arity = 2;
  148. /* First, mark nodes which contain CPU workers, simply by setting their userdata field */
  149. int i;
  150. for (i = 0; i < nworkers; i++)
  151. {
  152. struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
  153. if (worker->perf_arch.type == STARPU_CPU_WORKER && worker->perf_arch.ncore == 0)
  154. {
  155. hwloc_obj_t obj = hwloc_get_obj_by_depth(topology->hwtopology, config->cpu_depth, worker->bindid);
  156. STARPU_ASSERT(obj->userdata == worker);
  157. obj = obj->parent;
  158. while (obj)
  159. {
  160. obj->userdata = (void*) -1;
  161. obj = obj->parent;
  162. }
  163. }
  164. }
  165. find_and_assign_combinations(hwloc_get_root_obj(topology->hwtopology), min, max, synthesize_arity);
  166. }
  167. #else /* STARPU_HAVE_HWLOC */
  168. static void assign_combinations_without_hwloc(struct starpu_worker_collection* worker_collection, int* workers, unsigned n, int min, int max)
  169. {
  170. int size,i,count =0;
  171. //if the maximun number of worker is already reached
  172. if(worker_collection->nworkers >= STARPU_NMAXWORKERS - 1)
  173. return;
  174. for (size = min; size <= max; size *= 2)
  175. {
  176. unsigned first;
  177. for (first = 0; first < n; first += size)
  178. {
  179. if (first + size <= n)
  180. {
  181. int found_workerids[size];
  182. for (i = 0; i < size; i++)
  183. found_workerids[i] = workers[first + i];
  184. /* We register this combination */
  185. int newworkerid;
  186. newworkerid = starpu_combined_worker_assign_workerid(size, found_workerids);
  187. STARPU_ASSERT(newworkerid >= 0);
  188. count++;
  189. worker_collection->add(worker_collection, newworkerid);
  190. //if the maximun number of worker is reached, then return
  191. if(worker_collection->nworkers >= STARPU_NMAXWORKERS - 1)
  192. return;
  193. }
  194. }
  195. }
  196. }
  197. static void find_and_assign_combinations_without_hwloc(int *workerids, int nworkers)
  198. {
  199. int i;
  200. unsigned j;
  201. unsigned sched_ctx_id = starpu_sched_ctx_get_context();
  202. if(sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  203. sched_ctx_id = 0;
  204. int min, max, mic_min, mic_max;
  205. struct starpu_worker_collection* workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  206. /* We put the id of all CPU workers in this array */
  207. int cpu_workers[STARPU_NMAXWORKERS];
  208. unsigned ncpus = 0;
  209. #ifdef STARPU_USE_MIC
  210. unsigned nb_mics = _starpu_get_machine_config()->topology.nmicdevices;
  211. unsigned * nmics_table;
  212. int * mic_id;
  213. int ** mic_workers;
  214. mic_id = malloc(sizeof(int)*nb_mics);
  215. nmics_table = malloc(sizeof(unsigned)*nb_mics);
  216. mic_workers = malloc(sizeof(int*)*nb_mics);
  217. for(j=0; j<nb_mics; j++)
  218. {
  219. mic_id[j] = -1;
  220. nmics_table[j] = 0;
  221. mic_workers[j] = malloc(sizeof(int)*STARPU_NMAXWORKERS);
  222. }
  223. #endif /* STARPU_USE_MIC */
  224. struct _starpu_worker *worker;
  225. for (i = 0; i < nworkers; i++)
  226. {
  227. worker = _starpu_get_worker_struct(workerids[i]);
  228. if (worker->arch == STARPU_CPU_WORKER)
  229. cpu_workers[ncpus++] = i;
  230. #ifdef STARPU_USE_MIC
  231. else if(worker->arch == STARPU_MIC_WORKER)
  232. {
  233. for(j=0; mic_id[j] != worker->mp_nodeid && mic_id[j] != -1 && j<nb_mics; j++);
  234. if(j<nb_mics)
  235. {
  236. if(mic_id[j] == -1)
  237. {
  238. mic_id[j] = worker->mp_nodeid;
  239. }
  240. mic_workers[j][nmics_table[j]++] = i;
  241. }
  242. }
  243. #endif /* STARPU_USE_MIC */
  244. }
  245. min = starpu_get_env_number("STARPU_MIN_WORKERSIZE");
  246. if (min < 2)
  247. min = 2;
  248. max = starpu_get_env_number("STARPU_MAX_WORKERSIZE");
  249. if (max == -1 || max > (int) ncpus)
  250. max = ncpus;
  251. assign_combinations_without_hwloc(workers,cpu_workers,ncpus,min,max);
  252. #ifdef STARPU_USE_MIC
  253. mic_min = starpu_get_env_number("STARPU_MIN_WORKERSIZE");
  254. if (mic_min < 2)
  255. mic_min = 2;
  256. for(j=0; j<nb_mics; j++)
  257. {
  258. mic_max = starpu_get_env_number("STARPU_MAX_WORKERSIZE");
  259. if (mic_max == -1 || mic_max > (int) nmics_table[j])
  260. mic_max = nmics_table[j];
  261. assign_combinations_without_hwloc(workers,mic_workers[j],nmics_table[j],mic_min,mic_max);
  262. free(mic_workers[j]);
  263. }
  264. free(mic_id);
  265. free(nmics_table);
  266. free(mic_workers);
  267. #endif /* STARPU_USE_MIC */
  268. }
  269. #endif /* STARPU_HAVE_HWLOC */
  270. static void combine_all_cpu_workers(int *workerids, int nworkers)
  271. {
  272. unsigned sched_ctx_id = starpu_sched_ctx_get_context();
  273. if(sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
  274. sched_ctx_id = 0;
  275. struct starpu_worker_collection* workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
  276. int cpu_workers[STARPU_NMAXWORKERS];
  277. int ncpus = 0;
  278. struct _starpu_worker *worker;
  279. int i;
  280. int min;
  281. int max;
  282. for (i = 0; i < nworkers; i++)
  283. {
  284. worker = _starpu_get_worker_struct(workerids[i]);
  285. if (worker->arch == STARPU_CPU_WORKER)
  286. cpu_workers[ncpus++] = workerids[i];
  287. }
  288. min = starpu_get_env_number("STARPU_MIN_WORKERSIZE");
  289. if (min < 1)
  290. min = 1;
  291. max = starpu_get_env_number("STARPU_MAX_WORKERSIZE");
  292. if (max == -1 || max > ncpus)
  293. max = ncpus;
  294. for (i = min; i <= max; i++)
  295. {
  296. int newworkerid;
  297. newworkerid = starpu_combined_worker_assign_workerid(i, cpu_workers);
  298. STARPU_ASSERT(newworkerid >= 0);
  299. workers->add(workers, newworkerid);
  300. }
  301. }
  302. void _starpu_sched_find_worker_combinations(int *workerids, int nworkers)
  303. {
  304. struct _starpu_machine_config *config = _starpu_get_machine_config();
  305. if (config->conf->single_combined_worker > 0)
  306. combine_all_cpu_workers(workerids, nworkers);
  307. else
  308. {
  309. #ifdef STARPU_HAVE_HWLOC
  310. find_and_assign_combinations_with_hwloc(workerids, nworkers);
  311. #else
  312. find_and_assign_combinations_without_hwloc(workerids, nworkers);
  313. #endif
  314. }
  315. }