Browse Source

julia: Add support for callbacks.

Pierre Huchant 5 years ago
parent
commit
c2ec009d4d

+ 23 - 0
julia/Manifest.toml

@@ -55,6 +55,10 @@ uuid = "76f85450-5226-5b5a-8eaa-529ad045b433"
 [[Libdl]]
 uuid = "8f399da3-3557-5675-b5ff-fb832c97cbdb"
 
+[[LinearAlgebra]]
+deps = ["Libdl"]
+uuid = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
+
 [[Logging]]
 uuid = "56ddb016-857b-54e1-b83d-db4d58db5568"
 
@@ -83,6 +87,11 @@ uuid = "3fa0cd96-eef1-5676-8a61-b3b8758bbffb"
 deps = ["Serialization"]
 uuid = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
 
+[[RecipesBase]]
+git-tree-sha1 = "54f8ceb165a0f6d083f0d12cb4996f5367c6edbc"
+uuid = "3cdcf5f2-1ef4-517c-9805-6587b60abb01"
+version = "1.0.1"
+
 [[SHA]]
 uuid = "ea8e919c-243c-51af-8825-aaa63cd721ce"
 
@@ -92,10 +101,24 @@ uuid = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
 [[Sockets]]
 uuid = "6462fe0b-24de-5631-8697-dd941f90decc"
 
+[[SparseArrays]]
+deps = ["LinearAlgebra", "Random"]
+uuid = "2f01184e-e22b-5df5-ae63-d93ebab69eaf"
+
+[[Statistics]]
+deps = ["LinearAlgebra", "SparseArrays"]
+uuid = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"
+
 [[Test]]
 deps = ["Distributed", "InteractiveUtils", "Logging", "Random"]
 uuid = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
 
+[[ThreadPools]]
+deps = ["Printf", "RecipesBase", "Statistics"]
+git-tree-sha1 = "48e35097fdc6d1706a9b90c5eee62f54402aa62c"
+uuid = "b189fb0b-2eb5-4ed4-bc0c-d34c51242431"
+version = "1.1.0"
+
 [[UUIDs]]
 deps = ["Random", "SHA"]
 uuid = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"

+ 1 - 0
julia/Project.toml

@@ -7,3 +7,4 @@ version = "0.1.0"
 CBinding = "d43a6710-96b8-4a2d-833c-c424785e5374"
 Clang = "40e3b903-d033-50b4-a0cc-940c62c95e31"
 Libdl = "8f399da3-3557-5675-b5ff-fb832c97cbdb"
+ThreadPools = "b189fb0b-2eb5-4ed4-bc0c-d34c51242431"

+ 4 - 0
julia/examples/Makefile.am

@@ -92,3 +92,7 @@ SHELL_TESTS	+=	vector_scal/vector_scal.sh
 STARPU_JULIA_EXAMPLES		+=	mandelbrot/mandelbrot
 mandelbrot_mandelbrot_SOURCES	=	mandelbrot/mandelbrot.c mandelbrot/cpu_mandelbrot.c mandelbrot/cpu_mandelbrot.h
 SHELL_TESTS			+=	mandelbrot/mandelbrot.sh
+
+STARPU_JULIA_EXAMPLES		+= callback/callback
+callback_callback_SOURCES	=	callback/callback.c
+SHELL_TESTS			+=	callback/callback.sh

+ 93 - 0
julia/examples/callback/callback.c

@@ -0,0 +1,93 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2009-2020  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+/*
+ * This is an example of using a callback. We submit a task, whose callback
+ * submits another task (without any callback).
+ */
+
+#include <starpu.h>
+
+#define FPRINTF(ofile, fmt, ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ## __VA_ARGS__); }} while(0)
+
+starpu_data_handle_t handle;
+
+void cpu_codelet(void *descr[], void *_args)
+{
+	(void)_args;
+	int *val = (int *)STARPU_VARIABLE_GET_PTR(descr[0]);
+
+	*val += 1;
+}
+
+struct starpu_codelet cl =
+{
+	.modes = { STARPU_RW },
+	.cpu_funcs = {cpu_codelet},
+	.cpu_funcs_name = {"cpu_codelet"},
+	.nbuffers = 1,
+	.name = "callback"
+};
+
+void callback_func(void *callback_arg)
+{
+	int ret;
+
+	(void)callback_arg;
+
+	struct starpu_task *task = starpu_task_create();
+	task->cl = &cl;
+	task->handles[0] = handle;
+
+	ret = starpu_task_submit(task);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+}
+
+int main(void)
+{
+	int v=40;
+	int ret;
+
+	ret = starpu_init(NULL);
+	if (ret == -ENODEV)
+		return 77;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	starpu_variable_data_register(&handle, STARPU_MAIN_RAM, (uintptr_t)&v, sizeof(int));
+
+	struct starpu_task *task = starpu_task_create();
+	task->cl = &cl;
+	task->callback_func = callback_func;
+	task->callback_arg = NULL;
+	task->handles[0] = handle;
+
+	ret = starpu_task_submit(task);
+	if (ret == -ENODEV) goto enodev;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+
+	starpu_task_wait_for_all();
+	starpu_data_unregister(handle);
+
+	FPRINTF(stderr, "v -> %d\n", v);
+
+	starpu_shutdown();
+
+	return (v == 42) ? 0 : 1;
+
+enodev:
+	starpu_shutdown();
+	return 77;
+}

