mpi_scatter_gather.c 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2011 Centre National de la Recherche Scientifique
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <starpu_mpi.h>
  17. /* Returns the MPI node number where data indexes index is */
  18. int my_distrib(int x, int y, int nb_nodes) {
  19. return (x+y) % nb_nodes;
  20. }
  21. void cpu_codelet(void *descr[], void *_args)
  22. {
  23. float *block;
  24. unsigned nx = STARPU_MATRIX_GET_NY(descr[0]);
  25. unsigned ld = STARPU_MATRIX_GET_LD(descr[0]);
  26. unsigned i,j;
  27. int rank;
  28. float factor;
  29. block = (float *)STARPU_MATRIX_GET_PTR(descr[0]);
  30. starpu_unpack_cl_args(_args, &rank, &factor);
  31. fprintf(stderr,"rank %d factor %f\n", rank, factor);
  32. for (j = 0; j < nx; j++)
  33. {
  34. for (i = 0; i < nx; i++)
  35. {
  36. block[j+i*ld] *= factor;
  37. }
  38. }
  39. }
  40. static starpu_codelet cl =
  41. {
  42. .where = STARPU_CPU,
  43. .cpu_func = cpu_codelet,
  44. .nbuffers = 1
  45. };
  46. int main(int argc, char **argv)
  47. {
  48. int rank, nodes;
  49. float ***bmat;
  50. starpu_data_handle **data_handles;
  51. unsigned i,j,x,y;
  52. unsigned nblocks=4;
  53. unsigned block_size=1;
  54. unsigned size = nblocks*block_size;
  55. unsigned ld = size / nblocks;
  56. starpu_init(NULL);
  57. starpu_mpi_initialize_extended(&rank, &nodes);
  58. if (rank == 0)
  59. {
  60. /* Allocate the matrix */
  61. int block_number=100;
  62. bmat = malloc(nblocks * sizeof(float *));
  63. for(x=0 ; x<nblocks ; x++)
  64. {
  65. bmat[x] = malloc(nblocks * sizeof(float *));
  66. for(y=0 ; y<nblocks ; y++)
  67. {
  68. float value=1.0;
  69. starpu_malloc((void **)&bmat[x][y], block_size*block_size*sizeof(float));
  70. for (i = 0; i < block_size; i++)
  71. {
  72. for (j = 0; j < block_size; j++)
  73. {
  74. bmat[x][y][j +i*block_size] = block_number + value;
  75. value++;
  76. }
  77. }
  78. block_number += 100;
  79. }
  80. }
  81. }
  82. #if 0
  83. // Print matrix
  84. if (rank == 0)
  85. {
  86. for(x=0 ; x<nblocks ; x++)
  87. {
  88. for(y=0 ; y<nblocks ; y++)
  89. {
  90. for (j = 0; j < block_size; j++)
  91. {
  92. for (i = 0; i < block_size; i++)
  93. {
  94. fprintf(stderr, "%2.2f\t", bmat[x][y][j+i*block_size]);
  95. }
  96. fprintf(stderr,"\n");
  97. }
  98. fprintf(stderr,"\n");
  99. }
  100. }
  101. }
  102. #endif
  103. /* Allocate data handles and register data to StarPU */
  104. data_handles = malloc(nblocks*sizeof(starpu_data_handle *));
  105. for(x = 0; x < nblocks ; x++)
  106. {
  107. data_handles[x] = malloc(nblocks*sizeof(starpu_data_handle));
  108. for (y = 0; y < nblocks; y++)
  109. {
  110. int mpi_rank = my_distrib(x, y, nodes);
  111. if (rank == 0)
  112. starpu_matrix_data_register(&data_handles[x][y], 0, (uintptr_t)bmat[x][y],
  113. ld, size/nblocks, size/nblocks, sizeof(float));
  114. else {
  115. if ((mpi_rank == rank) || ((rank == mpi_rank+1 || rank == mpi_rank-1)))
  116. {
  117. /* I own that index, or i will need it for my computations */
  118. fprintf(stderr, "[%d] Owning or neighbor of data[%d][%d]\n", rank, x, y);
  119. starpu_matrix_data_register(&data_handles[x][y], -1, (uintptr_t)NULL,
  120. ld, size/nblocks, size/nblocks, sizeof(float));
  121. }
  122. else
  123. {
  124. /* I know it's useless to allocate anything for this */
  125. data_handles[x][y] = NULL;
  126. }
  127. }
  128. if (data_handles[x][y])
  129. {
  130. starpu_data_set_rank(data_handles[x][y], mpi_rank);
  131. }
  132. }
  133. }
  134. /* Scatter the matrix among the nodes */
  135. if (rank == 0)
  136. {
  137. for(x = 0; x < nblocks ; x++)
  138. {
  139. for (y = 0; y < nblocks; y++)
  140. {
  141. if (data_handles[x][y])
  142. {
  143. int owner = starpu_data_get_rank(data_handles[x][y]);
  144. if (owner != 0)
  145. {
  146. fprintf(stderr, "[%d] Sending data[%d][%d] to %d\n", rank, x, y, owner);
  147. starpu_mpi_isend_detached(data_handles[x][y], owner, owner, MPI_COMM_WORLD, NULL, NULL);
  148. }
  149. }
  150. }
  151. }
  152. }
  153. else {
  154. for(x = 0; x < nblocks ; x++)
  155. {
  156. for (y = 0; y < nblocks; y++)
  157. {
  158. if (data_handles[x][y])
  159. {
  160. int owner = starpu_data_get_rank(data_handles[x][y]);
  161. if (owner == rank)
  162. {
  163. fprintf(stderr, "[%d] Receiving data[%d][%d] from %d\n", rank, x, y, 0);
  164. starpu_mpi_irecv_detached(data_handles[x][y], 0, rank, MPI_COMM_WORLD, NULL, NULL);
  165. }
  166. }
  167. }
  168. }
  169. }
  170. /* Calculation */
  171. float factor=10.0;
  172. for(x = 0; x < nblocks ; x++)
  173. {
  174. for (y = 0; y < nblocks; y++)
  175. {
  176. int mpi_rank = my_distrib(x, y, nodes);
  177. if (mpi_rank == rank)
  178. {
  179. fprintf(stderr,"[%d] Computing on data[%d][%d]\n", rank, x, y);
  180. starpu_insert_task(&cl,
  181. STARPU_VALUE, &rank, sizeof(rank),
  182. STARPU_VALUE, &factor, sizeof(factor),
  183. STARPU_RW, data_handles[x][y],
  184. 0);
  185. starpu_task_wait_for_all();
  186. }
  187. factor+=10.0;
  188. }
  189. }
  190. /* Gather the matrix on main node */
  191. if (rank == 0)
  192. {
  193. for(x = 0; x < nblocks ; x++)
  194. {
  195. for (y = 0; y < nblocks; y++)
  196. {
  197. if (data_handles[x][y])
  198. {
  199. int owner = starpu_data_get_rank(data_handles[x][y]);
  200. if (owner != 0)
  201. {
  202. fprintf(stderr, "[%d] Receiving data[%d][%d] from %d\n", rank, x, y, owner);
  203. starpu_mpi_irecv_detached(data_handles[x][y], owner, owner, MPI_COMM_WORLD, NULL, NULL);
  204. }
  205. }
  206. }
  207. }
  208. }
  209. else {
  210. for(x = 0; x < nblocks ; x++)
  211. {
  212. for (y = 0; y < nblocks; y++)
  213. {
  214. if (data_handles[x][y])
  215. {
  216. int owner = starpu_data_get_rank(data_handles[x][y]);
  217. if (owner == rank)
  218. {
  219. fprintf(stderr, "[%d] Sending data[%d][%d] to %d\n", rank, x, y, 0);
  220. starpu_mpi_isend_detached(data_handles[x][y], 0, rank, MPI_COMM_WORLD, NULL, NULL);
  221. }
  222. }
  223. }
  224. }
  225. }
  226. // Print matrix
  227. if (rank == 0)
  228. {
  229. for(x=0 ; x<nblocks ; x++)
  230. {
  231. for(y=0 ; y<nblocks ; y++)
  232. {
  233. starpu_data_unregister(data_handles[x][y]);
  234. for (j = 0; j < block_size; j++)
  235. {
  236. for (i = 0; i < block_size; i++)
  237. {
  238. fprintf(stderr, "%2.2f\t", bmat[x][y][j+i*block_size]);
  239. }
  240. fprintf(stderr,"\n");
  241. }
  242. fprintf(stderr,"\n");
  243. }
  244. }
  245. }
  246. // Free memory
  247. for(x = 0; x < nblocks ; x++)
  248. {
  249. free(data_handles[x]);
  250. }
  251. free(data_handles);
  252. if (rank == 0)
  253. {
  254. for(x=0 ; x<nblocks ; x++)
  255. {
  256. for(y=0 ; y<nblocks ; y++)
  257. {
  258. starpu_free((void *)bmat[x][y]);
  259. }
  260. free(bmat[x]);
  261. }
  262. free(bmat);
  263. }
  264. starpu_mpi_shutdown();
  265. starpu_shutdown();
  266. return 0;
  267. }