nf_mpi_redux.f90 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. program nf_mpi_redux
  2. use iso_c_binding
  3. use fstarpu_mod
  4. use fstarpu_mpi_mod
  5. implicit none
  6. integer, target :: ret, np, i, j, trial
  7. type(c_ptr) :: work_cl, task_rw_cl,task_red_cl, task_ini_cl
  8. character(kind=c_char,len=*), parameter :: name=C_CHAR_"task"//C_NULL_CHAR
  9. character(kind=c_char,len=*), parameter :: namered=C_CHAR_"task_red"//C_NULL_CHAR
  10. character(kind=c_char,len=*), parameter :: nameini=C_CHAR_"task_ini"//C_NULL_CHAR
  11. real(kind(1.d0)), target :: a,tmp
  12. real(kind(1.d0)), target, allocatable :: b(:)
  13. integer(kind=8) :: tag, err
  14. type(c_ptr) :: ahdl
  15. type(c_ptr), target, allocatable :: bhdl(:)
  16. type(c_ptr) :: task_mode, codelet_mode
  17. integer, target :: comm_world,comm_w_rank, comm_size
  18. integer(c_int), target :: w_node, nworkers, work_coef
  19. call fstarpu_fxt_autostart_profiling(0)
  20. ret = fstarpu_init(c_null_ptr)
  21. ret = fstarpu_mpi_init(1)
  22. comm_world = fstarpu_mpi_world_comm()
  23. comm_w_rank = fstarpu_mpi_world_rank()
  24. comm_size = fstarpu_mpi_world_size()
  25. if (comm_size.lt.2) then
  26. write(*,'(" ")')
  27. write(*,'("This application is meant to run with at least two nodes.")')
  28. stop 2
  29. end if
  30. allocate(b(comm_size-1), bhdl(comm_size-1))
  31. nworkers = fstarpu_worker_get_count()
  32. if (nworkers.lt.1) then
  33. write(*,'(" ")')
  34. write(*,'("This application is meant to run with at least one worker per node.")')
  35. stop 2
  36. end if
  37. ! allocate and reduction codelets
  38. task_red_cl = fstarpu_codelet_allocate()
  39. call fstarpu_codelet_set_name(task_red_cl, namered)
  40. call fstarpu_codelet_add_cpu_func(task_red_cl,C_FUNLOC(cl_cpu_task_red))
  41. call fstarpu_codelet_add_buffer(task_red_cl, FSTARPU_RW)
  42. call fstarpu_codelet_add_buffer(task_red_cl, FSTARPU_R)
  43. task_ini_cl = fstarpu_codelet_allocate()
  44. call fstarpu_codelet_set_name(task_ini_cl, nameini)
  45. call fstarpu_codelet_add_cpu_func(task_ini_cl,C_FUNLOC(cl_cpu_task_ini))
  46. call fstarpu_codelet_add_buffer(task_ini_cl, FSTARPU_W)
  47. work_coef=2
  48. do trial=1,2
  49. if (trial.eq.1) then
  50. write(*,*) "Using STARPU_MPI_REDUX"
  51. codelet_mode = FSTARPU_RW.ior.FSTARPU_COMMUTE
  52. task_mode = FSTARPU_MPI_REDUX
  53. else if (trial.eq.2) then
  54. write(*,*) "Using STARPU_REDUX"
  55. codelet_mode = FSTARPU_REDUX
  56. task_mode = FSTARPU_REDUX
  57. end if
  58. ! allocate and fill codelet structs
  59. work_cl = fstarpu_codelet_allocate()
  60. call fstarpu_codelet_set_name(work_cl, name)
  61. call fstarpu_codelet_add_cpu_func(work_cl, C_FUNLOC(cl_cpu_task))
  62. call fstarpu_codelet_add_buffer(work_cl, codelet_mode)
  63. call fstarpu_codelet_add_buffer(work_cl, FSTARPU_R)
  64. err = fstarpu_mpi_barrier(comm_world)
  65. if(comm_w_rank.eq.0) then
  66. write(*,'(" ")')
  67. a = 1.0
  68. write(*,*) "init a = ", a
  69. else
  70. b(comm_w_rank) = 1.0 / (comm_w_rank + 1.0)
  71. write(*,*) "init b_",comm_w_rank,"=", b(comm_w_rank), " AT ", &
  72. c_loc(bhdl(comm_w_rank)) ! This is not really meaningful
  73. end if
  74. err = fstarpu_mpi_barrier(comm_world)
  75. tag = 0
  76. if(comm_w_rank.eq.0) then
  77. call fstarpu_variable_data_register(ahdl, 0, c_loc(a),c_sizeof(a))
  78. do i=1,comm_size-1
  79. call fstarpu_variable_data_register(bhdl(i), -1, c_null_ptr,c_sizeof(b(i)))
  80. end do
  81. else
  82. call fstarpu_variable_data_register(ahdl, -1, c_null_ptr,c_sizeof(a))
  83. do i=1,comm_size-1
  84. if (i.eq.comm_w_rank) then
  85. call fstarpu_variable_data_register(bhdl(i), 0, c_loc(b(i)),c_sizeof(b(i)))
  86. else
  87. call fstarpu_variable_data_register(bhdl(i), -1, c_null_ptr,c_sizeof(b(i)))
  88. end if
  89. end do
  90. end if
  91. call fstarpu_mpi_data_register(ahdl, tag, 0)
  92. do i=1,comm_size-1
  93. call fstarpu_mpi_data_register(bhdl(i), tag+i,i)
  94. end do
  95. tag = tag + comm_size
  96. call fstarpu_data_set_reduction_methods(ahdl,task_red_cl,task_ini_cl)
  97. err = fstarpu_mpi_barrier(comm_world)
  98. call fstarpu_fxt_start_profiling()
  99. do w_node=1,comm_size-1
  100. do i=1,work_coef*nworkers
  101. call fstarpu_mpi_task_insert( (/ c_loc(comm_world), &
  102. work_cl, &
  103. task_mode, ahdl, &
  104. FSTARPU_R, bhdl(w_node), &
  105. FSTARPU_EXECUTE_ON_NODE, c_loc(w_node), &
  106. C_NULL_PTR /))
  107. end do
  108. end do
  109. call fstarpu_mpi_redux_data(comm_world, ahdl)
  110. err = fstarpu_mpi_wait_for_all(comm_world)
  111. if(comm_w_rank.eq.0) then
  112. tmp = 0
  113. do w_node=1,comm_size-1
  114. tmp = tmp + 1.0 / (w_node+1.0)
  115. end do
  116. write(*,*) 'computed result ---> ',a, "expected =",&
  117. 1.0 + (comm_size-1.0)*(comm_size)/2.0 + work_coef*nworkers*((comm_size-1.0)*3.0 + tmp)
  118. end if
  119. err = fstarpu_mpi_barrier(comm_world)
  120. call fstarpu_data_unregister(ahdl)
  121. do w_node=1,comm_size-1
  122. call fstarpu_data_unregister(bhdl(w_node))
  123. end do
  124. call fstarpu_codelet_free(work_cl)
  125. end do
  126. call fstarpu_fxt_stop_profiling()
  127. call fstarpu_codelet_free(task_red_cl)
  128. call fstarpu_codelet_free(task_ini_cl)
  129. err = fstarpu_mpi_shutdown()
  130. call fstarpu_shutdown()
  131. deallocate(b, bhdl)
  132. stop
  133. contains
  134. recursive subroutine cl_cpu_task (buffers, cl_args) bind(C)
  135. use iso_c_binding ! C interfacing module
  136. use fstarpu_mod ! StarPU interfacing module
  137. implicit none
  138. type(c_ptr), value, intent(in) :: buffers, cl_args ! cl_args is unused
  139. integer(c_int) :: ret, worker_id
  140. integer :: comm_rank
  141. integer, target :: i
  142. real(kind(1.d0)), pointer :: a, b
  143. real(kind(1.d0)) :: old_a
  144. worker_id = fstarpu_worker_get_id()
  145. comm_rank = fstarpu_mpi_world_rank()
  146. call c_f_pointer(fstarpu_variable_get_ptr(buffers, 0), a)
  147. call c_f_pointer(fstarpu_variable_get_ptr(buffers, 1), b)
  148. call sleep(1.d0)
  149. old_a = a
  150. a = old_a + 3.0 + b
  151. write(*,*) "task (c_w_rank:",comm_rank," worker_id:",worker_id,") from ",old_a,"to",a
  152. return
  153. end subroutine cl_cpu_task
  154. recursive subroutine cl_cpu_task_red (buffers, cl_args) bind(C)
  155. use iso_c_binding ! C interfacing module
  156. use fstarpu_mod ! StarPU interfacing module
  157. implicit none
  158. type(c_ptr), value, intent(in) :: buffers, cl_args ! cl_args is unused
  159. integer(c_int) :: ret, worker_id
  160. integer, target :: comm_rank
  161. real(kind(1.d0)), pointer :: as, ad
  162. real(kind(1.d0)) :: old_ad
  163. worker_id = fstarpu_worker_get_id()
  164. comm_rank = fstarpu_mpi_world_rank()
  165. call c_f_pointer(fstarpu_variable_get_ptr(buffers, 0), ad)
  166. call c_f_pointer(fstarpu_variable_get_ptr(buffers, 1), as)
  167. old_ad = ad
  168. ad = ad + as
  169. call sleep(1.d0)
  170. write(*,*) "red_cl (c_w_rank:",comm_rank,"worker_id:",worker_id,")",as, old_ad, ' ---> ',ad
  171. return
  172. end subroutine cl_cpu_task_red
  173. recursive subroutine cl_cpu_task_ini (buffers, cl_args) bind(C)
  174. use iso_c_binding ! C interfacing module
  175. use fstarpu_mod ! StarPU interfacing module
  176. implicit none
  177. type(c_ptr), value, intent(in) :: buffers, cl_args
  178. ! cl_args is unused
  179. integer(c_int) :: ret, worker_id
  180. integer, target :: comm_rank
  181. real(kind(1.d0)), pointer :: a
  182. worker_id = fstarpu_worker_get_id()
  183. comm_rank = fstarpu_mpi_world_rank()
  184. call c_f_pointer(fstarpu_variable_get_ptr(buffers, 0), a)
  185. call sleep(0.5d0)
  186. ! As this codelet is run by each worker in the REDUX mode case
  187. ! this initialization makes salient the number of copies spawned
  188. write(*,*) "ini_cl (c_w_rank:",comm_rank,"worker_id:",worker_id,") set to", comm_rank, "(was",a,")"
  189. a = comm_rank
  190. return
  191. end subroutine cl_cpu_task_ini
  192. subroutine sleep(t)
  193. implicit none
  194. integer :: t_start, t_end, t_rate
  195. real(kind(1.d0)) :: ta, t
  196. call system_clock(t_start)
  197. do
  198. call system_clock(t_end, t_rate)
  199. ta = real(t_end-t_start)/real(t_rate)
  200. if(ta.gt.t) return
  201. end do
  202. end subroutine sleep
  203. end program