+ 74 - 0
julia/examples/callback/callback.jl

@@ -0,0 +1,74 @@
+# StarPU --- Runtime system for heterogeneous multicore architectures.
+#
+# Copyright (C) 2020       Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+#
+# StarPU is free software; you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation; either version 2.1 of the License, or (at
+# your option) any later version.
+#
+# StarPU is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+#
+# See the GNU Lesser General Public License in COPYING.LGPL for more details.
+#
+using StarPU
+
+@target STARPU_CPU
+@codelet function variable(val ::Ref{Int32}) :: Nothing
+    val[] = val[] + 1
+
+    return
+end
+
+starpu_init()
+
+function callback(args)
+    cl = args[1]
+    handles = args[2]
+
+    task = starpu_task(cl = cl, handles=handles)
+    starpu_task_submit(task)
+end
+
+function variable_with_starpu(val ::Ref{Int32})
+    perfmodel = starpu_perfmodel(
+        perf_type = starpu_perfmodel_type(STARPU_HISTORY_BASED),
+        symbol = "history_perf"
+    )
+
+    cl = starpu_codelet(
+        cpu_func = CPU_CODELETS["variable"],
+        # cuda_func = CUDA_CODELETS["matrix_mult"],
+        #opencl_func="ocl_matrix_mult",
+        modes = [STARPU_RW],
+        perfmodel = perfmodel
+    )
+
+    @starpu_block let
+	hVal = starpu_data_register(val)
+
+        task = starpu_task(cl = cl, handles = [hVal], callback=callback, callback_arg=(cl, [hVal]))
+        starpu_task_submit(task)
+
+        starpu_task_wait_for_all()
+    end
+end
+
+function display()
+    v = Ref(Int32(40))
+
+    variable_with_starpu(v)
+
+    println("variable -> ", v[])
+    if v[] == 42
+        println("result is correct")
+    else
+        println("result is incorret")
+    end
+end
+
+display()
+
+starpu_shutdown()

+ 19 - 0
julia/examples/callback/callback.sh

@@ -0,0 +1,19 @@
+#!/bin/bash
+# StarPU --- Runtime system for heterogeneous multicore architectures.
+#
+# Copyright (C) 2020       Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+#
+# StarPU is free software; you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation; either version 2.1 of the License, or (at
+# your option) any later version.
+#
+# StarPU is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+#
+# See the GNU Lesser General Public License in COPYING.LGPL for more details.
+#
+
+$(dirname $0)/../execute.sh callback/callback.jl
+

+ 2 - 0
julia/examples/check_deps/check_deps.jl

@@ -3,9 +3,11 @@ import Pkg
 try
     using CBinding
     using Clang
+    using ThreadPools
 catch
     Pkg.activate((@__DIR__)*"/../..")
     Pkg.instantiate()
     using Clang
     using CBinding
+    using ThreadPools
 end

+ 1 - 0
julia/examples/execute.sh.in

@@ -20,6 +20,7 @@ export JULIA_LOAD_PATH=@STARPU_SRC_DIR@/julia:$JULIA_LOAD_PATH
 export STARPU_INCLUDE_DIR=@STARPU_BUILD_DIR@/include
 export STARPU_JULIA_LIB=@STARPU_BUILD_DIR@/julia/src/.libs/libstarpujulia-1.3.so
 export STARPU_JULIA_BUILD=@STARPU_BUILD_DIR@/julia
+export JULIA_NUM_THREADS=8
 srcdir=@STARPU_SRC_DIR@/julia/examples
 
 if test "$1" == "-calllib"

+ 1 - 1
julia/src/Makefile.am

@@ -33,4 +33,4 @@ libstarpujulia_@STARPU_EFFECTIVE_VERSION@_la_LDFLAGS = $(ldflags) -no-undefined
   -version-info $(LIBSTARPUJULIA_INTERFACE_CURRENT):$(LIBSTARPUJULIA_INTERFACE_REVISION):$(LIBSTARPUJULIA_INTERFACE_AGE)
 
 libstarpujulia_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
-	dummy.c
+	callback_wrapper.c

+ 10 - 0
julia/src/dummy.c

