bcast_bench.c 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. /*
  17. * Basic broadcast benchmark with synchronized clocks.
  18. * Inspired a lot from NewMadeleine examples/mcast/nm_mcast_bench.c
  19. *
  20. * Synchronized clocks (mpi_sync_clocks) are available here:
  21. * https://gitlab.inria.fr/pm2/pm2/-/tree/master/mpi_sync_clocks
  22. * and are detected during StarPU's configure.
  23. */
  24. #include <starpu_mpi.h>
  25. #include <mpi_sync_clocks.h>
  26. #include "helper.h"
  27. #include "bench_helper.h"
  28. #define SERVER_PRINTF(fmt, ...) do { if(rank == 0) { printf(fmt, ## __VA_ARGS__); fflush(stdout); }} while(0)
  29. typedef void (*algorithm_t)(int nb_dest_nodes, starpu_data_handle_t handle, int nb_nodes_id, int size_id, int bench_id);
  30. static void dummy_loop(int nb_dest_nodes, starpu_data_handle_t handle, int nb_nodes_id, int size_id, int bench_id);
  31. static algorithm_t algorithms[] = { dummy_loop };
  32. #undef NX_MAX
  33. #undef NX_MIN
  34. #define NX_MIN 1
  35. #ifdef STARPU_QUICK_CHECK
  36. #define NB_BENCH 2
  37. #define NX_MAX 100 // kB
  38. #else
  39. #define NB_BENCH 10
  40. #define NX_MAX 240196 // kB
  41. #endif
  42. #define NX_STEP 1.4 // multiplication
  43. #define NB_NODES_STEP 2 // addition
  44. #define NB_NODES_START 3
  45. #define NB_METHODS (sizeof(algorithms)/sizeof(algorithm_t))
  46. struct statistics
  47. {
  48. double min;
  49. double med;
  50. double avg;
  51. double max;
  52. };
  53. static int times_nb_nodes;
  54. static int times_size;
  55. static int worldsize;
  56. static int rank;
  57. static double* times;
  58. static mpi_sync_clocks_t clocks;
  59. static const starpu_mpi_tag_t data_tag = 0x12;
  60. static const starpu_mpi_tag_t time_tag = 0x13;
  61. static double find_max(double* array, int size)
  62. {
  63. double t_max = mpi_sync_clocks_remote_to_global(clocks, 1, array[0]);
  64. double t_value;
  65. int i;
  66. for (i = 1; i < size; i++)
  67. {
  68. t_value = mpi_sync_clocks_remote_to_global(clocks, i+1, array[i]);
  69. if (t_value > t_max)
  70. {
  71. t_max = t_value;
  72. }
  73. }
  74. return t_max;
  75. }
  76. static struct statistics compute_statistics(double* array, int size)
  77. {
  78. struct statistics stat;
  79. int i;
  80. qsort(array, size, sizeof(double), &comp_double);
  81. double avg = 0;
  82. for (i = 0; i < size; i++)
  83. {
  84. avg += array[i];
  85. }
  86. stat.avg = avg / size;
  87. stat.min = array[0];
  88. stat.med = array[(int) floor(size / 2)];
  89. stat.max = array[size - 1];
  90. return stat;
  91. }
  92. static int time_index(int size, int bench, int node)
  93. {
  94. assert(size < times_size);
  95. assert(bench < NB_BENCH);
  96. assert(node < worldsize);
  97. // Warning: if bench < 0 (warmup case), this function returns a result, the user has to check if it makes sense.
  98. return size * (NB_BENCH * (worldsize + 1)) + bench * (worldsize + 1) + node;
  99. }
  100. static void dummy_loop(int nb_dest_nodes, starpu_data_handle_t data_handle, int nb_nodes_id, int size_id, int bench_id)
  101. {
  102. double t_end;
  103. int i;
  104. starpu_data_handle_t time_handle;
  105. if (rank == 0)
  106. {
  107. int t_index = time_index(size_id, bench_id, 0);
  108. if (bench_id >= 0)
  109. {
  110. times[t_index] = mpi_sync_clocks_get_time_usec(clocks);
  111. }
  112. starpu_mpi_req* reqs = malloc(nb_dest_nodes*sizeof(starpu_mpi_req));
  113. for (i = 1; i <= nb_dest_nodes; i++)
  114. {
  115. starpu_mpi_isend(data_handle, &reqs[i-1], i, data_tag, MPI_COMM_WORLD);
  116. }
  117. for (i = 0; i < nb_dest_nodes; i++)
  118. {
  119. starpu_mpi_wait(&reqs[i], MPI_STATUS_IGNORE);
  120. }
  121. for (i = 1; i <= nb_dest_nodes; i++)
  122. {
  123. starpu_variable_data_register(&time_handle, STARPU_MAIN_RAM, (uintptr_t) &t_end, sizeof(double));
  124. starpu_mpi_recv(time_handle, i, time_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  125. starpu_data_unregister(time_handle);
  126. if (bench_id >= 0)
  127. {
  128. times[t_index+i] = t_end;
  129. }
  130. }
  131. free(reqs);
  132. }
  133. else // not server
  134. {
  135. starpu_mpi_recv(data_handle, 0, data_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  136. t_end = mpi_sync_clocks_get_time_usec(clocks);
  137. starpu_variable_data_register(&time_handle, STARPU_MAIN_RAM, (uintptr_t) &t_end, sizeof(double));
  138. starpu_mpi_send(time_handle, 0, time_tag, MPI_COMM_WORLD);
  139. starpu_data_unregister(time_handle);
  140. }
  141. }
  142. static void compute_display_times(const int method, const int nb_nodes_id, const int nb_dest_nodes)
  143. {
  144. int size_id = 0;
  145. double times_bench[NB_BENCH];
  146. int s, b;
  147. SERVER_PRINTF("Computing clock offsets... ");
  148. mpi_sync_clocks_synchronize(clocks);
  149. if (rank == 0)
  150. {
  151. printf("done\n");
  152. /* Computing times */
  153. for (s = NX_MIN; s < NX_MAX; s = (s * NX_STEP) + 1)
  154. {
  155. for (b = 0; b < NB_BENCH; b++)
  156. {
  157. double t_begin = times[time_index(size_id, b, 0)];
  158. double t_end = find_max(times + time_index(size_id, b, 1), nb_dest_nodes);
  159. assert(t_begin < t_end);
  160. times_bench[b] = t_end - t_begin;
  161. }
  162. struct statistics stat_main_task = compute_statistics(times_bench, NB_BENCH);
  163. printf(" %d | %3d \t| %5d\t\t| ", method, nb_dest_nodes+1, s);
  164. printf("%10.3lf\t%10.3lf\t%10.3lf\t%10.3lf\n", stat_main_task.min, stat_main_task.med, stat_main_task.avg, stat_main_task.max);
  165. fflush(stdout);
  166. size_id++;
  167. }
  168. }
  169. }
  170. static inline void man()
  171. {
  172. fprintf(stderr, "Options:\n");
  173. fprintf(stderr, "\t-h --help display this help\n");
  174. fprintf(stderr, "\t-p pause workers during benchmark\n");
  175. exit(EXIT_SUCCESS);
  176. }
  177. int main(int argc, char **argv)
  178. {
  179. int pause_workers = 0;
  180. int nb_nodes_id;
  181. int size_id;
  182. int ret, method, nb_dest_nodes, s, b, i, array_size;
  183. starpu_data_handle_t data_handle;
  184. float* msg;
  185. for (i = 1; i < argc; i++)
  186. {
  187. if (strcmp(argv[i], "-p") == 0)
  188. {
  189. pause_workers = 1;
  190. }
  191. else if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0)
  192. {
  193. man();
  194. }
  195. else
  196. {
  197. fprintf(stderr,"Unrecognized option %s\n", argv[i]);
  198. man();
  199. }
  200. }
  201. ret = starpu_mpi_init_conf(&argc, &argv, 1, MPI_COMM_WORLD, NULL);
  202. STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init_conf");
  203. starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
  204. starpu_mpi_comm_size(MPI_COMM_WORLD, &worldsize);
  205. if (worldsize < 4)
  206. {
  207. if (rank == 0)
  208. FPRINTF(stderr, "We need at least 4 processes.\n");
  209. starpu_mpi_shutdown();
  210. return STARPU_TEST_SKIPPED;
  211. }
  212. if (pause_workers)
  213. {
  214. SERVER_PRINTF("Workers will be paused during benchmark.\n");
  215. /* Pause workers for this bench: all workers polling for tasks has a strong impact on performances */
  216. starpu_pause();
  217. }
  218. times_nb_nodes = ((worldsize - NB_NODES_START) / NB_NODES_STEP) + 1;
  219. times_size = (int) (logf((float) NX_MAX / (float) NX_MIN) / logf(NX_STEP)) + 1;
  220. assert(times_size > 0);
  221. times = malloc(times_size * NB_BENCH * (worldsize + 1) * sizeof(double));
  222. SERVER_PRINTF("#0: dummy loop\n");
  223. SERVER_PRINTF(" | Nodes \t| \t| \tMain task lasted (us):\n");
  224. SERVER_PRINTF(" Algo | in comm \t| Size (KB)\t| min\tmed\tavg\tmax\n");
  225. SERVER_PRINTF("-----------------------------------------------------------------------\n");
  226. for (method = 0; method < NB_METHODS; method++)
  227. {
  228. nb_nodes_id = 0;
  229. for (nb_dest_nodes = NB_NODES_START; nb_dest_nodes < worldsize; nb_dest_nodes += NB_NODES_STEP)
  230. {
  231. starpu_mpi_barrier(MPI_COMM_WORLD);
  232. SERVER_PRINTF("Starting global clock... ");
  233. clocks = mpi_sync_clocks_init(MPI_COMM_WORLD);
  234. SERVER_PRINTF("done\n");
  235. size_id = 0;
  236. for (s = NX_MIN; s < NX_MAX; s = (s * NX_STEP) + 1)
  237. {
  238. SERVER_PRINTF(" %d | %3d \t| %5d\t\t| ", method, nb_dest_nodes+1, s);
  239. array_size = s * 1000 / sizeof(float);
  240. msg = malloc(array_size * sizeof(float));
  241. for (i = 0; i < array_size; i++)
  242. {
  243. msg[i] = 3.14;
  244. }
  245. starpu_vector_data_register(&data_handle, STARPU_MAIN_RAM, (uintptr_t) msg, array_size, sizeof(float));
  246. for (b = -1; b < NB_BENCH; b++)
  247. {
  248. if (rank <= nb_dest_nodes)
  249. {
  250. algorithms[method](nb_dest_nodes, data_handle, nb_nodes_id, size_id, b);
  251. }
  252. SERVER_PRINTF(".");
  253. }
  254. SERVER_PRINTF("\n");
  255. starpu_data_unregister(data_handle);
  256. free(msg);
  257. size_id++;
  258. }
  259. // flush clocks
  260. compute_display_times(method, nb_nodes_id, nb_dest_nodes);
  261. mpi_sync_clocks_shutdown(clocks);
  262. nb_nodes_id++;
  263. }
  264. }
  265. if (pause_workers)
  266. {
  267. starpu_resume();
  268. }
  269. starpu_mpi_shutdown();
  270. free(times);
  271. return 0;
  272. }