broadcast.c 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. /* StarPU --- Runtime system for heterogeneous multicore architectures.
  2. *
  3. * Copyright (C) 2013-2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. *
  5. * StarPU is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU Lesser General Public License as published by
  7. * the Free Software Foundation; either version 2.1 of the License, or (at
  8. * your option) any later version.
  9. *
  10. * StarPU is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. *
  14. * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. */
  16. #include <starpu_mpi.h>
  17. #include "helper.h"
  18. void wait_CPU(void *descr[], void *args)
  19. {
  20. int *var = (int*) STARPU_VARIABLE_GET_PTR(descr[0]);
  21. int val;
  22. starpu_codelet_unpack_args(args, &val);
  23. *var = val;
  24. starpu_sleep(1);
  25. }
  26. static struct starpu_codelet cl =
  27. {
  28. .cpu_funcs = { wait_CPU },
  29. .cpu_funcs_name = { "wait_CPU" },
  30. .nbuffers = 1,
  31. .flags = STARPU_CODELET_SIMGRID_EXECUTE,
  32. .modes = { STARPU_W },
  33. };
  34. int main(int argc, char **argv)
  35. {
  36. int ret, rank, size;
  37. starpu_data_handle_t handle;
  38. int var;
  39. int mpi_init;
  40. MPI_Status status;
  41. MPI_INIT_THREAD(&argc, &argv, MPI_THREAD_SERIALIZED, &mpi_init);
  42. ret = starpu_mpi_init_conf(&argc, &argv, mpi_init, MPI_COMM_WORLD, NULL);
  43. STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_init_conf");
  44. starpu_mpi_comm_rank(MPI_COMM_WORLD, &rank);
  45. starpu_mpi_comm_size(MPI_COMM_WORLD, &size);
  46. starpu_variable_data_register(&handle, STARPU_MAIN_RAM, (uintptr_t)&var, sizeof(var));
  47. if (rank == 0)
  48. {
  49. int val, n;
  50. val = 42;
  51. starpu_task_insert(&cl, STARPU_W, handle, STARPU_VALUE, &val, sizeof(val), 0);
  52. for(n = 1 ; n < size ; n++)
  53. {
  54. FPRINTF_MPI(stderr, "sending data to %d\n", n);
  55. starpu_mpi_isend_detached(handle, n, 0, MPI_COMM_WORLD, NULL, NULL);
  56. }
  57. val = 43;
  58. starpu_task_insert(&cl, STARPU_W, handle, STARPU_VALUE, &val, sizeof(val), 0);
  59. for(n = 1 ; n < size ; n++)
  60. {
  61. FPRINTF_MPI(stderr, "sending data to %d\n", n);
  62. starpu_mpi_isend_detached(handle, n, 0, MPI_COMM_WORLD, NULL, NULL);
  63. }
  64. }
  65. else
  66. {
  67. starpu_mpi_recv(handle, 0, 0, MPI_COMM_WORLD, &status);
  68. starpu_data_acquire(handle, STARPU_R);
  69. STARPU_ASSERT(var == 42);
  70. starpu_data_release(handle);
  71. starpu_mpi_recv(handle, 0, 0, MPI_COMM_WORLD, &status);
  72. starpu_data_acquire(handle, STARPU_R);
  73. STARPU_ASSERT(var == 43);
  74. starpu_data_release(handle);
  75. FPRINTF_MPI(stderr, "received data\n");
  76. }
  77. starpu_data_unregister(handle);
  78. starpu_mpi_shutdown();
  79. if (!mpi_init)
  80. MPI_Finalize();
  81. return 0;
  82. }