07_async_spawn.c 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. /* StarPURM --- StarPU Resource Management Layer.
  2. *
  3. * Copyright (C) 2017 Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. /* This example shows a basic StarPU vector scale app on top of StarPURM,
  17. * making use of both the main RM API and the spawn_kernel_on_cpus API func */
  18. #include <stdlib.h>
  19. #include <stdio.h>
  20. #include <assert.h>
  21. #include <starpu.h>
  22. #include <starpurm.h>
  23. #include <starpurm.h>
  24. #include <pthread.h>
  25. static int rm_cpu_type_id = -1;
  26. static int rm_nb_cpu_units = 0;
  27. static void usage(void);
  28. static void test1(const int N);
  29. static void test2(const int N, const int task_mult);
  30. static void init_rm_infos(void);
  31. static unsigned spawn_pending = 0;
  32. static pthread_mutex_t spawn_pending_mutex = PTHREAD_MUTEX_INITIALIZER;
  33. static pthread_cond_t spawn_pending_cond;
  34. static void _inc_spawn_pending(void)
  35. {
  36. pthread_mutex_lock(&spawn_pending_mutex);
  37. assert(spawn_pending < UINT_MAX);
  38. spawn_pending++;
  39. pthread_mutex_unlock(&spawn_pending_mutex);
  40. }
  41. static void _dec_spawn_pending(void)
  42. {
  43. pthread_mutex_lock(&spawn_pending_mutex);
  44. assert(spawn_pending > 0);
  45. spawn_pending--;
  46. if (spawn_pending == 0)
  47. pthread_cond_broadcast(&spawn_pending_cond);
  48. pthread_mutex_unlock(&spawn_pending_mutex);
  49. }
  50. static void _wait_pending_spawns(void)
  51. {
  52. pthread_mutex_lock(&spawn_pending_mutex);
  53. while (spawn_pending > 0)
  54. pthread_cond_wait(&spawn_pending_cond, &spawn_pending_mutex);
  55. pthread_mutex_unlock(&spawn_pending_mutex);
  56. }
  57. static void spawn_callback(void *_arg)
  58. {
  59. assert(42 == (uintptr_t)_arg);
  60. _dec_spawn_pending();
  61. }
  62. /* vector scale codelet */
  63. static void vector_scale_func(void *cl_buffers[], void *cl_arg)
  64. {
  65. double scalar = -1.0;
  66. int n = STARPU_VECTOR_GET_NX(cl_buffers[0]);
  67. double *vector = (double *)STARPU_VECTOR_GET_PTR(cl_buffers[0]);
  68. int i;
  69. starpu_codelet_unpack_args(cl_arg, &scalar);
  70. int workerid = starpu_worker_get_id();
  71. hwloc_cpuset_t worker_cpuset = starpu_worker_get_hwloc_cpuset(workerid);
  72. {
  73. int strl1 = hwloc_bitmap_snprintf(NULL, 0, worker_cpuset);
  74. char str1[strl1+1];
  75. hwloc_bitmap_snprintf(str1, strl1+1, worker_cpuset);
  76. printf("worker[%03d] - task: vector=%p, n=%d, scalar=%lf, worker cpuset = %s\n", workerid, vector, n, scalar, str1);
  77. }
  78. hwloc_bitmap_free(worker_cpuset);
  79. for (i = 0; i < n; i++)
  80. {
  81. vector[i] *= scalar;
  82. }
  83. }
  84. static struct starpu_codelet vector_scale_cl =
  85. {
  86. .cpu_funcs = {vector_scale_func},
  87. .nbuffers = 1
  88. };
  89. /* main routines */
  90. static void usage(void)
  91. {
  92. fprintf(stderr, "usage: 05_vector_scale [VECTOR_SIZE]\n");
  93. exit(1);
  94. }
  95. static void test1(const int N)
  96. {
  97. double *vector = NULL;
  98. const double scalar = 2.0;
  99. starpu_data_handle_t vector_handle;
  100. int ret;
  101. vector = malloc(N * sizeof(*vector));
  102. {
  103. int i;
  104. for (i = 0; i < N; i++)
  105. {
  106. vector[i] = i;
  107. }
  108. }
  109. starpu_vector_data_register(&vector_handle, STARPU_MAIN_RAM, (uintptr_t)vector, N, sizeof(*vector));
  110. ret = starpu_task_insert(&vector_scale_cl,
  111. STARPU_RW, vector_handle,
  112. STARPU_VALUE, &scalar, sizeof(scalar),
  113. 0);
  114. assert(ret == 0);
  115. starpu_task_wait_for_all();
  116. starpu_data_unregister(vector_handle);
  117. {
  118. int i;
  119. for (i = 0; i < N; i++)
  120. {
  121. double d_i = i;
  122. if (vector[i] != d_i*scalar)
  123. {
  124. fprintf(stderr, "%s: check_failed\n", __func__);
  125. exit(1);
  126. }
  127. }
  128. }
  129. free(vector);
  130. }
  131. static void test2(const int N, const int task_mult)
  132. {
  133. double *vector = NULL;
  134. const double scalar = 3.0;
  135. starpu_data_handle_t vector_handle;
  136. int ret;
  137. vector = malloc(N * sizeof(*vector));
  138. {
  139. int i;
  140. for (i = 0; i < N; i++)
  141. {
  142. vector[i] = i;
  143. }
  144. }
  145. starpu_vector_data_register(&vector_handle, STARPU_MAIN_RAM, (uintptr_t)vector, N, sizeof(*vector));
  146. struct starpu_data_filter partition_filter =
  147. {
  148. .filter_func = starpu_vector_filter_block,
  149. .nchildren = rm_nb_cpu_units * task_mult
  150. };
  151. starpu_data_partition(vector_handle, &partition_filter);
  152. {
  153. int i;
  154. for (i = 0; i < rm_nb_cpu_units*task_mult; i++)
  155. {
  156. starpu_data_handle_t sub_vector_handle = starpu_data_get_sub_data(vector_handle, 1, i);
  157. ret = starpu_task_insert(&vector_scale_cl,
  158. STARPU_RW, sub_vector_handle,
  159. STARPU_VALUE, &scalar, sizeof(scalar),
  160. 0);
  161. assert(ret == 0);
  162. }
  163. }
  164. starpu_task_wait_for_all();
  165. starpu_data_unpartition(vector_handle, STARPU_MAIN_RAM);
  166. starpu_data_unregister(vector_handle);
  167. {
  168. int i;
  169. for (i = 0; i < N; i++)
  170. {
  171. double d_i = i;
  172. if (vector[i] != d_i*scalar)
  173. {
  174. fprintf(stderr, "%s: check_failed\n", __func__);
  175. exit(1);
  176. }
  177. }
  178. }
  179. free(vector);
  180. }
  181. static void init_rm_infos(void)
  182. {
  183. int cpu_type = starpurm_get_device_type_id("cpu");
  184. int nb_cpu_units = starpurm_get_nb_devices_by_type(cpu_type);
  185. if (nb_cpu_units < 1)
  186. {
  187. /* No CPU unit available. */
  188. exit(77);
  189. }
  190. rm_cpu_type_id = cpu_type;
  191. rm_nb_cpu_units = nb_cpu_units;
  192. }
  193. static void kernel_to_spawn(void *args)
  194. {
  195. int param_N = *(int*)args;
  196. //test1(param_N);
  197. test2(param_N, 1);
  198. //test2(param_N, 10);
  199. //test2(param_N, 100);
  200. }
  201. int main(int argc, char *argv[])
  202. {
  203. pthread_cond_init(&spawn_pending_cond, NULL);
  204. int param_N = 1000000;
  205. int drs_enabled;
  206. if (argc > 1)
  207. {
  208. param_N = atoi(argv[1]);
  209. if (param_N < 1)
  210. {
  211. usage();
  212. }
  213. }
  214. starpurm_initialize();
  215. init_rm_infos();
  216. if (rm_nb_cpu_units > 1)
  217. {
  218. const int half_nb_cpus = rm_nb_cpu_units/2;
  219. starpurm_set_drs_enable(NULL);
  220. drs_enabled = starpurm_drs_enabled_p();
  221. assert(drs_enabled != 0);
  222. int repeat;
  223. for (repeat=0; repeat < 20; repeat++)
  224. {
  225. hwloc_cpuset_t cpu_cpuset = starpurm_get_all_cpu_workers_cpuset();
  226. {
  227. int strl1 = hwloc_bitmap_snprintf(NULL, 0, cpu_cpuset);
  228. char str1[strl1+1];
  229. hwloc_bitmap_snprintf(str1, strl1+1, cpu_cpuset);
  230. printf("all cpus cpuset = %s\n", str1);
  231. }
  232. int first_idx = hwloc_bitmap_first(cpu_cpuset);
  233. int last_idx = hwloc_bitmap_last(cpu_cpuset);
  234. hwloc_cpuset_t sel_cpuset = hwloc_bitmap_alloc();
  235. assert(sel_cpuset != NULL);
  236. int count = 0;
  237. int idx = first_idx;
  238. while (idx != -1 && idx <= last_idx && count < half_nb_cpus)
  239. {
  240. if (hwloc_bitmap_isset(cpu_cpuset, idx))
  241. {
  242. hwloc_bitmap_set(sel_cpuset, idx);
  243. count ++;
  244. }
  245. idx = hwloc_bitmap_next(cpu_cpuset, idx);
  246. }
  247. assert(count == half_nb_cpus);
  248. {
  249. int strl1 = hwloc_bitmap_snprintf(NULL, 0, sel_cpuset);
  250. char str1[strl1+1];
  251. hwloc_bitmap_snprintf(str1, strl1+1, sel_cpuset);
  252. printf("spawning a kernel on cpuset = %s\n", str1);
  253. }
  254. _inc_spawn_pending();
  255. starpurm_spawn_kernel_on_cpus_callback(NULL, kernel_to_spawn, &param_N, sel_cpuset, spawn_callback, (void*)(uintptr_t)42);
  256. hwloc_bitmap_free(sel_cpuset);
  257. hwloc_bitmap_free(cpu_cpuset);
  258. }
  259. _wait_pending_spawns();
  260. printf("withdrawing %d cpus from StarPU\n", half_nb_cpus);
  261. starpurm_withdraw_cpus_from_starpu(NULL, half_nb_cpus);
  262. test1(param_N);
  263. test2(param_N, 1);
  264. test2(param_N, 10);
  265. test2(param_N, 100);
  266. printf("assigning %d cpus to StarPU\n", half_nb_cpus);
  267. starpurm_assign_cpus_to_starpu(NULL, half_nb_cpus);
  268. test1(param_N);
  269. test2(param_N, 1);
  270. test2(param_N, 10);
  271. test2(param_N, 100);
  272. starpurm_set_drs_disable(NULL);
  273. drs_enabled = starpurm_drs_enabled_p();
  274. assert(drs_enabled == 0);
  275. }
  276. starpurm_shutdown();
  277. pthread_cond_destroy(&spawn_pending_cond);
  278. return 0;
  279. }