@@ -22,3 +22,13 @@ void  *dummy_function_list[] = {
 				starpu_init,
 };
 
+void julia_callback_func(void *user_data)
+{
+  volatile int *signal = (int *) user_data;
+
+  // Wakeup callback
+  *(signal) = 1;
+
+  // Wait for callback to end.
+  while ((*signal) != 0);
+}

+ 44 - 4
julia/src/task.jl

@@ -13,6 +13,8 @@
 #
 # See the GNU Lesser General Public License in COPYING.LGPL for more details.
 #
+using ThreadPools
+
 struct jl_starpu_codelet
     c_codelet :: starpu_codelet
     perfmodel :: starpu_perfmodel
@@ -68,6 +70,9 @@ mutable struct jl_starpu_task
     handle_pointers :: Vector{StarpuDataHandlePointer}
     synchronous :: Bool
     cl_arg # type depends on codelet
+    callback_signal :: Vector{Cint}
+    callback_function :: Union{Cvoid, Function}
+    callback_arg
     c_task :: starpu_task
 end
 
@@ -76,13 +81,13 @@ end
 
             Creates a new task which will run the specified codelet on handle buffers and cl_args data
         """
-function starpu_task(; cl :: Union{Cvoid, jl_starpu_codelet} = nothing, handles :: Vector{StarpuDataHandle} = StarpuDataHandle[], cl_arg = ())
-
+function starpu_task(; cl :: Union{Cvoid, jl_starpu_codelet} = nothing, handles :: Vector{StarpuDataHandle} = StarpuDataHandle[], cl_arg = (),
+                     callback :: Union{Cvoid, Function} = nothing, callback_arg = nothing)
     if (cl == nothing)
         error("\"cl\" field can't be empty when creating a StarpuTask")
     end
 
-    output = jl_starpu_task(cl, handles, map((x -> x.object), handles), false, nothing, starpu_task(zero))
+    output = jl_starpu_task(cl, handles, map((x -> x.object), handles), false, nothing, Vector{Cint}(undef, 1), callback, callback_arg, starpu_task(zero))
 
     # handle scalar_parameters
     codelet_name = cl.cpu_func
@@ -119,6 +124,14 @@ function starpu_task(; cl :: Union{Cvoid, jl_starpu_codelet} = nothing, handles
         output.c_task.cl_arg = Base.unsafe_convert(Ptr{Cvoid}, Ref(output.cl_arg))
         output.c_task.cl_arg_size = sizeof(output.cl_arg)
     end
+
+    # callback
+    if output.callback_function != nothing
+        output.callback_signal[1] = 0
+        output.c_task.callback_arg = Base.unsafe_convert(Ptr{Cvoid}, output.callback_signal)
+        output.c_task.callback_func = load_wrapper_function_pointer("julia_callback_func")
+    end
+
     return output
 end
 
@@ -154,7 +167,34 @@ function starpu_task_submit(task :: jl_starpu_task)
         error("Invalid number of handles for task : $(length(task.handles)) where given while codelet has $(task.cl.modes) modes")
     end
 
+    # Prevent task from being garbage collected. This is necessary for tasks created
+    # inside callbacks.
     starpu_task_submit(Ref(task.c_task))
+
+    if task.callback_function != nothing
+        callback_arg = task.callback_arg
+        callback_signal = task.callback_signal
+        callback_function = task.callback_function
+
+        @qbthreads for x in 1:1
+            begin
+                # Active waiting loop
+                # We're doing a fake computation on tmp to prevent optimization.
+                tmp = 0
+                while task.callback_signal[1] == 0
+                    tmp += 1
+                end
+
+                # We've received the signal from the pthread, now execute the callback.
+                callback_function(callback_arg)
+
+                # Tell the pthread that the callback is done.
+                callback_signal[1] = 0
+
+                return callback_signal[1]
+            end
+        end
+    end
 end
 
 function starpu_modes(x :: Symbol)
@@ -205,7 +245,7 @@ end
 """
 function starpu_task_wait_for_all()
     @threadcall(@starpufunc(:starpu_task_wait_for_all),
-                          Cint, ())
+                Cint, ())
 end
 
 """

+ 15 - 0
julia/src/utils.jl

@@ -76,6 +76,21 @@ function load_starpu_function_pointer(func_name :: String)
     return func_pointer
 end
 
+function load_wrapper_function_pointer(func_name :: String)
+    if (isempty(func_name))
+        return C_NULL
+    end
+
+    func_pointer=Libdl.dlsym(starpu_wrapper_library_handle, func_name)
+
+    if (func_pointer == C_NULL)
+        error("Couldn't find function symbol $func_name into extern library file $starpu_tasks_library")
+    end
+
+    return func_pointer
+end
+
+
 """
     Declares a Julia function which is just calling the StarPU function
     having the same name.