mpi_redux.c 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2016-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. /*
  17. * This example illustrates how to use the STARPU_MPI_REDUX mode
  18. * and compare it with the standard STARPU_REDUX.
  19. *
  20. * In order to make this comparison salliant, the init codelet is not
  21. * a task that set the handle to a neutral element but rather depends
  22. * on the working node.
  23. * This is not a proper way to use a reduction pattern however it
  24. * can be analogous to the cost/weight of each contribution.
  25. */
  26. #include <stdlib.h>
  27. #include <stdio.h>
  28. #include <assert.h>
  29. #include <math.h>
  30. #include <starpu.h>
  31. #include <starpu_mpi.h>
  32. #include "helper.h"
  33. #include <unistd.h>
  34. static void cl_cpu_work(void *handles[], void*arg)
  35. {
  36. (void)arg;
  37. double *a = (double *)STARPU_VARIABLE_GET_PTR(handles[0]);
  38. double *b = (double *)STARPU_VARIABLE_GET_PTR(handles[1]);
  39. sleep(2);
  40. printf("work_cl (rank:%d,worker:%d) %f =>",starpu_mpi_world_rank(), starpu_worker_get_id(), *a);
  41. *a = 3.0 + *a + *b;
  42. printf("%f\n",*a);
  43. }
  44. static struct starpu_codelet work_cl =
  45. {
  46. .cpu_funcs = { cl_cpu_work },
  47. .nbuffers = 2,
  48. .modes = { STARPU_REDUX, STARPU_R },
  49. .name = "task_init"
  50. };
  51. static struct starpu_codelet mpi_work_cl =
  52. {
  53. .cpu_funcs = { cl_cpu_work },
  54. .nbuffers = 2,
  55. .modes = { STARPU_RW | STARPU_COMMUTE, STARPU_R },
  56. .name = "task_init-mpi"
  57. };
  58. static void cl_cpu_task_init(void *handles[], void*arg)
  59. {
  60. (void) arg;
  61. double *a = (double *)STARPU_VARIABLE_GET_PTR(handles[0]);
  62. sleep(1);
  63. printf("init_cl (rank:%d,worker:%d) %d (was %f)\n", starpu_mpi_world_rank(), starpu_worker_get_id(), starpu_mpi_world_rank(), *a);
  64. *a = starpu_mpi_world_rank();
  65. }
  66. static struct starpu_codelet task_init_cl =
  67. {
  68. .cpu_funcs = { cl_cpu_task_init },
  69. .nbuffers = 1,
  70. .modes = { STARPU_W },
  71. .name = "task_init"
  72. };
  73. static void cl_cpu_task_red(void *handles[], void*arg)
  74. {
  75. (void) arg;
  76. double *ad = (double *)STARPU_VARIABLE_GET_PTR(handles[0]);
  77. double *as = (double *)STARPU_VARIABLE_GET_PTR(handles[1]);
  78. sleep(2);
  79. printf("red_cl (rank:%d,worker:%d) %f ; %f --> %f\n", starpu_mpi_world_rank(), starpu_worker_get_id(), *as, *ad, *as+*ad);
  80. *ad = *ad + *as;
  81. }
  82. static struct starpu_codelet task_red_cl =
  83. {
  84. .cpu_funcs = { cl_cpu_task_red },
  85. .nbuffers = 2,
  86. .modes = { STARPU_RW|STARPU_COMMUTE, STARPU_R },
  87. .name = "task_red"
  88. };
  89. int main(int argc, char *argv[])
  90. {
  91. int comm_rank, comm_size;
  92. /* Initializes STarPU and the StarPU-MPI layer */
  93. starpu_fxt_autostart_profiling(0);
  94. int ret = starpu_mpi_init_conf(&argc, &argv, 1, MPI_COMM_WORLD, NULL);
  95. STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_ini_conft");
  96. int nworkers = starpu_cpu_worker_get_count();
  97. if (nworkers < 2)
  98. {
  99. FPRINTF(stderr, "We need at least 2 CPU worker per node.\n");
  100. starpu_mpi_shutdown();
  101. return STARPU_TEST_SKIPPED;
  102. }
  103. starpu_mpi_comm_size(MPI_COMM_WORLD, &comm_size);
  104. if (comm_size < 2)
  105. {
  106. FPRINTF(stderr, "We need at least 2 nodes.\n");
  107. starpu_mpi_shutdown();
  108. return STARPU_TEST_SKIPPED;
  109. }
  110. starpu_mpi_comm_rank(MPI_COMM_WORLD, &comm_rank);
  111. double a, b[comm_size];
  112. starpu_data_handle_t a_h, b_h[comm_size];
  113. double work_coef = 2;
  114. enum starpu_data_access_mode task_mode;
  115. int i,j,work_node;
  116. starpu_mpi_tag_t tag = 0;
  117. for (i = 0 ; i < 2 ; i++)
  118. {
  119. starpu_mpi_barrier(MPI_COMM_WORLD);
  120. if (i==0)
  121. task_mode = STARPU_MPI_REDUX;
  122. else
  123. task_mode = STARPU_REDUX;
  124. if (comm_rank == 0)
  125. {
  126. a = 1.0;
  127. printf("init a = %f\n", a);
  128. starpu_variable_data_register(&a_h, STARPU_MAIN_RAM, (uintptr_t)&a, sizeof(double));
  129. for (j=0;j<comm_size;j++)
  130. starpu_variable_data_register(&b_h[j], -1, 0, sizeof(double));
  131. }
  132. else
  133. {
  134. b[comm_rank] = 1.0 / (comm_rank + 1.0);
  135. printf("init b_%d = %f\n", comm_rank, b[comm_rank]);
  136. starpu_variable_data_register(&a_h, -1, 0, sizeof(double));
  137. for (j=0;j<comm_size;j++)
  138. {
  139. if (j == comm_rank)
  140. starpu_variable_data_register(&b_h[j], STARPU_MAIN_RAM, (uintptr_t)&b[j], sizeof(double));
  141. else
  142. starpu_variable_data_register(&b_h[j], -1, 0, sizeof(double));
  143. }
  144. }
  145. starpu_mpi_data_register(a_h, tag++, 0);
  146. for (j=0;j<comm_size;j++)
  147. starpu_mpi_data_register(b_h[j], tag++, j);
  148. starpu_data_set_reduction_methods(a_h, &task_red_cl, &task_init_cl);
  149. starpu_fxt_start_profiling();
  150. for (work_node=1; work_node < comm_size;work_node++)
  151. {
  152. for (j=1;j<=work_coef*nworkers;j++)
  153. {
  154. if (i == 0)
  155. starpu_mpi_task_insert(MPI_COMM_WORLD,
  156. &mpi_work_cl,
  157. task_mode, a_h,
  158. STARPU_R, b_h[work_node],
  159. STARPU_EXECUTE_ON_NODE, work_node,
  160. 0);
  161. else
  162. starpu_mpi_task_insert(MPI_COMM_WORLD,
  163. &work_cl,
  164. task_mode, a_h,
  165. STARPU_R, b_h[work_node],
  166. STARPU_EXECUTE_ON_NODE, work_node,
  167. 0);
  168. }
  169. }
  170. starpu_mpi_redux_data(MPI_COMM_WORLD, a_h);
  171. starpu_mpi_wait_for_all(MPI_COMM_WORLD);
  172. starpu_mpi_barrier(MPI_COMM_WORLD);
  173. if (comm_rank == 0)
  174. {
  175. double tmp = 0.0;
  176. for (work_node = 1; work_node < comm_size ; work_node++)
  177. tmp += 1.0 / (work_node + 1.0);
  178. printf("computed result ---> %f expected %f\n", a, 1.0 + (comm_size - 1.0)*(comm_size)/2.0 + work_coef*nworkers*((comm_size-1)*3.0 + tmp));
  179. }
  180. starpu_data_unregister(a_h);
  181. for (work_node=0; work_node < comm_size;work_node++)
  182. starpu_data_unregister(b_h[work_node]);
  183. starpu_mpi_barrier(MPI_COMM_WORLD);
  184. }
  185. starpu_mpi_shutdown();
  186. return 0;
  187. }