task.jl 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. # StarPU --- Runtime system for heterogeneous multicore architectures.
  2. #
  3. # Copyright (C) 2020 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
  4. #
  5. # StarPU is free software; you can redistribute it and/or modify
  6. # it under the terms of the GNU Lesser General Public License as published by
  7. # the Free Software Foundation; either version 2.1 of the License, or (at
  8. # your option) any later version.
  9. #
  10. # StarPU is distributed in the hope that it will be useful, but
  11. # WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  13. #
  14. # See the GNU Lesser General Public License in COPYING.LGPL for more details.
  15. #
  16. using ThreadPools
  17. mutable struct jl_starpu_codelet
  18. c_codelet :: starpu_codelet
  19. perfmodel :: starpu_perfmodel
  20. cpu_func :: Union{String, STARPU_BLAS}
  21. cuda_func :: Union{String, STARPU_BLAS}
  22. opencl_func :: String
  23. modes
  24. end
  25. global codelet_list = Vector{jl_starpu_codelet}()
  26. function starpu_codelet(;
  27. cpu_func :: Union{String, STARPU_BLAS} = "",
  28. cuda_func :: Union{String, STARPU_BLAS} = "",
  29. opencl_func :: String = "",
  30. modes = [],
  31. perfmodel :: starpu_perfmodel,
  32. where_to_execute :: Union{Cvoid, UInt32} = nothing,
  33. color :: UInt32 = 0x00000000
  34. )
  35. if (length(modes) > STARPU_NMAXBUFS)
  36. error("Codelet has too much buffers ($(length(modes)) but only $STARPU_NMAXBUFS are allowed)")
  37. end
  38. if (where_to_execute == nothing)
  39. real_where = ((cpu_func != "") * STARPU_CPU) | ((cuda_func != "") * STARPU_CUDA)
  40. else
  41. real_where = where_to_execute
  42. end
  43. output = jl_starpu_codelet(starpu_codelet(zero), perfmodel, cpu_func, cuda_func, opencl_func, modes)
  44. ## TODO: starpu_codelet_init
  45. output.c_codelet.where = real_where
  46. for i in 1:length(modes)
  47. output.c_codelet.modes[i] = modes[i]
  48. end
  49. output.c_codelet.nbuffers = length(modes)
  50. output.c_codelet.model = pointer_from_objref(perfmodel)
  51. output.c_codelet.color = color
  52. if typeof(cpu_func) == STARPU_BLAS
  53. output.cpu_func = cpu_blas_codelets[cpu_func]
  54. output.c_codelet.cpu_func = load_wrapper_function_pointer(output.cpu_func)
  55. else
  56. output.c_codelet.cpu_func = load_starpu_function_pointer(cpu_func)
  57. end
  58. if typeof(cuda_func) == STARPU_BLAS
  59. output.cuda_func = cuda_blas_codelets[cuda_func]
  60. output.c_codelet.cuda_func = load_wrapper_function_pointer(output.cuda_func)
  61. output.c_codelet.cuda_flags[1] = STARPU_CUDA_ASYNC
  62. else
  63. output.c_codelet.cuda_func = load_starpu_function_pointer(cuda_func)
  64. end
  65. output.c_codelet.opencl_func = load_starpu_function_pointer(opencl_func)
  66. # Codelets must not be garbage collected before starpu shutdown is called.
  67. lock(mutex)
  68. push!(codelet_list, output)
  69. unlock(mutex)
  70. return output
  71. end
  72. mutable struct jl_starpu_task
  73. cl :: jl_starpu_codelet
  74. handles :: Vector{StarpuDataHandle}
  75. handle_pointers :: Vector{StarpuDataHandlePointer}
  76. synchronous :: Bool
  77. cl_arg # type depends on codelet
  78. callback_signal :: Vector{Cint}
  79. callback_function :: Union{Cvoid, Function}
  80. callback_arg
  81. c_task :: starpu_task
  82. end
  83. task_list = Vector{jl_starpu_task}()
  84. """
  85. starpu_task(; cl :: jl_starpu_codelet, handles :: Vector{StarpuDataHandle}, cl_arg :: Ref)
  86. Creates a new task which will run the specified codelet on handle buffers and cl_args data
  87. """
  88. function starpu_task(; cl :: Union{Cvoid, jl_starpu_codelet} = nothing, handles :: Vector{StarpuDataHandle} = StarpuDataHandle[], cl_arg = (),
  89. callback :: Union{Cvoid, Function} = nothing, callback_arg = nothing, tag :: Union{Cvoid, starpu_tag_t} = nothing,
  90. sequential_consistency = true, detach = 1)
  91. if (cl == nothing)
  92. error("\"cl\" field can't be empty when creating a StarpuTask")
  93. end
  94. output = jl_starpu_task(cl, handles, map((x -> x.object), handles), false, nothing, Vector{Cint}(undef, 1), callback, callback_arg, starpu_task(zero))
  95. # handle scalar_parameters
  96. codelet_name = cl.cpu_func
  97. if isempty(codelet_name)
  98. codelet_name = cl.cuda_func
  99. end
  100. if isempty(codelet_name)
  101. codelet_name = cl.opencl_func
  102. end
  103. if isempty(codelet_name)
  104. error("No function provided with codelet.")
  105. end
  106. scalar_parameters = get(CODELETS_SCALARS, codelet_name, nothing)
  107. if scalar_parameters != nothing
  108. nb_scalar_required = length(scalar_parameters)
  109. nb_scalar_provided = tuple_len(cl_arg)
  110. if (nb_scalar_provided != nb_scalar_required)
  111. error("$nb_scalar_provided scalar parameters provided but $nb_scalar_required are required by $codelet_name.")
  112. end
  113. output.cl_arg = create_param_struct_from_clarg(codelet_name, cl_arg)
  114. else
  115. output.cl_arg = cl_arg
  116. end
  117. starpu_task_init(Ref(output.c_task))
  118. output.c_task.cl = pointer_from_objref(cl.c_codelet)
  119. output.c_task.synchronous = false
  120. output.c_task.sequential_consistency = sequential_consistency
  121. output.c_task.detach = detach
  122. ## TODO: check num handles equals num codelet buffers
  123. for i in 1:length(handles)
  124. output.c_task.handles[i] = output.handle_pointers[i]
  125. end
  126. if tuple_len(cl_arg) > 0
  127. output.c_task.cl_arg = Base.unsafe_convert(Ptr{Cvoid}, Ref(output.cl_arg))
  128. output.c_task.cl_arg_size = sizeof(output.cl_arg)
  129. end
  130. # callback
  131. if output.callback_function != nothing
  132. output.callback_signal[1] = 0
  133. output.c_task.callback_arg = Base.unsafe_convert(Ptr{Cvoid}, output.callback_signal)
  134. output.c_task.callback_func = load_wrapper_function_pointer("julia_callback_func")
  135. end
  136. if tag != nothing
  137. output.c_task.tag_id = tag
  138. output.c_task.use_tag = 1
  139. end
  140. # Tasks must not be garbage collected before starpu_task_wait_for_all is called.
  141. # This is necessary in particular for tasks created inside callback functions.
  142. lock(mutex)
  143. push!(task_list, output)
  144. unlock(mutex)
  145. return output
  146. end
  147. function create_param_struct_from_clarg(name, cl_arg)
  148. struct_params_name = CODELETS_PARAMS_STRUCT[name]
  149. if struct_params_name == false
  150. error("structure name not found in CODELET_PARAMS_STRUCT")
  151. end
  152. nb_scalar_provided = length(cl_arg)
  153. create_struct_param_str = "output = $struct_params_name("
  154. for i in 1:nb_scalar_provided-1
  155. arg = cl_arg[i]
  156. create_struct_param_str *= "$arg, "
  157. end
  158. if (nb_scalar_provided > 0)
  159. arg = cl_arg[nb_scalar_provided]
  160. create_struct_param_str *= "$arg"
  161. end
  162. create_struct_param_str *= ")"
  163. eval(Meta.parse(create_struct_param_str))
  164. return output
  165. end
  166. """
  167. Launches task execution, if "synchronous" task field is set to "false", call
  168. returns immediately
  169. """
  170. function starpu_task_submit(task :: jl_starpu_task)
  171. if (length(task.handles) != length(task.cl.modes))
  172. error("Invalid number of handles for task : $(length(task.handles)) where given while codelet has $(task.cl.modes) modes")
  173. end
  174. starpu_task_submit(Ref(task.c_task))
  175. if task.callback_function != nothing
  176. callback_arg = task.callback_arg
  177. callback_signal = task.callback_signal
  178. callback_function = task.callback_function
  179. lock(mutex)
  180. put!(task_pool) do
  181. # Active waiting loop
  182. @starpucall(julia_wait_signal, Cvoid, (Ptr{Cvoid},), Base.unsafe_convert(Ptr{Cvoid}, callback_signal))
  183. # We've received the signal from the pthread, now execute the callback.
  184. callback_function(callback_arg)
  185. # Tell the pthread that the callback is done.
  186. callback_signal[1] = 0
  187. end
  188. unlock(mutex)
  189. end
  190. end
  191. function starpu_modes(x :: Symbol)
  192. if (x == Symbol("STARPU_RW"))
  193. return STARPU_RW
  194. elseif (x == Symbol("STARPU_R"))
  195. return STARPU_R
  196. else return STARPU_W
  197. end
  198. end
  199. """
  200. Creates and submits an asynchronous task running cl Codelet function.
  201. Ex : @starpu_async_cl cl(handle1, handle2)
  202. """
  203. macro starpu_async_cl(expr, modes, cl_arg=(), color ::UInt32=0x00000000)
  204. if (!isa(expr, Expr) || expr.head != :call)
  205. error("Invalid task submit syntax")
  206. end
  207. if (!isa(expr, Expr)||modes.head != :vect)
  208. error("Invalid task submit syntax")
  209. end
  210. perfmodel = starpu_perfmodel(
  211. perf_type = starpu_perfmodel_type(STARPU_HISTORY_BASED),
  212. symbol = "history_perf"
  213. )
  214. println(CPU_CODELETS[string(expr.args[1])])
  215. cl = starpu_codelet(
  216. cpu_func = CPU_CODELETS[string(expr.args[1])],
  217. # cuda_func = CUDA_CODELETS[string(expr.args[1])],
  218. #opencl_func="ocl_matrix_mult",
  219. ### TODO: CORRECT !
  220. modes = map((x -> starpu_modes(x)),modes.args),
  221. perfmodel = perfmodel,
  222. color = color
  223. )
  224. handles = Expr(:vect, expr.args[2:end]...)
  225. #dump(handles)
  226. quote
  227. task = starpu_task(cl = $(esc(cl)), handles = $(esc(handles)), cl_arg=$(esc(cl_arg)))
  228. starpu_task_submit(task)
  229. end
  230. end
  231. function starpu_task_wait(task :: jl_starpu_task)
  232. @threadcall(@starpufunc(:starpu_task_wait),
  233. Cint, (Ptr{Cvoid},), Ref(task.c_task))
  234. # starpu_task_wait(Ref(task.c_task))
  235. end
  236. """
  237. Blocks until every submitted task has finished.
  238. """
  239. function starpu_task_wait_for_all()
  240. @threadcall(@starpufunc(:starpu_task_wait_for_all),
  241. Cint, ())
  242. lock(mutex)
  243. empty!(task_list)
  244. unlock(mutex)
  245. end
  246. """
  247. Blocks until every submitted task has finished.
  248. Ex : @starpu_sync_tasks begin
  249. [...]
  250. starpu_task_submit(task)
  251. [...]
  252. end
  253. TODO : Make the macro only wait for tasks declared inside the following expression.
  254. (similar mechanism as @starpu_block)
  255. """
  256. macro starpu_sync_tasks(expr)
  257. quote
  258. $(esc(expr))
  259. starpu_task_wait_for_all()
  260. end
  261. end
  262. function starpu_task_destroy(task :: jl_starpu_task)
  263. starpu_task_destroy(Ref(task.c_task))
  264. end