瀏覽代碼

starpupy: add cloudpickle to dump and to load arguments (TODO: if the input of submitted function is a Future)

HE Kun 4 年之前
父節點
當前提交
c0f21b3e6e
共有 4 個文件被更改,包括 107 次插入17 次删除
  1. 1 1
      include/starpu_task_util.h
  2. 2 1
      src/util/starpu_task_insert_utils.c
  3. 5 5
      starpupy/examples/starpu_py.py
  4. 99 10
      starpupy/src/starpu_task_wrapper.c

+ 1 - 1
include/starpu_task_util.h

@@ -520,7 +520,7 @@ void starpu_codelet_unpack_arg_init(struct starpu_codelet_pack_arg_data *state,
    Unpack one argument from struct starpu_codelet_pack_arg \p state into ptr.
    That structure has to be initialized before with starpu_codelet_unpack_arg_init().
 */
-void starpu_codelet_unpack_arg(struct starpu_codelet_pack_arg_data *state, void *ptr);
+void starpu_codelet_unpack_arg(struct starpu_codelet_pack_arg_data *state, void *ptr, size_t size);
 
 void starpu_codelet_unpack_arg_fini(struct starpu_codelet_pack_arg_data *state);
 

+ 2 - 1
src/util/starpu_task_insert_utils.c

@@ -71,10 +71,11 @@ void starpu_codelet_unpack_arg_init(struct starpu_codelet_pack_arg_data *state,
 	state->nargs = 0;
 }
 
-void starpu_codelet_unpack_arg(struct starpu_codelet_pack_arg_data *state, void *ptr)
+void starpu_codelet_unpack_arg(struct starpu_codelet_pack_arg_data *state, void *ptr, size_t size)
 {
 	size_t ptr_size;
 	memcpy((void *)&ptr_size, state->arg_buffer+state->current_offset, sizeof(ptr_size));
+	assert(ptr_size==size);
 	state->current_offset += sizeof(ptr_size);
 
 	memcpy(ptr, state->arg_buffer+state->current_offset, ptr_size);

+ 5 - 5
starpupy/examples/starpu_py.py

@@ -129,11 +129,11 @@ async def main():
     res6 = await fut6
     print("The result of function is", res6)
 
-    #apply starpu.delayed(sub_deco)
-    fut7 = sub_deco(fut6, 1)
-    res7 = await fut7
-    print("The first argument of this function is the result of Example 8")
-    print("The result of function is", res7)
+ #    #apply starpu.delayed(sub_deco)
+ #    fut7 = sub_deco(fut6, 1)
+ #    res7 = await fut7
+ #    print("The first argument of this function is the result of Example 8")
+ #    print("The result of function is", res7)
 
 asyncio.run(main())
 

+ 99 - 10
starpupy/src/starpu_task_wrapper.c

@@ -54,11 +54,34 @@ static PyObject *cloudpickle_module; /*cloudpickle module*/
 // 	PyObject *fut; /*asyncio.Future*/
 // 	PyObject *lp; /*asyncio.Eventloop*/
 // };
+static char* starpu_cloudpickle_dumps(PyObject *obj, Py_ssize_t* obj_data_size)
+{
+	PyObject *dumps = PyObject_GetAttrString(cloudpickle_module, "dumps");
+	PyObject *obj_bytes= PyObject_CallFunctionObjArgs(dumps, obj, NULL);
+
+	char* obj_data;
+    PyBytes_AsStringAndSize(obj_bytes, &obj_data, obj_data_size);
+    
+	return obj_data;
+}
 
+static PyObject* starpu_cloudpickle_loads(char* pyString, Py_ssize_t pyString_size)
+{
+
+	PyObject *loads = PyObject_GetAttrString(cloudpickle_module, "loads");
+	PyObject *obj_bytes_str = PyBytes_FromStringAndSize(pyString, pyString_size);
+	PyObject *obj = PyObject_CallFunctionObjArgs(loads, obj_bytes_str, NULL);
+
+	return obj;
+}
 /*function passed to starpu_codelet.cpu_func*/
 void starpupy_codelet_func(void *buffers[], void *cl_arg)
 {
+	char * func_data;
+    Py_ssize_t func_data_size;
 	PyObject *func_py; /*the python function passed in*/
+	char * arg_data;
+    Py_ssize_t arg_data_size;
 	PyObject *argList; /*argument list of python function passed in*/
 
 	//struct codelet_args *cst = (struct codelet_args*) cl_arg;
@@ -68,8 +91,16 @@ void starpupy_codelet_func(void *buffers[], void *cl_arg)
 	struct starpu_codelet_pack_arg_data data;
 	starpu_codelet_unpack_arg_init(&data, &task->cl_arg, &task->cl_arg_size);
 
-	starpu_codelet_unpack_arg(&data, &func_py);
-	starpu_codelet_unpack_arg(&data, &argList);
+	/*get func_py char*/
+	starpu_codelet_unpack_arg(&data, &func_data_size, sizeof(func_data_size));
+    func_data = (char *)malloc(func_data_size);
+    starpu_codelet_unpack_arg(&data, &func_data, sizeof(func_data));
+	//starpu_codelet_unpack_arg(&data, &func_py, sizeof(func_py));
+	/*get argList char*/
+	starpu_codelet_unpack_arg(&data, &arg_data_size, sizeof(arg_data_size));
+    arg_data = (char *)malloc(arg_data_size);
+    starpu_codelet_unpack_arg(&data, &arg_data, sizeof(arg_data));
+	//starpu_codelet_unpack_arg(&data, &argList, sizeof(argList));
 	/*skip fut*/
 	starpu_codelet_unpack_discard_arg(&data);
 	/*skip loop*/
@@ -80,6 +111,10 @@ void starpupy_codelet_func(void *buffers[], void *cl_arg)
 	/*make sure we own the GIL*/
 	PyGILState_STATE state = PyGILState_Ensure();
 
+	/*use cloudpickle to load func_py and argList*/
+	func_py=starpu_cloudpickle_loads(func_data, func_data_size);
+	argList=starpu_cloudpickle_loads(arg_data, arg_data_size);
+
 	/*verify that the function is a proper callable*/
 	if (!PyCallable_Check(func_py))
 	{
@@ -110,12 +145,31 @@ void starpupy_codelet_func(void *buffers[], void *cl_arg)
 	//PyObject *pRetVal = PyObject_CallObject(cst->f, cst->argList);
 	//cst->rv = pRetVal;
 	PyObject *rv = PyObject_CallObject(func_py, argList);
-	//if rv==NULL
-	starpu_codelet_pack_arg(&data, &rv, sizeof(rv));
+	/*if the result is None type, pack NULL without using cloudpickle*/
+	if (rv==Py_None)
+	{
+		char* rv_data=NULL;
+		Py_ssize_t rv_data_size=0;
+		starpu_codelet_pack_arg(&data, &rv_data_size, sizeof(rv_data_size));
+	    starpu_codelet_pack_arg(&data, &rv_data, sizeof(rv_data));
+	}
+	/*else use cloudpickle to dump rv*/
+	else
+	{
+		Py_ssize_t rv_data_size;
+		char* rv_data = starpu_cloudpickle_dumps(rv, &rv_data_size);
+		starpu_codelet_pack_arg(&data, &rv_data_size, sizeof(rv_data_size));
+	    starpu_codelet_pack_arg(&data, &rv_data, sizeof(rv_data));
+	}
+	
+	//starpu_codelet_pack_arg(&data, &rv, sizeof(rv));
     starpu_codelet_pack_arg_fini(&data, &task->cl_arg, &task->cl_arg_size);
 
 	//Py_DECREF(cst->f);
 
+    Py_DECREF(func_py);
+    Py_DECREF(argList);
+
 	/*restore previous GIL state*/
 	PyGILState_Release(state);
 }
@@ -125,6 +179,8 @@ void cb_func(void *v)
 {
 	PyObject *fut; /*asyncio.Future*/
 	PyObject *loop; /*asyncio.Eventloop*/
+	char * rv_data;
+    Py_ssize_t rv_data_size;
 	PyObject *rv; /*return value when using PyObject_CallObject call the function f*/
 
 	struct starpu_task *task = starpu_task_get_current();
@@ -135,17 +191,36 @@ void cb_func(void *v)
 
 	/*skip func_py*/
 	starpu_codelet_unpack_discard_arg(&data);
+	starpu_codelet_unpack_discard_arg(&data);
 	/*skip argList*/
 	starpu_codelet_unpack_discard_arg(&data);
-	starpu_codelet_unpack_arg(&data, &fut);
-	starpu_codelet_unpack_arg(&data, &loop);
+	starpu_codelet_unpack_discard_arg(&data);
+	/*get fut*/
+	starpu_codelet_unpack_arg(&data, &fut, sizeof(fut));
+	/*get loop*/
+	starpu_codelet_unpack_arg(&data, &loop, sizeof(loop));
 	/*skip sb*/
 	starpu_codelet_unpack_discard_arg(&data);
-	starpu_codelet_unpack_arg(&data, &rv);
+	/*get rv char*/
+	starpu_codelet_unpack_arg(&data, &rv_data_size, sizeof(rv_data_size));
+    rv_data = (char *)malloc(rv_data_size);
+    starpu_codelet_unpack_arg(&data, &rv_data, sizeof(rv_data));
+	//starpu_codelet_unpack_arg(&data, &rv, sizeof(rv));
 
 	/*make sure we own the GIL*/
 	PyGILState_STATE state = PyGILState_Ensure();
 
+	/*if the rv_data_size is 0, the result is None type*/
+	if (rv_data_size==0)
+	{
+		rv=Py_None;
+	}
+	/*else use cloudpickle to load rv*/
+	else
+	{
+		rv=starpu_cloudpickle_loads(rv_data, rv_data_size);
+	}
+
 	/*set the Future result and mark the Future as done*/
 	PyObject *set_result = PyObject_GetAttrString(fut, "set_result");
 	PyObject *loop_callback = PyObject_CallMethod(loop, "call_soon_threadsafe", "(O,O)", set_result, rv);
@@ -205,15 +280,19 @@ static size_t sizebase (struct starpu_task *task, unsigned nimpl)
 
 	/*skip func_py*/
 	starpu_codelet_unpack_discard_arg(&data);
+	starpu_codelet_unpack_discard_arg(&data);
 	/*skip argList*/
 	starpu_codelet_unpack_discard_arg(&data);
+	starpu_codelet_unpack_discard_arg(&data);
 	/*skip fut*/
 	starpu_codelet_unpack_discard_arg(&data);
 	/*skip loop*/
 	starpu_codelet_unpack_discard_arg(&data);
-	starpu_codelet_unpack_arg(&data, &sb);
+	/*get sb*/
+	starpu_codelet_unpack_arg(&data, &sb, sizeof(sb));
 	/*skip rv*/
 	starpu_codelet_unpack_discard_arg(&data);
+	starpu_codelet_unpack_discard_arg(&data);
 	//starpu_codelet_unpack_args(task_submit->cl_arg, &func_py, &argList, &fut, &loop, &sb, &rv);
 
 	return sb;
@@ -372,8 +451,17 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
 		}
 	}
 
-	starpu_codelet_pack_arg(&data, &func_py, sizeof(func_py));
-	starpu_codelet_pack_arg(&data, &argList, sizeof(argList));
+	/*use cloudpickle to dump func_py and argList*/
+	Py_ssize_t func_data_size;
+	char* func_data = starpu_cloudpickle_dumps(func_py, &func_data_size);
+	starpu_codelet_pack_arg(&data, &func_data_size, sizeof(func_data_size));
+    starpu_codelet_pack_arg(&data, &func_data, sizeof(func_data));
+	//starpu_codelet_pack_arg(&data, &func_py, sizeof(func_py));
+	Py_ssize_t arg_data_size;
+	char* arg_data = starpu_cloudpickle_dumps(argList, &arg_data_size);
+	starpu_codelet_pack_arg(&data, &arg_data_size, sizeof(arg_data_size));
+    starpu_codelet_pack_arg(&data, &arg_data, sizeof(arg_data));
+	//starpu_codelet_pack_arg(&data, &argList, sizeof(argList));
 	starpu_codelet_pack_arg(&data, &fut, sizeof(fut));
 	starpu_codelet_pack_arg(&data, &loop, sizeof(loop));
 
@@ -577,6 +665,7 @@ PyInit_starpupy(void)
 	asyncio_module = PyImport_ImportModule("asyncio");
 	/*cloudpickle import*/
 	cloudpickle_module = PyImport_ImportModule("cloudpickle");
+
 #ifdef STARPU_PYTHON_HAVE_NUMPY
 	/*numpy import array*/
 	import_array();