nf_mpi_redux_tree.f90 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. ! StarPU --- Runtime system for heterogeneous multicore architectures.
  2. !
  3. ! Copyright (C) 2016-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. !
  5. ! StarPU is free software; you can redistribute it and/or modify
  6. ! it under the terms of the GNU Lesser General Public License as published by
  7. ! the Free Software Foundation; either version 2.1 of the License, or (at
  8. ! your option) any later version.
  9. !
  10. ! StarPU is distributed in the hope that it will be useful, but
  11. ! WITHOUT ANY WARRANTY; without even the implied warranty of
  12. ! MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. !
  14. ! See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. !
  16. program nf_mpi_redux
  17. use iso_c_binding
  18. use fstarpu_mod
  19. use fstarpu_mpi_mod
  20. implicit none
  21. integer, target :: ret, np, i, j, arity
  22. type(c_ptr) :: work_cl, task_rw_cl,task_red_cl, task_ini_cl
  23. character(kind=c_char,len=*), parameter :: name=C_CHAR_"task"//C_NULL_CHAR
  24. character(kind=c_char,len=*), parameter :: namered=C_CHAR_"task_red"//C_NULL_CHAR
  25. character(kind=c_char,len=*), parameter :: nameini=C_CHAR_"task_ini"//C_NULL_CHAR
  26. real(kind(1.d0)), target :: a,tmp
  27. real(kind(1.d0)), target, allocatable :: b(:)
  28. integer(kind=8) :: tag, err
  29. type(c_ptr), target :: ahdl
  30. type(c_ptr), target, allocatable :: bhdl(:)
  31. type(c_ptr) :: task_mode, codelet_mode
  32. integer, target :: comm_world,comm_w_rank, comm_size
  33. integer(c_int), target :: w_node, nworkers, work_coef
  34. !call fstarpu_fxt_autostart_profiling(0)
  35. ret = fstarpu_init(c_null_ptr)
  36. ret = fstarpu_mpi_init(1)
  37. comm_world = fstarpu_mpi_world_comm()
  38. comm_w_rank = fstarpu_mpi_world_rank()
  39. comm_size = fstarpu_mpi_world_size()
  40. allocate(b(comm_size-1), bhdl(comm_size-1))
  41. nworkers = fstarpu_worker_get_count()
  42. if (nworkers.lt.1) then
  43. write(*,'(" ")')
  44. write(*,'("This application is meant to run with at least one worker per node.")')
  45. stop 2
  46. end if
  47. ! allocate and reduction codelets
  48. task_red_cl = fstarpu_codelet_allocate()
  49. call fstarpu_codelet_set_name(task_red_cl, namered)
  50. call fstarpu_codelet_add_cpu_func(task_red_cl,C_FUNLOC(cl_cpu_task_red))
  51. call fstarpu_codelet_add_buffer(task_red_cl, FSTARPU_RW.ior.FSTARPU_COMMUTE)
  52. call fstarpu_codelet_add_buffer(task_red_cl, FSTARPU_R)
  53. task_ini_cl = fstarpu_codelet_allocate()
  54. call fstarpu_codelet_set_name(task_ini_cl, nameini)
  55. call fstarpu_codelet_add_cpu_func(task_ini_cl,C_FUNLOC(cl_cpu_task_ini))
  56. call fstarpu_codelet_add_buffer(task_ini_cl, FSTARPU_W)
  57. work_coef=2
  58. codelet_mode = FSTARPU_RW.ior.FSTARPU_COMMUTE
  59. task_mode = FSTARPU_MPI_REDUX
  60. ! allocate and fill codelet structs
  61. work_cl = fstarpu_codelet_allocate()
  62. call fstarpu_codelet_set_name(work_cl, name)
  63. call fstarpu_codelet_add_cpu_func(work_cl, C_FUNLOC(cl_cpu_task))
  64. call fstarpu_codelet_add_buffer(work_cl, codelet_mode)
  65. call fstarpu_codelet_add_buffer(work_cl, FSTARPU_R)
  66. err = fstarpu_mpi_barrier(comm_world)
  67. do arity=2,comm_size
  68. if(comm_w_rank.eq.0) then
  69. write(*,'(" ")')
  70. a = 1.0
  71. write(*,*) "init a = ", a
  72. else
  73. b(comm_w_rank) = 1.0 / (comm_w_rank + 1.0)
  74. write(*,*) "init b_",comm_w_rank,"=", b(comm_w_rank)
  75. end if
  76. err = fstarpu_mpi_barrier(comm_world)
  77. tag = 0
  78. if(comm_w_rank.eq.0) then
  79. call fstarpu_variable_data_register(ahdl, 0, c_loc(a),c_sizeof(a))
  80. do i=1,comm_size-1
  81. call fstarpu_variable_data_register(bhdl(i), -1, c_null_ptr,c_sizeof(b(i)))
  82. end do
  83. else
  84. call fstarpu_variable_data_register(ahdl, -1, c_null_ptr,c_sizeof(a))
  85. do i=1,comm_size-1
  86. if (i.eq.comm_w_rank) then
  87. call fstarpu_variable_data_register(bhdl(i), 0, c_loc(b(i)),c_sizeof(b(i)))
  88. else
  89. call fstarpu_variable_data_register(bhdl(i), -1, c_null_ptr,c_sizeof(b(i)))
  90. end if
  91. end do
  92. end if
  93. call fstarpu_mpi_data_register(ahdl, tag, 0)
  94. do i=1,comm_size-1
  95. call fstarpu_mpi_data_register(bhdl(i), tag+i,i)
  96. end do
  97. tag = tag + comm_size
  98. call fstarpu_data_set_reduction_methods(ahdl,task_red_cl,task_ini_cl)
  99. err = fstarpu_mpi_barrier(comm_world)
  100. call fstarpu_fxt_start_profiling()
  101. do w_node=1,comm_size-1
  102. do i=1,work_coef*nworkers
  103. call fstarpu_mpi_task_insert( (/ c_loc(comm_world), &
  104. work_cl, &
  105. task_mode, ahdl, &
  106. FSTARPU_R, bhdl(w_node), &
  107. FSTARPU_EXECUTE_ON_NODE, c_loc(w_node), &
  108. C_NULL_PTR /))
  109. end do
  110. end do
  111. call fstarpu_mpi_redux_data_tree(comm_world, ahdl, arity)
  112. err = fstarpu_mpi_wait_for_all(comm_world)
  113. if(comm_w_rank.eq.0) then
  114. tmp = 0
  115. do w_node=1,comm_size-1
  116. tmp = tmp + 1.0 / (w_node+1.0)
  117. end do
  118. write(*,*) 'computed result ---> ',a, "expected =",&
  119. 1.0 + (comm_size-1.0)*(comm_size)/2.0 + work_coef*nworkers*((comm_size-1.0)*3.0 + tmp)
  120. end if
  121. err = fstarpu_mpi_barrier(comm_world)
  122. call fstarpu_data_unregister(ahdl)
  123. do w_node=1,comm_size-1
  124. call fstarpu_data_unregister(bhdl(w_node))
  125. end do
  126. call fstarpu_fxt_stop_profiling()
  127. end do
  128. call fstarpu_codelet_free(work_cl)
  129. call fstarpu_codelet_free(task_red_cl)
  130. call fstarpu_codelet_free(task_ini_cl)
  131. err = fstarpu_mpi_shutdown()
  132. call fstarpu_shutdown()
  133. deallocate(b, bhdl)
  134. stop 0
  135. contains
  136. recursive subroutine cl_cpu_task (buffers, cl_args) bind(C)
  137. use iso_c_binding ! C interfacing module
  138. use fstarpu_mod ! StarPU interfacing module
  139. implicit none
  140. type(c_ptr), value, intent(in) :: buffers, cl_args ! cl_args is unused
  141. integer(c_int) :: ret, worker_id
  142. integer :: comm_rank
  143. integer, target :: i
  144. real(kind(1.d0)), pointer :: a, b
  145. real(kind(1.d0)) :: old_a
  146. worker_id = fstarpu_worker_get_id()
  147. comm_rank = fstarpu_mpi_world_rank()
  148. call c_f_pointer(fstarpu_variable_get_ptr(buffers, 0), a)
  149. call c_f_pointer(fstarpu_variable_get_ptr(buffers, 1), b)
  150. call nf_sleep(1.d0)
  151. old_a = a
  152. a = old_a + 3.0 + b
  153. write(*,*) "task (c_w_rank:",comm_rank," worker_id:",worker_id,") from ",old_a,"to",a
  154. return
  155. end subroutine cl_cpu_task
  156. recursive subroutine cl_cpu_task_red (buffers, cl_args) bind(C)
  157. use iso_c_binding ! C interfacing module
  158. use fstarpu_mod ! StarPU interfacing module
  159. implicit none
  160. type(c_ptr), value, intent(in) :: buffers, cl_args ! cl_args is unused
  161. integer(c_int) :: ret, worker_id
  162. integer, target :: comm_rank
  163. real(kind(1.d0)), pointer :: as, ad
  164. real(kind(1.d0)) :: old_ad
  165. worker_id = fstarpu_worker_get_id()
  166. comm_rank = fstarpu_mpi_world_rank()
  167. call c_f_pointer(fstarpu_variable_get_ptr(buffers, 0), ad)
  168. call c_f_pointer(fstarpu_variable_get_ptr(buffers, 1), as)
  169. old_ad = ad
  170. ad = ad + as
  171. call nf_sleep(1.d0)
  172. write(*,*) "red_cl (c_w_rank:",comm_rank,"worker_id:",worker_id,")",as, old_ad, ' ---> ',ad
  173. return
  174. end subroutine cl_cpu_task_red
  175. recursive subroutine cl_cpu_task_ini (buffers, cl_args) bind(C)
  176. use iso_c_binding ! C interfacing module
  177. use fstarpu_mod ! StarPU interfacing module
  178. implicit none
  179. type(c_ptr), value, intent(in) :: buffers, cl_args
  180. ! cl_args is unused
  181. integer(c_int) :: ret, worker_id
  182. integer, target :: comm_rank
  183. real(kind(1.d0)), pointer :: a
  184. worker_id = fstarpu_worker_get_id()
  185. comm_rank = fstarpu_mpi_world_rank()
  186. call c_f_pointer(fstarpu_variable_get_ptr(buffers, 0), a)
  187. call nf_sleep(0.5d0)
  188. ! As this codelet is run by each worker in the REDUX mode case
  189. ! this initialization makes salient the number of copies spawned
  190. write(*,*) "ini_cl (c_w_rank:",comm_rank,"worker_id:",worker_id,") set to", comm_rank, "(was",a,")"
  191. a = comm_rank
  192. return
  193. end subroutine cl_cpu_task_ini
  194. subroutine nf_sleep(t)
  195. implicit none
  196. integer :: t_start, t_end, t_rate
  197. real(kind(1.d0)) :: ta, t
  198. call system_clock(t_start)
  199. do
  200. call system_clock(t_end, t_rate)
  201. ta = real(t_end-t_start)/real(t_rate)
  202. if(ta.gt.t) return
  203. end do
  204. end subroutine nf_sleep
  205. end program