burst_helper.c 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <starpu_mpi.h>
  17. #include "helper.h"
  18. #include "burst_helper.h"
  19. #if defined(STARPU_SIMGRID) || defined(STARPU_QUICK_CHECK)
  20. #define NB_REQUESTS 10
  21. #else
  22. #define NB_REQUESTS 50
  23. #endif
  24. #define NX_ARRAY (320 * 320)
  25. static starpu_data_handle_t* recv_handles;
  26. static starpu_data_handle_t* send_handles;
  27. static float** recv_buffers;
  28. static float** send_buffers;
  29. static starpu_mpi_req* recv_reqs;
  30. static starpu_mpi_req* send_reqs;
  31. int burst_nb_requests = NB_REQUESTS;
  32. void burst_init_data(int rank)
  33. {
  34. if (rank == 0 || rank == 1)
  35. {
  36. recv_handles = malloc(burst_nb_requests * sizeof(starpu_data_handle_t));
  37. send_handles = malloc(burst_nb_requests * sizeof(starpu_data_handle_t));
  38. recv_buffers = malloc(burst_nb_requests * sizeof(float*));
  39. send_buffers = malloc(burst_nb_requests * sizeof(float*));
  40. recv_reqs = malloc(burst_nb_requests * sizeof(starpu_mpi_req));
  41. send_reqs = malloc(burst_nb_requests * sizeof(starpu_mpi_req));
  42. for (int i = 0; i < burst_nb_requests; i++)
  43. {
  44. send_buffers[i] = malloc(NX_ARRAY * sizeof(float));
  45. memset(send_buffers[i], 0, NX_ARRAY * sizeof(float));
  46. starpu_vector_data_register(&send_handles[i], STARPU_MAIN_RAM, (uintptr_t) send_buffers[i], NX_ARRAY, sizeof(float));
  47. recv_buffers[i] = malloc(NX_ARRAY * sizeof(float));
  48. memset(recv_buffers[i], 0, NX_ARRAY * sizeof(float));
  49. starpu_vector_data_register(&recv_handles[i], STARPU_MAIN_RAM, (uintptr_t) recv_buffers[i], NX_ARRAY, sizeof(float));
  50. }
  51. }
  52. }
  53. void burst_free_data(int rank)
  54. {
  55. if (rank == 0 || rank == 1)
  56. {
  57. for (int i = 0; i < burst_nb_requests; i++)
  58. {
  59. starpu_data_unregister(send_handles[i]);
  60. free(send_buffers[i]);
  61. starpu_data_unregister(recv_handles[i]);
  62. free(recv_buffers[i]);
  63. }
  64. free(recv_handles);
  65. free(send_handles);
  66. free(recv_buffers);
  67. free(send_buffers);
  68. free(recv_reqs);
  69. free(send_reqs);
  70. }
  71. }
  72. /* Burst simultaneous from both nodes: 0 and 1 post all the recvs, synchronise, and then post all the sends */
  73. void burst_bidir(int rank)
  74. {
  75. int other_rank = (rank == 0) ? 1 : 0;
  76. FPRINTF(stderr, "Simultaneous....start (rank %d)\n", rank);
  77. if (rank == 0 || rank == 1)
  78. {
  79. for (int i = 0; i < burst_nb_requests; i++)
  80. {
  81. recv_reqs[i] = NULL;
  82. starpu_mpi_irecv(recv_handles[i], &recv_reqs[i], other_rank, i, MPI_COMM_WORLD);
  83. }
  84. }
  85. starpu_mpi_barrier(MPI_COMM_WORLD);
  86. if (rank == 0 || rank == 1)
  87. {
  88. for (int i = 0; i < burst_nb_requests; i++)
  89. {
  90. send_reqs[i] = NULL;
  91. starpu_mpi_isend_prio(send_handles[i], &send_reqs[i], other_rank, i, i, MPI_COMM_WORLD);
  92. }
  93. for (int i = 0; i < burst_nb_requests; i++)
  94. {
  95. if (recv_reqs[i]) starpu_mpi_wait(&recv_reqs[i], MPI_STATUS_IGNORE);
  96. if (send_reqs[i]) starpu_mpi_wait(&send_reqs[i], MPI_STATUS_IGNORE);
  97. }
  98. }
  99. FPRINTF(stderr, "Simultaneous....end (rank %d)\n", rank);
  100. starpu_mpi_barrier(MPI_COMM_WORLD);
  101. }
  102. void burst_unidir(int sender, int rank)
  103. {
  104. int other_rank = (rank == 0) ? 1 : 0;
  105. int receiver = (sender == 0) ? 1 : 0;
  106. FPRINTF(stderr, "%d -> %d... start (rank %d)\n", sender, receiver, rank);
  107. if (rank != sender)
  108. {
  109. for (int i = 0; i < burst_nb_requests; i++)
  110. {
  111. recv_reqs[i] = NULL;
  112. starpu_mpi_irecv(recv_handles[i], &recv_reqs[i], other_rank, i, MPI_COMM_WORLD);
  113. }
  114. }
  115. starpu_mpi_barrier(MPI_COMM_WORLD);
  116. if (rank == sender)
  117. {
  118. for (int i = 0; i < burst_nb_requests; i++)
  119. {
  120. send_reqs[i] = NULL;
  121. starpu_mpi_isend_prio(send_handles[i], &send_reqs[i], other_rank, i, i, MPI_COMM_WORLD);
  122. }
  123. }
  124. if (rank == 0 || rank == 1)
  125. {
  126. for (int i = 0; i < burst_nb_requests; i++)
  127. {
  128. if (rank != sender && recv_reqs[i]) starpu_mpi_wait(&recv_reqs[i], MPI_STATUS_IGNORE);
  129. if (rank == sender && send_reqs[i]) starpu_mpi_wait(&send_reqs[i], MPI_STATUS_IGNORE);
  130. }
  131. }
  132. FPRINTF(stderr, "%d -> %d... end (rank %d)\n", sender, receiver, rank);
  133. starpu_mpi_barrier(MPI_COMM_WORLD);
  134. }
  135. /* Half burst from both nodes, second half burst is triggered after some requests finished. */
  136. void burst_bidir_half_postponed(int rank)
  137. {
  138. int other_rank = (rank == 0) ? 1 : 0;
  139. int received = 0;
  140. FPRINTF(stderr, "Half/half burst...start (rank %d)\n", rank);
  141. if (rank == 0 || rank == 1)
  142. {
  143. for (int i = 0; i < burst_nb_requests; i++)
  144. {
  145. recv_reqs[i] = NULL;
  146. starpu_mpi_irecv(recv_handles[i], &recv_reqs[i], other_rank, i, MPI_COMM_WORLD);
  147. }
  148. }
  149. starpu_mpi_barrier(MPI_COMM_WORLD);
  150. if (rank == 0 || rank == 1)
  151. {
  152. for (int i = 0; i < (burst_nb_requests / 2); i++)
  153. {
  154. send_reqs[i] = NULL;
  155. starpu_mpi_isend_prio(send_handles[i], &send_reqs[i], other_rank, i, i, MPI_COMM_WORLD);
  156. }
  157. if (recv_reqs[burst_nb_requests / 4]) starpu_mpi_wait(&recv_reqs[burst_nb_requests / 4], MPI_STATUS_IGNORE);
  158. for (int i = (burst_nb_requests / 2); i < burst_nb_requests; i++)
  159. {
  160. send_reqs[i] = NULL;
  161. starpu_mpi_isend_prio(send_handles[i], &send_reqs[i], other_rank, i, i, MPI_COMM_WORLD);
  162. }
  163. for (int i = 0; i < burst_nb_requests; i++)
  164. {
  165. if (recv_reqs[i]) starpu_mpi_wait(&recv_reqs[i], MPI_STATUS_IGNORE);
  166. if (send_reqs[i]) starpu_mpi_wait(&send_reqs[i], MPI_STATUS_IGNORE);
  167. }
  168. }
  169. FPRINTF(stderr, "Half/half burst...done (rank %d)\n", rank);
  170. starpu_mpi_barrier(MPI_COMM_WORLD);
  171. }
  172. void burst_all(int rank)
  173. {
  174. double start, end;
  175. start = starpu_timing_now();
  176. /* Burst simultaneous from both nodes: 0 and 1 post all the recvs, synchronise, and then post all the sends */
  177. burst_bidir(rank);
  178. /* Burst from 0 to 1 : rank 1 posts all the recvs, barrier, then rank 0 posts all the sends */
  179. burst_unidir(0, rank);
  180. /* Burst from 1 to 0 : rank 0 posts all the recvs, barrier, then rank 1 posts all the sends */
  181. burst_unidir(1, rank);
  182. /* Half burst from both nodes, second half burst is triggered after some requests finished. */
  183. burst_bidir_half_postponed(rank);
  184. end = starpu_timing_now();
  185. FPRINTF(stderr, "All bursts took %.0f ms\n", (end - start) / 1000.0);
  186. }