bcast_bench.c 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. /*
  17. * Basic broadcast benchmark with synchronized clocks.
  18. * Inspired a lot from NewMadeleine examples/mcast/nm_mcast_bench.c
  19. *
  20. * Synchronized clocks (mpi_sync_clocks) are available here:
  21. * https://gitlab.inria.fr/pm2/pm2/-/tree/master/mpi_sync_clocks
  22. * and are detected during StarPU's configure.
  23. */
  24. #include <starpu_mpi.h>
  25. #include <mpi_sync_clocks.h>
  26. #include "helper.h"
  27. #include "bench_helper.h"
  28. #define SERVER_PRINTF(fmt, ...) do { if(rank == 0) { printf(fmt, ## __VA_ARGS__); fflush(stdout); }} while(0)
  29. typedef void (*algorithm_t)(int nb_dest_nodes, starpu_data_handle_t handle, int nb_nodes_id, int size_id, int bench_id);
  30. static void dummy_loop(int nb_dest_nodes, starpu_data_handle_t handle, int nb_nodes_id, int size_id, int bench_id);
  31. static algorithm_t algorithms[] = { dummy_loop };
  32. #undef NX_MAX
  33. #undef NX_MIN
  34. #define NX_MIN 1
  35. #ifdef STARPU_QUICK_CHECK
  36. #define NB_BENCH 2
  37. #define NX_MAX 100 // kB
  38. #else
  39. #define NB_BENCH 10
  40. #define NX_MAX 240196 // kB
  41. #endif
  42. #define NX_STEP 1.4 // multiplication
  43. #define NB_NODES_STEP 2 // addition
  44. #define NB_NODES_START 3
  45. #define NB_METHODS (sizeof(algorithms)/sizeof(algorithm_t))
  46. struct statistics
  47. {
  48. double min;
  49. double med;
  50. double avg;
  51. double max;
  52. };
  53. static int times_nb_nodes;
  54. static int times_size;
  55. static int worldsize;
  56. static int rank;
  57. static double* times;
  58. static mpi_sync_clocks_t clocks;
  59. static const starpu_mpi_tag_t data_tag = 0x12;
  60. static const starpu_mpi_tag_t time_tag = 0x13;
  61. static double find_max(double* array, int size)
  62. {
  63. double t_max = mpi_sync_clocks_remote_to_global(clocks, 1, array[0]);
  64. double t_value;
  65. int i;
  66. for (i = 1; i < size; i++)
  67. {
  68. t_value = mpi_sync_clocks_remote_to_global(clocks, i+1, array[i]);
  69. if (t_value > t_max)
  70. {
  71. t_max = t_value;
  72. }
  73. }
  74. return t_max;
  75. }
  76. static struct statistics compute_statistics(double* array, int size)
  77. {
  78. struct statistics stat;
  79. int i;
  80. qsort(array, size, sizeof(double), &comp_double);
  81. double avg = 0;
  82. for (i = 0; i < size; i++)
  83. {
  84. avg += array[i];
  85. }
  86. stat.avg = avg / size;
  87. stat.min = array[0];
  88. stat.med = array[(int) floor(size / 2)];
  89. stat.max = array[size - 1];
  90. return stat;
  91. }
  92. static int time_index(int size, int bench, int node)
  93. {
  94. assert(size < times_size);
  95. assert(bench < NB_BENCH);
  96. assert(node < worldsize);
  97. // Warning: if bench < 0 (warmup case), this function returns a result, the user has to check if it makes sense.
  98. return size * (NB_BENCH * (worldsize + 1))
  99. + bench * (worldsize + 1)
  100. + node;
  101. }
  102. static void dummy_loop(int nb_dest_nodes, starpu_data_handle_t data_handle, int nb_nodes_id, int size_id, int bench_id)
  103. {
  104. double t_end;
  105. int i;
  106. starpu_data_handle_t time_handle;
  107. if (rank == 0)
  108. {
  109. int t_index = time_index(size_id, bench_id, 0);
  110. if (bench_id >= 0)
  111. {
  112. times[t_index] = mpi_sync_clocks_get_time_usec(clocks);
  113. }
  114. starpu_mpi_req* reqs = malloc(nb_dest_nodes*sizeof(starpu_mpi_req));
  115. for (i = 1; i <= nb_dest_nodes; i++)
  116. {
  117. starpu_mpi_isend(data_handle, &reqs[i-1], i, data_tag, MPI_COMM_WORLD);
  118. }
  119. for (int i = 1; i <= nb_dest_nodes; i++)
  120. {
  121. starpu_variable_data_register(&time_handle, STARPU_MAIN_RAM, (uintptr_t) &t_end, sizeof(double));
  122. starpu_mpi_recv(time_handle, i, time_tag, MPI_COMM_WORLD, NULL);
  123. starpu_data_unregister(time_handle);
  124. if (bench_id >= 0)
  125. {
  126. times[t_index+i] = t_end;
  127. }
  128. }
  129. free(reqs);
  130. }
  131. else // not server
  132. {
  133. starpu_mpi_recv(data_handle, 0, data_tag, MPI_COMM_WORLD, NULL);
  134. t_end = mpi_sync_clocks_get_time_usec(clocks);
  135. starpu_variable_data_register(&time_handle, STARPU_MAIN_RAM, (uintptr_t) &t_end, sizeof(double));
  136. starpu_mpi_send(time_handle, 0, time_tag, MPI_COMM_WORLD);
  137. starpu_data_unregister(time_handle);
  138. }
  139. }
  140. static void compute_display_times(const int method, const int nb_nodes_id, const int nb_dest_nodes)
  141. {
  142. int size_id = 0;
  143. double times_bench[NB_BENCH];
  144. int s, b;
  145. SERVER_PRINTF("Computing clock offsets... ");
  146. mpi_sync_clocks_synchronize(clocks);
  147. if (rank == 0)
  148. {
  149. printf("done\n");
  150. /* Computing times */
  151. for (s = NX_MIN; s < NX_MAX; s = (s * NX_STEP) + 1)
  152. {
  153. for (b = 0; b < NB_BENCH; b++)
  154. {
  155. double t_begin = times[time_index(size_id, b, 0)];
  156. double t_end = find_max(times + time_index(size_id, b, 1), nb_dest_nodes);
  157. assert(t_begin < t_end);
  158. times_bench[b] = t_end - t_begin;
  159. }
  160. struct statistics stat_main_task = compute_statistics(times_bench, NB_BENCH);
  161. printf(" %d | %3d \t| %5d\t\t| ", method, nb_dest_nodes+1, s);
  162. printf("%10.3lf\t%10.3lf\t%10.3lf\t%10.3lf\n", stat_main_task.min, stat_main_task.med, stat_main_task.avg, stat_main_task.max);
  163. fflush(stdout);
  164. size_id++;
  165. }
  166. }
  167. }
  168. static inline void man()
  169. {
  170. fprintf(stderr, "Options:\n");
  171. fprintf(stderr, "\t-h --help display this help\n");
  172. fprintf(stderr, "\t-p pause workers during benchmark\n");
  173. exit(EXIT_SUCCESS);
  174. }
  175. int main(int argc, char **argv)
  176. {
  177. int pause_workers = 0;
  178. int nb_nodes_id = 0;
  179. int size_id = 0;
  180. int ret, method, nb_dest_nodes, s, b, i, array_size;
  181. starpu_data_handle_t data_handle;
  182. float* msg;
  183. for (i = 1; i < argc; i++)
  184. {
  185. if (strcmp(argv[i], "-p") == 0)
  186. {
  187. pause_workers = 1;
  188. }
  189. else if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0)
  190. {
  191. man();
  192. }
  193. else
  194. {
  195. fprintf(stderr,"Unrecognized option %s\n", argv[i]);
  196. man();
  197. }
  198. }
  199. ret = starpu_mpi_init_conf(&argc, &argv, 1, MPI_COMM_WORLD, NULL);
  200. STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init_conf");
  201. starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
  202. starpu_mpi_comm_size(MPI_COMM_WORLD, &worldsize);
  203. if (worldsize < 4)
  204. {
  205. if (rank == 0)
  206. FPRINTF(stderr, "We need at least 4 processes.\n");
  207. starpu_mpi_shutdown();
  208. return STARPU_TEST_SKIPPED;
  209. }
  210. if (pause_workers)
  211. {
  212. SERVER_PRINTF("Workers will be paused during benchmark.\n");
  213. /* Pause workers for this bench: all workers polling for tasks has a strong impact on performances */
  214. starpu_pause();
  215. }
  216. times_nb_nodes = ((worldsize - NB_NODES_START) / NB_NODES_STEP) + 1;
  217. times_size = (int) (logf((float) NX_MAX / (float) NX_MIN) / logf(NX_STEP)) + 1;
  218. assert(times_size > 0);
  219. times = malloc(times_size * NB_BENCH * (worldsize + 1) * sizeof(double));
  220. SERVER_PRINTF("#0: dummy loop\n");
  221. SERVER_PRINTF(" | Nodes \t| \t| \tMain task lasted (us):\n");
  222. SERVER_PRINTF(" Algo | in comm \t| Size (KB)\t| min\tmed\tavg\tmax\n");
  223. SERVER_PRINTF("-----------------------------------------------------------------------\n");
  224. for (method = 0; method < NB_METHODS; method++)
  225. {
  226. nb_nodes_id = 0;
  227. for (nb_dest_nodes = NB_NODES_START; nb_dest_nodes < worldsize; nb_dest_nodes += NB_NODES_STEP)
  228. {
  229. starpu_mpi_barrier(MPI_COMM_WORLD);
  230. SERVER_PRINTF("Starting global clock... ");
  231. clocks = mpi_sync_clocks_init(MPI_COMM_WORLD);
  232. SERVER_PRINTF("done\n");
  233. size_id = 0;
  234. for (s = NX_MIN; s < NX_MAX; s = (s * NX_STEP) + 1)
  235. {
  236. SERVER_PRINTF(" %d | %3d \t| %5d\t\t| ", method, nb_dest_nodes+1, s);
  237. array_size = s * 1000 / sizeof(float);
  238. msg = malloc(array_size * sizeof(float));
  239. for (i = 0; i < array_size; i++)
  240. {
  241. msg[i] = 3.14;
  242. }
  243. starpu_vector_data_register(&data_handle, STARPU_MAIN_RAM, (uintptr_t) msg, array_size, sizeof(float));
  244. for (b = -1; b < NB_BENCH; b++)
  245. {
  246. if (rank <= nb_dest_nodes)
  247. {
  248. algorithms[method](nb_dest_nodes, data_handle, nb_nodes_id, size_id, b);
  249. }
  250. SERVER_PRINTF(".");
  251. }
  252. SERVER_PRINTF("\n");
  253. starpu_data_unregister(data_handle);
  254. free(msg);
  255. size_id++;
  256. }
  257. // flush clocks
  258. compute_display_times(method, nb_nodes_id, nb_dest_nodes);
  259. mpi_sync_clocks_shutdown(clocks);
  260. nb_nodes_id++;
  261. }
  262. }
  263. if (pause_workers)
  264. {
  265. starpu_resume();
  266. }
  267. starpu_mpi_shutdown();
  268. free(times);
  269. return 0;
  270. }